spacetimedb_subscription/
lib.rs

1use anyhow::{bail, Result};
2use spacetimedb_execution::{
3    pipelined::{
4        PipelinedExecutor, PipelinedIxDeltaJoin, PipelinedIxDeltaScan, PipelinedIxJoin, PipelinedIxScan,
5        PipelinedProject,
6    },
7    Datastore, DeltaStore, Row,
8};
9use spacetimedb_expr::check::SchemaView;
10use spacetimedb_lib::{identity::AuthCtx, metrics::ExecutionMetrics, query::Delta, AlgebraicValue};
11use spacetimedb_physical_plan::plan::{IxJoin, IxScan, Label, PhysicalPlan, ProjectPlan, Sarg, TableScan, TupleField};
12use spacetimedb_primitives::{ColId, ColList, IndexId, TableId};
13use spacetimedb_query::compile_subscription;
14use std::sync::Arc;
15use std::{collections::HashSet, ops::RangeBounds};
16
17/// A subscription is a view over a particular table.
18/// How do we incrementally maintain that view?
19/// These are the query fragments that are required.
20/// See [Self::compile_from_plan] for how to generate them.
21#[derive(Debug)]
22struct Fragments {
23    /// Plan fragments that return rows to insert.
24    /// For joins there will be 4 fragments,
25    /// but for selects only one.
26    insert_plans: Vec<PipelinedProject>,
27    /// Plan fragments that return rows to delete.
28    /// For joins there will be 4 fragments,
29    /// but for selects only one.
30    delete_plans: Vec<PipelinedProject>,
31}
32
33impl Fragments {
34    /// Returns the index ids from which this fragment reads.
35    fn index_ids(&self) -> impl Iterator<Item = (TableId, IndexId)> {
36        let mut index_ids = HashSet::new();
37        for plan in self.insert_plans.iter().chain(self.delete_plans.iter()) {
38            plan.visit(&mut |plan| match plan {
39                PipelinedExecutor::IxScan(PipelinedIxScan { table_id, index_id, .. })
40                | PipelinedExecutor::IxDeltaScan(PipelinedIxDeltaScan { table_id, index_id, .. })
41                | PipelinedExecutor::IxJoin(PipelinedIxJoin {
42                    rhs_table: table_id,
43                    rhs_index: index_id,
44                    ..
45                })
46                | PipelinedExecutor::IxDeltaJoin(PipelinedIxDeltaJoin {
47                    rhs_table: table_id,
48                    rhs_index: index_id,
49                    ..
50                }) => {
51                    index_ids.insert((*table_id, *index_id));
52                }
53                _ => {}
54            });
55        }
56        index_ids.into_iter()
57    }
58
59    /// A subscription is just a view of a particular table.
60    /// Here we compute the rows that are to be inserted into that view,
61    /// and evaluate a closure over each one.
62    fn for_each_insert<'a, Tx: Datastore + DeltaStore>(
63        &self,
64        tx: &'a Tx,
65        metrics: &mut ExecutionMetrics,
66        f: &mut dyn FnMut(Row<'a>) -> Result<()>,
67    ) -> Result<()> {
68        for plan in &self.insert_plans {
69            if !plan.is_empty(tx) {
70                plan.execute(tx, metrics, f)?;
71            }
72        }
73        Ok(())
74    }
75
76    /// A subscription is just a view of a particular table.
77    /// Here we compute the rows that are to be removed from that view,
78    /// and evaluate a closure over each one.
79    fn for_each_delete<'a, Tx: Datastore + DeltaStore>(
80        &self,
81        tx: &'a Tx,
82        metrics: &mut ExecutionMetrics,
83        f: &mut dyn FnMut(Row<'a>) -> Result<()>,
84    ) -> Result<()> {
85        for plan in &self.delete_plans {
86            if !plan.is_empty(tx) {
87                plan.execute(tx, metrics, f)?;
88            }
89        }
90        Ok(())
91    }
92
93    /// Which fragments are required for incrementally updating a subscription?
94    /// This is most interesting in the case of a join.
95    ///
96    /// Let `V`  denote the join between tables `R` and `S` at time `t`.
97    /// Let `V'` denote the same join at time `t+1`.
98    ///
99    /// We then have the following equality
100    ///
101    /// ```text
102    /// V' = V U dv
103    /// ```
104    ///
105    /// where `dv` is called the delta of `V`.
106    ///
107    /// So how do we compute `dv` incrementally?
108    /// That is, without evaluating `R' x S'`.
109    /// and without access to the state at time `t`.
110    ///
111    /// Given the following notation:
112    ///
113    /// ```text
114    /// x: The relational join operator
115    /// U: union
116    /// -: difference
117    ///
118    /// dv: The difference or delta between V and V'
119    ///
120    /// dv(+): Rows in V' that are not in V
121    /// dv(-): Rows in V  that are not in V'
122    /// ```
123    ///
124    /// we derive the following equations
125    ///
126    /// ```text
127    /// V  = R x S
128    ///    = RS
129    ///
130    /// V' = V  U dv
131    ///    = RS U dv
132    ///
133    /// V' = R' x S'
134    ///    = (R U dr) x (S U ds)
135    ///    = RS U Rds U drS U drds
136    ///
137    /// dv = Rds U drS U drds
138    ///    = (R' - dr)ds U dr(S' - ds) U drds
139    ///    = R'ds - drds U drS' - drds U drds
140    ///    = R'ds U drS' - drds
141    ///    = R'(ds(+) - ds(-)) U (dr(+) - dr(-))S' - (dr(+) - dr(-))(ds(+) - ds(-))
142    ///    = R'ds(+)
143    ///         - R'ds(-)
144    ///         U dr(+)S'
145    ///         - dr(-)S'
146    ///         - dr(+)ds(+)
147    ///         U dr(+)ds(-)
148    ///         U dr(-)ds(+)
149    ///         - dr(-)ds(-)
150    ///    = R'ds(+)
151    ///         U dr(+)S'
152    ///         U dr(+)ds(-)
153    ///         U dr(-)ds(+)
154    ///         - R'ds(-)
155    ///         - dr(-)S'
156    ///         - dr(+)ds(+)
157    ///         - dr(-)ds(-)
158    ///
159    /// dv(+) = R'ds(+) U dr(+)S' U dr(+)ds(-) U dr(-)ds(+)
160    /// dv(-) = R'ds(-) U dr(-)S' U dr(+)ds(+) U dr(-)ds(-)
161    /// ```
162    fn compile_from_plan(plan: &ProjectPlan, tables: &[Label]) -> Result<Self> {
163        /// Mutate a query plan by turning a table scan into a delta scan
164        fn mut_plan(plan: &mut ProjectPlan, relvar: Label, delta: Delta) {
165            plan.visit_mut(&mut |plan| match plan {
166                PhysicalPlan::TableScan(
167                    scan @ TableScan {
168                        limit: None,
169                        delta: None,
170                        ..
171                    },
172                    alias,
173                ) if alias == &relvar => {
174                    scan.delta = Some(delta);
175                }
176                _ => {}
177            });
178        }
179
180        /// Return a new plan with delta scans for the given tables
181        fn new_plan(plan: &ProjectPlan, tables: &[(Label, Delta)]) -> Result<PipelinedProject> {
182            let mut plan = plan.clone();
183            for (alias, delta) in tables {
184                mut_plan(&mut plan, *alias, *delta);
185            }
186            plan.optimize().map(PipelinedProject::from)
187        }
188
189        match tables {
190            [dr] => Ok(Fragments {
191                insert_plans: vec![new_plan(plan, &[(*dr, Delta::Inserts)])?],
192                delete_plans: vec![new_plan(plan, &[(*dr, Delta::Deletes)])?],
193            }),
194            [dr, ds] => Ok(Fragments {
195                insert_plans: vec![
196                    new_plan(
197                        // dr(+)S'
198                        plan,
199                        &[(*dr, Delta::Inserts)],
200                    )?,
201                    new_plan(
202                        // R'ds(+)
203                        plan,
204                        &[(*ds, Delta::Inserts)],
205                    )?,
206                    new_plan(
207                        // dr(+)ds(-)
208                        plan,
209                        &[(*dr, Delta::Inserts), (*ds, Delta::Deletes)],
210                    )?,
211                    new_plan(
212                        // dr(-)ds(+)
213                        plan,
214                        &[(*dr, Delta::Deletes), (*ds, Delta::Inserts)],
215                    )?,
216                ],
217                delete_plans: vec![
218                    new_plan(
219                        // dr(-)S'
220                        plan,
221                        &[(*dr, Delta::Deletes)],
222                    )?,
223                    new_plan(
224                        // R'ds(-)
225                        plan,
226                        &[(*ds, Delta::Deletes)],
227                    )?,
228                    new_plan(
229                        // dr(+)ds(+)
230                        plan,
231                        &[(*dr, Delta::Inserts), (*ds, Delta::Inserts)],
232                    )?,
233                    new_plan(
234                        // dr(-)ds(-)
235                        plan,
236                        &[(*dr, Delta::Deletes), (*ds, Delta::Deletes)],
237                    )?,
238                ],
239            }),
240            _ => bail!("Invalid number of tables in subscription: {}", tables.len()),
241        }
242    }
243}
244
245/// Newtype wrapper for table names.
246///
247/// Uses an `Arc` internally, so `Clone` is cheap.
248#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
249pub struct TableName(Arc<str>);
250
251impl From<Arc<str>> for TableName {
252    fn from(name: Arc<str>) -> Self {
253        TableName(name)
254    }
255}
256
257impl From<Box<str>> for TableName {
258    fn from(name: Box<str>) -> Self {
259        TableName(name.into())
260    }
261}
262
263impl From<String> for TableName {
264    fn from(name: String) -> Self {
265        TableName(name.into())
266    }
267}
268
269impl std::ops::Deref for TableName {
270    type Target = str;
271    fn deref(&self) -> &Self::Target {
272        &self.0
273    }
274}
275
276impl TableName {
277    pub fn table_name_from_str(name: &str) -> Self {
278        TableName(name.into())
279    }
280}
281
282impl std::fmt::Display for TableName {
283    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
284        self.0.fmt(f)
285    }
286}
287
288/// A join edge is used for pruning queries when evaluating subscription updates.
289///
290/// If we have the following subscriptions:
291/// ```sql
292/// SELECT a.* FROM a JOIN b ON a.id = b.id WHERE b.x = 1
293/// SELECT a.* FROM a JOIN b ON a.id = b.id WHERE b.x = 2
294/// ...
295/// SELECT a.* FROM a JOIN b ON a.id = b.id WHERE b.x = n
296/// ```
297///
298/// Whenever `a` is updated, only the relevant queries are evaluated.
299#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
300pub struct JoinEdge {
301    /// The [`TableId`] for `a`
302    pub lhs_table: TableId,
303    /// The [`TableId`] for `b`
304    pub rhs_table: TableId,
305    /// The [`ColId`] for `a.id`
306    pub lhs_join_col: ColId,
307    /// The [`ColId`] for `b.id`
308    pub rhs_join_col: ColId,
309    /// The [`ColId`] for `b.x`
310    pub rhs_col: ColId,
311}
312
313impl JoinEdge {
314    /// A helper method for finding a range of join edges for a particular table in a sorted set.
315    fn min_for_table(lhs_table: TableId) -> Self {
316        Self {
317            lhs_table,
318            rhs_table: TableId(u32::MIN),
319            lhs_join_col: ColId(u16::MIN),
320            rhs_join_col: ColId(u16::MIN),
321            rhs_col: ColId(u16::MIN),
322        }
323    }
324
325    /// A helper method for finding a range of join edges for a particular table in a sorted set.
326    fn max_for_table(lhs_table: TableId) -> Self {
327        Self {
328            lhs_table,
329            rhs_table: TableId(u32::MAX),
330            lhs_join_col: ColId(u16::MAX),
331            rhs_join_col: ColId(u16::MAX),
332            rhs_col: ColId(u16::MAX),
333        }
334    }
335
336    /// A helper method for finding a range of join edges for a particular table in a sorted set.
337    pub fn range_for_table(lhs_table: TableId) -> impl RangeBounds<Self> {
338        Self::min_for_table(lhs_table)..=Self::max_for_table(lhs_table)
339    }
340}
341
342/// A subscription defines a view over a table
343#[derive(Debug)]
344pub struct SubscriptionPlan {
345    /// To which table are we subscribed?
346    return_id: TableId,
347    /// To which table are we subscribed?
348    return_name: TableName,
349    /// A subscription can read from multiple tables.
350    /// From which tables do we read?
351    table_ids: Vec<TableId>,
352    /// The plan fragments for updating the view
353    fragments: Fragments,
354    /// The optimized plan without any delta scans
355    plan_opt: ProjectPlan,
356}
357
358impl SubscriptionPlan {
359    /// Is this a plan for a join?
360    pub fn is_join(&self) -> bool {
361        self.fragments.insert_plans.len() > 1 && self.fragments.delete_plans.len() > 1
362    }
363
364    /// To which table does this plan subscribe?
365    pub fn subscribed_table_id(&self) -> TableId {
366        self.return_id
367    }
368
369    /// To which table does this plan subscribe?
370    pub fn subscribed_table_name(&self) -> &TableName {
371        &self.return_name
372    }
373
374    /// From which tables does this plan read?
375    pub fn table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
376        self.table_ids.iter().copied()
377    }
378
379    /// The optimized plan without any delta scans
380    pub fn optimized_physical_plan(&self) -> &ProjectPlan {
381        &self.plan_opt
382    }
383
384    /// From which indexes does this plan read?
385    pub fn index_ids(&self) -> impl Iterator<Item = (TableId, IndexId)> {
386        self.fragments.index_ids()
387    }
388
389    /// A subscription is just a view of a particular table.
390    /// Here we compute the rows that are to be inserted into that view,
391    /// and evaluate a closure over each one.
392    pub fn for_each_insert<'a, Tx: Datastore + DeltaStore>(
393        &self,
394        tx: &'a Tx,
395        metrics: &mut ExecutionMetrics,
396        f: &mut dyn FnMut(Row<'a>) -> Result<()>,
397    ) -> Result<()> {
398        self.fragments.for_each_insert(tx, metrics, f)
399    }
400
401    /// A subscription is just a view of a particular table.
402    /// Here we compute the rows that are to be removed from that view,
403    /// and evaluate a closure over each one.
404    pub fn for_each_delete<'a, Tx: Datastore + DeltaStore>(
405        &self,
406        tx: &'a Tx,
407        metrics: &mut ExecutionMetrics,
408        f: &mut dyn FnMut(Row<'a>) -> Result<()>,
409    ) -> Result<()> {
410        self.fragments.for_each_delete(tx, metrics, f)
411    }
412
413    /// Returns a join edge for this query if it has one.
414    ///
415    /// Requirements include:
416    /// 1. Unique join index
417    /// 2. Single column index lookup on the rhs table
418    /// 3. No self joins
419    pub fn join_edge(&self) -> Option<(JoinEdge, AlgebraicValue)> {
420        if !self.is_join() {
421            return None;
422        }
423        let mut join_edge = None;
424        self.plan_opt.visit(&mut |op| match op {
425            PhysicalPlan::IxJoin(
426                IxJoin {
427                    lhs,
428                    rhs,
429                    rhs_field: lhs_join_col,
430                    lhs_field:
431                        TupleField {
432                            field_pos: rhs_join_col,
433                            ..
434                        },
435                    ..
436                },
437                _,
438            ) if rhs.table_id == self.return_id => match &**lhs {
439                PhysicalPlan::IxScan(
440                    IxScan {
441                        schema,
442                        prefix,
443                        arg: Sarg::Eq(rhs_col, rhs_val),
444                        ..
445                    },
446                    _,
447                ) if schema.table_id != self.return_id
448                    && prefix.is_empty()
449                    && schema.is_unique(&ColList::new((*rhs_join_col).into())) =>
450                {
451                    let lhs_table = self.return_id;
452                    let rhs_table = schema.table_id;
453                    let rhs_col = *rhs_col;
454                    let rhs_val = rhs_val.clone();
455                    let lhs_join_col = *lhs_join_col;
456                    let rhs_join_col = (*rhs_join_col).into();
457                    let edge = JoinEdge {
458                        lhs_table,
459                        rhs_table,
460                        lhs_join_col,
461                        rhs_join_col,
462                        rhs_col,
463                    };
464                    join_edge = Some((edge, rhs_val));
465                }
466                _ => {}
467            },
468            _ => {}
469        });
470        join_edge
471    }
472
473    /// Generate a plan for incrementally maintaining a subscription
474    pub fn compile(sql: &str, tx: &impl SchemaView, auth: &AuthCtx) -> Result<(Vec<Self>, bool)> {
475        let (plans, return_id, return_name, has_param) = compile_subscription(sql, tx, auth)?;
476
477        /// Does this plan have any non-index joins?
478        fn has_non_index_join(plan: &PhysicalPlan) -> bool {
479            plan.any(&|op| matches!(op, PhysicalPlan::HashJoin(..) | PhysicalPlan::NLJoin(..)))
480        }
481
482        /// What tables are involved in this plan?
483        fn table_ids_for_plan(plan: &PhysicalPlan) -> (Vec<TableId>, Vec<Label>) {
484            let mut table_aliases = vec![];
485            let mut table_ids = vec![];
486            plan.visit(&mut |plan| match plan {
487                PhysicalPlan::TableScan(
488                    TableScan {
489                        // What table are we reading?
490                        schema,
491                        ..
492                    },
493                    alias,
494                )
495                | PhysicalPlan::IxScan(
496                    IxScan {
497                        // What table are we reading?
498                        schema,
499                        ..
500                    },
501                    alias,
502                ) => {
503                    table_aliases.push(*alias);
504                    table_ids.push(schema.table_id);
505                }
506                _ => {}
507            });
508            (table_ids, table_aliases)
509        }
510
511        let mut subscriptions = vec![];
512
513        let return_name = TableName::from(return_name);
514
515        for plan in plans {
516            let plan_opt = plan.clone().optimize()?;
517
518            if has_non_index_join(&plan_opt) {
519                bail!("Subscriptions require indexes on join columns")
520            }
521
522            let (table_ids, table_aliases) = table_ids_for_plan(&plan);
523
524            let fragments = Fragments::compile_from_plan(&plan, &table_aliases)?;
525
526            subscriptions.push(Self {
527                return_id,
528                return_name: return_name.clone(),
529                table_ids,
530                plan_opt,
531                fragments,
532            });
533        }
534
535        Ok((subscriptions, has_param))
536    }
537}