declarative_dataflow/timestamp/
pair.rs

1//! Code by Frank McSherry, customized here. Original source:
2//! github.com/TimelyDataflow/differential-dataflow/blob/master/experiments/src/bin/multitemporal.rs
3//!
4//! This module contains a definition of a new timestamp time, a
5//! "pair" or product.
6//!
7//! This is a minimal self-contained implementation, in that it
8//! doesn't borrow anything from the rest of the library other than
9//! the traits it needs to implement. With this type and its
10//! implementations, you can use it as a timestamp type.
11
12/// A pair of timestamps, partially ordered by the product order.
13#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
14pub struct Pair<S, T> {
15    /// First timestamp coordinate, e.g. system time.
16    pub first: S,
17    /// Second timestamp coordinate, e.g. event time.
18    pub second: T,
19}
20
21impl<S, T> Pair<S, T> {
22    /// Create a new pair.
23    pub fn new(first: S, second: T) -> Self {
24        Pair { first, second }
25    }
26}
27
28// Implement timely dataflow's `PartialOrder` trait.
29use timely::order::PartialOrder;
30impl<S: PartialOrder, T: PartialOrder> PartialOrder for Pair<S, T> {
31    fn less_equal(&self, other: &Self) -> bool {
32        self.first.less_equal(&other.first) && self.second.less_equal(&other.second)
33    }
34}
35
36use timely::progress::timestamp::Refines;
37impl<S: Timestamp, T: Timestamp> Refines<()> for Pair<S, T> {
38    fn to_inner(_outer: ()) -> Self {
39        Default::default()
40    }
41    fn to_outer(self) {}
42    fn summarize(_summary: <Self>::Summary) {}
43}
44
45// Implement timely dataflow's `PathSummary` trait.
46// This is preparation for the `Timestamp` implementation below.
47use timely::progress::PathSummary;
48
49impl<S: Timestamp, T: Timestamp> PathSummary<Pair<S, T>> for () {
50    fn results_in(&self, timestamp: &Pair<S, T>) -> Option<Pair<S, T>> {
51        Some(timestamp.clone())
52    }
53    fn followed_by(&self, other: &Self) -> Option<Self> {
54        Some(other.clone())
55    }
56}
57
58// Implement timely dataflow's `Timestamp` trait.
59use timely::progress::Timestamp;
60impl<S: Timestamp, T: Timestamp> Timestamp for Pair<S, T> {
61    type Summary = ();
62}
63
64// Implement differential dataflow's `Lattice` trait.
65// This extends the `PartialOrder` implementation with additional structure.
66use differential_dataflow::lattice::Lattice;
67impl<S: Lattice, T: Lattice> Lattice for Pair<S, T> {
68    fn minimum() -> Self {
69        Pair {
70            first: S::minimum(),
71            second: T::minimum(),
72        }
73    }
74    fn join(&self, other: &Self) -> Self {
75        Pair {
76            first: self.first.join(&other.first),
77            second: self.second.join(&other.second),
78        }
79    }
80    fn meet(&self, other: &Self) -> Self {
81        Pair {
82            first: self.first.meet(&other.first),
83            second: self.second.meet(&other.second),
84        }
85    }
86}
87
88use std::fmt::{Debug, Error, Formatter};
89
90/// Debug implementation to avoid seeing fully qualified path names.
91impl<TOuter: Debug, TInner: Debug> Debug for Pair<TOuter, TInner> {
92    fn fmt(&self, f: &mut Formatter) -> Result<(), Error> {
93        f.write_str(&format!("({:?}, {:?})", self.first, self.second))
94    }
95}
96
97impl<TOuter, TInner> std::ops::Sub for Pair<TOuter, TInner>
98where
99    TOuter: std::ops::Sub<Output = TOuter>,
100    TInner: std::ops::Sub<Output = TInner>,
101{
102    type Output = Self;
103
104    fn sub(self, other: Self) -> Self {
105        Pair {
106            first: self.first.sub(other.first),
107            second: self.second.sub(other.second),
108        }
109    }
110}