1use anyhow::{bail, Result};
2use spacetimedb_execution::{
3 dml::{MutDatastore, MutExecutor},
4 pipelined::ProjectListExecutor,
5 Datastore, DeltaStore,
6};
7use spacetimedb_expr::{
8 check::{parse_and_type_sub, SchemaView},
9 expr::ProjectList,
10 rls::{resolve_views_for_sql, resolve_views_for_sub},
11 statement::{parse_and_type_sql, Statement, DML},
12};
13use spacetimedb_lib::{identity::AuthCtx, metrics::ExecutionMetrics, ProductValue};
14use spacetimedb_physical_plan::{
15 compile::{compile_dml_plan, compile_select, compile_select_list},
16 plan::{ProjectListPlan, ProjectPlan},
17};
18use spacetimedb_primitives::TableId;
19
20const MAX_SQL_LENGTH: usize = 50_000;
24
25pub fn compile_subscription(
26 sql: &str,
27 tx: &impl SchemaView,
28 auth: &AuthCtx,
29) -> Result<(Vec<ProjectPlan>, TableId, Box<str>, bool)> {
30 if sql.len() > MAX_SQL_LENGTH {
31 bail!("SQL query exceeds maximum allowed length: \"{sql:.120}...\"")
32 }
33
34 let (plan, mut has_param) = parse_and_type_sub(sql, tx, auth)?;
35
36 let Some(return_id) = plan.return_table_id() else {
37 bail!("Failed to determine TableId for query")
38 };
39
40 let Some(return_name) = tx.schema_for_table(return_id).map(|schema| schema.table_name.clone()) else {
41 bail!("TableId `{return_id}` does not exist")
42 };
43
44 let plan_fragments = resolve_views_for_sub(tx, plan, auth, &mut has_param)?
46 .into_iter()
47 .map(compile_select)
48 .collect();
49
50 Ok((plan_fragments, return_id, return_name, has_param))
51}
52
53pub fn compile_sql_stmt(sql: &str, tx: &impl SchemaView, auth: &AuthCtx) -> Result<Statement> {
55 if sql.len() > MAX_SQL_LENGTH {
56 bail!("SQL query exceeds maximum allowed length: \"{sql:.120}...\"")
57 }
58
59 match parse_and_type_sql(sql, tx, auth)? {
60 stmt @ Statement::DML(_) => Ok(stmt),
61 Statement::Select(expr) => Ok(Statement::Select(resolve_views_for_sql(tx, expr, auth)?)),
62 }
63}
64
65pub fn execute_select_stmt<Tx: Datastore + DeltaStore>(
67 stmt: ProjectList,
68 tx: &Tx,
69 metrics: &mut ExecutionMetrics,
70 check_row_limit: impl Fn(ProjectListPlan) -> Result<ProjectListPlan>,
71) -> Result<Vec<ProductValue>> {
72 let plan = compile_select_list(stmt).optimize()?;
73 let plan = check_row_limit(plan)?;
74 let plan = ProjectListExecutor::from(plan);
75 let mut rows = vec![];
76 plan.execute(tx, metrics, &mut |row| {
77 rows.push(row);
78 Ok(())
79 })?;
80 Ok(rows)
81}
82
83pub fn execute_dml_stmt<Tx: MutDatastore>(stmt: DML, tx: &mut Tx, metrics: &mut ExecutionMetrics) -> Result<()> {
85 let plan = compile_dml_plan(stmt).optimize()?;
86 let plan = MutExecutor::from(plan);
87 plan.execute(tx, metrics)
88}