Skip to main content

differential_dataflow/
lattice.rs

1//! Partially ordered elements with a least upper bound.
2//!
3//! Lattices form the basis of differential dataflow's efficient execution in the presence of
4//! iterative sub-computations. All logical times in differential dataflow must implement the
5//! `Lattice` trait, and all reasoning in operators are done it terms of `Lattice` methods.
6
7use timely::order::PartialOrder;
8use timely::progress::{Antichain, frontier::AntichainRef};
9
10/// A bounded partially ordered type supporting joins and meets.
11pub trait Lattice : PartialOrder {
12
13    /// The smallest element greater than or equal to both arguments.
14    ///
15    /// # Examples
16    ///
17    /// ```
18    /// # use timely::PartialOrder;
19    /// # use timely::order::Product;
20    /// # use differential_dataflow::lattice::Lattice;
21    /// # fn main() {
22    ///
23    /// let time1 = Product::new(3, 7);
24    /// let time2 = Product::new(4, 6);
25    /// let join = time1.join(&time2);
26    ///
27    /// assert_eq!(join, Product::new(4, 7));
28    /// # }
29    /// ```
30    #[must_use]
31    fn join(&self, other: &Self) -> Self;
32
33    /// Updates `self` to the smallest element greater than or equal to both arguments.
34    ///
35    /// # Examples
36    ///
37    /// ```
38    /// # use timely::PartialOrder;
39    /// # use timely::order::Product;
40    /// # use differential_dataflow::lattice::Lattice;
41    /// # fn main() {
42    ///
43    /// let mut time1 = Product::new(3, 7);
44    /// let time2 = Product::new(4, 6);
45    /// time1.join_assign(&time2);
46    ///
47    /// assert_eq!(time1, Product::new(4, 7));
48    /// # }
49    /// ```
50    fn join_assign(&mut self, other: &Self) where Self: Sized {
51        *self = self.join(other);
52    }
53
54    /// The largest element less than or equal to both arguments.
55    ///
56    /// # Examples
57    ///
58    /// ```
59    /// # use timely::PartialOrder;
60    /// # use timely::order::Product;
61    /// # use differential_dataflow::lattice::Lattice;
62    /// # fn main() {
63    ///
64    /// let time1 = Product::new(3, 7);
65    /// let time2 = Product::new(4, 6);
66    /// let meet = time1.meet(&time2);
67    ///
68    /// assert_eq!(meet, Product::new(3, 6));
69    /// # }
70    /// ```
71    #[must_use]
72    fn meet(&self, other: &Self) -> Self;
73
74    /// Updates `self` to the largest element less than or equal to both arguments.
75    ///
76    /// # Examples
77    ///
78    /// ```
79    /// # use timely::PartialOrder;
80    /// # use timely::order::Product;
81    /// # use differential_dataflow::lattice::Lattice;
82    /// # fn main() {
83    ///
84    /// let mut time1 = Product::new(3, 7);
85    /// let time2 = Product::new(4, 6);
86    /// time1.meet_assign(&time2);
87    ///
88    /// assert_eq!(time1, Product::new(3, 6));
89    /// # }
90    /// ```
91    fn meet_assign(&mut self, other: &Self) where Self: Sized  {
92        *self = self.meet(other);
93    }
94
95    /// Advances self to the largest time indistinguishable under `frontier`.
96    ///
97    /// This method produces the "largest" lattice element with the property that for every
98    /// lattice element greater than some element of `frontier`, both the result and `self`
99    /// compare identically to the lattice element. The result is the "largest" element in
100    /// the sense that any other element with the same property (compares identically to times
101    /// greater or equal to `frontier`) must be less or equal to the result.
102    ///
103    /// When provided an empty frontier `self` is not modified.
104    ///
105    /// # Examples
106    ///
107    /// ```
108    /// # use timely::PartialOrder;
109    /// # use timely::order::Product;
110    /// # use differential_dataflow::lattice::Lattice;
111    /// # fn main() {
112    ///
113    /// use timely::progress::frontier::{Antichain, AntichainRef};
114    ///
115    /// let time = Product::new(3, 7);
116    /// let mut advanced = Product::new(3, 7);
117    /// let frontier = Antichain::from(vec![Product::new(4, 8), Product::new(5, 3)]);
118    /// advanced.advance_by(frontier.borrow());
119    ///
120    /// // `time` and `advanced` are indistinguishable to elements >= an element of `frontier`
121    /// for i in 0 .. 10 {
122    ///     for j in 0 .. 10 {
123    ///         let test = Product::new(i, j);
124    ///         // for `test` in the future of `frontier` ..
125    ///         if frontier.less_equal(&test) {
126    ///             assert_eq!(time.less_equal(&test), advanced.less_equal(&test));
127    ///         }
128    ///     }
129    /// }
130    ///
131    /// assert_eq!(advanced, Product::new(4, 7));
132    /// # }
133    /// ```
134    #[inline]
135    fn advance_by(&mut self, frontier: AntichainRef<Self>) where Self: Sized {
136        match &*frontier {
137            [] => {}
138            [first] => self.join_assign(first),
139            [first, rest @ ..] => {
140                let mut result = self.join(first);
141                for f in rest { result.meet_assign(&self.join(f)); }
142                *self = result;
143            }
144        }
145    }
146}
147
148use timely::order::Product;
149
150impl<T1: Lattice, T2: Lattice> Lattice for Product<T1, T2> {
151    #[inline]
152    fn join(&self, other: &Product<T1, T2>) -> Product<T1, T2> {
153        Product {
154            outer: self.outer.join(&other.outer),
155            inner: self.inner.join(&other.inner),
156        }
157    }
158    #[inline]
159    fn join_assign(&mut self, other: &Self) {
160        self.outer.join_assign(&other.outer);
161        self.inner.join_assign(&other.inner);
162    }
163    #[inline]
164    fn meet(&self, other: &Product<T1, T2>) -> Product<T1, T2> {
165        Product {
166            outer: self.outer.meet(&other.outer),
167            inner: self.inner.meet(&other.inner),
168        }
169    }
170    #[inline]
171    fn meet_assign(&mut self, other: &Self) {
172        self.outer.meet_assign(&other.outer);
173        self.inner.meet_assign(&other.inner);
174    }
175}
176
177/// A type that has a unique maximum element.
178pub trait Maximum {
179    /// The unique maximal element of the set.
180    fn maximum() -> Self;
181}
182
183/// Implements `Maximum` for elements with a `MAX` associated constant.
184macro_rules! implement_maximum {
185    ($($index_type:ty,)*) => (
186        $(
187            impl Maximum for $index_type {
188                fn maximum() -> Self { Self::MAX }
189            }
190        )*
191    )
192}
193
194implement_maximum!(usize, u128, u64, u32, u16, u8, isize, i128, i64, i32, i16, i8, Duration,);
195impl Maximum for () { fn maximum() -> () { () }}
196
197use timely::progress::Timestamp;
198
199// Tuples have the annoyance that they are only a lattice for `T2` with maximal elements,
200// as the `meet` operator on `(x, _)` and `(y, _)` would be `(x meet y, maximum())`.
201impl<T1: Lattice+Clone, T2: Lattice+Clone+Maximum+Timestamp> Lattice for (T1, T2) {
202    #[inline]
203    fn join(&self, other: &(T1, T2)) -> (T1, T2) {
204        if self.0.eq(&other.0) {
205            (self.0.clone(), self.1.join(&other.1))
206        } else if self.0.less_than(&other.0) {
207            other.clone()
208        } else if other.0.less_than(&self.0) {
209            self.clone()
210        } else {
211            (self.0.join(&other.0), T2::minimum())
212        }
213    }
214    #[inline]
215    fn meet(&self, other: &(T1, T2)) -> (T1, T2) {
216        if self.0.eq(&other.0) {
217            (self.0.clone(), self.1.meet(&other.1))
218        } else if self.0.less_than(&other.0) {
219            self.clone()
220        } else if other.0.less_than(&self.0) {
221            other.clone()
222        } else {
223            (self.0.meet(&other.0), T2::maximum())
224        }
225    }
226}
227
228macro_rules! implement_lattice {
229    ($index_type:ty, $minimum:expr) => (
230        impl Lattice for $index_type {
231            #[inline] fn join(&self, other: &Self) -> Self { ::std::cmp::max(*self, *other) }
232            #[inline] fn meet(&self, other: &Self) -> Self { ::std::cmp::min(*self, *other) }
233        }
234    )
235}
236
237use std::time::Duration;
238
239implement_lattice!(Duration, Duration::new(0, 0));
240implement_lattice!(usize, 0);
241implement_lattice!(u128, 0);
242implement_lattice!(u64, 0);
243implement_lattice!(u32, 0);
244implement_lattice!(u16, 0);
245implement_lattice!(u8, 0);
246implement_lattice!(isize, 0);
247implement_lattice!(i128, 0);
248implement_lattice!(i64, 0);
249implement_lattice!(i32, 0);
250implement_lattice!(i16, 0);
251implement_lattice!(i8, 0);
252implement_lattice!((), ());
253
254/// Returns the "smallest" minimal antichain "greater or equal" to both inputs.
255///
256/// This method is primarily meant for cases where one cannot use the methods
257/// of `Antichain`'s `PartialOrder` implementation, such as when one has only
258/// references rather than owned antichains.
259///
260/// # Examples
261///
262/// ```
263/// # use timely::PartialOrder;
264/// # use timely::order::Product;
265/// # use differential_dataflow::lattice::Lattice;
266/// # use differential_dataflow::lattice::antichain_join;
267/// # fn main() {
268///
269/// let f1 = &[Product::new(3, 7), Product::new(5, 6)];
270/// let f2 = &[Product::new(4, 6)];
271/// let join = antichain_join(f1, f2);
272/// assert_eq!(&*join.elements(), &[Product::new(4, 7), Product::new(5, 6)]);
273/// # }
274/// ```
275pub fn antichain_join<T: Lattice>(one: &[T], other: &[T]) -> Antichain<T> {
276    let mut upper = Antichain::new();
277    antichain_join_into(one, other, &mut upper);
278    upper
279}
280
281/// Returns the "smallest" minimal antichain "greater or equal" to both inputs.
282///
283/// This method is primarily meant for cases where one cannot use the methods
284/// of `Antichain`'s `PartialOrder` implementation, such as when one has only
285/// references rather than owned antichains.
286///
287/// This function is similar to [antichain_join] but reuses an existing allocation.
288/// The provided antichain is cleared before inserting elements.
289///
290/// # Examples
291///
292/// ```
293/// # use timely::PartialOrder;
294/// # use timely::order::Product;
295/// # use timely::progress::Antichain;
296/// # use differential_dataflow::lattice::Lattice;
297/// # use differential_dataflow::lattice::antichain_join_into;
298/// # fn main() {
299///
300/// let mut join = Antichain::new();
301/// let f1 = &[Product::new(3, 7), Product::new(5, 6)];
302/// let f2 = &[Product::new(4, 6)];
303/// antichain_join_into(f1, f2, &mut join);
304/// assert_eq!(&*join.elements(), &[Product::new(4, 7), Product::new(5, 6)]);
305/// # }
306/// ```
307pub fn antichain_join_into<T: Lattice>(one: &[T], other: &[T], upper: &mut Antichain<T>) {
308    upper.clear();
309    for time1 in one {
310        for time2 in other {
311            upper.insert(time1.join(time2));
312        }
313    }
314}
315
316/// Returns the "greatest" minimal antichain "less or equal" to both inputs.
317///
318/// This method is primarily meant for cases where one cannot use the methods
319/// of `Antichain`'s `PartialOrder` implementation, such as when one has only
320/// references rather than owned antichains.
321///
322/// # Examples
323///
324/// ```
325/// # use timely::PartialOrder;
326/// # use timely::order::Product;
327/// # use differential_dataflow::lattice::Lattice;
328/// # use differential_dataflow::lattice::antichain_meet;
329/// # fn main() {
330///
331/// let f1 = &[Product::new(3, 7), Product::new(5, 6)];
332/// let f2 = &[Product::new(4, 6)];
333/// let meet = antichain_meet(f1, f2);
334/// assert_eq!(&*meet.elements(), &[Product::new(3, 7), Product::new(4, 6)]);
335/// # }
336/// ```
337pub fn antichain_meet<T: Lattice+Clone>(one: &[T], other: &[T]) -> Antichain<T> {
338    let mut upper = Antichain::new();
339    for time1 in one {
340        upper.insert(time1.clone());
341    }
342    for time2 in other {
343        upper.insert(time2.clone());
344    }
345    upper
346}
347
348impl<T: Lattice+Clone> Lattice for Antichain<T> {
349    fn join(&self, other: &Self) -> Self {
350        let mut upper = Antichain::new();
351        for time1 in self.elements().iter() {
352            for time2 in other.elements().iter() {
353                upper.insert(time1.join(time2));
354            }
355        }
356        upper
357    }
358    fn meet(&self, other: &Self) -> Self {
359        let mut upper = Antichain::new();
360        for time1 in self.elements().iter() {
361            upper.insert(time1.clone());
362        }
363        for time2 in other.elements().iter() {
364            upper.insert(time2.clone());
365        }
366        upper
367    }
368}