Skip to main content

palimpsest_dataflow/
lattice.rs

1//! Partially ordered elements with a least upper bound.
2//!
3//! Lattices form the basis of differential dataflow's efficient execution in the presence of
4//! iterative sub-computations. All logical times in differential dataflow must implement the
5//! `Lattice` trait, and all reasoning in operators are done it terms of `Lattice` methods.
6
7use timely::order::PartialOrder;
8use timely::progress::{frontier::AntichainRef, Antichain};
9
10/// A bounded partially ordered type supporting joins and meets.
11pub trait Lattice: PartialOrder {
12    /// The smallest element greater than or equal to both arguments.
13    ///
14    /// # Examples
15    ///
16    /// ```
17    /// # use timely::PartialOrder;
18    /// # use timely::order::Product;
19    /// # use palimpsest_dataflow::lattice::Lattice;
20    /// # fn main() {
21    ///
22    /// let time1 = Product::new(3, 7);
23    /// let time2 = Product::new(4, 6);
24    /// let join = time1.join(&time2);
25    ///
26    /// assert_eq!(join, Product::new(4, 7));
27    /// # }
28    /// ```
29    #[must_use]
30    fn join(&self, other: &Self) -> Self;
31
32    /// Updates `self` to the smallest element greater than or equal to both arguments.
33    ///
34    /// # Examples
35    ///
36    /// ```
37    /// # use timely::PartialOrder;
38    /// # use timely::order::Product;
39    /// # use palimpsest_dataflow::lattice::Lattice;
40    /// # fn main() {
41    ///
42    /// let mut time1 = Product::new(3, 7);
43    /// let time2 = Product::new(4, 6);
44    /// time1.join_assign(&time2);
45    ///
46    /// assert_eq!(time1, Product::new(4, 7));
47    /// # }
48    /// ```
49    fn join_assign(&mut self, other: &Self)
50    where
51        Self: Sized,
52    {
53        *self = self.join(other);
54    }
55
56    /// The largest element less than or equal to both arguments.
57    ///
58    /// # Examples
59    ///
60    /// ```
61    /// # use timely::PartialOrder;
62    /// # use timely::order::Product;
63    /// # use palimpsest_dataflow::lattice::Lattice;
64    /// # fn main() {
65    ///
66    /// let time1 = Product::new(3, 7);
67    /// let time2 = Product::new(4, 6);
68    /// let meet = time1.meet(&time2);
69    ///
70    /// assert_eq!(meet, Product::new(3, 6));
71    /// # }
72    /// ```
73    #[must_use]
74    fn meet(&self, other: &Self) -> Self;
75
76    /// Updates `self` to the largest element less than or equal to both arguments.
77    ///
78    /// # Examples
79    ///
80    /// ```
81    /// # use timely::PartialOrder;
82    /// # use timely::order::Product;
83    /// # use palimpsest_dataflow::lattice::Lattice;
84    /// # fn main() {
85    ///
86    /// let mut time1 = Product::new(3, 7);
87    /// let time2 = Product::new(4, 6);
88    /// time1.meet_assign(&time2);
89    ///
90    /// assert_eq!(time1, Product::new(3, 6));
91    /// # }
92    /// ```
93    fn meet_assign(&mut self, other: &Self)
94    where
95        Self: Sized,
96    {
97        *self = self.meet(other);
98    }
99
100    /// Advances self to the largest time indistinguishable under `frontier`.
101    ///
102    /// This method produces the "largest" lattice element with the property that for every
103    /// lattice element greater than some element of `frontier`, both the result and `self`
104    /// compare identically to the lattice element. The result is the "largest" element in
105    /// the sense that any other element with the same property (compares identically to times
106    /// greater or equal to `frontier`) must be less or equal to the result.
107    ///
108    /// When provided an empty frontier `self` is not modified.
109    ///
110    /// # Examples
111    ///
112    /// ```
113    /// # use timely::PartialOrder;
114    /// # use timely::order::Product;
115    /// # use palimpsest_dataflow::lattice::Lattice;
116    /// # fn main() {
117    ///
118    /// use timely::progress::frontier::{Antichain, AntichainRef};
119    ///
120    /// let time = Product::new(3, 7);
121    /// let mut advanced = Product::new(3, 7);
122    /// let frontier = Antichain::from(vec![Product::new(4, 8), Product::new(5, 3)]);
123    /// advanced.advance_by(frontier.borrow());
124    ///
125    /// // `time` and `advanced` are indistinguishable to elements >= an element of `frontier`
126    /// for i in 0 .. 10 {
127    ///     for j in 0 .. 10 {
128    ///         let test = Product::new(i, j);
129    ///         // for `test` in the future of `frontier` ..
130    ///         if frontier.less_equal(&test) {
131    ///             assert_eq!(time.less_equal(&test), advanced.less_equal(&test));
132    ///         }
133    ///     }
134    /// }
135    ///
136    /// assert_eq!(advanced, Product::new(4, 7));
137    /// # }
138    /// ```
139    #[inline]
140    fn advance_by(&mut self, frontier: AntichainRef<Self>)
141    where
142        Self: Sized,
143    {
144        let mut iter = frontier.iter();
145        if let Some(first) = iter.next() {
146            let mut result = self.join(first);
147            for f in iter {
148                result.meet_assign(&self.join(f));
149            }
150            *self = result;
151        }
152    }
153}
154
155use timely::order::Product;
156
157impl<T1: Lattice, T2: Lattice> Lattice for Product<T1, T2> {
158    #[inline]
159    fn join(&self, other: &Product<T1, T2>) -> Product<T1, T2> {
160        Product {
161            outer: self.outer.join(&other.outer),
162            inner: self.inner.join(&other.inner),
163        }
164    }
165    #[inline]
166    fn meet(&self, other: &Product<T1, T2>) -> Product<T1, T2> {
167        Product {
168            outer: self.outer.meet(&other.outer),
169            inner: self.inner.meet(&other.inner),
170        }
171    }
172}
173
174/// A type that has a unique maximum element.
175pub trait Maximum {
176    /// The unique maximal element of the set.
177    fn maximum() -> Self;
178}
179
180/// Implements `Maximum` for elements with a `MAX` associated constant.
181macro_rules! implement_maximum {
182    ($($index_type:ty,)*) => (
183        $(
184            impl Maximum for $index_type {
185                fn maximum() -> Self { Self::MAX }
186            }
187        )*
188    )
189}
190
191implement_maximum!(usize, u128, u64, u32, u16, u8, isize, i128, i64, i32, i16, i8, Duration,);
192impl Maximum for () {
193    fn maximum() -> () {
194        ()
195    }
196}
197
198use timely::progress::Timestamp;
199
200// Tuples have the annoyance that they are only a lattice for `T2` with maximal elements,
201// as the `meet` operator on `(x, _)` and `(y, _)` would be `(x meet y, maximum())`.
202impl<T1: Lattice + Clone, T2: Lattice + Clone + Maximum + Timestamp> Lattice for (T1, T2) {
203    #[inline]
204    fn join(&self, other: &(T1, T2)) -> (T1, T2) {
205        if self.0.eq(&other.0) {
206            (self.0.clone(), self.1.join(&other.1))
207        } else if self.0.less_than(&other.0) {
208            other.clone()
209        } else if other.0.less_than(&self.0) {
210            self.clone()
211        } else {
212            (self.0.join(&other.0), T2::minimum())
213        }
214    }
215    #[inline]
216    fn meet(&self, other: &(T1, T2)) -> (T1, T2) {
217        if self.0.eq(&other.0) {
218            (self.0.clone(), self.1.meet(&other.1))
219        } else if self.0.less_than(&other.0) {
220            self.clone()
221        } else if other.0.less_than(&self.0) {
222            other.clone()
223        } else {
224            (self.0.meet(&other.0), T2::maximum())
225        }
226    }
227}
228
229macro_rules! implement_lattice {
230    ($index_type:ty, $minimum:expr) => {
231        impl Lattice for $index_type {
232            #[inline]
233            fn join(&self, other: &Self) -> Self {
234                ::std::cmp::max(*self, *other)
235            }
236            #[inline]
237            fn meet(&self, other: &Self) -> Self {
238                ::std::cmp::min(*self, *other)
239            }
240        }
241    };
242}
243
244use std::time::Duration;
245
246implement_lattice!(Duration, Duration::new(0, 0));
247implement_lattice!(usize, 0);
248implement_lattice!(u128, 0);
249implement_lattice!(u64, 0);
250implement_lattice!(u32, 0);
251implement_lattice!(u16, 0);
252implement_lattice!(u8, 0);
253implement_lattice!(isize, 0);
254implement_lattice!(i128, 0);
255implement_lattice!(i64, 0);
256implement_lattice!(i32, 0);
257implement_lattice!(i16, 0);
258implement_lattice!(i8, 0);
259implement_lattice!((), ());
260
261/// Returns the "smallest" minimal antichain "greater or equal" to both inputs.
262///
263/// This method is primarily meant for cases where one cannot use the methods
264/// of `Antichain`'s `PartialOrder` implementation, such as when one has only
265/// references rather than owned antichains.
266///
267/// # Examples
268///
269/// ```
270/// # use timely::PartialOrder;
271/// # use timely::order::Product;
272/// # use palimpsest_dataflow::lattice::Lattice;
273/// # use palimpsest_dataflow::lattice::antichain_join;
274/// # fn main() {
275///
276/// let f1 = &[Product::new(3, 7), Product::new(5, 6)];
277/// let f2 = &[Product::new(4, 6)];
278/// let join = antichain_join(f1, f2);
279/// assert_eq!(&*join.elements(), &[Product::new(4, 7), Product::new(5, 6)]);
280/// # }
281/// ```
282pub fn antichain_join<T: Lattice>(one: &[T], other: &[T]) -> Antichain<T> {
283    let mut upper = Antichain::new();
284    antichain_join_into(one, other, &mut upper);
285    upper
286}
287
288/// Returns the "smallest" minimal antichain "greater or equal" to both inputs.
289///
290/// This method is primarily meant for cases where one cannot use the methods
291/// of `Antichain`'s `PartialOrder` implementation, such as when one has only
292/// references rather than owned antichains.
293///
294/// This function is similar to [antichain_join] but reuses an existing allocation.
295/// The provided antichain is cleared before inserting elements.
296///
297/// # Examples
298///
299/// ```
300/// # use timely::PartialOrder;
301/// # use timely::order::Product;
302/// # use timely::progress::Antichain;
303/// # use palimpsest_dataflow::lattice::Lattice;
304/// # use palimpsest_dataflow::lattice::antichain_join_into;
305/// # fn main() {
306///
307/// let mut join = Antichain::new();
308/// let f1 = &[Product::new(3, 7), Product::new(5, 6)];
309/// let f2 = &[Product::new(4, 6)];
310/// antichain_join_into(f1, f2, &mut join);
311/// assert_eq!(&*join.elements(), &[Product::new(4, 7), Product::new(5, 6)]);
312/// # }
313/// ```
314pub fn antichain_join_into<T: Lattice>(one: &[T], other: &[T], upper: &mut Antichain<T>) {
315    upper.clear();
316    for time1 in one {
317        for time2 in other {
318            upper.insert(time1.join(time2));
319        }
320    }
321}
322
323/// Returns the "greatest" minimal antichain "less or equal" to both inputs.
324///
325/// This method is primarily meant for cases where one cannot use the methods
326/// of `Antichain`'s `PartialOrder` implementation, such as when one has only
327/// references rather than owned antichains.
328///
329/// # Examples
330///
331/// ```
332/// # use timely::PartialOrder;
333/// # use timely::order::Product;
334/// # use palimpsest_dataflow::lattice::Lattice;
335/// # use palimpsest_dataflow::lattice::antichain_meet;
336/// # fn main() {
337///
338/// let f1 = &[Product::new(3, 7), Product::new(5, 6)];
339/// let f2 = &[Product::new(4, 6)];
340/// let meet = antichain_meet(f1, f2);
341/// assert_eq!(&*meet.elements(), &[Product::new(3, 7), Product::new(4, 6)]);
342/// # }
343/// ```
344pub fn antichain_meet<T: Lattice + Clone>(one: &[T], other: &[T]) -> Antichain<T> {
345    let mut upper = Antichain::new();
346    for time1 in one {
347        upper.insert(time1.clone());
348    }
349    for time2 in other {
350        upper.insert(time2.clone());
351    }
352    upper
353}
354
355impl<T: Lattice + Clone> Lattice for Antichain<T> {
356    fn join(&self, other: &Self) -> Self {
357        let mut upper = Antichain::new();
358        for time1 in self.elements().iter() {
359            for time2 in other.elements().iter() {
360                upper.insert(time1.join(time2));
361            }
362        }
363        upper
364    }
365    fn meet(&self, other: &Self) -> Self {
366        let mut upper = Antichain::new();
367        for time1 in self.elements().iter() {
368            upper.insert(time1.clone());
369        }
370        for time2 in other.elements().iter() {
371            upper.insert(time2.clone());
372        }
373        upper
374    }
375}