1use anyhow::{bail, Result};
2use spacetimedb_execution::{
3 dml::{MutDatastore, MutExecutor},
4 pipelined::ProjectListExecutor,
5 Datastore, DeltaStore,
6};
7use spacetimedb_expr::{
8 check::{parse_and_type_sub, SchemaView},
9 expr::ProjectList,
10 statement::{parse_and_type_sql, Statement, DML},
11};
12use spacetimedb_lib::{metrics::ExecutionMetrics, ProductValue};
13use spacetimedb_physical_plan::{
14 compile::{compile_dml_plan, compile_select, compile_select_list},
15 plan::{ProjectListPlan, ProjectPlan},
16};
17use spacetimedb_primitives::TableId;
18
19const MAX_SQL_LENGTH: usize = 50_000;
23
24pub fn compile_subscription(sql: &str, tx: &impl SchemaView) -> Result<(ProjectPlan, TableId, Box<str>)> {
25 if sql.len() > MAX_SQL_LENGTH {
26 bail!("SQL query exceeds maximum allowed length: \"{sql:.120}...\"")
27 }
28
29 let plan = parse_and_type_sub(sql, tx)?;
30
31 let Some(return_id) = plan.return_table_id() else {
32 bail!("Failed to determine TableId for query")
33 };
34
35 let Some(return_name) = tx.schema_for_table(return_id).map(|schema| schema.table_name.clone()) else {
36 bail!("TableId `{return_id}` does not exist")
37 };
38
39 let plan = compile_select(plan);
40
41 Ok((plan, return_id, return_name))
42}
43
44pub fn compile_sql_stmt(sql: &str, tx: &impl SchemaView) -> Result<Statement> {
46 if sql.len() > MAX_SQL_LENGTH {
47 bail!("SQL query exceeds maximum allowed length: \"{sql:.120}...\"")
48 }
49 Ok(parse_and_type_sql(sql, tx)?)
50}
51
52pub fn execute_select_stmt<Tx: Datastore + DeltaStore>(
54 stmt: ProjectList,
55 tx: &Tx,
56 metrics: &mut ExecutionMetrics,
57 check_row_limit: impl Fn(ProjectListPlan) -> Result<ProjectListPlan>,
58) -> Result<Vec<ProductValue>> {
59 let plan = compile_select_list(stmt).optimize()?;
60 let plan = check_row_limit(plan)?;
61 let plan = ProjectListExecutor::from(plan);
62 let mut rows = vec![];
63 plan.execute(tx, metrics, &mut |row| {
64 rows.push(row);
65 Ok(())
66 })?;
67 Ok(rows)
68}
69
70pub fn execute_dml_stmt<Tx: MutDatastore>(stmt: DML, tx: &mut Tx, metrics: &mut ExecutionMetrics) -> Result<()> {
72 let plan = compile_dml_plan(stmt).optimize()?;
73 let plan = MutExecutor::from(plan);
74 plan.execute(tx, metrics)
75}