1use core::iter;
2
3use crate::relation::RelValue;
4use spacetimedb_data_structures::map::HashMap;
5use spacetimedb_sats::AlgebraicValue;
6use spacetimedb_schema::relation::ColExpr;
7
8pub trait RelOps<'a> {
10 fn next(&mut self) -> Option<RelValue<'a>>;
12
13 #[inline]
22 fn select<P>(self, predicate: P) -> Select<Self, P>
23 where
24 P: FnMut(&RelValue<'_>) -> bool,
25 Self: Sized,
26 {
27 Select::new(self, predicate)
28 }
29
30 #[inline]
40 fn project<'b, P>(self, cols: &'b [ColExpr], extractor: P) -> Project<'b, Self, P>
41 where
42 P: for<'c> FnMut(&[ColExpr], RelValue<'c>) -> RelValue<'c>,
43 Self: Sized,
44 {
45 Project::new(self, cols, extractor)
46 }
47
48 #[inline]
59 fn join_inner<Pred, Proj, KeyLhs, KeyRhs, Rhs>(
60 self,
61 with: Rhs,
62 key_lhs: KeyLhs,
63 key_rhs: KeyRhs,
64 predicate: Pred,
65 project: Proj,
66 ) -> JoinInner<'a, Self, Rhs, KeyLhs, KeyRhs, Pred, Proj>
67 where
68 Self: Sized,
69 Pred: FnMut(&RelValue<'a>, &RelValue<'a>) -> bool,
70 Proj: FnMut(RelValue<'a>, RelValue<'a>) -> RelValue<'a>,
71 KeyLhs: FnMut(&RelValue<'a>) -> AlgebraicValue,
72 KeyRhs: FnMut(&RelValue<'a>) -> AlgebraicValue,
73 Rhs: RelOps<'a>,
74 {
75 JoinInner::new(self, with, key_lhs, key_rhs, predicate, project)
76 }
77
78 #[inline]
80 fn collect_vec<T>(mut self, mut convert: impl FnMut(RelValue<'a>) -> T) -> Vec<T>
81 where
82 Self: Sized,
83 {
84 let mut result = Vec::new();
85 while let Some(row) = self.next() {
86 result.push(convert(row));
87 }
88 result
89 }
90
91 fn iter(&mut self) -> impl Iterator<Item = RelValue<'a>>
92 where
93 Self: Sized,
94 {
95 iter::from_fn(move || self.next())
96 }
97}
98
99impl<'a, I: RelOps<'a> + ?Sized> RelOps<'a> for Box<I> {
100 fn next(&mut self) -> Option<RelValue<'a>> {
101 (**self).next()
102 }
103}
104
105#[derive(Clone, Debug)]
109pub struct EmptyRelOps;
110
111impl<'a> RelOps<'a> for EmptyRelOps {
112 fn next(&mut self) -> Option<RelValue<'a>> {
113 None
114 }
115}
116
117#[derive(Clone, Debug)]
118pub struct Select<I, P> {
119 pub(crate) iter: I,
120 pub(crate) predicate: P,
121}
122
123impl<I, P> Select<I, P> {
124 pub fn new(iter: I, predicate: P) -> Select<I, P> {
125 Select { iter, predicate }
126 }
127}
128
129impl<'a, I, P> RelOps<'a> for Select<I, P>
130where
131 I: RelOps<'a>,
132 P: FnMut(&RelValue<'a>) -> bool,
133{
134 fn next(&mut self) -> Option<RelValue<'a>> {
135 let filter = &mut self.predicate;
136 while let Some(v) = self.iter.next() {
137 if filter(&v) {
138 return Some(v);
139 }
140 }
141 None
142 }
143}
144
145#[derive(Clone, Debug)]
146pub struct Project<'a, I, P> {
147 pub(crate) cols: &'a [ColExpr],
148 pub(crate) iter: I,
149 pub(crate) extractor: P,
150}
151
152impl<'a, I, P> Project<'a, I, P> {
153 pub fn new(iter: I, cols: &'a [ColExpr], extractor: P) -> Project<'a, I, P> {
154 Project { iter, cols, extractor }
155 }
156}
157
158impl<'a, I, P> RelOps<'a> for Project<'_, I, P>
159where
160 I: RelOps<'a>,
161 P: FnMut(&[ColExpr], RelValue<'a>) -> RelValue<'a>,
162{
163 fn next(&mut self) -> Option<RelValue<'a>> {
164 self.iter.next().map(|v| (self.extractor)(self.cols, v))
165 }
166}
167
168#[derive(Clone, Debug)]
169pub struct JoinInner<'a, Lhs, Rhs, KeyLhs, KeyRhs, Pred, Proj> {
170 pub(crate) lhs: Lhs,
171 pub(crate) rhs: Rhs,
172 pub(crate) key_lhs: KeyLhs,
173 pub(crate) key_rhs: KeyRhs,
174 pub(crate) predicate: Pred,
175 pub(crate) projection: Proj,
176 map: HashMap<AlgebraicValue, Vec<RelValue<'a>>>,
177 filled_rhs: bool,
178 left: Option<RelValue<'a>>,
179}
180
181impl<Lhs, Rhs, KeyLhs, KeyRhs, Pred, Proj> JoinInner<'_, Lhs, Rhs, KeyLhs, KeyRhs, Pred, Proj> {
182 pub fn new(lhs: Lhs, rhs: Rhs, key_lhs: KeyLhs, key_rhs: KeyRhs, predicate: Pred, projection: Proj) -> Self {
183 Self {
184 map: HashMap::default(),
185 lhs,
186 rhs,
187 key_lhs,
188 key_rhs,
189 predicate,
190 projection,
191 filled_rhs: false,
192 left: None,
193 }
194 }
195}
196
197impl<'a, Lhs, Rhs, KeyLhs, KeyRhs, Pred, Proj> RelOps<'a> for JoinInner<'a, Lhs, Rhs, KeyLhs, KeyRhs, Pred, Proj>
198where
199 Lhs: RelOps<'a>,
200 Rhs: RelOps<'a>,
201 KeyLhs: FnMut(&RelValue<'a>) -> AlgebraicValue,
202 KeyRhs: FnMut(&RelValue<'a>) -> AlgebraicValue,
203 Pred: FnMut(&RelValue<'a>, &RelValue<'a>) -> bool,
204 Proj: FnMut(RelValue<'a>, RelValue<'a>) -> RelValue<'a>,
205{
206 fn next(&mut self) -> Option<RelValue<'a>> {
207 if !self.filled_rhs {
209 self.map = HashMap::default();
210 while let Some(row_rhs) = self.rhs.next() {
211 let key_rhs = (self.key_rhs)(&row_rhs);
212 self.map.entry(key_rhs).or_default().push(row_rhs);
213 }
214 self.filled_rhs = true;
215 }
216
217 loop {
218 let lhs = match &self.left {
220 Some(left) => left,
221 None => self.left.insert(self.lhs.next()?),
222 };
223 let k = (self.key_lhs)(lhs);
224
225 if let Some(rvv) = self.map.get_mut(&k) {
228 if let Some(rhs) = rvv.pop() {
229 if (self.predicate)(lhs, &rhs) {
230 return Some((self.projection)(lhs.clone(), rhs));
231 }
232 }
233 }
234 self.left = None;
235 continue;
236 }
237 }
238}