differential_dataflow/dynamic/
pointstamp.rs

1//! A timestamp type as in Naiad, where a vector of timestamps of different lengths are comparable.
2//!
3//! This type compares using "standard" tuple logic as if each timestamp were extended indefinitely with minimal elements.
4//!
5//! The path summary for this type allows *run-time* rather than *type-driven* iterative scopes.
6//! Each summary represents some journey within and out of some number of scopes, followed by entry
7//! into and iteration within some other number of scopes.
8//!
9//! As a result, summaries describe some number of trailing coordinates to truncate, and some increments
10//! to the resulting vector. Structurally, the increments can only be to one non-truncated coordinate
11//! (as iteration within a scope requires leaving contained scopes), and then to any number of appended
12//! default coordinates (which is effectively just *setting* the coordinate).
13
14use abomonation_derive::Abomonation;
15use serde::{Deserialize, Serialize};
16
17/// A sequence of timestamps, partially ordered by the product order.
18///
19/// Sequences of different lengths are compared as if extended indefinitely by `T::minimum()`.
20/// Sequences are not guaranteed to be "minimal", and may end with `T::minimum()` entries.
21#[derive(
22    Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize, Abomonation,
23)]
24pub struct PointStamp<T> {
25    /// A sequence of timestamps corresponding to timestamps in a sequence of nested scopes.
26    pub vector: Vec<T>,
27}
28
29impl<T> PointStamp<T> {
30    /// Create a new sequence.
31    pub fn new(vector: Vec<T>) -> Self {
32        PointStamp { vector }
33    }
34}
35
36// Implement timely dataflow's `PartialOrder` trait.
37use timely::order::PartialOrder;
38impl<T: PartialOrder + Timestamp> PartialOrder for PointStamp<T> {
39    fn less_equal(&self, other: &Self) -> bool {
40        // Every present coordinate must be less-equal the corresponding coordinate,
41        // where absent corresponding coordinates are `T::minimum()`. Coordinates
42        // absent from `self.vector` are themselves `T::minimum()` and are less-equal
43        // any corresponding coordinate in `other.vector`.
44        self.vector
45            .iter()
46            .zip(other.vector.iter().chain(std::iter::repeat(&T::minimum())))
47            .all(|(t1, t2)| t1.less_equal(t2))
48    }
49}
50
51use timely::progress::timestamp::Refines;
52impl<T: Timestamp> Refines<()> for PointStamp<T> {
53    fn to_inner(_outer: ()) -> Self {
54        Self { vector: Vec::new() }
55    }
56    fn to_outer(self) -> () {
57        ()
58    }
59    fn summarize(_summary: <Self>::Summary) -> () {
60        ()
61    }
62}
63
64// Implement timely dataflow's `PathSummary` trait.
65// This is preparation for the `Timestamp` implementation below.
66use timely::progress::PathSummary;
67
68/// Describes an action on a `PointStamp`: truncation to `length` followed by `actions`.
69#[derive(
70    Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize, Abomonation
71)]
72pub struct PointStampSummary<TS> {
73    /// Number of leading coordinates to retain.
74    ///
75    /// A `None` value indicates that all coordinates should be retained.
76    pub retain: Option<usize>,
77    /// Summary actions to apply to all coordinates.
78    ///
79    /// If `actions.len()` is greater than `retain`, a timestamp should be extended by
80    /// `T::minimum()` in order to be subjected to `actions`.
81    pub actions: Vec<TS>,
82}
83
84impl<T: Timestamp> PathSummary<PointStamp<T>> for PointStampSummary<T::Summary> {
85    fn results_in(&self, timestamp: &PointStamp<T>) -> Option<PointStamp<T>> {
86        // Get a slice of timestamp coordinates appropriate for consideration.
87        let timestamps = if let Some(retain) = self.retain {
88            if retain < timestamp.vector.len() {
89                &timestamp.vector[..retain]
90            } else {
91                &timestamp.vector[..]
92            }
93        } else {
94            &timestamp.vector[..]
95        };
96
97        let mut vector = Vec::with_capacity(std::cmp::max(timestamps.len(), self.actions.len()));
98        // Introduce elements where both timestamp and action exist.
99        let min_len = std::cmp::min(timestamps.len(), self.actions.len());
100        for (action, timestamp) in self.actions.iter().zip(timestamps.iter()) {
101            vector.push(action.results_in(timestamp)?);
102        }
103        // Any remaining timestamps should be copied in.
104        for timestamp in timestamps.iter().skip(min_len) {
105            vector.push(timestamp.clone());
106        }
107        // Any remaining actions should be applied to the empty timestamp.
108        for action in self.actions.iter().skip(min_len) {
109            vector.push(action.results_in(&T::minimum())?);
110        }
111
112        Some(PointStamp { vector })
113    }
114    fn followed_by(&self, other: &Self) -> Option<Self> {
115        // The output `retain` will be the minimum of the two inputs.
116        let retain = match (self.retain, other.retain) {
117            (Some(x), Some(y)) => Some(std::cmp::min(x, y)),
118            (Some(x), None) => Some(x),
119            (None, Some(y)) => Some(y),
120            (None, None) => None,
121        };
122
123        // The output `actions` will depend on the relative sizes of the input `retain`s.
124        let self_actions = if let Some(retain) = other.retain {
125            if retain < self.actions.len() {
126                &self.actions[..retain]
127            } else {
128                &self.actions[..]
129            }
130        } else {
131            &self.actions[..]
132        };
133
134        let mut actions = Vec::with_capacity(std::cmp::max(self_actions.len(), other.actions.len()));
135        // Introduce actions where both input actions apply.
136        let min_len = std::cmp::min(self_actions.len(), other.actions.len());
137        for (action1, action2) in self_actions.iter().zip(other.actions.iter()) {
138            actions.push(action1.followed_by(action2)?);
139        }
140        // Append any remaining self actions.
141        actions.extend(self_actions.iter().skip(min_len).cloned());
142        // Append any remaining other actions.
143        actions.extend(other.actions.iter().skip(min_len).cloned());
144
145        Some(Self { retain, actions })
146    }
147}
148
149impl<TS: PartialOrder> PartialOrder for PointStampSummary<TS> {
150    fn less_equal(&self, other: &Self) -> bool {
151        // If the `retain`s are not the same, there is some coordinate which
152        // could either be bigger or smaller as the timestamp or the replacemnt.
153        // In principle, a `T::minimum()` extension could break this rule, and
154        // we could tighten this logic if needed; I think it is fine not to though.
155        self.retain == other.retain
156            && self.actions.len() <= other.actions.len()
157            && self
158                .actions
159                .iter()
160                .zip(other.actions.iter())
161                .all(|(t1, t2)| t1.less_equal(t2))
162    }
163}
164
165// Implement timely dataflow's `Timestamp` trait.
166use timely::progress::Timestamp;
167impl<T: Timestamp> Timestamp for PointStamp<T> {
168    fn minimum() -> Self {
169        Self { vector: Vec::new() }
170    }
171    type Summary = PointStampSummary<T::Summary>;
172}
173
174// Implement differential dataflow's `Lattice` trait.
175// This extends the `PartialOrder` implementation with additional structure.
176use crate::lattice::Lattice;
177impl<T: Lattice + Timestamp + Clone> Lattice for PointStamp<T> {
178    fn join(&self, other: &Self) -> Self {
179        let min_len = ::std::cmp::min(self.vector.len(), other.vector.len());
180        let max_len = ::std::cmp::max(self.vector.len(), other.vector.len());
181        let mut vector = Vec::with_capacity(max_len);
182        // For coordinates in both inputs, apply `join` to the pair.
183        for index in 0..min_len {
184            vector.push(self.vector[index].join(&other.vector[index]));
185        }
186        // Only one of the two vectors will have remaining elements; copy them.
187        for time in &self.vector[min_len..] {
188            vector.push(time.clone());
189        }
190        for time in &other.vector[min_len..] {
191            vector.push(time.clone());
192        }
193        Self { vector }
194    }
195    fn meet(&self, other: &Self) -> Self {
196        let min_len = ::std::cmp::min(self.vector.len(), other.vector.len());
197        let mut vector = Vec::with_capacity(min_len);
198        // For coordinates in both inputs, apply `meet` to the pair.
199        for index in 0..min_len {
200            vector.push(self.vector[index].meet(&other.vector[index]));
201        }
202        // Remaining coordinates are `T::minimum()` in one input, and so in the output.
203        Self { vector }
204    }
205}
206
207use timely::container::columnation::{Columnation, Region};
208impl<T: Columnation> Columnation for PointStamp<T> {
209    type InnerRegion = PointStampStack<T::InnerRegion>;
210}
211
212/// Stack for PointStamp. Part of Columnation implementation.
213pub struct PointStampStack<R: Region>(<Vec<R::Item> as Columnation>::InnerRegion)
214where
215    <R as Region>::Item: Columnation;
216
217impl<R: Region> Default for PointStampStack<R>
218    where
219        <R as Region>::Item: Columnation
220{
221    #[inline]
222    fn default() -> Self {
223        Self(Default::default())
224    }
225}
226
227impl<R: Region> Region for PointStampStack<R>
228    where
229        <R as Region>::Item: Columnation
230{
231    type Item = PointStamp<R::Item>;
232
233    #[inline]
234    unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
235        Self::Item { vector: self.0.copy(&item.vector) }
236    }
237
238    fn clear(&mut self) {
239        self.0.clear();
240    }
241
242    fn reserve_items<'a, I>(&mut self, items: I) where Self: 'a, I: Iterator<Item=&'a Self::Item> + Clone {
243        self.0.reserve_items(items.map(|x| &x.vector));
244    }
245
246    fn reserve_regions<'a, I>(&mut self, regions: I) where Self: 'a, I: Iterator<Item=&'a Self> + Clone {
247        self.0.reserve_regions(regions.map(|r| &r.0));
248    }
249
250    fn heap_size(&self, callback: impl FnMut(usize, usize)) {
251        self.0.heap_size(callback);
252    }
253}