Skip to main content

differential_dataflow/dynamic/
pointstamp.rs

1//! A timestamp type as in Naiad, where a vector of timestamps of different lengths are comparable.
2//!
3//! This type compares using "standard" tuple logic as if each timestamp were extended indefinitely with minimal elements.
4//!
5//! The path summary for this type allows *run-time* rather than *type-driven* iterative scopes.
6//! Each summary represents some journey within and out of some number of scopes, followed by entry
7//! into and iteration within some other number of scopes.
8//!
9//! As a result, summaries describe some number of trailing coordinates to truncate, and some increments
10//! to the resulting vector. Structurally, the increments can only be to one non-truncated coordinate
11//! (as iteration within a scope requires leaving contained scopes), and then to any number of appended
12//! default coordinates (which is effectively just *setting* the coordinate).
13
14use columnar::Columnar;
15use serde::{Deserialize, Serialize};
16use smallvec::SmallVec;
17
18/// A sequence of timestamps, partially ordered by the product order.
19///
20/// Sequences of different lengths are compared as if extended indefinitely by `T::minimum()`.
21/// Sequences are guaranteed to be "minimal", and may not end with `T::minimum()` entries.
22#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize, Columnar)]
23#[columnar(derive(Eq, PartialEq, Ord, PartialOrd))]
24pub struct PointStamp<T> {
25    /// A sequence of timestamps corresponding to timestamps in a sequence of nested scopes.
26    vector: SmallVec<[T; 1]>,
27}
28
29impl<T: Timestamp> PartialEq<[T]> for PointStamp<T> {
30    fn eq(&self, other: &[T]) -> bool {
31        self.vector.iter()
32            .zip(other.iter().chain(std::iter::repeat(&T::minimum())))
33            .all(|(t1, t2)| t1.eq(t2))
34    }
35}
36
37impl<T: Timestamp> PartialEq<PointStamp<T>> for [T] {
38    fn eq(&self, other: &PointStamp<T>) -> bool {
39        self.iter()
40            .zip(other.vector.iter().chain(std::iter::repeat(&T::minimum())))
41            .all(|(t1, t2)| t1.eq(t2))
42    }
43}
44
45impl<T: Timestamp> PartialOrder<[T]> for PointStamp<T> {
46    fn less_equal(&self, other: &[T]) -> bool {
47        self.vector.iter()
48            .zip(other.iter().chain(std::iter::repeat(&T::minimum())))
49            .all(|(t1, t2)| t1.less_equal(t2))
50    }
51}
52
53impl<T: Timestamp> PartialOrder<PointStamp<T>> for [T] {
54    fn less_equal(&self, other: &PointStamp<T>) -> bool {
55        self.iter()
56            .zip(other.vector.iter().chain(std::iter::repeat(&T::minimum())))
57            .all(|(t1, t2)| t1.less_equal(t2))
58    }
59}
60
61impl<T: Timestamp> PointStamp<T> {
62    /// Create a new sequence.
63    ///
64    /// This method will modify `vector` to ensure it does not end with `T::minimum()`.
65    pub fn new(mut vector: SmallVec<[T; 1]>) -> Self {
66        while vector.last() == Some(&T::minimum()) {
67            vector.pop();
68        }
69        PointStamp { vector }
70    }
71    /// Returns the wrapped small vector.
72    ///
73    /// This method is the support way to mutate the contents of `self`, by extracting
74    /// the vector and then re-introducing it with `PointStamp::new` to re-establish
75    /// the invariant that the vector not end with `T::minimum`.
76    pub fn into_inner(self) -> SmallVec<[T; 1]> {
77        self.vector
78    }
79}
80
81impl<T> std::ops::Deref for PointStamp<T> {
82    type Target = [T];
83    fn deref(&self) -> &Self::Target {
84        &self.vector
85    }
86}
87
88// Implement timely dataflow's `PartialOrder` trait.
89use timely::order::PartialOrder;
90impl<T: PartialOrder + Timestamp> PartialOrder for PointStamp<T> {
91    fn less_equal(&self, other: &Self) -> bool {
92        // Every present coordinate must be less-equal the corresponding coordinate,
93        // where absent corresponding coordinates are `T::minimum()`. Coordinates
94        // absent from `self.vector` are themselves `T::minimum()` and are less-equal
95        // any corresponding coordinate in `other.vector`.
96        self.vector
97            .iter()
98            .zip(other.vector.iter().chain(std::iter::repeat(&T::minimum())))
99            .all(|(t1, t2)| t1.less_equal(t2))
100    }
101}
102
103use timely::progress::timestamp::Refines;
104impl<T: Timestamp> Refines<()> for PointStamp<T> {
105    fn to_inner(_outer: ()) -> Self {
106        Self { vector: Default::default() }
107    }
108    fn to_outer(self) -> () {
109        ()
110    }
111    fn summarize(_summary: <Self>::Summary) -> () {
112        ()
113    }
114}
115
116// Implement timely dataflow's `PathSummary` trait.
117// This is preparation for the `Timestamp` implementation below.
118use timely::progress::PathSummary;
119
120/// Describes an action on a `PointStamp`: truncation to `length` followed by `actions`.
121#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize)]
122pub struct PointStampSummary<TS> {
123    /// Number of leading coordinates to retain.
124    ///
125    /// A `None` value indicates that all coordinates should be retained.
126    pub retain: Option<usize>,
127    /// Summary actions to apply to all coordinates.
128    ///
129    /// If `actions.len()` is greater than `retain`, a timestamp should be extended by
130    /// `T::minimum()` in order to be subjected to `actions`.
131    pub actions: Vec<TS>,
132}
133
134impl<T: Timestamp> PathSummary<PointStamp<T>> for PointStampSummary<T::Summary> {
135    fn results_in(&self, timestamp: &PointStamp<T>) -> Option<PointStamp<T>> {
136        // Get a slice of timestamp coordinates appropriate for consideration.
137        let timestamps = if let Some(retain) = self.retain {
138            if retain < timestamp.vector.len() {
139                &timestamp.vector[..retain]
140            } else {
141                &timestamp.vector[..]
142            }
143        } else {
144            &timestamp.vector[..]
145        };
146
147        let mut vector = Vec::with_capacity(std::cmp::max(timestamps.len(), self.actions.len()));
148        // Introduce elements where both timestamp and action exist.
149        let min_len = std::cmp::min(timestamps.len(), self.actions.len());
150        for (action, timestamp) in self.actions.iter().zip(timestamps.iter()) {
151            vector.push(action.results_in(timestamp)?);
152        }
153        // Any remaining timestamps should be copied in.
154        for timestamp in timestamps.iter().skip(min_len) {
155            vector.push(timestamp.clone());
156        }
157        // Any remaining actions should be applied to the empty timestamp.
158        for action in self.actions.iter().skip(min_len) {
159            vector.push(action.results_in(&T::minimum())?);
160        }
161
162        Some(PointStamp::new(vector.into()))
163    }
164    fn followed_by(&self, other: &Self) -> Option<Self> {
165        // The output `retain` will be the minimum of the two inputs.
166        let retain = match (self.retain, other.retain) {
167            (Some(x), Some(y)) => Some(std::cmp::min(x, y)),
168            (Some(x), None) => Some(x),
169            (None, Some(y)) => Some(y),
170            (None, None) => None,
171        };
172
173        // The output `actions` will depend on the relative sizes of the input `retain`s.
174        let self_actions = if let Some(retain) = other.retain {
175            if retain < self.actions.len() {
176                &self.actions[..retain]
177            } else {
178                &self.actions[..]
179            }
180        } else {
181            &self.actions[..]
182        };
183
184        let mut actions = Vec::with_capacity(std::cmp::max(self_actions.len(), other.actions.len()));
185        // Introduce actions where both input actions apply.
186        let min_len = std::cmp::min(self_actions.len(), other.actions.len());
187        for (action1, action2) in self_actions.iter().zip(other.actions.iter()) {
188            actions.push(action1.followed_by(action2)?);
189        }
190        // Append any remaining self actions.
191        actions.extend(self_actions.iter().skip(min_len).cloned());
192        // Append any remaining other actions.
193        actions.extend(other.actions.iter().skip(min_len).cloned());
194
195        Some(Self { retain, actions })
196    }
197}
198
199impl<TS: PartialOrder> PartialOrder for PointStampSummary<TS> {
200    fn less_equal(&self, other: &Self) -> bool {
201        // If the `retain`s are not the same, there is some coordinate which
202        // could either be bigger or smaller as the timestamp or the replacement.
203        // In principle, a `T::minimum()` extension could break this rule, and
204        // we could tighten this logic if needed; I think it is fine not to though.
205        self.retain == other.retain
206            && self.actions.len() <= other.actions.len()
207            && self
208                .actions
209                .iter()
210                .zip(other.actions.iter())
211                .all(|(t1, t2)| t1.less_equal(t2))
212    }
213}
214
215// Implement timely dataflow's `Timestamp` trait.
216use timely::progress::Timestamp;
217impl<T: Timestamp> Timestamp for PointStamp<T> {
218    fn minimum() -> Self {
219        Self::new(Default::default())
220    }
221    type Summary = PointStampSummary<T::Summary>;
222}
223
224// Implement differential dataflow's `Lattice` trait.
225// This extends the `PartialOrder` implementation with additional structure.
226use crate::lattice::Lattice;
227impl<T: Lattice + Timestamp + Clone> Lattice for PointStamp<T> {
228    fn join(&self, other: &Self) -> Self {
229        let min_len = ::std::cmp::min(self.vector.len(), other.vector.len());
230        let max_len = ::std::cmp::max(self.vector.len(), other.vector.len());
231        let mut vector = SmallVec::with_capacity(max_len);
232        // For coordinates in both inputs, apply `join` to the pair.
233        for index in 0..min_len {
234            vector.push(self.vector[index].join(&other.vector[index]));
235        }
236        // Only one of the two vectors will have remaining elements; copy them.
237        for time in &self.vector[min_len..] {
238            vector.push(time.clone());
239        }
240        for time in &other.vector[min_len..] {
241            vector.push(time.clone());
242        }
243        Self::new(vector)
244    }
245    fn meet(&self, other: &Self) -> Self {
246        let min_len = ::std::cmp::min(self.vector.len(), other.vector.len());
247        let mut vector = SmallVec::with_capacity(min_len);
248        // For coordinates in both inputs, apply `meet` to the pair.
249        for index in 0..min_len {
250            vector.push(self.vector[index].meet(&other.vector[index]));
251        }
252        // Remaining coordinates are `T::minimum()` in one input, and so in the output.
253        Self::new(vector)
254    }
255}
256
257mod columnation {
258    use columnation::{Columnation, Region};
259    use smallvec::SmallVec;
260    use crate::dynamic::pointstamp::PointStamp;
261
262    impl<T: Columnation+Clone> Columnation for PointStamp<T> {
263        type InnerRegion = PointStampStack<T::InnerRegion>;
264    }
265
266    /// Stack for PointStamp. Part of Columnation implementation.
267    pub struct PointStampStack<R: Region<Item: Columnation+Clone>>(<SmallVec<[R::Item; 1]> as Columnation>::InnerRegion);
268
269    impl<R: Region<Item: Columnation+Clone>> Default for PointStampStack<R> {
270        #[inline]
271        fn default() -> Self {
272            Self(Default::default())
273        }
274    }
275
276    impl<R: Region<Item: Columnation+Clone>> Region for PointStampStack<R> {
277        type Item = PointStamp<R::Item>;
278
279        #[inline]
280        unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
281            Self::Item { vector: self.0.copy(&item.vector) }
282        }
283
284        fn clear(&mut self) {
285            self.0.clear();
286        }
287
288        fn reserve_items<'a, I>(&mut self, items: I) where Self: 'a, I: Iterator<Item=&'a Self::Item> + Clone {
289            self.0.reserve_items(items.map(|x| &x.vector));
290        }
291
292        fn reserve_regions<'a, I>(&mut self, regions: I) where Self: 'a, I: Iterator<Item=&'a Self> + Clone {
293            self.0.reserve_regions(regions.map(|r| &r.0));
294        }
295
296        fn heap_size(&self, callback: impl FnMut(usize, usize)) {
297            self.0.heap_size(callback);
298        }
299    }
300}