spacetimedb/subscription/
query.rs

1use crate::db::relational_db::{RelationalDB, Tx};
2use crate::error::{DBError, SubscriptionError};
3use crate::sql::ast::SchemaViewer;
4use crate::sql::compiler::compile_sql;
5use crate::subscription::subscription::SupportedQuery;
6use once_cell::sync::Lazy;
7use regex::Regex;
8use spacetimedb_lib::identity::AuthCtx;
9use spacetimedb_subscription::SubscriptionPlan;
10use spacetimedb_vm::expr::{self, Crud, CrudExpr, QueryExpr};
11
12use super::execution_unit::QueryHash;
13use super::module_subscription_manager::Plan;
14
15static WHITESPACE: Lazy<Regex> = Lazy::new(|| Regex::new(r"^\s*$").unwrap());
16static SUBSCRIBE_TO_ALL_TABLES_REGEX: Lazy<Regex> =
17    Lazy::new(|| Regex::new(r"^\s*(?i)\bSELECT\s+\*\s+FROM\s+\*\s*$").unwrap());
18
19/// Is this string all whitespace?
20pub fn is_whitespace_or_empty(sql: &str) -> bool {
21    WHITESPACE.is_match_at(sql, 0)
22}
23
24/// Is this a `SELECT * FROM *` query?
25pub fn is_subscribe_to_all_tables(sql: &str) -> bool {
26    SUBSCRIBE_TO_ALL_TABLES_REGEX.is_match_at(sql, 0)
27}
28
29// TODO: Remove this after the SubscribeSingle migration.
30// TODO: It's semantically wrong to `SELECT * FROM *`
31// as it can only return back the changes valid for the tables in scope *right now*
32// instead of **continuously updating** the db changes
33// with system table modifications (add/remove tables, indexes, ...).
34//
35/// Variant of [`compile_read_only_query`] which appends `SourceExpr`s into a given `SourceBuilder`,
36/// rather than returning a new `SourceSet`.
37///
38/// This is necessary when merging multiple SQL queries into a single query set,
39/// as in [`crate::subscription::module_subscription_actor::ModuleSubscriptions::add_subscriber`].
40pub fn compile_read_only_queryset(
41    relational_db: &RelationalDB,
42    auth: &AuthCtx,
43    tx: &Tx,
44    input: &str,
45) -> Result<Vec<SupportedQuery>, DBError> {
46    let input = input.trim();
47    if input.is_empty() {
48        return Err(SubscriptionError::Empty.into());
49    }
50
51    // Remove redundant whitespace, and in particular newlines, for debug info.
52    let input = WHITESPACE.replace_all(input, " ");
53
54    let compiled = compile_sql(relational_db, auth, tx, &input)?;
55    let mut queries = Vec::with_capacity(compiled.len());
56    for q in compiled {
57        return Err(SubscriptionError::SideEffect(match q {
58            CrudExpr::Query(x) => {
59                queries.push(x);
60                continue;
61            }
62            CrudExpr::Insert { .. } => Crud::Insert,
63            CrudExpr::Update { .. } => Crud::Update,
64            CrudExpr::Delete { .. } => Crud::Delete,
65            CrudExpr::SetVar { .. } => Crud::Config,
66            CrudExpr::ReadVar { .. } => Crud::Config,
67        })
68        .into());
69    }
70
71    if !queries.is_empty() {
72        Ok(queries
73            .into_iter()
74            .map(|query| SupportedQuery::new(query, input.to_string()))
75            .collect::<Result<_, _>>()?)
76    } else {
77        Err(SubscriptionError::Empty.into())
78    }
79}
80
81/// Compile a string into a single read-only query.
82/// This returns an error if the string has multiple queries or mutations.
83pub fn compile_read_only_query(auth: &AuthCtx, tx: &Tx, input: &str) -> Result<Plan, DBError> {
84    if is_whitespace_or_empty(input) {
85        return Err(SubscriptionError::Empty.into());
86    }
87
88    let tx = SchemaViewer::new(tx, auth);
89    let (plans, has_param) = SubscriptionPlan::compile(input, &tx, auth)?;
90    let hash = QueryHash::from_string(input, auth.caller, has_param);
91    Ok(Plan::new(plans, hash, input.to_owned()))
92}
93
94/// Compile a string into a single read-only query.
95/// This returns an error if the string has multiple queries or mutations.
96pub fn compile_query_with_hashes(
97    auth: &AuthCtx,
98    tx: &Tx,
99    input: &str,
100    hash: QueryHash,
101    hash_with_param: QueryHash,
102) -> Result<Plan, DBError> {
103    if is_whitespace_or_empty(input) {
104        return Err(SubscriptionError::Empty.into());
105    }
106
107    let tx = SchemaViewer::new(tx, auth);
108    let (plans, has_param) = SubscriptionPlan::compile(input, &tx, auth)?;
109
110    if auth.is_owner() || has_param {
111        // Note that when generating hashes for queries from owners,
112        // we always treat them as if they were parameterized by :sender.
113        // This is because RLS is not applicable to owners.
114        // Hence owner hashes must never overlap with client hashes.
115        return Ok(Plan::new(plans, hash_with_param, input.to_owned()));
116    }
117    Ok(Plan::new(plans, hash, input.to_owned()))
118}
119
120/// The kind of [`QueryExpr`] currently supported for incremental evaluation.
121#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd, Hash)]
122pub enum Supported {
123    /// A scan or [`QueryExpr::Select`] of a single table.
124    Select,
125    /// A semijoin of two tables, restricted to [`QueryExpr::IndexJoin`]s.
126    ///
127    /// See [`crate::sql::compiler::try_index_join`].
128    Semijoin,
129}
130
131/// Classify a [`QueryExpr`] into a [`Supported`] kind, or `None` if incremental
132/// evaluation is not currently supported for the expression.
133pub fn classify(expr: &QueryExpr) -> Option<Supported> {
134    use expr::Query::*;
135    if matches!(&*expr.query, [IndexJoin(_)]) {
136        return Some(Supported::Semijoin);
137    }
138    for op in &expr.query {
139        if let JoinInner(_) = op {
140            return None;
141        }
142    }
143    Some(Supported::Select)
144}
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149    use crate::db::relational_db::tests_utils::{
150        begin_mut_tx, begin_tx, insert, with_auto_commit, with_read_only, TestDB,
151    };
152    use crate::db::relational_db::MutTx;
153    use crate::host::module_host::{DatabaseTableUpdate, DatabaseUpdate, UpdatesRelValue};
154    use crate::sql::execute::collect_result;
155    use crate::sql::execute::tests::run_for_testing;
156    use crate::subscription::module_subscription_manager::QueriedTableIndexIds;
157    use crate::subscription::subscription::{legacy_get_all, ExecutionSet};
158    use crate::subscription::tx::DeltaTx;
159    use crate::vm::tests::create_table_with_rows;
160    use crate::vm::DbProgram;
161    use itertools::Itertools;
162    use spacetimedb_client_api_messages::websocket::{BsatnFormat, Compression};
163    use spacetimedb_datastore::execution_context::Workload;
164    use spacetimedb_lib::bsatn;
165    use spacetimedb_lib::db::auth::{StAccess, StTableType};
166    use spacetimedb_lib::error::ResultTest;
167    use spacetimedb_lib::identity::AuthCtx;
168    use spacetimedb_lib::metrics::ExecutionMetrics;
169    use spacetimedb_lib::Identity;
170    use spacetimedb_primitives::{ColId, TableId};
171    use spacetimedb_sats::{product, AlgebraicType, ProductType, ProductValue};
172    use spacetimedb_schema::relation::FieldName;
173    use spacetimedb_schema::schema::*;
174    use spacetimedb_vm::eval::run_ast;
175    use spacetimedb_vm::eval::test_helpers::{mem_table, mem_table_without_table_name, scalar};
176    use spacetimedb_vm::expr::{Expr, SourceSet};
177    use spacetimedb_vm::operator::OpCmp;
178    use spacetimedb_vm::relation::{MemTable, RelValue};
179    use std::collections::HashMap;
180    use std::sync::Arc;
181
182    /// Runs a query that evaluates if the changes made should be reported to the [ModuleSubscriptionManager]
183    fn run_query<const N: usize>(
184        db: &RelationalDB,
185        tx: &Tx,
186        query: &QueryExpr,
187        auth: AuthCtx,
188        sources: SourceSet<Vec<ProductValue>, N>,
189    ) -> Result<Vec<MemTable>, DBError> {
190        let mut tx = tx.into();
191        let p = &mut DbProgram::new(db, &mut tx, auth);
192        let q = Expr::Crud(Box::new(CrudExpr::Query(query.clone())));
193
194        let mut result = Vec::with_capacity(1);
195        let mut updates = Vec::new();
196        collect_result(&mut result, &mut updates, run_ast(p, q, sources).into())?;
197        Ok(result)
198    }
199
200    fn insert_op(table_id: TableId, table_name: &str, row: ProductValue) -> DatabaseTableUpdate {
201        DatabaseTableUpdate {
202            table_id,
203            table_name: table_name.into(),
204            deletes: [].into(),
205            inserts: [row].into(),
206        }
207    }
208
209    fn delete_op(table_id: TableId, table_name: &str, row: ProductValue) -> DatabaseTableUpdate {
210        DatabaseTableUpdate {
211            table_id,
212            table_name: table_name.into(),
213            deletes: [row].into(),
214            inserts: [].into(),
215        }
216    }
217
218    fn insert_row(db: &RelationalDB, tx: &mut MutTx, table_id: TableId, row: ProductValue) -> ResultTest<()> {
219        insert(db, tx, table_id, &row)?;
220        Ok(())
221    }
222
223    fn delete_row(db: &RelationalDB, tx: &mut MutTx, table_id: TableId, row: ProductValue) {
224        db.delete_by_rel(tx, table_id, [row]);
225    }
226
227    fn make_data(
228        db: &RelationalDB,
229        tx: &mut MutTx,
230        table_name: &str,
231        head: &ProductType,
232        row: &ProductValue,
233        access: StAccess,
234    ) -> ResultTest<(Arc<TableSchema>, MemTable, DatabaseTableUpdate, QueryExpr)> {
235        let schema = create_table_with_rows(db, tx, table_name, head.clone(), &[row.clone()], access)?;
236        let table = mem_table(schema.table_id, schema.get_row_type().clone(), [row.clone()]);
237
238        let data = DatabaseTableUpdate {
239            table_id: schema.table_id,
240            table_name: table_name.into(),
241            deletes: [].into(),
242            inserts: [row.clone()].into(),
243        };
244
245        let q = QueryExpr::new(&*schema);
246
247        Ok((schema, table, data, q))
248    }
249
250    fn make_inv(
251        db: &RelationalDB,
252        tx: &mut MutTx,
253        access: StAccess,
254    ) -> ResultTest<(Arc<TableSchema>, MemTable, DatabaseTableUpdate, QueryExpr)> {
255        let head = ProductType::from([("inventory_id", AlgebraicType::U64), ("name", AlgebraicType::String)]);
256        let row = product!(1u64, "health");
257
258        let (schema, table, data, q) = make_data(db, tx, "inventory", &head, &row, access)?;
259
260        let fields = &[0, 1].map(|c| FieldName::new(schema.table_id, c.into()).into());
261        let q = q.with_project(fields.into(), None).unwrap();
262
263        Ok((schema, table, data, q))
264    }
265
266    fn make_player(
267        db: &RelationalDB,
268        tx: &mut MutTx,
269    ) -> ResultTest<(Arc<TableSchema>, MemTable, DatabaseTableUpdate, QueryExpr)> {
270        let table_name = "player";
271        let head = ProductType::from([("player_id", AlgebraicType::U64), ("name", AlgebraicType::String)]);
272        let row = product!(2u64, "jhon doe");
273
274        let (schema, table, data, q) = make_data(db, tx, table_name, &head, &row, StAccess::Public)?;
275
276        let fields = [0, 1].map(|c| FieldName::new(schema.table_id, c.into()).into());
277        let q = q.with_project(fields.into(), None).unwrap();
278
279        Ok((schema, table, data, q))
280    }
281
282    /// Replace the primary (ie. `source`) table of the given [`QueryExpr`] with
283    /// a virtual [`MemTable`] consisting of the rows in [`DatabaseTableUpdate`].
284    fn query_to_mem_table(
285        mut of: QueryExpr,
286        data: &DatabaseTableUpdate,
287    ) -> (QueryExpr, SourceSet<Vec<ProductValue>, 1>) {
288        let data = data.deletes.iter().chain(data.inserts.iter()).cloned().collect();
289        let mem_table = MemTable::new(of.head().clone(), of.source.table_access(), data);
290        let mut sources = SourceSet::empty();
291        of.source = sources.add_mem_table(mem_table);
292        (of, sources)
293    }
294
295    fn check_query(
296        db: &RelationalDB,
297        table: &MemTable,
298        tx: &Tx,
299        q: &QueryExpr,
300        data: &DatabaseTableUpdate,
301    ) -> ResultTest<()> {
302        let (q, sources) = query_to_mem_table(q.clone(), data);
303        let result = run_query(db, tx, &q, AuthCtx::for_testing(), sources)?;
304
305        assert_eq!(
306            Some(mem_table_without_table_name(table)),
307            result.first().map(mem_table_without_table_name)
308        );
309
310        Ok(())
311    }
312
313    fn check_query_incr(
314        db: &RelationalDB,
315        tx: &Tx,
316        s: &ExecutionSet,
317        update: &DatabaseUpdate,
318        total_tables: usize,
319        rows: &[ProductValue],
320    ) -> ResultTest<()> {
321        let tx = &tx.into();
322        let update = update.tables.iter().collect::<Vec<_>>();
323        let result = s.eval_incr_for_test(db, tx, &update, None);
324        assert_eq!(
325            result.tables.len(),
326            total_tables,
327            "Must return the correct number of tables: {result:#?}"
328        );
329
330        let result = result
331            .tables
332            .iter()
333            .map(|u| &u.updates)
334            .flat_map(|u| {
335                u.deletes
336                    .iter()
337                    .chain(&*u.inserts)
338                    .map(|rv| rv.clone().into_product_value())
339                    .collect::<Vec<_>>()
340            })
341            .sorted()
342            .collect::<Vec<_>>();
343
344        assert_eq!(result, rows, "Must return the correct row(s)");
345
346        Ok(())
347    }
348
349    fn check_query_eval(
350        db: &RelationalDB,
351        tx: &Tx,
352        s: &ExecutionSet,
353        total_tables: usize,
354        rows: &[ProductValue],
355    ) -> ResultTest<()> {
356        let result = s.eval::<BsatnFormat>(db, tx, None, Compression::Brotli).tables;
357        assert_eq!(
358            result.len(),
359            total_tables,
360            "Must return the correct number of tables: {result:#?}"
361        );
362
363        let result = result
364            .into_iter()
365            .flat_map(|x| x.updates)
366            .map(|x| x.maybe_decompress())
367            .flat_map(|x| {
368                (&x.deletes)
369                    .into_iter()
370                    .chain(&x.inserts)
371                    .map(|x| x.to_owned())
372                    .collect::<Vec<_>>()
373            })
374            .sorted()
375            .collect_vec();
376
377        let rows = rows.iter().map(|r| bsatn::to_vec(r).unwrap()).collect_vec();
378
379        assert_eq!(result, rows, "Must return the correct row(s)");
380
381        Ok(())
382    }
383
384    fn singleton_execution_set(expr: QueryExpr, sql: String) -> ResultTest<ExecutionSet> {
385        Ok(ExecutionSet::from_iter([SupportedQuery::try_from((expr, sql))?]))
386    }
387
388    #[test]
389    fn test_whitespace_regex() -> ResultTest<()> {
390        assert!(is_whitespace_or_empty(""));
391        assert!(is_whitespace_or_empty(" "));
392        assert!(is_whitespace_or_empty("\n \t"));
393        assert!(!is_whitespace_or_empty(" a"));
394        Ok(())
395    }
396
397    #[test]
398    fn test_subscribe_to_all_tables_regex() -> ResultTest<()> {
399        assert!(is_subscribe_to_all_tables("SELECT * FROM *"));
400        assert!(is_subscribe_to_all_tables("Select * From *"));
401        assert!(is_subscribe_to_all_tables("select * from *"));
402        assert!(is_subscribe_to_all_tables("\nselect *\nfrom * "));
403        assert!(!is_subscribe_to_all_tables("select * from * where"));
404        Ok(())
405    }
406
407    #[test]
408    fn test_compile_incr_plan() -> ResultTest<()> {
409        let db = TestDB::durable()?;
410
411        let schema = &[("n", AlgebraicType::U64), ("data", AlgebraicType::U64)];
412        let indexes = &[0.into()];
413        db.create_table_for_test("a", schema, indexes)?;
414        db.create_table_for_test("b", schema, indexes)?;
415
416        let tx = begin_tx(&db);
417        let sql = "SELECT b.* FROM b JOIN a ON b.n = a.n WHERE b.data > 200";
418        let result = compile_read_only_query(&AuthCtx::for_testing(), &tx, sql);
419        assert!(result.is_ok());
420        Ok(())
421    }
422
423    #[test]
424    fn test_eval_incr_for_index_scan() -> ResultTest<()> {
425        let db = TestDB::durable()?;
426
427        // Create table [test] with index on [b]
428        let schema = &[("a", AlgebraicType::U64), ("b", AlgebraicType::U64)];
429        let indexes = &[1.into()];
430        let table_id = db.create_table_for_test("test", schema, indexes)?;
431
432        let mut tx = begin_mut_tx(&db);
433        let mut deletes = Vec::new();
434        for i in 0u64..9u64 {
435            insert(&db, &mut tx, table_id, &product!(i, i))?;
436            deletes.push(product!(i + 10, i))
437        }
438
439        let update = DatabaseUpdate {
440            tables: vec![DatabaseTableUpdate {
441                table_id,
442                table_name: "test".into(),
443                deletes: deletes.into(),
444                inserts: [].into(),
445            }],
446        };
447
448        db.commit_tx(tx)?;
449        let tx = begin_tx(&db);
450
451        let sql = "select * from test where b = 3";
452        let mut exp = compile_sql(&db, &AuthCtx::for_testing(), &tx, sql)?;
453
454        let Some(CrudExpr::Query(query)) = exp.pop() else {
455            panic!("unexpected query {:#?}", exp[0]);
456        };
457
458        let query: ExecutionSet = singleton_execution_set(query, sql.into())?;
459
460        let tx = (&tx).into();
461        let update = update.tables.iter().collect::<Vec<_>>();
462        let result = query.eval_incr_for_test(&db, &tx, &update, None);
463
464        assert_eq!(result.tables.len(), 1);
465
466        let update = &result.tables[0].updates;
467
468        assert_eq!(update.inserts.len(), 0);
469        assert_eq!(update.deletes.len(), 1);
470
471        let op = &update.deletes[0];
472
473        assert_eq!(op.clone().into_product_value(), product!(13u64, 3u64));
474        Ok(())
475    }
476
477    #[test]
478    fn test_subscribe() -> ResultTest<()> {
479        let db = TestDB::durable()?;
480
481        let mut tx = begin_mut_tx(&db);
482
483        let (schema, table, data, q) = make_inv(&db, &mut tx, StAccess::Public)?;
484        db.commit_tx(tx)?;
485        assert_eq!(schema.table_type, StTableType::User);
486        assert_eq!(schema.table_access, StAccess::Public);
487
488        let tx = begin_tx(&db);
489        let q_1 = q.clone();
490        check_query(&db, &table, &tx, &q_1, &data)?;
491
492        let q_2 = q
493            .with_select_cmp(OpCmp::Eq, FieldName::new(schema.table_id, 0.into()), scalar(1u64))
494            .unwrap();
495        check_query(&db, &table, &tx, &q_2, &data)?;
496
497        Ok(())
498    }
499
500    #[test]
501    fn test_subscribe_private() -> ResultTest<()> {
502        let db = TestDB::durable()?;
503
504        let mut tx = begin_mut_tx(&db);
505
506        let (schema, table, data, q) = make_inv(&db, &mut tx, StAccess::Private)?;
507        db.commit_tx(tx)?;
508        assert_eq!(schema.table_type, StTableType::User);
509        assert_eq!(schema.table_access, StAccess::Private);
510
511        let row = product!(1u64, "health");
512        let tx = begin_tx(&db);
513        check_query(&db, &table, &tx, &q, &data)?;
514
515        // SELECT * FROM inventory WHERE inventory_id = 1
516        let q_id = QueryExpr::new(&*schema)
517            .with_select_cmp(OpCmp::Eq, FieldName::new(schema.table_id, 0.into()), scalar(1u64))
518            .unwrap();
519
520        let s = singleton_execution_set(q_id, "SELECT * FROM inventory WHERE inventory_id = 1".into())?;
521
522        let data = DatabaseTableUpdate {
523            table_id: schema.table_id,
524            table_name: "inventory".into(),
525            deletes: [].into(),
526            inserts: [row.clone()].into(),
527        };
528
529        let update = DatabaseUpdate {
530            tables: vec![data.clone()],
531        };
532
533        check_query_incr(&db, &tx, &s, &update, 1, &[row])?;
534
535        let q = QueryExpr::new(&*schema);
536
537        let (q, sources) = query_to_mem_table(q, &data);
538        //Try access the private table
539        match run_query(
540            &db,
541            &tx,
542            &q,
543            AuthCtx::new(Identity::__dummy(), Identity::from_byte_array([1u8; 32])),
544            sources,
545        ) {
546            Ok(_) => {
547                panic!("it allows to execute against private table")
548            }
549            Err(err) => {
550                if err.get_auth_error().is_none() {
551                    panic!("fail to report an `auth` violation for private table, it gets {err}")
552                }
553            }
554        }
555
556        Ok(())
557    }
558
559    #[test]
560    fn test_subscribe_sql() -> ResultTest<()> {
561        let db = TestDB::durable()?;
562
563        // Create table [MobileEntityState]
564        let schema = &[
565            ("entity_id", AlgebraicType::U64),
566            ("location_x", AlgebraicType::I32),
567            ("location_z", AlgebraicType::I32),
568            ("destination_x", AlgebraicType::I32),
569            ("destination_z", AlgebraicType::I32),
570            ("is_running", AlgebraicType::Bool),
571            ("timestamp", AlgebraicType::U64),
572            ("dimension", AlgebraicType::U32),
573        ];
574        let indexes = &[0.into(), 1.into(), 2.into()];
575        db.create_table_for_test("MobileEntityState", schema, indexes)?;
576
577        // Create table [EnemyState]
578        let schema = &[
579            ("entity_id", AlgebraicType::U64),
580            ("herd_id", AlgebraicType::I32),
581            ("status", AlgebraicType::I32),
582            ("type", AlgebraicType::I32),
583            ("direction", AlgebraicType::I32),
584        ];
585        let indexes = &[0.into()];
586        db.create_table_for_test("EnemyState", schema, indexes)?;
587
588        for sql_insert in [
589        "insert into MobileEntityState (entity_id, location_x, location_z, destination_x, destination_z, is_running, timestamp, dimension) values (1, 96001, 96001, 96001, 1867045146, false, 17167179743690094247, 3926297397)",
590        "insert into MobileEntityState (entity_id, location_x, location_z, destination_x, destination_z, is_running, timestamp, dimension) values (2, 96001, 191000, 191000, 1560020888, true, 2947537077064292621, 445019304)",
591        "insert into EnemyState (entity_id, herd_id, status, type, direction) values (1, 1181485940, 1633678837, 1158301365, 132191327)",
592        "insert into EnemyState (entity_id, herd_id, status, type, direction) values (2, 2017368418, 194072456, 34423057, 1296770410)"] {
593            run_for_testing(&db, sql_insert)?;
594        }
595
596        let sql_query = "\
597        SELECT EnemyState.* FROM EnemyState \
598        JOIN MobileEntityState ON MobileEntityState.entity_id = EnemyState.entity_id  \
599        WHERE MobileEntityState.location_x > 96000 \
600        AND MobileEntityState.location_x < 192000 \
601        AND MobileEntityState.location_z > 96000 \
602        AND MobileEntityState.location_z < 192000";
603
604        let tx = begin_tx(&db);
605        let qset = compile_read_only_queryset(&db, &AuthCtx::for_testing(), &tx, sql_query)?;
606
607        for q in qset {
608            let result = run_query(
609                &db,
610                &tx,
611                q.as_expr(),
612                AuthCtx::for_testing(),
613                SourceSet::<_, 0>::empty(),
614            )?;
615            assert_eq!(result.len(), 1, "Join query did not return any rows");
616        }
617
618        Ok(())
619    }
620
621    #[test]
622    fn test_subscribe_all() -> ResultTest<()> {
623        let db = TestDB::durable()?;
624
625        let mut tx = begin_mut_tx(&db);
626
627        let (schema_1, _, _, _) = make_inv(&db, &mut tx, StAccess::Public)?;
628        let (schema_2, _, _, _) = make_player(&db, &mut tx)?;
629        db.commit_tx(tx)?;
630        let row_1 = product!(1u64, "health");
631        let row_2 = product!(2u64, "jhon doe");
632        let tx = db.begin_tx(Workload::Subscribe);
633        let s = legacy_get_all(&db, &tx, &AuthCtx::for_testing())?.into();
634        check_query_eval(&db, &tx, &s, 2, &[row_1.clone(), row_2.clone()])?;
635
636        let data1 = DatabaseTableUpdate {
637            table_id: schema_1.table_id,
638            table_name: "inventory".into(),
639            deletes: [row_1].into(),
640            inserts: [].into(),
641        };
642
643        let data2 = DatabaseTableUpdate {
644            table_id: schema_2.table_id,
645            table_name: "player".into(),
646            deletes: [].into(),
647            inserts: [row_2].into(),
648        };
649
650        let update = DatabaseUpdate {
651            tables: vec![data1, data2],
652        };
653
654        let row_1 = product!(1u64, "health");
655        let row_2 = product!(2u64, "jhon doe");
656        check_query_incr(&db, &tx, &s, &update, 2, &[row_1, row_2])?;
657
658        Ok(())
659    }
660
661    #[test]
662    fn test_classify() -> ResultTest<()> {
663        let db = TestDB::durable()?;
664
665        // Create table [plain]
666        let schema = &[("id", AlgebraicType::U64)];
667        db.create_table_for_test("plain", schema, &[])?;
668
669        // Create table [lhs] with indexes on [id] and [x]
670        let schema = &[("id", AlgebraicType::U64), ("x", AlgebraicType::I32)];
671        let indexes = &[ColId(0), ColId(1)];
672        db.create_table_for_test("lhs", schema, indexes)?;
673
674        // Create table [rhs] with indexes on [id] and [y]
675        let schema = &[("id", AlgebraicType::U64), ("y", AlgebraicType::I32)];
676        let indexes = &[ColId(0), ColId(1)];
677        db.create_table_for_test("rhs", schema, indexes)?;
678
679        let tx = begin_tx(&db);
680
681        // All single table queries are supported
682        let scans = [
683            "SELECT * FROM plain",
684            "SELECT * FROM plain WHERE id > 5",
685            "SELECT plain.* FROM plain",
686            "SELECT plain.* FROM plain WHERE plain.id = 5",
687            "SELECT * FROM lhs",
688            "SELECT * FROM lhs WHERE id > 5",
689        ];
690        for scan in scans {
691            let expr = compile_read_only_queryset(&db, &AuthCtx::for_testing(), &tx, scan)?
692                .pop()
693                .unwrap();
694            assert_eq!(expr.kind(), Supported::Select, "{scan}\n{expr:#?}");
695        }
696
697        // Only index semijoins are supported
698        let joins = ["SELECT lhs.* FROM lhs JOIN rhs ON lhs.id = rhs.id WHERE rhs.y < 10"];
699        for join in joins {
700            let expr = compile_read_only_queryset(&db, &AuthCtx::for_testing(), &tx, join)?
701                .pop()
702                .unwrap();
703            assert_eq!(expr.kind(), Supported::Semijoin, "{join}\n{expr:#?}");
704        }
705
706        // All other joins are unsupported
707        let joins = [
708            "SELECT lhs.* FROM lhs JOIN rhs ON lhs.id = rhs.id",
709            "SELECT * FROM lhs JOIN rhs ON lhs.id = rhs.id",
710            "SELECT * FROM lhs JOIN rhs ON lhs.id = rhs.id WHERE lhs.x < 10",
711        ];
712        for join in joins {
713            match compile_read_only_queryset(&db, &AuthCtx::for_testing(), &tx, join) {
714                Err(DBError::Subscription(SubscriptionError::Unsupported(_)) | DBError::TypeError(_)) => (),
715                x => panic!("Unexpected: {x:?}"),
716            }
717        }
718
719        Ok(())
720    }
721
722    /// Create table [lhs] with index on [id]
723    fn create_lhs_table_for_eval_incr(db: &RelationalDB) -> ResultTest<TableId> {
724        const I32: AlgebraicType = AlgebraicType::I32;
725        let lhs_id = db.create_table_for_test("lhs", &[("id", I32), ("x", I32)], &[0.into()])?;
726        with_auto_commit(db, |tx| {
727            for i in 0..5 {
728                let row = product!(i, i + 5);
729                insert(db, tx, lhs_id, &row)?;
730            }
731            Ok(lhs_id)
732        })
733    }
734
735    /// Create table [rhs] with index on [id]
736    fn create_rhs_table_for_eval_incr(db: &RelationalDB) -> ResultTest<TableId> {
737        const I32: AlgebraicType = AlgebraicType::I32;
738        let rhs_id = db.create_table_for_test("rhs", &[("rid", I32), ("id", I32), ("y", I32)], &[1.into()])?;
739        with_auto_commit(db, |tx| {
740            for i in 10..20 {
741                let row = product!(i, i - 10, i - 8);
742                insert(db, tx, rhs_id, &row)?;
743            }
744            Ok(rhs_id)
745        })
746    }
747
748    fn compile_query(db: &RelationalDB) -> ResultTest<SubscriptionPlan> {
749        with_read_only(db, |tx| {
750            let auth = AuthCtx::for_testing();
751            let tx = SchemaViewer::new(tx, &auth);
752            // Should be answered using an index semijion
753            let sql = "select lhs.* from lhs join rhs on lhs.id = rhs.id where rhs.y >= 2 and rhs.y <= 4";
754            Ok(SubscriptionPlan::compile(sql, &tx, &auth)
755                .map(|(mut plans, _)| {
756                    assert_eq!(plans.len(), 1);
757                    plans.pop().unwrap()
758                })
759                .unwrap())
760        })
761    }
762
763    fn run_eval_incr_test<T, F: Fn(&RelationalDB) -> ResultTest<T>>(test_fn: F) -> ResultTest<T> {
764        TestDB::durable().map(|db| test_fn(&db))??;
765        TestDB::durable().map(|db| test_fn(&db.with_row_count(Arc::new(|_, _| 5))))?
766    }
767
768    #[test]
769    /// TODO: This test is a slight modification of [test_eval_incr_for_index_join].
770    /// Essentially the WHERE condition is on different tables.
771    /// Should refactor to reduce duplicate logic between the two tests.
772    fn test_eval_incr_for_left_semijoin() -> ResultTest<()> {
773        fn compile_query(db: &RelationalDB) -> ResultTest<SubscriptionPlan> {
774            with_read_only(db, |tx| {
775                let auth = AuthCtx::for_testing();
776                let tx = SchemaViewer::new(tx, &auth);
777                // Should be answered using an index semijion
778                let sql = "select lhs.* from lhs join rhs on lhs.id = rhs.id where lhs.x >= 5 and lhs.x <= 7";
779                Ok(SubscriptionPlan::compile(sql, &tx, &auth)
780                    .map(|(mut plans, _)| {
781                        assert_eq!(plans.len(), 1);
782                        plans.pop().unwrap()
783                    })
784                    .unwrap())
785            })
786        }
787
788        // Case 1:
789        // Delete a row inside the region of lhs,
790        // Insert a row inside the region of lhs.
791        fn index_join_case_1(db: &RelationalDB) -> ResultTest<()> {
792            let _ = create_lhs_table_for_eval_incr(db)?;
793            let rhs_id = create_rhs_table_for_eval_incr(db)?;
794            let query = compile_query(db)?;
795
796            let r1 = product!(10, 0, 2);
797            let r2 = product!(10, 0, 3);
798
799            let mut metrics = ExecutionMetrics::default();
800
801            let result = eval_incr(db, &mut metrics, &query, vec![(rhs_id, r1, false), (rhs_id, r2, true)])?;
802
803            // No updates to report
804            assert!(result.is_empty());
805            Ok(())
806        }
807
808        // Case 2:
809        // Delete a row outside the region of lhs,
810        // Insert a row outside the region of lhs.
811        fn index_join_case_2(db: &RelationalDB) -> ResultTest<()> {
812            let _ = create_lhs_table_for_eval_incr(db)?;
813            let rhs_id = create_rhs_table_for_eval_incr(db)?;
814            let query = compile_query(db)?;
815
816            let r1 = product!(13, 3, 5);
817            let r2 = product!(13, 4, 6);
818
819            let mut metrics = ExecutionMetrics::default();
820
821            let result = eval_incr(db, &mut metrics, &query, vec![(rhs_id, r1, false), (rhs_id, r2, true)])?;
822
823            // No updates to report
824            assert!(result.is_empty());
825            Ok(())
826        }
827
828        // Case 3:
829        // Delete a row inside  the region of lhs,
830        // Insert a row outside the region of lhs.
831        fn index_join_case_3(db: &RelationalDB) -> ResultTest<()> {
832            let lhs_id = create_lhs_table_for_eval_incr(db)?;
833            let rhs_id = create_rhs_table_for_eval_incr(db)?;
834            let query = compile_query(db)?;
835
836            let r1 = product!(10, 0, 2);
837            let r2 = product!(10, 3, 5);
838
839            let mut metrics = ExecutionMetrics::default();
840
841            let result = eval_incr(db, &mut metrics, &query, vec![(rhs_id, r1, false), (rhs_id, r2, true)])?;
842
843            // A single delete from lhs
844            assert_eq!(result.tables.len(), 1);
845            assert_eq!(result.tables[0], delete_op(lhs_id, "lhs", product!(0, 5)));
846            Ok(())
847        }
848
849        // Case 4:
850        // Delete a row outside the region of lhs,
851        // Insert a row inside  the region of lhs.
852        fn index_join_case_4(db: &RelationalDB) -> ResultTest<()> {
853            let lhs_id = create_lhs_table_for_eval_incr(db)?;
854            let rhs_id = create_rhs_table_for_eval_incr(db)?;
855            let query = compile_query(db)?;
856
857            let r1 = product!(13, 3, 5);
858            let r2 = product!(13, 2, 4);
859
860            let mut metrics = ExecutionMetrics::default();
861
862            let result = eval_incr(db, &mut metrics, &query, vec![(rhs_id, r1, false), (rhs_id, r2, true)])?;
863
864            // A single insert into lhs
865            assert_eq!(result.tables.len(), 1);
866            assert_eq!(result.tables[0], insert_op(lhs_id, "lhs", product!(2, 7)));
867            Ok(())
868        }
869
870        // Case 5:
871        // Insert row into rhs,
872        // Insert matching row inside the region of lhs.
873        fn index_join_case_5(db: &RelationalDB) -> ResultTest<()> {
874            let lhs_id = create_lhs_table_for_eval_incr(db)?;
875            let rhs_id = create_rhs_table_for_eval_incr(db)?;
876            let query = compile_query(db)?;
877
878            let lhs_row = product!(5, 6);
879            let rhs_row = product!(20, 5, 3);
880
881            let mut metrics = ExecutionMetrics::default();
882
883            let result = eval_incr(
884                db,
885                &mut metrics,
886                &query,
887                vec![(lhs_id, lhs_row, true), (rhs_id, rhs_row, true)],
888            )?;
889
890            // A single insert into lhs
891            assert_eq!(result.tables.len(), 1);
892            assert_eq!(result.tables[0], insert_op(lhs_id, "lhs", product!(5, 6)));
893            Ok(())
894        }
895
896        // Case 6:
897        // Insert row into rhs,
898        // Insert matching row outside the region of lhs.
899        fn index_join_case_6(db: &RelationalDB) -> ResultTest<()> {
900            let lhs_id = create_lhs_table_for_eval_incr(db)?;
901            let rhs_id = create_rhs_table_for_eval_incr(db)?;
902            let query = compile_query(db)?;
903
904            let lhs_row = product!(5, 10);
905            let rhs_row = product!(20, 5, 5);
906
907            let mut metrics = ExecutionMetrics::default();
908
909            let result = eval_incr(
910                db,
911                &mut metrics,
912                &query,
913                vec![(lhs_id, lhs_row, true), (rhs_id, rhs_row, true)],
914            )?;
915
916            // No updates to report
917            assert_eq!(result.tables.len(), 0);
918            Ok(())
919        }
920
921        // Case 7:
922        // Delete row from rhs,
923        // Delete matching row inside the region of lhs.
924        fn index_join_case_7(db: &RelationalDB) -> ResultTest<()> {
925            let lhs_id = create_lhs_table_for_eval_incr(db)?;
926            let rhs_id = create_rhs_table_for_eval_incr(db)?;
927            let query = compile_query(db)?;
928
929            let lhs_row = product!(0, 5);
930            let rhs_row = product!(10, 0, 2);
931
932            let mut metrics = ExecutionMetrics::default();
933
934            let result = eval_incr(
935                db,
936                &mut metrics,
937                &query,
938                vec![(lhs_id, lhs_row, false), (rhs_id, rhs_row, false)],
939            )?;
940
941            // A single delete from lhs
942            assert_eq!(result.tables.len(), 1);
943            assert_eq!(result.tables[0], delete_op(lhs_id, "lhs", product!(0, 5)));
944            Ok(())
945        }
946
947        // Case 8:
948        // Delete row from rhs,
949        // Delete matching row outside the region of lhs.
950        fn index_join_case_8(db: &RelationalDB) -> ResultTest<()> {
951            let lhs_id = create_lhs_table_for_eval_incr(db)?;
952            let rhs_id = create_rhs_table_for_eval_incr(db)?;
953            let query = compile_query(db)?;
954
955            let lhs_row = product!(3, 8);
956            let rhs_row = product!(13, 3, 5);
957
958            let mut metrics = ExecutionMetrics::default();
959
960            let result = eval_incr(
961                db,
962                &mut metrics,
963                &query,
964                vec![(lhs_id, lhs_row, false), (rhs_id, rhs_row, false)],
965            )?;
966
967            // No updates to report
968            assert_eq!(result.tables.len(), 0);
969            Ok(())
970        }
971
972        // Case 9:
973        // Update row from rhs,
974        // Update matching row inside the region of lhs.
975        fn index_join_case_9(db: &RelationalDB) -> ResultTest<()> {
976            let lhs_id = create_lhs_table_for_eval_incr(db)?;
977            let rhs_id = create_rhs_table_for_eval_incr(db)?;
978            let query = compile_query(db)?;
979
980            let lhs_old = product!(1, 6);
981            let lhs_new = product!(1, 7);
982            let rhs_old = product!(11, 1, 3);
983            let rhs_new = product!(11, 1, 4);
984
985            let mut metrics = ExecutionMetrics::default();
986
987            let result = eval_incr(
988                db,
989                &mut metrics,
990                &query,
991                vec![
992                    (lhs_id, lhs_old, false),
993                    (rhs_id, rhs_old, false),
994                    (lhs_id, lhs_new, true),
995                    (rhs_id, rhs_new, true),
996                ],
997            )?;
998
999            let lhs_old = product!(1, 6);
1000            let lhs_new = product!(1, 7);
1001
1002            // A delete and an insert into lhs
1003            assert_eq!(result.tables.len(), 1);
1004            assert_eq!(
1005                result.tables[0],
1006                DatabaseTableUpdate {
1007                    table_id: lhs_id,
1008                    table_name: "lhs".into(),
1009                    deletes: [lhs_old].into(),
1010                    inserts: [lhs_new].into(),
1011                },
1012            );
1013            Ok(())
1014        }
1015
1016        run_eval_incr_test(index_join_case_1)?;
1017        run_eval_incr_test(index_join_case_2)?;
1018        run_eval_incr_test(index_join_case_3)?;
1019        run_eval_incr_test(index_join_case_4)?;
1020        run_eval_incr_test(index_join_case_5)?;
1021        run_eval_incr_test(index_join_case_6)?;
1022        run_eval_incr_test(index_join_case_7)?;
1023        run_eval_incr_test(index_join_case_8)?;
1024        run_eval_incr_test(index_join_case_9)?;
1025        Ok(())
1026    }
1027
1028    #[test]
1029    fn test_eval_incr_for_index_join() -> ResultTest<()> {
1030        // Case 1:
1031        // Delete a row inside the region of rhs,
1032        // Insert a row inside the region of rhs.
1033        run_eval_incr_test(index_join_case_1)?;
1034        // Case 2:
1035        // Delete a row outside the region of rhs,
1036        // Insert a row outside the region of rhs.
1037        run_eval_incr_test(index_join_case_2)?;
1038        // Case 3:
1039        // Delete a row inside  the region of rhs,
1040        // Insert a row outside the region of rhs.
1041        run_eval_incr_test(index_join_case_3)?;
1042        // Case 4:
1043        // Delete a row outside the region of rhs,
1044        // Insert a row inside  the region of rhs.
1045        run_eval_incr_test(index_join_case_4)?;
1046        // Case 5:
1047        // Insert row into lhs,
1048        // Insert matching row inside the region of rhs.
1049        run_eval_incr_test(index_join_case_5)?;
1050        // Case 6:
1051        // Insert row into lhs,
1052        // Insert matching row outside the region of rhs.
1053        run_eval_incr_test(index_join_case_6)?;
1054        // Case 7:
1055        // Delete row from lhs,
1056        // Delete matching row inside the region of rhs.
1057        run_eval_incr_test(index_join_case_7)?;
1058        // Case 8:
1059        // Delete row from lhs,
1060        // Delete matching row outside the region of rhs.
1061        run_eval_incr_test(index_join_case_8)?;
1062        // Case 9:
1063        // Update row from lhs,
1064        // Update matching row inside the region of rhs.
1065        run_eval_incr_test(index_join_case_9)?;
1066        Ok(())
1067    }
1068
1069    fn eval_incr(
1070        db: &RelationalDB,
1071        metrics: &mut ExecutionMetrics,
1072        plan: &SubscriptionPlan,
1073        ops: Vec<(TableId, ProductValue, bool)>,
1074    ) -> ResultTest<DatabaseUpdate> {
1075        let mut tx = begin_mut_tx(db);
1076
1077        for (table_id, row, insert) in ops {
1078            if insert {
1079                insert_row(db, &mut tx, table_id, row)?;
1080            } else {
1081                delete_row(db, &mut tx, table_id, row);
1082            }
1083        }
1084
1085        let (data, _, tx) = tx.commit_downgrade(Workload::ForTests);
1086        let table_id = plan.subscribed_table_id();
1087        // This awful construction to convert `Arc<str>` into `Box<str>`.
1088        let table_name = (&**plan.subscribed_table_name()).into();
1089        let tx = DeltaTx::new(&tx, &data, &QueriedTableIndexIds::from_iter(plan.index_ids()));
1090
1091        // IMPORTANT: FOR TESTING ONLY!
1092        //
1093        // This utility implements set semantics for incremental updates.
1094        // This is safe because we are only testing PK/FK joins,
1095        // and we don't have to track row multiplicities for PK/FK joins.
1096        // But in general we must assume bag semantics for server side tests.
1097        let mut eval_delta = || {
1098            // Note, we can't determine apriori what capacity to allocate
1099            let mut inserts = HashMap::new();
1100            let mut deletes = vec![];
1101
1102            plan.for_each_insert(&tx, metrics, &mut |row| {
1103                inserts
1104                    .entry(RelValue::from(row))
1105                    // Row already inserted?
1106                    // Increment its multiplicity.
1107                    .and_modify(|n| *n += 1)
1108                    .or_insert(1);
1109                Ok(())
1110            })
1111            .unwrap();
1112
1113            plan.for_each_delete(&tx, metrics, &mut |row| {
1114                let row = RelValue::from(row);
1115                match inserts.get_mut(&row) {
1116                    // This row was not inserted.
1117                    // Add it to the delete set.
1118                    None => {
1119                        deletes.push(row);
1120                    }
1121                    // This row was inserted.
1122                    // Decrement the multiplicity.
1123                    Some(1) => {
1124                        inserts.remove(&row);
1125                    }
1126                    // This row was inserted.
1127                    // Decrement the multiplicity.
1128                    Some(n) => {
1129                        *n -= 1;
1130                    }
1131                }
1132                Ok(())
1133            })
1134            .unwrap();
1135
1136            UpdatesRelValue {
1137                inserts: inserts.into_keys().collect(),
1138                deletes,
1139            }
1140        };
1141
1142        let updates = eval_delta();
1143
1144        let inserts = updates
1145            .inserts
1146            .into_iter()
1147            .map(RelValue::into_product_value)
1148            .collect::<Arc<_>>();
1149        let deletes = updates
1150            .deletes
1151            .into_iter()
1152            .map(RelValue::into_product_value)
1153            .collect::<Arc<_>>();
1154
1155        let tables = if inserts.is_empty() && deletes.is_empty() {
1156            vec![]
1157        } else {
1158            vec![DatabaseTableUpdate {
1159                table_id,
1160                table_name,
1161                inserts,
1162                deletes,
1163            }]
1164        };
1165        Ok(DatabaseUpdate { tables })
1166    }
1167
1168    // Case 1:
1169    // Delete a row inside the region of rhs,
1170    // Insert a row inside the region of rhs.
1171    fn index_join_case_1(db: &RelationalDB) -> ResultTest<()> {
1172        let _ = create_lhs_table_for_eval_incr(db)?;
1173        let rhs_id = create_rhs_table_for_eval_incr(db)?;
1174        let query = compile_query(db)?;
1175
1176        let r1 = product!(10, 0, 2);
1177        let r2 = product!(10, 0, 3);
1178
1179        let mut metrics = ExecutionMetrics::default();
1180
1181        let result = eval_incr(db, &mut metrics, &query, vec![(rhs_id, r1, false), (rhs_id, r2, true)])?;
1182
1183        // No updates to report
1184        assert!(result.is_empty());
1185
1186        // The lhs row must always probe the rhs index.
1187        // The rhs row passes the rhs filter,
1188        // resulting in a probe of the rhs index.
1189        assert_eq!(metrics.index_seeks, 2);
1190        Ok(())
1191    }
1192
1193    // Case 2:
1194    // Delete a row outside the region of rhs,
1195    // Insert a row outside the region of rhs.
1196    fn index_join_case_2(db: &RelationalDB) -> ResultTest<()> {
1197        let _ = create_lhs_table_for_eval_incr(db)?;
1198        let rhs_id = create_rhs_table_for_eval_incr(db)?;
1199        let query = compile_query(db)?;
1200
1201        let r1 = product!(13, 3, 5);
1202        let r2 = product!(13, 3, 6);
1203
1204        let mut metrics = ExecutionMetrics::default();
1205
1206        let result = eval_incr(db, &mut metrics, &query, vec![(rhs_id, r1, false), (rhs_id, r2, true)])?;
1207
1208        // No updates to report
1209        assert!(result.is_empty());
1210
1211        // The lhs row must always probe the rhs index.
1212        // The rhs row doesn't pass the rhs filter,
1213        // hence it doesn't survive to probe the lhs index.
1214        assert_eq!(metrics.index_seeks, 0);
1215        Ok(())
1216    }
1217
1218    // Case 3:
1219    // Delete a row inside  the region of rhs,
1220    // Insert a row outside the region of rhs.
1221    fn index_join_case_3(db: &RelationalDB) -> ResultTest<()> {
1222        let lhs_id = create_lhs_table_for_eval_incr(db)?;
1223        let rhs_id = create_rhs_table_for_eval_incr(db)?;
1224        let query = compile_query(db)?;
1225
1226        let r1 = product!(10, 0, 2);
1227        let r2 = product!(10, 0, 5);
1228
1229        let mut metrics = ExecutionMetrics::default();
1230
1231        let result = eval_incr(db, &mut metrics, &query, vec![(rhs_id, r1, false), (rhs_id, r2, true)])?;
1232
1233        // A single delete from lhs
1234        assert_eq!(result.tables.len(), 1);
1235        assert_eq!(result.tables[0], delete_op(lhs_id, "lhs", product!(0, 5)));
1236
1237        // One row passes the rhs filter, the other does not.
1238        // This results in a single probe of the lhs index.
1239        assert_eq!(metrics.index_seeks, 1);
1240        Ok(())
1241    }
1242
1243    // Case 4:
1244    // Delete a row outside the region of rhs,
1245    // Insert a row inside  the region of rhs.
1246    fn index_join_case_4(db: &RelationalDB) -> ResultTest<()> {
1247        let lhs_id = create_lhs_table_for_eval_incr(db)?;
1248        let rhs_id = create_rhs_table_for_eval_incr(db)?;
1249        let query = compile_query(db)?;
1250
1251        let r1 = product!(13, 3, 5);
1252        let r2 = product!(13, 3, 4);
1253
1254        let mut metrics = ExecutionMetrics::default();
1255
1256        let result = eval_incr(db, &mut metrics, &query, vec![(rhs_id, r1, false), (rhs_id, r2, true)])?;
1257
1258        // A single insert into lhs
1259        assert_eq!(result.tables.len(), 1);
1260        assert_eq!(result.tables[0], insert_op(lhs_id, "lhs", product!(3, 8)));
1261
1262        // One row passes the rhs filter, the other does not.
1263        // This results in a single probe of the lhs index.
1264        assert_eq!(metrics.index_seeks, 1);
1265        Ok(())
1266    }
1267
1268    // Case 5:
1269    // Insert row into lhs,
1270    // Insert matching row inside the region of rhs.
1271    fn index_join_case_5(db: &RelationalDB) -> ResultTest<()> {
1272        let lhs_id = create_lhs_table_for_eval_incr(db)?;
1273        let rhs_id = create_rhs_table_for_eval_incr(db)?;
1274        let query = compile_query(db)?;
1275
1276        let lhs_row = product!(5, 10);
1277        let rhs_row = product!(20, 5, 3);
1278
1279        let mut metrics = ExecutionMetrics::default();
1280
1281        let result = eval_incr(
1282            db,
1283            &mut metrics,
1284            &query,
1285            vec![(lhs_id, lhs_row, true), (rhs_id, rhs_row, true)],
1286        )?;
1287
1288        // A single insert into lhs
1289        assert_eq!(result.tables.len(), 1);
1290        assert_eq!(result.tables[0], insert_op(lhs_id, "lhs", product!(5, 10)));
1291
1292        // Because we only have inserts, only 3 delta queries are evaluated,
1293        // each one an index join, and each one probing the join index exactly once.
1294        assert_eq!(metrics.index_seeks, 3);
1295        Ok(())
1296    }
1297
1298    // Case 6:
1299    // Insert row into lhs,
1300    // Insert matching row outside the region of rhs.
1301    fn index_join_case_6(db: &RelationalDB) -> ResultTest<()> {
1302        let lhs_id = create_lhs_table_for_eval_incr(db)?;
1303        let rhs_id = create_rhs_table_for_eval_incr(db)?;
1304        let query = compile_query(db)?;
1305
1306        let lhs_row = product!(5, 10);
1307        let rhs_row = product!(20, 5, 5);
1308
1309        let mut metrics = ExecutionMetrics::default();
1310
1311        let result = eval_incr(
1312            db,
1313            &mut metrics,
1314            &query,
1315            vec![(lhs_id, lhs_row, true), (rhs_id, rhs_row, true)],
1316        )?;
1317
1318        // No updates to report
1319        assert_eq!(result.tables.len(), 0);
1320
1321        // Because we only have inserts, only 3 delta queries are evaluated,
1322        // each one an index join, and each one probing the join index at most once.
1323        //
1324        // The lhs row always probes the rhs index,
1325        // but the rhs row doesn't pass the rhs filter,
1326        // hence it doesn't survive to probe the lhs index.
1327        assert_eq!(metrics.index_seeks, 2);
1328        Ok(())
1329    }
1330
1331    // Case 7:
1332    // Delete row from lhs,
1333    // Delete matching row inside the region of rhs.
1334    fn index_join_case_7(db: &RelationalDB) -> ResultTest<()> {
1335        let lhs_id = create_lhs_table_for_eval_incr(db)?;
1336        let rhs_id = create_rhs_table_for_eval_incr(db)?;
1337        let query = compile_query(db)?;
1338
1339        let lhs_row = product!(0, 5);
1340        let rhs_row = product!(10, 0, 2);
1341
1342        let mut metrics = ExecutionMetrics::default();
1343
1344        let result = eval_incr(
1345            db,
1346            &mut metrics,
1347            &query,
1348            vec![(lhs_id, lhs_row, false), (rhs_id, rhs_row, false)],
1349        )?;
1350
1351        // A single delete from lhs
1352        assert_eq!(result.tables.len(), 1);
1353        assert_eq!(result.tables[0], delete_op(lhs_id, "lhs", product!(0, 5)));
1354
1355        // Because we only have inserts, only 3 delta queries are evaluated,
1356        // each one an index join, and each one probing the join index exactly once.
1357        assert_eq!(metrics.index_seeks, 3);
1358        Ok(())
1359    }
1360
1361    // Case 8:
1362    // Delete row from lhs,
1363    // Delete matching row outside the region of rhs.
1364    fn index_join_case_8(db: &RelationalDB) -> ResultTest<()> {
1365        let lhs_id = create_lhs_table_for_eval_incr(db)?;
1366        let rhs_id = create_rhs_table_for_eval_incr(db)?;
1367        let query = compile_query(db)?;
1368
1369        let lhs_row = product!(3, 8);
1370        let rhs_row = product!(13, 3, 5);
1371
1372        let mut metrics = ExecutionMetrics::default();
1373
1374        let result = eval_incr(
1375            db,
1376            &mut metrics,
1377            &query,
1378            vec![(lhs_id, lhs_row, false), (rhs_id, rhs_row, false)],
1379        )?;
1380
1381        // No updates to report
1382        assert_eq!(result.tables.len(), 0);
1383
1384        // Because we only have inserts, only 3 delta queries are evaluated,
1385        // each one an index join, and each one probing the join index at most once.
1386        //
1387        // The lhs row always probes the rhs index,
1388        // but the rhs row doesn't pass the rhs filter,
1389        // hence it doesn't survive to probe the lhs index.
1390        assert_eq!(metrics.index_seeks, 2);
1391        Ok(())
1392    }
1393
1394    // Case 9:
1395    // Update row from lhs,
1396    // Update matching row inside the region of rhs.
1397    fn index_join_case_9(db: &RelationalDB) -> ResultTest<()> {
1398        let lhs_id = create_lhs_table_for_eval_incr(db)?;
1399        let rhs_id = create_rhs_table_for_eval_incr(db)?;
1400        let query = compile_query(db)?;
1401
1402        let lhs_old = product!(1, 6);
1403        let lhs_new = product!(1, 7);
1404        let rhs_old = product!(11, 1, 3);
1405        let rhs_new = product!(11, 1, 4);
1406
1407        let mut metrics = ExecutionMetrics::default();
1408
1409        let result = eval_incr(
1410            db,
1411            &mut metrics,
1412            &query,
1413            vec![
1414                (lhs_id, lhs_old, false),
1415                (rhs_id, rhs_old, false),
1416                (lhs_id, lhs_new, true),
1417                (rhs_id, rhs_new, true),
1418            ],
1419        )?;
1420
1421        let lhs_old = product!(1, 6);
1422        let lhs_new = product!(1, 7);
1423
1424        // A delete and an insert into lhs
1425        assert_eq!(result.tables.len(), 1);
1426        assert_eq!(
1427            result.tables[0],
1428            DatabaseTableUpdate {
1429                table_id: lhs_id,
1430                table_name: "lhs".into(),
1431                deletes: [lhs_old].into(),
1432                inserts: [lhs_new].into(),
1433            },
1434        );
1435
1436        // Because we have deletes and inserts for both tables,
1437        // all 8 delta queries are evaluated,
1438        // each one probing the join index exactly once.
1439        assert_eq!(metrics.index_seeks, 8);
1440        Ok(())
1441    }
1442}