spacetimedb_subscription/lib.rs
1use anyhow::{bail, Result};
2use spacetimedb_execution::{
3 pipelined::{
4 PipelinedExecutor, PipelinedIxDeltaJoin, PipelinedIxDeltaScan, PipelinedIxJoin, PipelinedIxScan,
5 PipelinedProject,
6 },
7 Datastore, DeltaStore, Row,
8};
9use spacetimedb_expr::check::SchemaView;
10use spacetimedb_lib::{identity::AuthCtx, metrics::ExecutionMetrics, query::Delta, AlgebraicValue};
11use spacetimedb_physical_plan::plan::{IxJoin, IxScan, Label, PhysicalPlan, ProjectPlan, Sarg, TableScan, TupleField};
12use spacetimedb_primitives::{ColId, ColList, IndexId, TableId};
13use spacetimedb_query::compile_subscription;
14use std::sync::Arc;
15use std::{collections::HashSet, ops::RangeBounds};
16
17/// A subscription is a view over a particular table.
18/// How do we incrementally maintain that view?
19/// These are the query fragments that are required.
20/// See [Self::compile_from_plan] for how to generate them.
21#[derive(Debug)]
22struct Fragments {
23 /// Plan fragments that return rows to insert.
24 /// For joins there will be 4 fragments,
25 /// but for selects only one.
26 insert_plans: Vec<PipelinedProject>,
27 /// Plan fragments that return rows to delete.
28 /// For joins there will be 4 fragments,
29 /// but for selects only one.
30 delete_plans: Vec<PipelinedProject>,
31}
32
33impl Fragments {
34 /// Returns the index ids from which this fragment reads.
35 fn index_ids(&self) -> impl Iterator<Item = (TableId, IndexId)> {
36 let mut index_ids = HashSet::new();
37 for plan in self.insert_plans.iter().chain(self.delete_plans.iter()) {
38 plan.visit(&mut |plan| match plan {
39 PipelinedExecutor::IxScan(PipelinedIxScan { table_id, index_id, .. })
40 | PipelinedExecutor::IxDeltaScan(PipelinedIxDeltaScan { table_id, index_id, .. })
41 | PipelinedExecutor::IxJoin(PipelinedIxJoin {
42 rhs_table: table_id,
43 rhs_index: index_id,
44 ..
45 })
46 | PipelinedExecutor::IxDeltaJoin(PipelinedIxDeltaJoin {
47 rhs_table: table_id,
48 rhs_index: index_id,
49 ..
50 }) => {
51 index_ids.insert((*table_id, *index_id));
52 }
53 _ => {}
54 });
55 }
56 index_ids.into_iter()
57 }
58
59 /// A subscription is just a view of a particular table.
60 /// Here we compute the rows that are to be inserted into that view,
61 /// and evaluate a closure over each one.
62 fn for_each_insert<'a, Tx: Datastore + DeltaStore>(
63 &self,
64 tx: &'a Tx,
65 metrics: &mut ExecutionMetrics,
66 f: &mut dyn FnMut(Row<'a>) -> Result<()>,
67 ) -> Result<()> {
68 for plan in &self.insert_plans {
69 if !plan.is_empty(tx) {
70 plan.execute(tx, metrics, f)?;
71 }
72 }
73 Ok(())
74 }
75
76 /// A subscription is just a view of a particular table.
77 /// Here we compute the rows that are to be removed from that view,
78 /// and evaluate a closure over each one.
79 fn for_each_delete<'a, Tx: Datastore + DeltaStore>(
80 &self,
81 tx: &'a Tx,
82 metrics: &mut ExecutionMetrics,
83 f: &mut dyn FnMut(Row<'a>) -> Result<()>,
84 ) -> Result<()> {
85 for plan in &self.delete_plans {
86 if !plan.is_empty(tx) {
87 plan.execute(tx, metrics, f)?;
88 }
89 }
90 Ok(())
91 }
92
93 /// Which fragments are required for incrementally updating a subscription?
94 /// This is most interesting in the case of a join.
95 ///
96 /// Let `V` denote the join between tables `R` and `S` at time `t`.
97 /// Let `V'` denote the same join at time `t+1`.
98 ///
99 /// We then have the following equality
100 ///
101 /// ```text
102 /// V' = V U dv
103 /// ```
104 ///
105 /// where `dv` is called the delta of `V`.
106 ///
107 /// So how do we compute `dv` incrementally?
108 /// That is, without evaluating `R' x S'`.
109 /// and without access to the state at time `t`.
110 ///
111 /// Given the following notation:
112 ///
113 /// ```text
114 /// x: The relational join operator
115 /// U: union
116 /// -: difference
117 ///
118 /// dv: The difference or delta between V and V'
119 ///
120 /// dv(+): Rows in V' that are not in V
121 /// dv(-): Rows in V that are not in V'
122 /// ```
123 ///
124 /// we derive the following equations
125 ///
126 /// ```text
127 /// V = R x S
128 /// = RS
129 ///
130 /// V' = V U dv
131 /// = RS U dv
132 ///
133 /// V' = R' x S'
134 /// = (R U dr) x (S U ds)
135 /// = RS U Rds U drS U drds
136 ///
137 /// dv = Rds U drS U drds
138 /// = (R' - dr)ds U dr(S' - ds) U drds
139 /// = R'ds - drds U drS' - drds U drds
140 /// = R'ds U drS' - drds
141 /// = R'(ds(+) - ds(-)) U (dr(+) - dr(-))S' - (dr(+) - dr(-))(ds(+) - ds(-))
142 /// = R'ds(+)
143 /// - R'ds(-)
144 /// U dr(+)S'
145 /// - dr(-)S'
146 /// - dr(+)ds(+)
147 /// U dr(+)ds(-)
148 /// U dr(-)ds(+)
149 /// - dr(-)ds(-)
150 /// = R'ds(+)
151 /// U dr(+)S'
152 /// U dr(+)ds(-)
153 /// U dr(-)ds(+)
154 /// - R'ds(-)
155 /// - dr(-)S'
156 /// - dr(+)ds(+)
157 /// - dr(-)ds(-)
158 ///
159 /// dv(+) = R'ds(+) U dr(+)S' U dr(+)ds(-) U dr(-)ds(+)
160 /// dv(-) = R'ds(-) U dr(-)S' U dr(+)ds(+) U dr(-)ds(-)
161 /// ```
162 fn compile_from_plan(plan: &ProjectPlan, tables: &[Label]) -> Result<Self> {
163 /// Mutate a query plan by turning a table scan into a delta scan
164 fn mut_plan(plan: &mut ProjectPlan, relvar: Label, delta: Delta) {
165 plan.visit_mut(&mut |plan| match plan {
166 PhysicalPlan::TableScan(
167 scan @ TableScan {
168 limit: None,
169 delta: None,
170 ..
171 },
172 alias,
173 ) if alias == &relvar => {
174 scan.delta = Some(delta);
175 }
176 _ => {}
177 });
178 }
179
180 /// Return a new plan with delta scans for the given tables
181 fn new_plan(plan: &ProjectPlan, tables: &[(Label, Delta)]) -> Result<PipelinedProject> {
182 let mut plan = plan.clone();
183 for (alias, delta) in tables {
184 mut_plan(&mut plan, *alias, *delta);
185 }
186 plan.optimize().map(PipelinedProject::from)
187 }
188
189 match tables {
190 [dr] => Ok(Fragments {
191 insert_plans: vec![new_plan(plan, &[(*dr, Delta::Inserts)])?],
192 delete_plans: vec![new_plan(plan, &[(*dr, Delta::Deletes)])?],
193 }),
194 [dr, ds] => Ok(Fragments {
195 insert_plans: vec![
196 new_plan(
197 // dr(+)S'
198 plan,
199 &[(*dr, Delta::Inserts)],
200 )?,
201 new_plan(
202 // R'ds(+)
203 plan,
204 &[(*ds, Delta::Inserts)],
205 )?,
206 new_plan(
207 // dr(+)ds(-)
208 plan,
209 &[(*dr, Delta::Inserts), (*ds, Delta::Deletes)],
210 )?,
211 new_plan(
212 // dr(-)ds(+)
213 plan,
214 &[(*dr, Delta::Deletes), (*ds, Delta::Inserts)],
215 )?,
216 ],
217 delete_plans: vec![
218 new_plan(
219 // dr(-)S'
220 plan,
221 &[(*dr, Delta::Deletes)],
222 )?,
223 new_plan(
224 // R'ds(-)
225 plan,
226 &[(*ds, Delta::Deletes)],
227 )?,
228 new_plan(
229 // dr(+)ds(+)
230 plan,
231 &[(*dr, Delta::Inserts), (*ds, Delta::Inserts)],
232 )?,
233 new_plan(
234 // dr(-)ds(-)
235 plan,
236 &[(*dr, Delta::Deletes), (*ds, Delta::Deletes)],
237 )?,
238 ],
239 }),
240 _ => bail!("Invalid number of tables in subscription: {}", tables.len()),
241 }
242 }
243}
244
245/// Newtype wrapper for table names.
246///
247/// Uses an `Arc` internally, so `Clone` is cheap.
248#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
249pub struct TableName(Arc<str>);
250
251impl From<Arc<str>> for TableName {
252 fn from(name: Arc<str>) -> Self {
253 TableName(name)
254 }
255}
256
257impl From<Box<str>> for TableName {
258 fn from(name: Box<str>) -> Self {
259 TableName(name.into())
260 }
261}
262
263impl From<String> for TableName {
264 fn from(name: String) -> Self {
265 TableName(name.into())
266 }
267}
268
269impl std::ops::Deref for TableName {
270 type Target = str;
271 fn deref(&self) -> &Self::Target {
272 &self.0
273 }
274}
275
276impl TableName {
277 pub fn table_name_from_str(name: &str) -> Self {
278 TableName(name.into())
279 }
280}
281
282impl std::fmt::Display for TableName {
283 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
284 self.0.fmt(f)
285 }
286}
287
288/// A join edge is used for pruning queries when evaluating subscription updates.
289///
290/// If we have the following subscriptions:
291/// ```sql
292/// SELECT a.* FROM a JOIN b ON a.id = b.id WHERE b.x = 1
293/// SELECT a.* FROM a JOIN b ON a.id = b.id WHERE b.x = 2
294/// ...
295/// SELECT a.* FROM a JOIN b ON a.id = b.id WHERE b.x = n
296/// ```
297///
298/// Whenever `a` is updated, only the relevant queries are evaluated.
299#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
300pub struct JoinEdge {
301 /// The [`TableId`] for `a`
302 pub lhs_table: TableId,
303 /// The [`TableId`] for `b`
304 pub rhs_table: TableId,
305 /// The [`ColId`] for `a.id`
306 pub lhs_join_col: ColId,
307 /// The [`ColId`] for `b.id`
308 pub rhs_join_col: ColId,
309 /// The [`ColId`] for `b.x`
310 pub rhs_col: ColId,
311}
312
313impl JoinEdge {
314 /// A helper method for finding a range of join edges for a particular table in a sorted set.
315 fn min_for_table(lhs_table: TableId) -> Self {
316 Self {
317 lhs_table,
318 rhs_table: TableId(u32::MIN),
319 lhs_join_col: ColId(u16::MIN),
320 rhs_join_col: ColId(u16::MIN),
321 rhs_col: ColId(u16::MIN),
322 }
323 }
324
325 /// A helper method for finding a range of join edges for a particular table in a sorted set.
326 fn max_for_table(lhs_table: TableId) -> Self {
327 Self {
328 lhs_table,
329 rhs_table: TableId(u32::MAX),
330 lhs_join_col: ColId(u16::MAX),
331 rhs_join_col: ColId(u16::MAX),
332 rhs_col: ColId(u16::MAX),
333 }
334 }
335
336 /// A helper method for finding a range of join edges for a particular table in a sorted set.
337 pub fn range_for_table(lhs_table: TableId) -> impl RangeBounds<Self> {
338 Self::min_for_table(lhs_table)..=Self::max_for_table(lhs_table)
339 }
340}
341
342/// A subscription defines a view over a table
343#[derive(Debug)]
344pub struct SubscriptionPlan {
345 /// To which table are we subscribed?
346 return_id: TableId,
347 /// To which table are we subscribed?
348 return_name: TableName,
349 /// A subscription can read from multiple tables.
350 /// From which tables do we read?
351 table_ids: Vec<TableId>,
352 /// The plan fragments for updating the view
353 fragments: Fragments,
354 /// The optimized plan without any delta scans
355 plan_opt: ProjectPlan,
356}
357
358impl SubscriptionPlan {
359 /// Is this a plan for a join?
360 pub fn is_join(&self) -> bool {
361 self.fragments.insert_plans.len() > 1 && self.fragments.delete_plans.len() > 1
362 }
363
364 /// To which table does this plan subscribe?
365 pub fn subscribed_table_id(&self) -> TableId {
366 self.return_id
367 }
368
369 /// To which table does this plan subscribe?
370 pub fn subscribed_table_name(&self) -> &TableName {
371 &self.return_name
372 }
373
374 /// From which tables does this plan read?
375 pub fn table_ids(&self) -> impl Iterator<Item = TableId> + '_ {
376 self.table_ids.iter().copied()
377 }
378
379 /// The optimized plan without any delta scans
380 pub fn optimized_physical_plan(&self) -> &ProjectPlan {
381 &self.plan_opt
382 }
383
384 /// From which indexes does this plan read?
385 pub fn index_ids(&self) -> impl Iterator<Item = (TableId, IndexId)> {
386 self.fragments.index_ids()
387 }
388
389 /// A subscription is just a view of a particular table.
390 /// Here we compute the rows that are to be inserted into that view,
391 /// and evaluate a closure over each one.
392 pub fn for_each_insert<'a, Tx: Datastore + DeltaStore>(
393 &self,
394 tx: &'a Tx,
395 metrics: &mut ExecutionMetrics,
396 f: &mut dyn FnMut(Row<'a>) -> Result<()>,
397 ) -> Result<()> {
398 self.fragments.for_each_insert(tx, metrics, f)
399 }
400
401 /// A subscription is just a view of a particular table.
402 /// Here we compute the rows that are to be removed from that view,
403 /// and evaluate a closure over each one.
404 pub fn for_each_delete<'a, Tx: Datastore + DeltaStore>(
405 &self,
406 tx: &'a Tx,
407 metrics: &mut ExecutionMetrics,
408 f: &mut dyn FnMut(Row<'a>) -> Result<()>,
409 ) -> Result<()> {
410 self.fragments.for_each_delete(tx, metrics, f)
411 }
412
413 /// Returns a join edge for this query if it has one.
414 ///
415 /// Requirements include:
416 /// 1. Unique join index
417 /// 2. Single column index lookup on the rhs table
418 /// 3. No self joins
419 pub fn join_edge(&self) -> Option<(JoinEdge, AlgebraicValue)> {
420 if !self.is_join() {
421 return None;
422 }
423 let mut join_edge = None;
424 self.plan_opt.visit(&mut |op| match op {
425 PhysicalPlan::IxJoin(
426 IxJoin {
427 lhs,
428 rhs,
429 rhs_field: lhs_join_col,
430 lhs_field:
431 TupleField {
432 field_pos: rhs_join_col,
433 ..
434 },
435 ..
436 },
437 _,
438 ) if rhs.table_id == self.return_id => match &**lhs {
439 PhysicalPlan::IxScan(
440 IxScan {
441 schema,
442 prefix,
443 arg: Sarg::Eq(rhs_col, rhs_val),
444 ..
445 },
446 _,
447 ) if schema.table_id != self.return_id
448 && prefix.is_empty()
449 && schema.is_unique(&ColList::new((*rhs_join_col).into())) =>
450 {
451 let lhs_table = self.return_id;
452 let rhs_table = schema.table_id;
453 let rhs_col = *rhs_col;
454 let rhs_val = rhs_val.clone();
455 let lhs_join_col = *lhs_join_col;
456 let rhs_join_col = (*rhs_join_col).into();
457 let edge = JoinEdge {
458 lhs_table,
459 rhs_table,
460 lhs_join_col,
461 rhs_join_col,
462 rhs_col,
463 };
464 join_edge = Some((edge, rhs_val));
465 }
466 _ => {}
467 },
468 _ => {}
469 });
470 join_edge
471 }
472
473 /// Generate a plan for incrementally maintaining a subscription
474 pub fn compile(sql: &str, tx: &impl SchemaView, auth: &AuthCtx) -> Result<(Vec<Self>, bool)> {
475 let (plans, return_id, return_name, has_param) = compile_subscription(sql, tx, auth)?;
476
477 /// Does this plan have any non-index joins?
478 fn has_non_index_join(plan: &PhysicalPlan) -> bool {
479 plan.any(&|op| matches!(op, PhysicalPlan::HashJoin(..) | PhysicalPlan::NLJoin(..)))
480 }
481
482 /// What tables are involved in this plan?
483 fn table_ids_for_plan(plan: &PhysicalPlan) -> (Vec<TableId>, Vec<Label>) {
484 let mut table_aliases = vec![];
485 let mut table_ids = vec![];
486 plan.visit(&mut |plan| match plan {
487 PhysicalPlan::TableScan(
488 TableScan {
489 // What table are we reading?
490 schema,
491 ..
492 },
493 alias,
494 )
495 | PhysicalPlan::IxScan(
496 IxScan {
497 // What table are we reading?
498 schema,
499 ..
500 },
501 alias,
502 ) => {
503 table_aliases.push(*alias);
504 table_ids.push(schema.table_id);
505 }
506 _ => {}
507 });
508 (table_ids, table_aliases)
509 }
510
511 let mut subscriptions = vec![];
512
513 let return_name = TableName::from(return_name);
514
515 for plan in plans {
516 let plan_opt = plan.clone().optimize()?;
517
518 if has_non_index_join(&plan_opt) {
519 bail!("Subscriptions require indexes on join columns")
520 }
521
522 let (table_ids, table_aliases) = table_ids_for_plan(&plan);
523
524 let fragments = Fragments::compile_from_plan(&plan, &table_aliases)?;
525
526 subscriptions.push(Self {
527 return_id,
528 return_name: return_name.clone(),
529 table_ids,
530 plan_opt,
531 fragments,
532 });
533 }
534
535 Ok((subscriptions, has_param))
536 }
537}