declarative_dataflow/timestamp/
altneu.rs

1//! Code by Frank McSherry, customized here. Original source:
2//! github.com/frankmcsherry/differential-dataflow/tree/master/dogsdogsdogs
3//!
4//! A lexicographically ordered pair of timestamps.
5//!
6//! Two timestamps (s1, t1) and (s2, t2) are ordered either if
7//! s1 and s2 are ordered, or if s1 equals s2 and t1 and t2 are
8//! ordered.
9//!
10//! The join of two timestamps should have as its first coordinate
11//! the join of the first coordinates, and for its second coordinate
12//! the join of the second coordinates for elements whose first
13//! coordinate equals the computed join. That may be the minimum
14//! element of the second lattice, if neither first element equals
15//! the join.
16
17/// A pair of timestamps, partially ordered by the product order.
18#[derive(Debug, Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
19pub struct AltNeu<T> {
20    /// Inner timestamp.
21    pub time: T,
22    /// alt < neu in timestamp comparisons.
23    pub neu: bool,
24}
25
26impl<T> AltNeu<T> {
27    /// Wraps a time in Alt.
28    pub fn alt(time: T) -> Self {
29        AltNeu { time, neu: false }
30    }
31    /// Wraps a time in Neu.
32    pub fn neu(time: T) -> Self {
33        AltNeu { time, neu: true }
34    }
35}
36
37// Implement timely dataflow's `PartialOrder` trait.
38use timely::order::PartialOrder;
39impl<T: PartialOrder> PartialOrder for AltNeu<T> {
40    fn less_equal(&self, other: &Self) -> bool {
41        if self.time.eq(&other.time) {
42            self.neu <= other.neu
43        } else {
44            self.time.less_equal(&other.time)
45        }
46    }
47}
48
49// Implement timely dataflow's `PathSummary` trait.
50// This is preparation for the `Timestamp` implementation below.
51use timely::progress::PathSummary;
52impl<T: Timestamp> PathSummary<AltNeu<T>> for () {
53    fn results_in(&self, timestamp: &AltNeu<T>) -> Option<AltNeu<T>> {
54        Some(timestamp.clone())
55    }
56    fn followed_by(&self, other: &Self) -> Option<Self> {
57        Some(other.clone())
58    }
59}
60
61// Implement timely dataflow's `Timestamp` trait.
62use timely::progress::Timestamp;
63impl<T: Timestamp> Timestamp for AltNeu<T> {
64    type Summary = ();
65}
66
67use timely::progress::timestamp::Refines;
68
69impl<T: Timestamp> Refines<T> for AltNeu<T> {
70    fn to_inner(other: T) -> Self {
71        AltNeu::alt(other)
72    }
73    fn to_outer(self: AltNeu<T>) -> T {
74        self.time
75    }
76    fn summarize(_path: ()) -> <T as Timestamp>::Summary {
77        Default::default()
78    }
79}
80
81// Implement differential dataflow's `Lattice` trait.
82// This extends the `PartialOrder` implementation with additional structure.
83use differential_dataflow::lattice::Lattice;
84impl<T: Lattice> Lattice for AltNeu<T> {
85    fn minimum() -> Self {
86        AltNeu::alt(T::minimum())
87    }
88    fn join(&self, other: &Self) -> Self {
89        let time = self.time.join(&other.time);
90        let mut neu = false;
91        if time == self.time {
92            neu = neu || self.neu;
93        }
94        if time == other.time {
95            neu = neu || other.neu;
96        }
97        AltNeu { time, neu }
98    }
99    fn meet(&self, other: &Self) -> Self {
100        let time = self.time.meet(&other.time);
101        let mut neu = true;
102        if time == self.time {
103            neu = neu && self.neu;
104        }
105        if time == other.time {
106            neu = neu && other.neu;
107        }
108        AltNeu { time, neu }
109    }
110}