spacetimedb_vm/
rel_ops.rs

1use core::iter;
2
3use crate::relation::RelValue;
4use spacetimedb_data_structures::map::HashMap;
5use spacetimedb_sats::AlgebraicValue;
6use spacetimedb_schema::relation::ColExpr;
7
8/// A trait for dealing with fallible iterators for the database.
9pub trait RelOps<'a> {
10    /// Advances the `iterator` and returns the next [RelValue].
11    fn next(&mut self) -> Option<RelValue<'a>>;
12
13    /// Creates an `Iterator` which uses a closure to determine if a [RelValueRef] should be yielded.
14    ///
15    /// Given a [RelValueRef] the closure must return true or false.
16    /// The returned iterator will yield only the elements for which the closure returns true.
17    ///
18    /// Note:
19    ///
20    /// It is the equivalent of a `WHERE` clause on SQL.
21    #[inline]
22    fn select<P>(self, predicate: P) -> Select<Self, P>
23    where
24        P: FnMut(&RelValue<'_>) -> bool,
25        Self: Sized,
26    {
27        Select::new(self, predicate)
28    }
29
30    /// Creates an `Iterator` which uses a closure that projects to a new [RelValue] extracted from the current.
31    ///
32    /// Given a [RelValue] the closure must return a subset of the current one.
33    ///
34    /// The [Header] is pre-checked that all the fields exist and return a error if any field is not found.
35    ///
36    /// Note:
37    ///
38    /// It is the equivalent of a `SELECT` clause on SQL.
39    #[inline]
40    fn project<'b, P>(self, cols: &'b [ColExpr], extractor: P) -> Project<'b, Self, P>
41    where
42        P: for<'c> FnMut(&[ColExpr], RelValue<'c>) -> RelValue<'c>,
43        Self: Sized,
44    {
45        Project::new(self, cols, extractor)
46    }
47
48    /// Intersection between the left and the right, both (non-sorted) `iterators`.
49    ///
50    /// The hash join strategy requires the right iterator can be collected to a `HashMap`.
51    /// The left iterator can be arbitrarily long.
52    ///
53    /// It is therefore asymmetric (you can't flip the iterators to get a right_outer join).
54    ///
55    /// Note:
56    ///
57    /// It is the equivalent of a `INNER JOIN` clause on SQL.
58    #[inline]
59    fn join_inner<Pred, Proj, KeyLhs, KeyRhs, Rhs>(
60        self,
61        with: Rhs,
62        key_lhs: KeyLhs,
63        key_rhs: KeyRhs,
64        predicate: Pred,
65        project: Proj,
66    ) -> JoinInner<'a, Self, Rhs, KeyLhs, KeyRhs, Pred, Proj>
67    where
68        Self: Sized,
69        Pred: FnMut(&RelValue<'a>, &RelValue<'a>) -> bool,
70        Proj: FnMut(RelValue<'a>, RelValue<'a>) -> RelValue<'a>,
71        KeyLhs: FnMut(&RelValue<'a>) -> AlgebraicValue,
72        KeyRhs: FnMut(&RelValue<'a>) -> AlgebraicValue,
73        Rhs: RelOps<'a>,
74    {
75        JoinInner::new(self, with, key_lhs, key_rhs, predicate, project)
76    }
77
78    /// Collect all the rows in this relation into a `Vec<T>` given a function `RelValue<'a> -> T`.
79    #[inline]
80    fn collect_vec<T>(mut self, mut convert: impl FnMut(RelValue<'a>) -> T) -> Vec<T>
81    where
82        Self: Sized,
83    {
84        let mut result = Vec::new();
85        while let Some(row) = self.next() {
86            result.push(convert(row));
87        }
88        result
89    }
90
91    fn iter(&mut self) -> impl Iterator<Item = RelValue<'a>>
92    where
93        Self: Sized,
94    {
95        iter::from_fn(move || self.next())
96    }
97}
98
99impl<'a, I: RelOps<'a> + ?Sized> RelOps<'a> for Box<I> {
100    fn next(&mut self) -> Option<RelValue<'a>> {
101        (**self).next()
102    }
103}
104
105/// `RelOps` iterator which never returns any rows.
106///
107/// Used to compile queries with unsatisfiable bounds, like `WHERE x < 5 AND x > 5`.
108#[derive(Clone, Debug)]
109pub struct EmptyRelOps;
110
111impl<'a> RelOps<'a> for EmptyRelOps {
112    fn next(&mut self) -> Option<RelValue<'a>> {
113        None
114    }
115}
116
117#[derive(Clone, Debug)]
118pub struct Select<I, P> {
119    pub(crate) iter: I,
120    pub(crate) predicate: P,
121}
122
123impl<I, P> Select<I, P> {
124    pub fn new(iter: I, predicate: P) -> Select<I, P> {
125        Select { iter, predicate }
126    }
127}
128
129impl<'a, I, P> RelOps<'a> for Select<I, P>
130where
131    I: RelOps<'a>,
132    P: FnMut(&RelValue<'a>) -> bool,
133{
134    fn next(&mut self) -> Option<RelValue<'a>> {
135        let filter = &mut self.predicate;
136        while let Some(v) = self.iter.next() {
137            if filter(&v) {
138                return Some(v);
139            }
140        }
141        None
142    }
143}
144
145#[derive(Clone, Debug)]
146pub struct Project<'a, I, P> {
147    pub(crate) cols: &'a [ColExpr],
148    pub(crate) iter: I,
149    pub(crate) extractor: P,
150}
151
152impl<'a, I, P> Project<'a, I, P> {
153    pub fn new(iter: I, cols: &'a [ColExpr], extractor: P) -> Project<'a, I, P> {
154        Project { iter, cols, extractor }
155    }
156}
157
158impl<'a, I, P> RelOps<'a> for Project<'_, I, P>
159where
160    I: RelOps<'a>,
161    P: FnMut(&[ColExpr], RelValue<'a>) -> RelValue<'a>,
162{
163    fn next(&mut self) -> Option<RelValue<'a>> {
164        self.iter.next().map(|v| (self.extractor)(self.cols, v))
165    }
166}
167
168#[derive(Clone, Debug)]
169pub struct JoinInner<'a, Lhs, Rhs, KeyLhs, KeyRhs, Pred, Proj> {
170    pub(crate) lhs: Lhs,
171    pub(crate) rhs: Rhs,
172    pub(crate) key_lhs: KeyLhs,
173    pub(crate) key_rhs: KeyRhs,
174    pub(crate) predicate: Pred,
175    pub(crate) projection: Proj,
176    map: HashMap<AlgebraicValue, Vec<RelValue<'a>>>,
177    filled_rhs: bool,
178    left: Option<RelValue<'a>>,
179}
180
181impl<Lhs, Rhs, KeyLhs, KeyRhs, Pred, Proj> JoinInner<'_, Lhs, Rhs, KeyLhs, KeyRhs, Pred, Proj> {
182    pub fn new(lhs: Lhs, rhs: Rhs, key_lhs: KeyLhs, key_rhs: KeyRhs, predicate: Pred, projection: Proj) -> Self {
183        Self {
184            map: HashMap::default(),
185            lhs,
186            rhs,
187            key_lhs,
188            key_rhs,
189            predicate,
190            projection,
191            filled_rhs: false,
192            left: None,
193        }
194    }
195}
196
197impl<'a, Lhs, Rhs, KeyLhs, KeyRhs, Pred, Proj> RelOps<'a> for JoinInner<'a, Lhs, Rhs, KeyLhs, KeyRhs, Pred, Proj>
198where
199    Lhs: RelOps<'a>,
200    Rhs: RelOps<'a>,
201    KeyLhs: FnMut(&RelValue<'a>) -> AlgebraicValue,
202    KeyRhs: FnMut(&RelValue<'a>) -> AlgebraicValue,
203    Pred: FnMut(&RelValue<'a>, &RelValue<'a>) -> bool,
204    Proj: FnMut(RelValue<'a>, RelValue<'a>) -> RelValue<'a>,
205{
206    fn next(&mut self) -> Option<RelValue<'a>> {
207        // Consume `Rhs`, building a map `KeyRhs => Rhs`.
208        if !self.filled_rhs {
209            self.map = HashMap::default();
210            while let Some(row_rhs) = self.rhs.next() {
211                let key_rhs = (self.key_rhs)(&row_rhs);
212                self.map.entry(key_rhs).or_default().push(row_rhs);
213            }
214            self.filled_rhs = true;
215        }
216
217        loop {
218            // Consume a row in `Lhs` and project to `KeyLhs`.
219            let lhs = match &self.left {
220                Some(left) => left,
221                None => self.left.insert(self.lhs.next()?),
222            };
223            let k = (self.key_lhs)(lhs);
224
225            // If we can relate `KeyLhs` and `KeyRhs`, we have candidate.
226            // If that candidate still has rhs elements, test against the predicate and yield.
227            if let Some(rvv) = self.map.get_mut(&k) {
228                if let Some(rhs) = rvv.pop() {
229                    if (self.predicate)(lhs, &rhs) {
230                        return Some((self.projection)(lhs.clone(), rhs));
231                    }
232                }
233            }
234            self.left = None;
235            continue;
236        }
237    }
238}