1use crate::db::relational_db::{RelationalDB, Tx};
2use crate::error::{DBError, SubscriptionError};
3use crate::sql::ast::SchemaViewer;
4use crate::sql::compiler::compile_sql;
5use crate::subscription::subscription::SupportedQuery;
6use once_cell::sync::Lazy;
7use regex::Regex;
8use spacetimedb_lib::identity::AuthCtx;
9use spacetimedb_subscription::SubscriptionPlan;
10use spacetimedb_vm::expr::{self, Crud, CrudExpr, QueryExpr};
11
12use super::execution_unit::QueryHash;
13use super::module_subscription_manager::Plan;
14
15static WHITESPACE: Lazy<Regex> = Lazy::new(|| Regex::new(r"^\s*$").unwrap());
16static SUBSCRIBE_TO_ALL_TABLES_REGEX: Lazy<Regex> =
17 Lazy::new(|| Regex::new(r"^\s*(?i)\bSELECT\s+\*\s+FROM\s+\*\s*$").unwrap());
18
19pub fn is_whitespace_or_empty(sql: &str) -> bool {
21 WHITESPACE.is_match_at(sql, 0)
22}
23
24pub fn is_subscribe_to_all_tables(sql: &str) -> bool {
26 SUBSCRIBE_TO_ALL_TABLES_REGEX.is_match_at(sql, 0)
27}
28
29pub fn compile_read_only_queryset(
41 relational_db: &RelationalDB,
42 auth: &AuthCtx,
43 tx: &Tx,
44 input: &str,
45) -> Result<Vec<SupportedQuery>, DBError> {
46 let input = input.trim();
47 if input.is_empty() {
48 return Err(SubscriptionError::Empty.into());
49 }
50
51 let input = WHITESPACE.replace_all(input, " ");
53
54 let compiled = compile_sql(relational_db, auth, tx, &input)?;
55 let mut queries = Vec::with_capacity(compiled.len());
56 for q in compiled {
57 return Err(SubscriptionError::SideEffect(match q {
58 CrudExpr::Query(x) => {
59 queries.push(x);
60 continue;
61 }
62 CrudExpr::Insert { .. } => Crud::Insert,
63 CrudExpr::Update { .. } => Crud::Update,
64 CrudExpr::Delete { .. } => Crud::Delete,
65 CrudExpr::SetVar { .. } => Crud::Config,
66 CrudExpr::ReadVar { .. } => Crud::Config,
67 })
68 .into());
69 }
70
71 if !queries.is_empty() {
72 Ok(queries
73 .into_iter()
74 .map(|query| SupportedQuery::new(query, input.to_string()))
75 .collect::<Result<_, _>>()?)
76 } else {
77 Err(SubscriptionError::Empty.into())
78 }
79}
80
81pub fn compile_read_only_query(auth: &AuthCtx, tx: &Tx, input: &str) -> Result<Plan, DBError> {
84 if is_whitespace_or_empty(input) {
85 return Err(SubscriptionError::Empty.into());
86 }
87
88 let tx = SchemaViewer::new(tx, auth);
89 let (plans, has_param) = SubscriptionPlan::compile(input, &tx, auth)?;
90 let hash = QueryHash::from_string(input, auth.caller, has_param);
91 Ok(Plan::new(plans, hash, input.to_owned()))
92}
93
94pub fn compile_query_with_hashes(
97 auth: &AuthCtx,
98 tx: &Tx,
99 input: &str,
100 hash: QueryHash,
101 hash_with_param: QueryHash,
102) -> Result<Plan, DBError> {
103 if is_whitespace_or_empty(input) {
104 return Err(SubscriptionError::Empty.into());
105 }
106
107 let tx = SchemaViewer::new(tx, auth);
108 let (plans, has_param) = SubscriptionPlan::compile(input, &tx, auth)?;
109
110 if auth.is_owner() || has_param {
111 return Ok(Plan::new(plans, hash_with_param, input.to_owned()));
116 }
117 Ok(Plan::new(plans, hash, input.to_owned()))
118}
119
120#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd, Hash)]
122pub enum Supported {
123 Select,
125 Semijoin,
129}
130
131pub fn classify(expr: &QueryExpr) -> Option<Supported> {
134 use expr::Query::*;
135 if matches!(&*expr.query, [IndexJoin(_)]) {
136 return Some(Supported::Semijoin);
137 }
138 for op in &expr.query {
139 if let JoinInner(_) = op {
140 return None;
141 }
142 }
143 Some(Supported::Select)
144}
145
146#[cfg(test)]
147mod tests {
148 use super::*;
149 use crate::db::relational_db::tests_utils::{
150 begin_mut_tx, begin_tx, insert, with_auto_commit, with_read_only, TestDB,
151 };
152 use crate::db::relational_db::MutTx;
153 use crate::host::module_host::{DatabaseTableUpdate, DatabaseUpdate, UpdatesRelValue};
154 use crate::sql::execute::collect_result;
155 use crate::sql::execute::tests::run_for_testing;
156 use crate::subscription::module_subscription_manager::QueriedTableIndexIds;
157 use crate::subscription::subscription::{legacy_get_all, ExecutionSet};
158 use crate::subscription::tx::DeltaTx;
159 use crate::vm::tests::create_table_with_rows;
160 use crate::vm::DbProgram;
161 use itertools::Itertools;
162 use spacetimedb_client_api_messages::websocket::{BsatnFormat, Compression};
163 use spacetimedb_datastore::execution_context::Workload;
164 use spacetimedb_lib::bsatn;
165 use spacetimedb_lib::db::auth::{StAccess, StTableType};
166 use spacetimedb_lib::error::ResultTest;
167 use spacetimedb_lib::identity::AuthCtx;
168 use spacetimedb_lib::metrics::ExecutionMetrics;
169 use spacetimedb_lib::Identity;
170 use spacetimedb_primitives::{ColId, TableId};
171 use spacetimedb_sats::{product, AlgebraicType, ProductType, ProductValue};
172 use spacetimedb_schema::relation::FieldName;
173 use spacetimedb_schema::schema::*;
174 use spacetimedb_vm::eval::run_ast;
175 use spacetimedb_vm::eval::test_helpers::{mem_table, mem_table_without_table_name, scalar};
176 use spacetimedb_vm::expr::{Expr, SourceSet};
177 use spacetimedb_vm::operator::OpCmp;
178 use spacetimedb_vm::relation::{MemTable, RelValue};
179 use std::collections::HashMap;
180 use std::sync::Arc;
181
182 fn run_query<const N: usize>(
184 db: &RelationalDB,
185 tx: &Tx,
186 query: &QueryExpr,
187 auth: AuthCtx,
188 sources: SourceSet<Vec<ProductValue>, N>,
189 ) -> Result<Vec<MemTable>, DBError> {
190 let mut tx = tx.into();
191 let p = &mut DbProgram::new(db, &mut tx, auth);
192 let q = Expr::Crud(Box::new(CrudExpr::Query(query.clone())));
193
194 let mut result = Vec::with_capacity(1);
195 let mut updates = Vec::new();
196 collect_result(&mut result, &mut updates, run_ast(p, q, sources).into())?;
197 Ok(result)
198 }
199
200 fn insert_op(table_id: TableId, table_name: &str, row: ProductValue) -> DatabaseTableUpdate {
201 DatabaseTableUpdate {
202 table_id,
203 table_name: table_name.into(),
204 deletes: [].into(),
205 inserts: [row].into(),
206 }
207 }
208
209 fn delete_op(table_id: TableId, table_name: &str, row: ProductValue) -> DatabaseTableUpdate {
210 DatabaseTableUpdate {
211 table_id,
212 table_name: table_name.into(),
213 deletes: [row].into(),
214 inserts: [].into(),
215 }
216 }
217
218 fn insert_row(db: &RelationalDB, tx: &mut MutTx, table_id: TableId, row: ProductValue) -> ResultTest<()> {
219 insert(db, tx, table_id, &row)?;
220 Ok(())
221 }
222
223 fn delete_row(db: &RelationalDB, tx: &mut MutTx, table_id: TableId, row: ProductValue) {
224 db.delete_by_rel(tx, table_id, [row]);
225 }
226
227 fn make_data(
228 db: &RelationalDB,
229 tx: &mut MutTx,
230 table_name: &str,
231 head: &ProductType,
232 row: &ProductValue,
233 access: StAccess,
234 ) -> ResultTest<(Arc<TableSchema>, MemTable, DatabaseTableUpdate, QueryExpr)> {
235 let schema = create_table_with_rows(db, tx, table_name, head.clone(), &[row.clone()], access)?;
236 let table = mem_table(schema.table_id, schema.get_row_type().clone(), [row.clone()]);
237
238 let data = DatabaseTableUpdate {
239 table_id: schema.table_id,
240 table_name: table_name.into(),
241 deletes: [].into(),
242 inserts: [row.clone()].into(),
243 };
244
245 let q = QueryExpr::new(&*schema);
246
247 Ok((schema, table, data, q))
248 }
249
250 fn make_inv(
251 db: &RelationalDB,
252 tx: &mut MutTx,
253 access: StAccess,
254 ) -> ResultTest<(Arc<TableSchema>, MemTable, DatabaseTableUpdate, QueryExpr)> {
255 let head = ProductType::from([("inventory_id", AlgebraicType::U64), ("name", AlgebraicType::String)]);
256 let row = product!(1u64, "health");
257
258 let (schema, table, data, q) = make_data(db, tx, "inventory", &head, &row, access)?;
259
260 let fields = &[0, 1].map(|c| FieldName::new(schema.table_id, c.into()).into());
261 let q = q.with_project(fields.into(), None).unwrap();
262
263 Ok((schema, table, data, q))
264 }
265
266 fn make_player(
267 db: &RelationalDB,
268 tx: &mut MutTx,
269 ) -> ResultTest<(Arc<TableSchema>, MemTable, DatabaseTableUpdate, QueryExpr)> {
270 let table_name = "player";
271 let head = ProductType::from([("player_id", AlgebraicType::U64), ("name", AlgebraicType::String)]);
272 let row = product!(2u64, "jhon doe");
273
274 let (schema, table, data, q) = make_data(db, tx, table_name, &head, &row, StAccess::Public)?;
275
276 let fields = [0, 1].map(|c| FieldName::new(schema.table_id, c.into()).into());
277 let q = q.with_project(fields.into(), None).unwrap();
278
279 Ok((schema, table, data, q))
280 }
281
282 fn query_to_mem_table(
285 mut of: QueryExpr,
286 data: &DatabaseTableUpdate,
287 ) -> (QueryExpr, SourceSet<Vec<ProductValue>, 1>) {
288 let data = data.deletes.iter().chain(data.inserts.iter()).cloned().collect();
289 let mem_table = MemTable::new(of.head().clone(), of.source.table_access(), data);
290 let mut sources = SourceSet::empty();
291 of.source = sources.add_mem_table(mem_table);
292 (of, sources)
293 }
294
295 fn check_query(
296 db: &RelationalDB,
297 table: &MemTable,
298 tx: &Tx,
299 q: &QueryExpr,
300 data: &DatabaseTableUpdate,
301 ) -> ResultTest<()> {
302 let (q, sources) = query_to_mem_table(q.clone(), data);
303 let result = run_query(db, tx, &q, AuthCtx::for_testing(), sources)?;
304
305 assert_eq!(
306 Some(mem_table_without_table_name(table)),
307 result.first().map(mem_table_without_table_name)
308 );
309
310 Ok(())
311 }
312
313 fn check_query_incr(
314 db: &RelationalDB,
315 tx: &Tx,
316 s: &ExecutionSet,
317 update: &DatabaseUpdate,
318 total_tables: usize,
319 rows: &[ProductValue],
320 ) -> ResultTest<()> {
321 let tx = &tx.into();
322 let update = update.tables.iter().collect::<Vec<_>>();
323 let result = s.eval_incr_for_test(db, tx, &update, None);
324 assert_eq!(
325 result.tables.len(),
326 total_tables,
327 "Must return the correct number of tables: {result:#?}"
328 );
329
330 let result = result
331 .tables
332 .iter()
333 .map(|u| &u.updates)
334 .flat_map(|u| {
335 u.deletes
336 .iter()
337 .chain(&*u.inserts)
338 .map(|rv| rv.clone().into_product_value())
339 .collect::<Vec<_>>()
340 })
341 .sorted()
342 .collect::<Vec<_>>();
343
344 assert_eq!(result, rows, "Must return the correct row(s)");
345
346 Ok(())
347 }
348
349 fn check_query_eval(
350 db: &RelationalDB,
351 tx: &Tx,
352 s: &ExecutionSet,
353 total_tables: usize,
354 rows: &[ProductValue],
355 ) -> ResultTest<()> {
356 let result = s.eval::<BsatnFormat>(db, tx, None, Compression::Brotli).tables;
357 assert_eq!(
358 result.len(),
359 total_tables,
360 "Must return the correct number of tables: {result:#?}"
361 );
362
363 let result = result
364 .into_iter()
365 .flat_map(|x| x.updates)
366 .map(|x| x.maybe_decompress())
367 .flat_map(|x| {
368 (&x.deletes)
369 .into_iter()
370 .chain(&x.inserts)
371 .map(|x| x.to_owned())
372 .collect::<Vec<_>>()
373 })
374 .sorted()
375 .collect_vec();
376
377 let rows = rows.iter().map(|r| bsatn::to_vec(r).unwrap()).collect_vec();
378
379 assert_eq!(result, rows, "Must return the correct row(s)");
380
381 Ok(())
382 }
383
384 fn singleton_execution_set(expr: QueryExpr, sql: String) -> ResultTest<ExecutionSet> {
385 Ok(ExecutionSet::from_iter([SupportedQuery::try_from((expr, sql))?]))
386 }
387
388 #[test]
389 fn test_whitespace_regex() -> ResultTest<()> {
390 assert!(is_whitespace_or_empty(""));
391 assert!(is_whitespace_or_empty(" "));
392 assert!(is_whitespace_or_empty("\n \t"));
393 assert!(!is_whitespace_or_empty(" a"));
394 Ok(())
395 }
396
397 #[test]
398 fn test_subscribe_to_all_tables_regex() -> ResultTest<()> {
399 assert!(is_subscribe_to_all_tables("SELECT * FROM *"));
400 assert!(is_subscribe_to_all_tables("Select * From *"));
401 assert!(is_subscribe_to_all_tables("select * from *"));
402 assert!(is_subscribe_to_all_tables("\nselect *\nfrom * "));
403 assert!(!is_subscribe_to_all_tables("select * from * where"));
404 Ok(())
405 }
406
407 #[test]
408 fn test_compile_incr_plan() -> ResultTest<()> {
409 let db = TestDB::durable()?;
410
411 let schema = &[("n", AlgebraicType::U64), ("data", AlgebraicType::U64)];
412 let indexes = &[0.into()];
413 db.create_table_for_test("a", schema, indexes)?;
414 db.create_table_for_test("b", schema, indexes)?;
415
416 let tx = begin_tx(&db);
417 let sql = "SELECT b.* FROM b JOIN a ON b.n = a.n WHERE b.data > 200";
418 let result = compile_read_only_query(&AuthCtx::for_testing(), &tx, sql);
419 assert!(result.is_ok());
420 Ok(())
421 }
422
423 #[test]
424 fn test_eval_incr_for_index_scan() -> ResultTest<()> {
425 let db = TestDB::durable()?;
426
427 let schema = &[("a", AlgebraicType::U64), ("b", AlgebraicType::U64)];
429 let indexes = &[1.into()];
430 let table_id = db.create_table_for_test("test", schema, indexes)?;
431
432 let mut tx = begin_mut_tx(&db);
433 let mut deletes = Vec::new();
434 for i in 0u64..9u64 {
435 insert(&db, &mut tx, table_id, &product!(i, i))?;
436 deletes.push(product!(i + 10, i))
437 }
438
439 let update = DatabaseUpdate {
440 tables: vec![DatabaseTableUpdate {
441 table_id,
442 table_name: "test".into(),
443 deletes: deletes.into(),
444 inserts: [].into(),
445 }],
446 };
447
448 db.commit_tx(tx)?;
449 let tx = begin_tx(&db);
450
451 let sql = "select * from test where b = 3";
452 let mut exp = compile_sql(&db, &AuthCtx::for_testing(), &tx, sql)?;
453
454 let Some(CrudExpr::Query(query)) = exp.pop() else {
455 panic!("unexpected query {:#?}", exp[0]);
456 };
457
458 let query: ExecutionSet = singleton_execution_set(query, sql.into())?;
459
460 let tx = (&tx).into();
461 let update = update.tables.iter().collect::<Vec<_>>();
462 let result = query.eval_incr_for_test(&db, &tx, &update, None);
463
464 assert_eq!(result.tables.len(), 1);
465
466 let update = &result.tables[0].updates;
467
468 assert_eq!(update.inserts.len(), 0);
469 assert_eq!(update.deletes.len(), 1);
470
471 let op = &update.deletes[0];
472
473 assert_eq!(op.clone().into_product_value(), product!(13u64, 3u64));
474 Ok(())
475 }
476
477 #[test]
478 fn test_subscribe() -> ResultTest<()> {
479 let db = TestDB::durable()?;
480
481 let mut tx = begin_mut_tx(&db);
482
483 let (schema, table, data, q) = make_inv(&db, &mut tx, StAccess::Public)?;
484 db.commit_tx(tx)?;
485 assert_eq!(schema.table_type, StTableType::User);
486 assert_eq!(schema.table_access, StAccess::Public);
487
488 let tx = begin_tx(&db);
489 let q_1 = q.clone();
490 check_query(&db, &table, &tx, &q_1, &data)?;
491
492 let q_2 = q
493 .with_select_cmp(OpCmp::Eq, FieldName::new(schema.table_id, 0.into()), scalar(1u64))
494 .unwrap();
495 check_query(&db, &table, &tx, &q_2, &data)?;
496
497 Ok(())
498 }
499
500 #[test]
501 fn test_subscribe_private() -> ResultTest<()> {
502 let db = TestDB::durable()?;
503
504 let mut tx = begin_mut_tx(&db);
505
506 let (schema, table, data, q) = make_inv(&db, &mut tx, StAccess::Private)?;
507 db.commit_tx(tx)?;
508 assert_eq!(schema.table_type, StTableType::User);
509 assert_eq!(schema.table_access, StAccess::Private);
510
511 let row = product!(1u64, "health");
512 let tx = begin_tx(&db);
513 check_query(&db, &table, &tx, &q, &data)?;
514
515 let q_id = QueryExpr::new(&*schema)
517 .with_select_cmp(OpCmp::Eq, FieldName::new(schema.table_id, 0.into()), scalar(1u64))
518 .unwrap();
519
520 let s = singleton_execution_set(q_id, "SELECT * FROM inventory WHERE inventory_id = 1".into())?;
521
522 let data = DatabaseTableUpdate {
523 table_id: schema.table_id,
524 table_name: "inventory".into(),
525 deletes: [].into(),
526 inserts: [row.clone()].into(),
527 };
528
529 let update = DatabaseUpdate {
530 tables: vec![data.clone()],
531 };
532
533 check_query_incr(&db, &tx, &s, &update, 1, &[row])?;
534
535 let q = QueryExpr::new(&*schema);
536
537 let (q, sources) = query_to_mem_table(q, &data);
538 match run_query(
540 &db,
541 &tx,
542 &q,
543 AuthCtx::new(Identity::__dummy(), Identity::from_byte_array([1u8; 32])),
544 sources,
545 ) {
546 Ok(_) => {
547 panic!("it allows to execute against private table")
548 }
549 Err(err) => {
550 if err.get_auth_error().is_none() {
551 panic!("fail to report an `auth` violation for private table, it gets {err}")
552 }
553 }
554 }
555
556 Ok(())
557 }
558
559 #[test]
560 fn test_subscribe_sql() -> ResultTest<()> {
561 let db = TestDB::durable()?;
562
563 let schema = &[
565 ("entity_id", AlgebraicType::U64),
566 ("location_x", AlgebraicType::I32),
567 ("location_z", AlgebraicType::I32),
568 ("destination_x", AlgebraicType::I32),
569 ("destination_z", AlgebraicType::I32),
570 ("is_running", AlgebraicType::Bool),
571 ("timestamp", AlgebraicType::U64),
572 ("dimension", AlgebraicType::U32),
573 ];
574 let indexes = &[0.into(), 1.into(), 2.into()];
575 db.create_table_for_test("MobileEntityState", schema, indexes)?;
576
577 let schema = &[
579 ("entity_id", AlgebraicType::U64),
580 ("herd_id", AlgebraicType::I32),
581 ("status", AlgebraicType::I32),
582 ("type", AlgebraicType::I32),
583 ("direction", AlgebraicType::I32),
584 ];
585 let indexes = &[0.into()];
586 db.create_table_for_test("EnemyState", schema, indexes)?;
587
588 for sql_insert in [
589 "insert into MobileEntityState (entity_id, location_x, location_z, destination_x, destination_z, is_running, timestamp, dimension) values (1, 96001, 96001, 96001, 1867045146, false, 17167179743690094247, 3926297397)",
590 "insert into MobileEntityState (entity_id, location_x, location_z, destination_x, destination_z, is_running, timestamp, dimension) values (2, 96001, 191000, 191000, 1560020888, true, 2947537077064292621, 445019304)",
591 "insert into EnemyState (entity_id, herd_id, status, type, direction) values (1, 1181485940, 1633678837, 1158301365, 132191327)",
592 "insert into EnemyState (entity_id, herd_id, status, type, direction) values (2, 2017368418, 194072456, 34423057, 1296770410)"] {
593 run_for_testing(&db, sql_insert)?;
594 }
595
596 let sql_query = "\
597 SELECT EnemyState.* FROM EnemyState \
598 JOIN MobileEntityState ON MobileEntityState.entity_id = EnemyState.entity_id \
599 WHERE MobileEntityState.location_x > 96000 \
600 AND MobileEntityState.location_x < 192000 \
601 AND MobileEntityState.location_z > 96000 \
602 AND MobileEntityState.location_z < 192000";
603
604 let tx = begin_tx(&db);
605 let qset = compile_read_only_queryset(&db, &AuthCtx::for_testing(), &tx, sql_query)?;
606
607 for q in qset {
608 let result = run_query(
609 &db,
610 &tx,
611 q.as_expr(),
612 AuthCtx::for_testing(),
613 SourceSet::<_, 0>::empty(),
614 )?;
615 assert_eq!(result.len(), 1, "Join query did not return any rows");
616 }
617
618 Ok(())
619 }
620
621 #[test]
622 fn test_subscribe_all() -> ResultTest<()> {
623 let db = TestDB::durable()?;
624
625 let mut tx = begin_mut_tx(&db);
626
627 let (schema_1, _, _, _) = make_inv(&db, &mut tx, StAccess::Public)?;
628 let (schema_2, _, _, _) = make_player(&db, &mut tx)?;
629 db.commit_tx(tx)?;
630 let row_1 = product!(1u64, "health");
631 let row_2 = product!(2u64, "jhon doe");
632 let tx = db.begin_tx(Workload::Subscribe);
633 let s = legacy_get_all(&db, &tx, &AuthCtx::for_testing())?.into();
634 check_query_eval(&db, &tx, &s, 2, &[row_1.clone(), row_2.clone()])?;
635
636 let data1 = DatabaseTableUpdate {
637 table_id: schema_1.table_id,
638 table_name: "inventory".into(),
639 deletes: [row_1].into(),
640 inserts: [].into(),
641 };
642
643 let data2 = DatabaseTableUpdate {
644 table_id: schema_2.table_id,
645 table_name: "player".into(),
646 deletes: [].into(),
647 inserts: [row_2].into(),
648 };
649
650 let update = DatabaseUpdate {
651 tables: vec![data1, data2],
652 };
653
654 let row_1 = product!(1u64, "health");
655 let row_2 = product!(2u64, "jhon doe");
656 check_query_incr(&db, &tx, &s, &update, 2, &[row_1, row_2])?;
657
658 Ok(())
659 }
660
661 #[test]
662 fn test_classify() -> ResultTest<()> {
663 let db = TestDB::durable()?;
664
665 let schema = &[("id", AlgebraicType::U64)];
667 db.create_table_for_test("plain", schema, &[])?;
668
669 let schema = &[("id", AlgebraicType::U64), ("x", AlgebraicType::I32)];
671 let indexes = &[ColId(0), ColId(1)];
672 db.create_table_for_test("lhs", schema, indexes)?;
673
674 let schema = &[("id", AlgebraicType::U64), ("y", AlgebraicType::I32)];
676 let indexes = &[ColId(0), ColId(1)];
677 db.create_table_for_test("rhs", schema, indexes)?;
678
679 let tx = begin_tx(&db);
680
681 let scans = [
683 "SELECT * FROM plain",
684 "SELECT * FROM plain WHERE id > 5",
685 "SELECT plain.* FROM plain",
686 "SELECT plain.* FROM plain WHERE plain.id = 5",
687 "SELECT * FROM lhs",
688 "SELECT * FROM lhs WHERE id > 5",
689 ];
690 for scan in scans {
691 let expr = compile_read_only_queryset(&db, &AuthCtx::for_testing(), &tx, scan)?
692 .pop()
693 .unwrap();
694 assert_eq!(expr.kind(), Supported::Select, "{scan}\n{expr:#?}");
695 }
696
697 let joins = ["SELECT lhs.* FROM lhs JOIN rhs ON lhs.id = rhs.id WHERE rhs.y < 10"];
699 for join in joins {
700 let expr = compile_read_only_queryset(&db, &AuthCtx::for_testing(), &tx, join)?
701 .pop()
702 .unwrap();
703 assert_eq!(expr.kind(), Supported::Semijoin, "{join}\n{expr:#?}");
704 }
705
706 let joins = [
708 "SELECT lhs.* FROM lhs JOIN rhs ON lhs.id = rhs.id",
709 "SELECT * FROM lhs JOIN rhs ON lhs.id = rhs.id",
710 "SELECT * FROM lhs JOIN rhs ON lhs.id = rhs.id WHERE lhs.x < 10",
711 ];
712 for join in joins {
713 match compile_read_only_queryset(&db, &AuthCtx::for_testing(), &tx, join) {
714 Err(DBError::Subscription(SubscriptionError::Unsupported(_)) | DBError::TypeError(_)) => (),
715 x => panic!("Unexpected: {x:?}"),
716 }
717 }
718
719 Ok(())
720 }
721
722 fn create_lhs_table_for_eval_incr(db: &RelationalDB) -> ResultTest<TableId> {
724 const I32: AlgebraicType = AlgebraicType::I32;
725 let lhs_id = db.create_table_for_test("lhs", &[("id", I32), ("x", I32)], &[0.into()])?;
726 with_auto_commit(db, |tx| {
727 for i in 0..5 {
728 let row = product!(i, i + 5);
729 insert(db, tx, lhs_id, &row)?;
730 }
731 Ok(lhs_id)
732 })
733 }
734
735 fn create_rhs_table_for_eval_incr(db: &RelationalDB) -> ResultTest<TableId> {
737 const I32: AlgebraicType = AlgebraicType::I32;
738 let rhs_id = db.create_table_for_test("rhs", &[("rid", I32), ("id", I32), ("y", I32)], &[1.into()])?;
739 with_auto_commit(db, |tx| {
740 for i in 10..20 {
741 let row = product!(i, i - 10, i - 8);
742 insert(db, tx, rhs_id, &row)?;
743 }
744 Ok(rhs_id)
745 })
746 }
747
748 fn compile_query(db: &RelationalDB) -> ResultTest<SubscriptionPlan> {
749 with_read_only(db, |tx| {
750 let auth = AuthCtx::for_testing();
751 let tx = SchemaViewer::new(tx, &auth);
752 let sql = "select lhs.* from lhs join rhs on lhs.id = rhs.id where rhs.y >= 2 and rhs.y <= 4";
754 Ok(SubscriptionPlan::compile(sql, &tx, &auth)
755 .map(|(mut plans, _)| {
756 assert_eq!(plans.len(), 1);
757 plans.pop().unwrap()
758 })
759 .unwrap())
760 })
761 }
762
763 fn run_eval_incr_test<T, F: Fn(&RelationalDB) -> ResultTest<T>>(test_fn: F) -> ResultTest<T> {
764 TestDB::durable().map(|db| test_fn(&db))??;
765 TestDB::durable().map(|db| test_fn(&db.with_row_count(Arc::new(|_, _| 5))))?
766 }
767
768 #[test]
769 fn test_eval_incr_for_left_semijoin() -> ResultTest<()> {
773 fn compile_query(db: &RelationalDB) -> ResultTest<SubscriptionPlan> {
774 with_read_only(db, |tx| {
775 let auth = AuthCtx::for_testing();
776 let tx = SchemaViewer::new(tx, &auth);
777 let sql = "select lhs.* from lhs join rhs on lhs.id = rhs.id where lhs.x >= 5 and lhs.x <= 7";
779 Ok(SubscriptionPlan::compile(sql, &tx, &auth)
780 .map(|(mut plans, _)| {
781 assert_eq!(plans.len(), 1);
782 plans.pop().unwrap()
783 })
784 .unwrap())
785 })
786 }
787
788 fn index_join_case_1(db: &RelationalDB) -> ResultTest<()> {
792 let _ = create_lhs_table_for_eval_incr(db)?;
793 let rhs_id = create_rhs_table_for_eval_incr(db)?;
794 let query = compile_query(db)?;
795
796 let r1 = product!(10, 0, 2);
797 let r2 = product!(10, 0, 3);
798
799 let mut metrics = ExecutionMetrics::default();
800
801 let result = eval_incr(db, &mut metrics, &query, vec![(rhs_id, r1, false), (rhs_id, r2, true)])?;
802
803 assert!(result.is_empty());
805 Ok(())
806 }
807
808 fn index_join_case_2(db: &RelationalDB) -> ResultTest<()> {
812 let _ = create_lhs_table_for_eval_incr(db)?;
813 let rhs_id = create_rhs_table_for_eval_incr(db)?;
814 let query = compile_query(db)?;
815
816 let r1 = product!(13, 3, 5);
817 let r2 = product!(13, 4, 6);
818
819 let mut metrics = ExecutionMetrics::default();
820
821 let result = eval_incr(db, &mut metrics, &query, vec![(rhs_id, r1, false), (rhs_id, r2, true)])?;
822
823 assert!(result.is_empty());
825 Ok(())
826 }
827
828 fn index_join_case_3(db: &RelationalDB) -> ResultTest<()> {
832 let lhs_id = create_lhs_table_for_eval_incr(db)?;
833 let rhs_id = create_rhs_table_for_eval_incr(db)?;
834 let query = compile_query(db)?;
835
836 let r1 = product!(10, 0, 2);
837 let r2 = product!(10, 3, 5);
838
839 let mut metrics = ExecutionMetrics::default();
840
841 let result = eval_incr(db, &mut metrics, &query, vec![(rhs_id, r1, false), (rhs_id, r2, true)])?;
842
843 assert_eq!(result.tables.len(), 1);
845 assert_eq!(result.tables[0], delete_op(lhs_id, "lhs", product!(0, 5)));
846 Ok(())
847 }
848
849 fn index_join_case_4(db: &RelationalDB) -> ResultTest<()> {
853 let lhs_id = create_lhs_table_for_eval_incr(db)?;
854 let rhs_id = create_rhs_table_for_eval_incr(db)?;
855 let query = compile_query(db)?;
856
857 let r1 = product!(13, 3, 5);
858 let r2 = product!(13, 2, 4);
859
860 let mut metrics = ExecutionMetrics::default();
861
862 let result = eval_incr(db, &mut metrics, &query, vec![(rhs_id, r1, false), (rhs_id, r2, true)])?;
863
864 assert_eq!(result.tables.len(), 1);
866 assert_eq!(result.tables[0], insert_op(lhs_id, "lhs", product!(2, 7)));
867 Ok(())
868 }
869
870 fn index_join_case_5(db: &RelationalDB) -> ResultTest<()> {
874 let lhs_id = create_lhs_table_for_eval_incr(db)?;
875 let rhs_id = create_rhs_table_for_eval_incr(db)?;
876 let query = compile_query(db)?;
877
878 let lhs_row = product!(5, 6);
879 let rhs_row = product!(20, 5, 3);
880
881 let mut metrics = ExecutionMetrics::default();
882
883 let result = eval_incr(
884 db,
885 &mut metrics,
886 &query,
887 vec![(lhs_id, lhs_row, true), (rhs_id, rhs_row, true)],
888 )?;
889
890 assert_eq!(result.tables.len(), 1);
892 assert_eq!(result.tables[0], insert_op(lhs_id, "lhs", product!(5, 6)));
893 Ok(())
894 }
895
896 fn index_join_case_6(db: &RelationalDB) -> ResultTest<()> {
900 let lhs_id = create_lhs_table_for_eval_incr(db)?;
901 let rhs_id = create_rhs_table_for_eval_incr(db)?;
902 let query = compile_query(db)?;
903
904 let lhs_row = product!(5, 10);
905 let rhs_row = product!(20, 5, 5);
906
907 let mut metrics = ExecutionMetrics::default();
908
909 let result = eval_incr(
910 db,
911 &mut metrics,
912 &query,
913 vec![(lhs_id, lhs_row, true), (rhs_id, rhs_row, true)],
914 )?;
915
916 assert_eq!(result.tables.len(), 0);
918 Ok(())
919 }
920
921 fn index_join_case_7(db: &RelationalDB) -> ResultTest<()> {
925 let lhs_id = create_lhs_table_for_eval_incr(db)?;
926 let rhs_id = create_rhs_table_for_eval_incr(db)?;
927 let query = compile_query(db)?;
928
929 let lhs_row = product!(0, 5);
930 let rhs_row = product!(10, 0, 2);
931
932 let mut metrics = ExecutionMetrics::default();
933
934 let result = eval_incr(
935 db,
936 &mut metrics,
937 &query,
938 vec![(lhs_id, lhs_row, false), (rhs_id, rhs_row, false)],
939 )?;
940
941 assert_eq!(result.tables.len(), 1);
943 assert_eq!(result.tables[0], delete_op(lhs_id, "lhs", product!(0, 5)));
944 Ok(())
945 }
946
947 fn index_join_case_8(db: &RelationalDB) -> ResultTest<()> {
951 let lhs_id = create_lhs_table_for_eval_incr(db)?;
952 let rhs_id = create_rhs_table_for_eval_incr(db)?;
953 let query = compile_query(db)?;
954
955 let lhs_row = product!(3, 8);
956 let rhs_row = product!(13, 3, 5);
957
958 let mut metrics = ExecutionMetrics::default();
959
960 let result = eval_incr(
961 db,
962 &mut metrics,
963 &query,
964 vec![(lhs_id, lhs_row, false), (rhs_id, rhs_row, false)],
965 )?;
966
967 assert_eq!(result.tables.len(), 0);
969 Ok(())
970 }
971
972 fn index_join_case_9(db: &RelationalDB) -> ResultTest<()> {
976 let lhs_id = create_lhs_table_for_eval_incr(db)?;
977 let rhs_id = create_rhs_table_for_eval_incr(db)?;
978 let query = compile_query(db)?;
979
980 let lhs_old = product!(1, 6);
981 let lhs_new = product!(1, 7);
982 let rhs_old = product!(11, 1, 3);
983 let rhs_new = product!(11, 1, 4);
984
985 let mut metrics = ExecutionMetrics::default();
986
987 let result = eval_incr(
988 db,
989 &mut metrics,
990 &query,
991 vec![
992 (lhs_id, lhs_old, false),
993 (rhs_id, rhs_old, false),
994 (lhs_id, lhs_new, true),
995 (rhs_id, rhs_new, true),
996 ],
997 )?;
998
999 let lhs_old = product!(1, 6);
1000 let lhs_new = product!(1, 7);
1001
1002 assert_eq!(result.tables.len(), 1);
1004 assert_eq!(
1005 result.tables[0],
1006 DatabaseTableUpdate {
1007 table_id: lhs_id,
1008 table_name: "lhs".into(),
1009 deletes: [lhs_old].into(),
1010 inserts: [lhs_new].into(),
1011 },
1012 );
1013 Ok(())
1014 }
1015
1016 run_eval_incr_test(index_join_case_1)?;
1017 run_eval_incr_test(index_join_case_2)?;
1018 run_eval_incr_test(index_join_case_3)?;
1019 run_eval_incr_test(index_join_case_4)?;
1020 run_eval_incr_test(index_join_case_5)?;
1021 run_eval_incr_test(index_join_case_6)?;
1022 run_eval_incr_test(index_join_case_7)?;
1023 run_eval_incr_test(index_join_case_8)?;
1024 run_eval_incr_test(index_join_case_9)?;
1025 Ok(())
1026 }
1027
1028 #[test]
1029 fn test_eval_incr_for_index_join() -> ResultTest<()> {
1030 run_eval_incr_test(index_join_case_1)?;
1034 run_eval_incr_test(index_join_case_2)?;
1038 run_eval_incr_test(index_join_case_3)?;
1042 run_eval_incr_test(index_join_case_4)?;
1046 run_eval_incr_test(index_join_case_5)?;
1050 run_eval_incr_test(index_join_case_6)?;
1054 run_eval_incr_test(index_join_case_7)?;
1058 run_eval_incr_test(index_join_case_8)?;
1062 run_eval_incr_test(index_join_case_9)?;
1066 Ok(())
1067 }
1068
1069 fn eval_incr(
1070 db: &RelationalDB,
1071 metrics: &mut ExecutionMetrics,
1072 plan: &SubscriptionPlan,
1073 ops: Vec<(TableId, ProductValue, bool)>,
1074 ) -> ResultTest<DatabaseUpdate> {
1075 let mut tx = begin_mut_tx(db);
1076
1077 for (table_id, row, insert) in ops {
1078 if insert {
1079 insert_row(db, &mut tx, table_id, row)?;
1080 } else {
1081 delete_row(db, &mut tx, table_id, row);
1082 }
1083 }
1084
1085 let (data, _, tx) = tx.commit_downgrade(Workload::ForTests);
1086 let table_id = plan.subscribed_table_id();
1087 let table_name = (&**plan.subscribed_table_name()).into();
1089 let tx = DeltaTx::new(&tx, &data, &QueriedTableIndexIds::from_iter(plan.index_ids()));
1090
1091 let mut eval_delta = || {
1098 let mut inserts = HashMap::new();
1100 let mut deletes = vec![];
1101
1102 plan.for_each_insert(&tx, metrics, &mut |row| {
1103 inserts
1104 .entry(RelValue::from(row))
1105 .and_modify(|n| *n += 1)
1108 .or_insert(1);
1109 Ok(())
1110 })
1111 .unwrap();
1112
1113 plan.for_each_delete(&tx, metrics, &mut |row| {
1114 let row = RelValue::from(row);
1115 match inserts.get_mut(&row) {
1116 None => {
1119 deletes.push(row);
1120 }
1121 Some(1) => {
1124 inserts.remove(&row);
1125 }
1126 Some(n) => {
1129 *n -= 1;
1130 }
1131 }
1132 Ok(())
1133 })
1134 .unwrap();
1135
1136 UpdatesRelValue {
1137 inserts: inserts.into_keys().collect(),
1138 deletes,
1139 }
1140 };
1141
1142 let updates = eval_delta();
1143
1144 let inserts = updates
1145 .inserts
1146 .into_iter()
1147 .map(RelValue::into_product_value)
1148 .collect::<Arc<_>>();
1149 let deletes = updates
1150 .deletes
1151 .into_iter()
1152 .map(RelValue::into_product_value)
1153 .collect::<Arc<_>>();
1154
1155 let tables = if inserts.is_empty() && deletes.is_empty() {
1156 vec![]
1157 } else {
1158 vec![DatabaseTableUpdate {
1159 table_id,
1160 table_name,
1161 inserts,
1162 deletes,
1163 }]
1164 };
1165 Ok(DatabaseUpdate { tables })
1166 }
1167
1168 fn index_join_case_1(db: &RelationalDB) -> ResultTest<()> {
1172 let _ = create_lhs_table_for_eval_incr(db)?;
1173 let rhs_id = create_rhs_table_for_eval_incr(db)?;
1174 let query = compile_query(db)?;
1175
1176 let r1 = product!(10, 0, 2);
1177 let r2 = product!(10, 0, 3);
1178
1179 let mut metrics = ExecutionMetrics::default();
1180
1181 let result = eval_incr(db, &mut metrics, &query, vec![(rhs_id, r1, false), (rhs_id, r2, true)])?;
1182
1183 assert!(result.is_empty());
1185
1186 assert_eq!(metrics.index_seeks, 2);
1190 Ok(())
1191 }
1192
1193 fn index_join_case_2(db: &RelationalDB) -> ResultTest<()> {
1197 let _ = create_lhs_table_for_eval_incr(db)?;
1198 let rhs_id = create_rhs_table_for_eval_incr(db)?;
1199 let query = compile_query(db)?;
1200
1201 let r1 = product!(13, 3, 5);
1202 let r2 = product!(13, 3, 6);
1203
1204 let mut metrics = ExecutionMetrics::default();
1205
1206 let result = eval_incr(db, &mut metrics, &query, vec![(rhs_id, r1, false), (rhs_id, r2, true)])?;
1207
1208 assert!(result.is_empty());
1210
1211 assert_eq!(metrics.index_seeks, 0);
1215 Ok(())
1216 }
1217
1218 fn index_join_case_3(db: &RelationalDB) -> ResultTest<()> {
1222 let lhs_id = create_lhs_table_for_eval_incr(db)?;
1223 let rhs_id = create_rhs_table_for_eval_incr(db)?;
1224 let query = compile_query(db)?;
1225
1226 let r1 = product!(10, 0, 2);
1227 let r2 = product!(10, 0, 5);
1228
1229 let mut metrics = ExecutionMetrics::default();
1230
1231 let result = eval_incr(db, &mut metrics, &query, vec![(rhs_id, r1, false), (rhs_id, r2, true)])?;
1232
1233 assert_eq!(result.tables.len(), 1);
1235 assert_eq!(result.tables[0], delete_op(lhs_id, "lhs", product!(0, 5)));
1236
1237 assert_eq!(metrics.index_seeks, 1);
1240 Ok(())
1241 }
1242
1243 fn index_join_case_4(db: &RelationalDB) -> ResultTest<()> {
1247 let lhs_id = create_lhs_table_for_eval_incr(db)?;
1248 let rhs_id = create_rhs_table_for_eval_incr(db)?;
1249 let query = compile_query(db)?;
1250
1251 let r1 = product!(13, 3, 5);
1252 let r2 = product!(13, 3, 4);
1253
1254 let mut metrics = ExecutionMetrics::default();
1255
1256 let result = eval_incr(db, &mut metrics, &query, vec![(rhs_id, r1, false), (rhs_id, r2, true)])?;
1257
1258 assert_eq!(result.tables.len(), 1);
1260 assert_eq!(result.tables[0], insert_op(lhs_id, "lhs", product!(3, 8)));
1261
1262 assert_eq!(metrics.index_seeks, 1);
1265 Ok(())
1266 }
1267
1268 fn index_join_case_5(db: &RelationalDB) -> ResultTest<()> {
1272 let lhs_id = create_lhs_table_for_eval_incr(db)?;
1273 let rhs_id = create_rhs_table_for_eval_incr(db)?;
1274 let query = compile_query(db)?;
1275
1276 let lhs_row = product!(5, 10);
1277 let rhs_row = product!(20, 5, 3);
1278
1279 let mut metrics = ExecutionMetrics::default();
1280
1281 let result = eval_incr(
1282 db,
1283 &mut metrics,
1284 &query,
1285 vec![(lhs_id, lhs_row, true), (rhs_id, rhs_row, true)],
1286 )?;
1287
1288 assert_eq!(result.tables.len(), 1);
1290 assert_eq!(result.tables[0], insert_op(lhs_id, "lhs", product!(5, 10)));
1291
1292 assert_eq!(metrics.index_seeks, 3);
1295 Ok(())
1296 }
1297
1298 fn index_join_case_6(db: &RelationalDB) -> ResultTest<()> {
1302 let lhs_id = create_lhs_table_for_eval_incr(db)?;
1303 let rhs_id = create_rhs_table_for_eval_incr(db)?;
1304 let query = compile_query(db)?;
1305
1306 let lhs_row = product!(5, 10);
1307 let rhs_row = product!(20, 5, 5);
1308
1309 let mut metrics = ExecutionMetrics::default();
1310
1311 let result = eval_incr(
1312 db,
1313 &mut metrics,
1314 &query,
1315 vec![(lhs_id, lhs_row, true), (rhs_id, rhs_row, true)],
1316 )?;
1317
1318 assert_eq!(result.tables.len(), 0);
1320
1321 assert_eq!(metrics.index_seeks, 2);
1328 Ok(())
1329 }
1330
1331 fn index_join_case_7(db: &RelationalDB) -> ResultTest<()> {
1335 let lhs_id = create_lhs_table_for_eval_incr(db)?;
1336 let rhs_id = create_rhs_table_for_eval_incr(db)?;
1337 let query = compile_query(db)?;
1338
1339 let lhs_row = product!(0, 5);
1340 let rhs_row = product!(10, 0, 2);
1341
1342 let mut metrics = ExecutionMetrics::default();
1343
1344 let result = eval_incr(
1345 db,
1346 &mut metrics,
1347 &query,
1348 vec![(lhs_id, lhs_row, false), (rhs_id, rhs_row, false)],
1349 )?;
1350
1351 assert_eq!(result.tables.len(), 1);
1353 assert_eq!(result.tables[0], delete_op(lhs_id, "lhs", product!(0, 5)));
1354
1355 assert_eq!(metrics.index_seeks, 3);
1358 Ok(())
1359 }
1360
1361 fn index_join_case_8(db: &RelationalDB) -> ResultTest<()> {
1365 let lhs_id = create_lhs_table_for_eval_incr(db)?;
1366 let rhs_id = create_rhs_table_for_eval_incr(db)?;
1367 let query = compile_query(db)?;
1368
1369 let lhs_row = product!(3, 8);
1370 let rhs_row = product!(13, 3, 5);
1371
1372 let mut metrics = ExecutionMetrics::default();
1373
1374 let result = eval_incr(
1375 db,
1376 &mut metrics,
1377 &query,
1378 vec![(lhs_id, lhs_row, false), (rhs_id, rhs_row, false)],
1379 )?;
1380
1381 assert_eq!(result.tables.len(), 0);
1383
1384 assert_eq!(metrics.index_seeks, 2);
1391 Ok(())
1392 }
1393
1394 fn index_join_case_9(db: &RelationalDB) -> ResultTest<()> {
1398 let lhs_id = create_lhs_table_for_eval_incr(db)?;
1399 let rhs_id = create_rhs_table_for_eval_incr(db)?;
1400 let query = compile_query(db)?;
1401
1402 let lhs_old = product!(1, 6);
1403 let lhs_new = product!(1, 7);
1404 let rhs_old = product!(11, 1, 3);
1405 let rhs_new = product!(11, 1, 4);
1406
1407 let mut metrics = ExecutionMetrics::default();
1408
1409 let result = eval_incr(
1410 db,
1411 &mut metrics,
1412 &query,
1413 vec![
1414 (lhs_id, lhs_old, false),
1415 (rhs_id, rhs_old, false),
1416 (lhs_id, lhs_new, true),
1417 (rhs_id, rhs_new, true),
1418 ],
1419 )?;
1420
1421 let lhs_old = product!(1, 6);
1422 let lhs_new = product!(1, 7);
1423
1424 assert_eq!(result.tables.len(), 1);
1426 assert_eq!(
1427 result.tables[0],
1428 DatabaseTableUpdate {
1429 table_id: lhs_id,
1430 table_name: "lhs".into(),
1431 deletes: [lhs_old].into(),
1432 inserts: [lhs_new].into(),
1433 },
1434 );
1435
1436 assert_eq!(metrics.index_seeks, 8);
1440 Ok(())
1441 }
1442}