Skip to main content

dbsp/time/
product.rs

1use crate::{
2    Scope,
3    algebra::{Lattice, PartialOrder},
4    time::Timestamp,
5    trace::Batch,
6};
7use feldera_macros::IsNone;
8use rkyv::{Archive, Deserialize, Serialize};
9use size_of::SizeOf;
10use std::fmt::{Debug, Display, Formatter};
11
12/// A nested pair of timestamps, one outer and one inner.
13#[derive(
14    Copy,
15    Clone,
16    Hash,
17    Eq,
18    PartialEq,
19    Default,
20    Ord,
21    PartialOrd,
22    SizeOf,
23    Archive,
24    Serialize,
25    Deserialize,
26    IsNone,
27)]
28#[archive_attr(derive(Ord, Eq, PartialEq, PartialOrd))]
29#[archive(bound(
30    archive = "TOuter: Archive, TInner: Archive, <TOuter as Archive>::Archived: Ord, <TInner as Archive>::Archived: Ord"
31))]
32#[archive(compare(PartialEq, PartialOrd))]
33pub struct Product<TOuter, TInner> {
34    /// Outer timestamp.
35    pub outer: TOuter,
36    /// Inner timestamp.
37    pub inner: TInner,
38}
39
40impl<TOuter, TInner> Product<TOuter, TInner> {
41    /// Creates a new product from outer and inner coordinates.
42    pub fn new(outer: TOuter, inner: TInner) -> Product<TOuter, TInner> {
43        Product { outer, inner }
44    }
45}
46
47impl<T1: Lattice, T2: Lattice> Lattice for Product<T1, T2> {
48    #[inline]
49    fn join(&self, other: &Product<T1, T2>) -> Product<T1, T2> {
50        Product {
51            outer: self.outer.join(&other.outer),
52            inner: self.inner.join(&other.inner),
53        }
54    }
55
56    #[inline]
57    fn meet(&self, other: &Product<T1, T2>) -> Product<T1, T2> {
58        Product {
59            outer: self.outer.meet(&other.outer),
60            inner: self.inner.meet(&other.inner),
61        }
62    }
63}
64
65/// Debug implementation to avoid seeing fully qualified path names.
66impl<TOuter: Debug, TInner: Debug> Debug for Product<TOuter, TInner> {
67    fn fmt(&self, f: &mut Formatter) -> Result<(), std::fmt::Error> {
68        f.write_str(&format!("({:?}, {:?})", self.outer, self.inner))
69    }
70}
71
72impl<TOuter: Display, TInner: Display> Display for Product<TOuter, TInner> {
73    fn fmt(&self, f: &mut Formatter) -> Result<(), std::fmt::Error> {
74        f.write_str(&format!("({}, {})", self.outer, self.inner))
75    }
76}
77
78impl<TOuter: PartialOrder, TInner: PartialOrder> PartialOrder for Product<TOuter, TInner> {
79    #[inline(always)]
80    fn less_equal(&self, other: &Self) -> bool {
81        self.outer.less_equal(&other.outer) && self.inner.less_equal(&other.inner)
82    }
83}
84
85impl<TOuter, TInner> Timestamp for Product<TOuter, TInner>
86where
87    TOuter: Timestamp,
88    TInner: Timestamp,
89{
90    const NESTING_DEPTH: usize = TOuter::NESTING_DEPTH + 1;
91
92    type Nested = Product<Self, u32>;
93
94    type TimedBatch<B: Batch<Time = ()>> = B::Timed<Self>;
95
96    fn minimum() -> Self {
97        Self::new(TOuter::minimum(), TInner::minimum())
98    }
99
100    fn clock_start() -> Self {
101        Self::new(TOuter::clock_start(), TInner::clock_start())
102    }
103
104    fn advance(&self, scope: Scope) -> Self {
105        if scope == 0 {
106            Self::new(self.outer.clone(), self.inner.advance(0))
107        } else {
108            Self::new(self.outer.advance(scope - 1), TInner::minimum())
109        }
110    }
111
112    fn recede(&self, scope: Scope) -> Self {
113        if scope == 0 {
114            Self::new(self.outer.clone(), self.inner.recede(0))
115        } else {
116            Self::new(self.outer.recede(scope - 1), self.inner.clone())
117        }
118    }
119
120    fn checked_recede(&self, scope: Scope) -> Option<Self> {
121        if scope == 0 {
122            self.inner
123                .checked_recede(0)
124                .map(|inner| Self::new(self.outer.clone(), inner))
125        } else {
126            self.outer
127                .checked_recede(scope - 1)
128                .map(|outer| Self::new(outer, self.inner.clone()))
129        }
130    }
131
132    fn epoch_start(&self, scope: Scope) -> Self {
133        if scope == 0 {
134            Self::new(self.outer.clone(), TInner::minimum())
135        } else {
136            Self::new(self.outer.epoch_start(scope - 1), TInner::minimum())
137        }
138    }
139
140    fn epoch_end(&self, scope: Scope) -> Self {
141        if scope == 0 {
142            Self::new(self.outer.clone(), self.inner.epoch_end(0))
143        } else {
144            Self::new(self.outer.epoch_end(scope - 1), self.inner.epoch_end(0))
145        }
146    }
147}