Skip to main content

palimpsest_dataflow/dynamic/
pointstamp.rs

1//! A timestamp type as in Naiad, where a vector of timestamps of different lengths are comparable.
2//!
3//! This type compares using "standard" tuple logic as if each timestamp were extended indefinitely with minimal elements.
4//!
5//! The path summary for this type allows *run-time* rather than *type-driven* iterative scopes.
6//! Each summary represents some journey within and out of some number of scopes, followed by entry
7//! into and iteration within some other number of scopes.
8//!
9//! As a result, summaries describe some number of trailing coordinates to truncate, and some increments
10//! to the resulting vector. Structurally, the increments can only be to one non-truncated coordinate
11//! (as iteration within a scope requires leaving contained scopes), and then to any number of appended
12//! default coordinates (which is effectively just *setting* the coordinate).
13
14use columnar::Columnar;
15use serde::{Deserialize, Serialize};
16
17/// A sequence of timestamps, partially ordered by the product order.
18///
19/// Sequences of different lengths are compared as if extended indefinitely by `T::minimum()`.
20/// Sequences are guaranteed to be "minimal", and may not end with `T::minimum()` entries.
21#[derive(
22    Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize, Columnar,
23)]
24#[columnar(derive(Eq, PartialEq, Ord, PartialOrd))]
25pub struct PointStamp<T> {
26    /// A sequence of timestamps corresponding to timestamps in a sequence of nested scopes.
27    vector: Vec<T>,
28}
29
30impl<T: Timestamp> PartialEq<[T]> for PointStamp<T> {
31    fn eq(&self, other: &[T]) -> bool {
32        self.vector
33            .iter()
34            .zip(other.iter().chain(std::iter::repeat(&T::minimum())))
35            .all(|(t1, t2)| t1.eq(t2))
36    }
37}
38
39impl<T: Timestamp> PartialEq<PointStamp<T>> for [T] {
40    fn eq(&self, other: &PointStamp<T>) -> bool {
41        self.iter()
42            .zip(other.vector.iter().chain(std::iter::repeat(&T::minimum())))
43            .all(|(t1, t2)| t1.eq(t2))
44    }
45}
46
47impl<T: Timestamp> PartialOrder<[T]> for PointStamp<T> {
48    fn less_equal(&self, other: &[T]) -> bool {
49        self.vector
50            .iter()
51            .zip(other.iter().chain(std::iter::repeat(&T::minimum())))
52            .all(|(t1, t2)| t1.less_equal(t2))
53    }
54}
55
56impl<T: Timestamp> PartialOrder<PointStamp<T>> for [T] {
57    fn less_equal(&self, other: &PointStamp<T>) -> bool {
58        self.iter()
59            .zip(other.vector.iter().chain(std::iter::repeat(&T::minimum())))
60            .all(|(t1, t2)| t1.less_equal(t2))
61    }
62}
63
64impl<T: Timestamp> PointStamp<T> {
65    /// Create a new sequence.
66    ///
67    /// This method will modify `vector` to ensure it does not end with `T::minimum()`.
68    pub fn new(mut vector: Vec<T>) -> Self {
69        while vector.last() == Some(&T::minimum()) {
70            vector.pop();
71        }
72        PointStamp { vector }
73    }
74    /// Returns the wrapped vector.
75    ///
76    /// This method is the support way to mutate the contents of `self`, by extracting
77    /// the vector and then re-introducing it with `PointStamp::new` to re-establish
78    /// the invariant that the vector not end with `T::minimum`.
79    pub fn into_vec(self) -> Vec<T> {
80        self.vector
81    }
82}
83
84impl<T> std::ops::Deref for PointStamp<T> {
85    type Target = [T];
86    fn deref(&self) -> &Self::Target {
87        &self.vector
88    }
89}
90
91// Implement timely dataflow's `PartialOrder` trait.
92use timely::order::PartialOrder;
93impl<T: PartialOrder + Timestamp> PartialOrder for PointStamp<T> {
94    fn less_equal(&self, other: &Self) -> bool {
95        // Every present coordinate must be less-equal the corresponding coordinate,
96        // where absent corresponding coordinates are `T::minimum()`. Coordinates
97        // absent from `self.vector` are themselves `T::minimum()` and are less-equal
98        // any corresponding coordinate in `other.vector`.
99        self.vector
100            .iter()
101            .zip(other.vector.iter().chain(std::iter::repeat(&T::minimum())))
102            .all(|(t1, t2)| t1.less_equal(t2))
103    }
104}
105
106use timely::progress::timestamp::Refines;
107impl<T: Timestamp> Refines<()> for PointStamp<T> {
108    fn to_inner(_outer: ()) -> Self {
109        Self { vector: Vec::new() }
110    }
111    fn to_outer(self) -> () {
112        ()
113    }
114    fn summarize(_summary: <Self>::Summary) -> () {
115        ()
116    }
117}
118
119// Implement timely dataflow's `PathSummary` trait.
120// This is preparation for the `Timestamp` implementation below.
121use timely::progress::PathSummary;
122
123/// Describes an action on a `PointStamp`: truncation to `length` followed by `actions`.
124#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize)]
125pub struct PointStampSummary<TS> {
126    /// Number of leading coordinates to retain.
127    ///
128    /// A `None` value indicates that all coordinates should be retained.
129    pub retain: Option<usize>,
130    /// Summary actions to apply to all coordinates.
131    ///
132    /// If `actions.len()` is greater than `retain`, a timestamp should be extended by
133    /// `T::minimum()` in order to be subjected to `actions`.
134    pub actions: Vec<TS>,
135}
136
137impl<T: Timestamp> PathSummary<PointStamp<T>> for PointStampSummary<T::Summary> {
138    fn results_in(&self, timestamp: &PointStamp<T>) -> Option<PointStamp<T>> {
139        // Get a slice of timestamp coordinates appropriate for consideration.
140        let timestamps = if let Some(retain) = self.retain {
141            if retain < timestamp.vector.len() {
142                &timestamp.vector[..retain]
143            } else {
144                &timestamp.vector[..]
145            }
146        } else {
147            &timestamp.vector[..]
148        };
149
150        let mut vector = Vec::with_capacity(std::cmp::max(timestamps.len(), self.actions.len()));
151        // Introduce elements where both timestamp and action exist.
152        let min_len = std::cmp::min(timestamps.len(), self.actions.len());
153        for (action, timestamp) in self.actions.iter().zip(timestamps.iter()) {
154            vector.push(action.results_in(timestamp)?);
155        }
156        // Any remaining timestamps should be copied in.
157        for timestamp in timestamps.iter().skip(min_len) {
158            vector.push(timestamp.clone());
159        }
160        // Any remaining actions should be applied to the empty timestamp.
161        for action in self.actions.iter().skip(min_len) {
162            vector.push(action.results_in(&T::minimum())?);
163        }
164
165        Some(PointStamp::new(vector))
166    }
167    fn followed_by(&self, other: &Self) -> Option<Self> {
168        // The output `retain` will be the minimum of the two inputs.
169        let retain = match (self.retain, other.retain) {
170            (Some(x), Some(y)) => Some(std::cmp::min(x, y)),
171            (Some(x), None) => Some(x),
172            (None, Some(y)) => Some(y),
173            (None, None) => None,
174        };
175
176        // The output `actions` will depend on the relative sizes of the input `retain`s.
177        let self_actions = if let Some(retain) = other.retain {
178            if retain < self.actions.len() {
179                &self.actions[..retain]
180            } else {
181                &self.actions[..]
182            }
183        } else {
184            &self.actions[..]
185        };
186
187        let mut actions =
188            Vec::with_capacity(std::cmp::max(self_actions.len(), other.actions.len()));
189        // Introduce actions where both input actions apply.
190        let min_len = std::cmp::min(self_actions.len(), other.actions.len());
191        for (action1, action2) in self_actions.iter().zip(other.actions.iter()) {
192            actions.push(action1.followed_by(action2)?);
193        }
194        // Append any remaining self actions.
195        actions.extend(self_actions.iter().skip(min_len).cloned());
196        // Append any remaining other actions.
197        actions.extend(other.actions.iter().skip(min_len).cloned());
198
199        Some(Self { retain, actions })
200    }
201}
202
203impl<TS: PartialOrder> PartialOrder for PointStampSummary<TS> {
204    fn less_equal(&self, other: &Self) -> bool {
205        // If the `retain`s are not the same, there is some coordinate which
206        // could either be bigger or smaller as the timestamp or the replacement.
207        // In principle, a `T::minimum()` extension could break this rule, and
208        // we could tighten this logic if needed; I think it is fine not to though.
209        self.retain == other.retain
210            && self.actions.len() <= other.actions.len()
211            && self
212                .actions
213                .iter()
214                .zip(other.actions.iter())
215                .all(|(t1, t2)| t1.less_equal(t2))
216    }
217}
218
219// Implement timely dataflow's `Timestamp` trait.
220use timely::progress::Timestamp;
221impl<T: Timestamp> Timestamp for PointStamp<T> {
222    fn minimum() -> Self {
223        Self::new(Vec::new())
224    }
225    type Summary = PointStampSummary<T::Summary>;
226}
227
228// Implement differential dataflow's `Lattice` trait.
229// This extends the `PartialOrder` implementation with additional structure.
230use crate::lattice::Lattice;
231impl<T: Lattice + Timestamp + Clone> Lattice for PointStamp<T> {
232    fn join(&self, other: &Self) -> Self {
233        let min_len = ::std::cmp::min(self.vector.len(), other.vector.len());
234        let max_len = ::std::cmp::max(self.vector.len(), other.vector.len());
235        let mut vector = Vec::with_capacity(max_len);
236        // For coordinates in both inputs, apply `join` to the pair.
237        for index in 0..min_len {
238            vector.push(self.vector[index].join(&other.vector[index]));
239        }
240        // Only one of the two vectors will have remaining elements; copy them.
241        for time in &self.vector[min_len..] {
242            vector.push(time.clone());
243        }
244        for time in &other.vector[min_len..] {
245            vector.push(time.clone());
246        }
247        Self::new(vector)
248    }
249    fn meet(&self, other: &Self) -> Self {
250        let min_len = ::std::cmp::min(self.vector.len(), other.vector.len());
251        let mut vector = Vec::with_capacity(min_len);
252        // For coordinates in both inputs, apply `meet` to the pair.
253        for index in 0..min_len {
254            vector.push(self.vector[index].meet(&other.vector[index]));
255        }
256        // Remaining coordinates are `T::minimum()` in one input, and so in the output.
257        Self::new(vector)
258    }
259}
260
261mod columnation {
262    use columnation::{Columnation, Region};
263
264    use crate::dynamic::pointstamp::PointStamp;
265
266    impl<T: Columnation> Columnation for PointStamp<T> {
267        type InnerRegion = PointStampStack<T::InnerRegion>;
268    }
269
270    /// Stack for PointStamp. Part of Columnation implementation.
271    pub struct PointStampStack<R: Region<Item: Columnation>>(
272        <Vec<R::Item> as Columnation>::InnerRegion,
273    );
274
275    impl<R: Region<Item: Columnation>> Default for PointStampStack<R> {
276        #[inline]
277        fn default() -> Self {
278            Self(Default::default())
279        }
280    }
281
282    impl<R: Region<Item: Columnation>> Region for PointStampStack<R> {
283        type Item = PointStamp<R::Item>;
284
285        #[inline]
286        unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
287            Self::Item {
288                vector: self.0.copy(&item.vector),
289            }
290        }
291
292        fn clear(&mut self) {
293            self.0.clear();
294        }
295
296        fn reserve_items<'a, I>(&mut self, items: I)
297        where
298            Self: 'a,
299            I: Iterator<Item = &'a Self::Item> + Clone,
300        {
301            self.0.reserve_items(items.map(|x| &x.vector));
302        }
303
304        fn reserve_regions<'a, I>(&mut self, regions: I)
305        where
306            Self: 'a,
307            I: Iterator<Item = &'a Self> + Clone,
308        {
309            self.0.reserve_regions(regions.map(|r| &r.0));
310        }
311
312        fn heap_size(&self, callback: impl FnMut(usize, usize)) {
313            self.0.heap_size(callback);
314        }
315    }
316}