spacetimedb_query/
lib.rs

1use anyhow::{bail, Result};
2use spacetimedb_execution::{
3    dml::{MutDatastore, MutExecutor},
4    pipelined::ProjectListExecutor,
5    Datastore, DeltaStore,
6};
7use spacetimedb_expr::{
8    check::{parse_and_type_sub, SchemaView},
9    expr::ProjectList,
10    rls::{resolve_views_for_sql, resolve_views_for_sub},
11    statement::{parse_and_type_sql, Statement, DML},
12};
13use spacetimedb_lib::{identity::AuthCtx, metrics::ExecutionMetrics, ProductValue};
14use spacetimedb_physical_plan::{
15    compile::{compile_dml_plan, compile_select, compile_select_list},
16    plan::{ProjectListPlan, ProjectPlan},
17};
18use spacetimedb_primitives::TableId;
19
20/// DIRTY HACK ALERT: Maximum allowed length, in UTF-8 bytes, of SQL queries.
21/// Any query longer than this will be rejected.
22/// This prevents a stack overflow when compiling queries with deeply-nested `AND` and `OR` conditions.
23const MAX_SQL_LENGTH: usize = 50_000;
24
25pub fn compile_subscription(
26    sql: &str,
27    tx: &impl SchemaView,
28    auth: &AuthCtx,
29) -> Result<(Vec<ProjectPlan>, TableId, Box<str>, bool)> {
30    if sql.len() > MAX_SQL_LENGTH {
31        bail!("SQL query exceeds maximum allowed length: \"{sql:.120}...\"")
32    }
33
34    let (plan, mut has_param) = parse_and_type_sub(sql, tx, auth)?;
35
36    let Some(return_id) = plan.return_table_id() else {
37        bail!("Failed to determine TableId for query")
38    };
39
40    let Some(return_name) = tx.schema_for_table(return_id).map(|schema| schema.table_name.clone()) else {
41        bail!("TableId `{return_id}` does not exist")
42    };
43
44    // Resolve any RLS filters
45    let plan_fragments = resolve_views_for_sub(tx, plan, auth, &mut has_param)?
46        .into_iter()
47        .map(compile_select)
48        .collect();
49
50    Ok((plan_fragments, return_id, return_name, has_param))
51}
52
53/// A utility for parsing and type checking a sql statement
54pub fn compile_sql_stmt(sql: &str, tx: &impl SchemaView, auth: &AuthCtx) -> Result<Statement> {
55    if sql.len() > MAX_SQL_LENGTH {
56        bail!("SQL query exceeds maximum allowed length: \"{sql:.120}...\"")
57    }
58
59    match parse_and_type_sql(sql, tx, auth)? {
60        stmt @ Statement::DML(_) => Ok(stmt),
61        Statement::Select(expr) => Ok(Statement::Select(resolve_views_for_sql(tx, expr, auth)?)),
62    }
63}
64
65/// A utility for executing a sql select statement
66pub fn execute_select_stmt<Tx: Datastore + DeltaStore>(
67    stmt: ProjectList,
68    tx: &Tx,
69    metrics: &mut ExecutionMetrics,
70    check_row_limit: impl Fn(ProjectListPlan) -> Result<ProjectListPlan>,
71) -> Result<Vec<ProductValue>> {
72    let plan = compile_select_list(stmt).optimize()?;
73    let plan = check_row_limit(plan)?;
74    let plan = ProjectListExecutor::from(plan);
75    let mut rows = vec![];
76    plan.execute(tx, metrics, &mut |row| {
77        rows.push(row);
78        Ok(())
79    })?;
80    Ok(rows)
81}
82
83/// A utility for executing a sql dml statement
84pub fn execute_dml_stmt<Tx: MutDatastore>(stmt: DML, tx: &mut Tx, metrics: &mut ExecutionMetrics) -> Result<()> {
85    let plan = compile_dml_plan(stmt).optimize()?;
86    let plan = MutExecutor::from(plan);
87    plan.execute(tx, metrics)
88}