1use crate::{
2 Scope,
3 algebra::{Lattice, PartialOrder},
4 time::Timestamp,
5 trace::Batch,
6};
7use feldera_macros::IsNone;
8use rkyv::{Archive, Deserialize, Serialize};
9use size_of::SizeOf;
10use std::fmt::{Debug, Display, Formatter};
11
12#[derive(
14 Copy,
15 Clone,
16 Hash,
17 Eq,
18 PartialEq,
19 Default,
20 Ord,
21 PartialOrd,
22 SizeOf,
23 Archive,
24 Serialize,
25 Deserialize,
26 IsNone,
27)]
28#[archive_attr(derive(Ord, Eq, PartialEq, PartialOrd))]
29#[archive(bound(
30 archive = "TOuter: Archive, TInner: Archive, <TOuter as Archive>::Archived: Ord, <TInner as Archive>::Archived: Ord"
31))]
32#[archive(compare(PartialEq, PartialOrd))]
33pub struct Product<TOuter, TInner> {
34 pub outer: TOuter,
36 pub inner: TInner,
38}
39
40impl<TOuter, TInner> Product<TOuter, TInner> {
41 pub fn new(outer: TOuter, inner: TInner) -> Product<TOuter, TInner> {
43 Product { outer, inner }
44 }
45}
46
47impl<T1: Lattice, T2: Lattice> Lattice for Product<T1, T2> {
48 #[inline]
49 fn join(&self, other: &Product<T1, T2>) -> Product<T1, T2> {
50 Product {
51 outer: self.outer.join(&other.outer),
52 inner: self.inner.join(&other.inner),
53 }
54 }
55
56 #[inline]
57 fn meet(&self, other: &Product<T1, T2>) -> Product<T1, T2> {
58 Product {
59 outer: self.outer.meet(&other.outer),
60 inner: self.inner.meet(&other.inner),
61 }
62 }
63}
64
65impl<TOuter: Debug, TInner: Debug> Debug for Product<TOuter, TInner> {
67 fn fmt(&self, f: &mut Formatter) -> Result<(), std::fmt::Error> {
68 f.write_str(&format!("({:?}, {:?})", self.outer, self.inner))
69 }
70}
71
72impl<TOuter: Display, TInner: Display> Display for Product<TOuter, TInner> {
73 fn fmt(&self, f: &mut Formatter) -> Result<(), std::fmt::Error> {
74 f.write_str(&format!("({}, {})", self.outer, self.inner))
75 }
76}
77
78impl<TOuter: PartialOrder, TInner: PartialOrder> PartialOrder for Product<TOuter, TInner> {
79 #[inline(always)]
80 fn less_equal(&self, other: &Self) -> bool {
81 self.outer.less_equal(&other.outer) && self.inner.less_equal(&other.inner)
82 }
83}
84
85impl<TOuter, TInner> Timestamp for Product<TOuter, TInner>
86where
87 TOuter: Timestamp,
88 TInner: Timestamp,
89{
90 const NESTING_DEPTH: usize = TOuter::NESTING_DEPTH + 1;
91
92 type Nested = Product<Self, u32>;
93
94 type TimedBatch<B: Batch<Time = ()>> = B::Timed<Self>;
95
96 fn minimum() -> Self {
97 Self::new(TOuter::minimum(), TInner::minimum())
98 }
99
100 fn clock_start() -> Self {
101 Self::new(TOuter::clock_start(), TInner::clock_start())
102 }
103
104 fn advance(&self, scope: Scope) -> Self {
105 if scope == 0 {
106 Self::new(self.outer.clone(), self.inner.advance(0))
107 } else {
108 Self::new(self.outer.advance(scope - 1), TInner::minimum())
109 }
110 }
111
112 fn recede(&self, scope: Scope) -> Self {
113 if scope == 0 {
114 Self::new(self.outer.clone(), self.inner.recede(0))
115 } else {
116 Self::new(self.outer.recede(scope - 1), self.inner.clone())
117 }
118 }
119
120 fn checked_recede(&self, scope: Scope) -> Option<Self> {
121 if scope == 0 {
122 self.inner
123 .checked_recede(0)
124 .map(|inner| Self::new(self.outer.clone(), inner))
125 } else {
126 self.outer
127 .checked_recede(scope - 1)
128 .map(|outer| Self::new(outer, self.inner.clone()))
129 }
130 }
131
132 fn epoch_start(&self, scope: Scope) -> Self {
133 if scope == 0 {
134 Self::new(self.outer.clone(), TInner::minimum())
135 } else {
136 Self::new(self.outer.epoch_start(scope - 1), TInner::minimum())
137 }
138 }
139
140 fn epoch_end(&self, scope: Scope) -> Self {
141 if scope == 0 {
142 Self::new(self.outer.clone(), self.inner.epoch_end(0))
143 } else {
144 Self::new(self.outer.epoch_end(scope - 1), self.inner.epoch_end(0))
145 }
146 }
147}