Skip to main content

citadel_sql/executor/
mod.rs

1//! SQL executor: DDL and DML operations.
2
3mod aggregate;
4mod ann_topk;
5pub(crate) mod compile;
6mod correlated;
7mod cte;
8mod ddl;
9mod dml;
10mod explain;
11pub(crate) mod helpers;
12mod join;
13pub(crate) mod matviews;
14mod scan;
15mod select;
16pub(crate) mod triggers;
17mod view;
18mod window;
19pub(crate) mod write;
20pub use compile::{compile, CompiledPlan};
21use cte::*;
22use ddl::*;
23pub use dml::exec_insert_in_txn;
24use dml::*;
25use explain::*;
26use join::*;
27use scan::*;
28use select::*;
29use view::*;
30use window::*;
31use write::*;
32
33use citadel::Database;
34use citadel_txn::read_txn::ReadTxn;
35use rustc_hash::FxHashMap;
36
37use crate::error::{Result, SqlError};
38use crate::parser::*;
39use crate::schema::SchemaManager;
40use crate::types::*;
41
42type CteContext = FxHashMap<String, QueryResult>;
43type ScanTableFn<'a> = &'a mut dyn FnMut(&str) -> Result<(TableSchema, Vec<Vec<Value>>)>;
44
45pub fn execute(
46    db: &Database,
47    schema: &mut SchemaManager,
48    stmt: &Statement,
49    params: &[Value],
50) -> Result<ExecutionResult> {
51    match stmt {
52        Statement::CreateTable(ct) => exec_create_table(db, schema, ct),
53        Statement::DropTable(dt) => exec_drop_table(db, schema, dt),
54        Statement::CreateIndex(ci) => exec_create_index(db, schema, ci),
55        Statement::DropIndex(di) => exec_drop_index(db, schema, di),
56        Statement::CreateView(cv) => exec_create_view(db, schema, cv),
57        Statement::DropView(dv) => exec_drop_view(db, schema, dv),
58        Statement::AlterTable(at) => exec_alter_table(db, schema, at),
59        Statement::Insert(ins) => exec_insert(db, schema, ins, params),
60        Statement::Select(sq) => exec_select_query(db, schema, sq),
61        Statement::Update(upd) => exec_update(db, schema, upd),
62        Statement::Delete(del) => exec_delete(db, schema, del),
63        Statement::Truncate(t) => exec_truncate(db, schema, t),
64        Statement::Explain(inner) => explain(schema, inner),
65        Statement::CreateTrigger(ct) => triggers::exec_create_trigger(db, schema, ct),
66        Statement::DropTrigger(dt) => triggers::exec_drop_trigger(db, schema, dt),
67        Statement::CreateMaterializedView(mv) => matviews::exec_create_matview(db, schema, mv),
68        Statement::RefreshMaterializedView(rmv) => matviews::exec_refresh_matview(db, schema, rmv),
69        Statement::DropMaterializedView(dmv) => matviews::exec_drop_matview(db, schema, dmv),
70        Statement::Begin { .. }
71        | Statement::Commit
72        | Statement::Rollback
73        | Statement::Savepoint(_)
74        | Statement::ReleaseSavepoint(_)
75        | Statement::RollbackTo(_)
76        | Statement::SetTimezone(_) => Err(SqlError::Unsupported(
77            "transaction / session control handled by Connection".into(),
78        )),
79    }
80}
81
82pub fn execute_with_read(
83    rtx: &mut citadel_txn::read_txn::ReadTxn<'_>,
84    schema: &SchemaManager,
85    stmt: &Statement,
86    _params: &[Value],
87) -> Result<ExecutionResult> {
88    match stmt {
89        Statement::Select(sq) => cte::exec_select_query_with_read(rtx, schema, sq),
90        Statement::Explain(inner) => explain(schema, inner),
91        Statement::CreateTable(_)
92        | Statement::DropTable(_)
93        | Statement::CreateIndex(_)
94        | Statement::DropIndex(_)
95        | Statement::CreateView(_)
96        | Statement::DropView(_)
97        | Statement::AlterTable(_)
98        | Statement::Insert(_)
99        | Statement::Update(_)
100        | Statement::Delete(_)
101        | Statement::Truncate(_)
102        | Statement::CreateTrigger(_)
103        | Statement::DropTrigger(_)
104        | Statement::CreateMaterializedView(_)
105        | Statement::RefreshMaterializedView(_)
106        | Statement::DropMaterializedView(_) => Err(SqlError::Unsupported(
107            "cannot execute mutating statement inside a read-only transaction".into(),
108        )),
109        Statement::Begin { .. }
110        | Statement::Commit
111        | Statement::Rollback
112        | Statement::Savepoint(_)
113        | Statement::ReleaseSavepoint(_)
114        | Statement::RollbackTo(_)
115        | Statement::SetTimezone(_) => Err(SqlError::Unsupported(
116            "transaction / session control handled by Connection".into(),
117        )),
118    }
119}
120
121/// Execute a parsed SQL statement within an existing write transaction.
122pub fn execute_in_txn(
123    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
124    schema: &mut SchemaManager,
125    stmt: &Statement,
126    params: &[Value],
127) -> Result<ExecutionResult> {
128    match stmt {
129        Statement::CreateTable(ct) => exec_create_table_in_txn(wtx, schema, ct),
130        Statement::DropTable(dt) => exec_drop_table_in_txn(wtx, schema, dt),
131        Statement::CreateIndex(ci) => exec_create_index_in_txn(wtx, schema, ci),
132        Statement::DropIndex(di) => exec_drop_index_in_txn(wtx, schema, di),
133        Statement::CreateView(cv) => exec_create_view_in_txn(wtx, schema, cv),
134        Statement::DropView(dv) => exec_drop_view_in_txn(wtx, schema, dv),
135        Statement::AlterTable(at) => exec_alter_table_in_txn(wtx, schema, at),
136        Statement::Insert(ins) => exec_insert_in_txn(wtx, schema, ins, params),
137        Statement::Select(sq) => exec_select_query_in_txn(wtx, schema, sq),
138        Statement::Update(upd) => exec_update_in_txn(wtx, schema, upd),
139        Statement::Delete(del) => exec_delete_in_txn(wtx, schema, del),
140        Statement::Truncate(t) => exec_truncate_in_txn(wtx, schema, t),
141        Statement::Explain(inner) => explain(schema, inner),
142        Statement::CreateTrigger(ct) => triggers::exec_create_trigger_in_txn(wtx, schema, ct),
143        Statement::DropTrigger(dt) => triggers::exec_drop_trigger_in_txn(wtx, schema, dt),
144        Statement::CreateMaterializedView(mv) => {
145            matviews::exec_create_matview_in_txn(wtx, schema, mv)
146        }
147        Statement::RefreshMaterializedView(rmv) => {
148            matviews::exec_refresh_matview_in_txn(wtx, schema, rmv)
149        }
150        Statement::DropMaterializedView(dmv) => {
151            matviews::exec_drop_matview_in_txn(wtx, schema, dmv)
152        }
153        Statement::Begin { .. }
154        | Statement::Commit
155        | Statement::Rollback
156        | Statement::Savepoint(_)
157        | Statement::ReleaseSavepoint(_)
158        | Statement::RollbackTo(_)
159        | Statement::SetTimezone(_) => {
160            Err(SqlError::Unsupported("nested transaction control".into()))
161        }
162    }
163}
164
165pub(super) fn scan_table_with_read(
166    rtx: &mut ReadTxn<'_>,
167    schema: &SchemaManager,
168    name: &str,
169) -> Result<(TableSchema, Vec<Vec<Value>>)> {
170    let table_schema = schema
171        .get(name)
172        .ok_or_else(|| SqlError::TableNotFound(name.to_string()))?;
173    let (rows, _) = collect_rows_with_read(rtx, table_schema, &None, None)?;
174    Ok((table_schema.clone(), rows))
175}
176
177pub(super) fn scan_table_with_read_or_view(
178    rtx: &mut ReadTxn<'_>,
179    schema: &SchemaManager,
180    name: &str,
181) -> Result<(TableSchema, Vec<Vec<Value>>)> {
182    if let Some(ts) = schema.get(name) {
183        let (rows, _) = collect_rows_with_read(rtx, ts, &None, None)?;
184        return Ok((ts.clone(), rows));
185    }
186    if let Some(vd) = schema.get_view(name) {
187        let qr = exec_view_with_read(rtx, schema, vd)?;
188        let vs = build_view_schema(name, &qr);
189        return Ok((vs, qr.rows));
190    }
191    if let Some(vt) = schema.get_virtual(name) {
192        let qr = vt.scan(schema)?;
193        let vs = build_view_schema(name, &qr);
194        return Ok((vs, qr.rows));
195    }
196    Err(SqlError::TableNotFound(name.to_string()))
197}
198
199pub(super) fn scan_table_write(
200    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
201    schema: &SchemaManager,
202    name: &str,
203) -> Result<(TableSchema, Vec<Vec<Value>>)> {
204    let table_schema = schema
205        .get(name)
206        .ok_or_else(|| SqlError::TableNotFound(name.to_string()))?;
207    let (rows, _) = collect_rows_write(wtx, table_schema, &None, None)?;
208    Ok((table_schema.clone(), rows))
209}
210
211pub(super) fn scan_table_write_or_view(
212    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
213    schema: &SchemaManager,
214    name: &str,
215) -> Result<(TableSchema, Vec<Vec<Value>>)> {
216    if let Some(ts) = schema.get(name) {
217        let (rows, _) = collect_rows_write(wtx, ts, &None, None)?;
218        return Ok((ts.clone(), rows));
219    }
220    if let Some(vd) = schema.get_view(name) {
221        let qr = exec_view_write(wtx, schema, vd)?;
222        let vs = build_view_schema(name, &qr);
223        return Ok((vs, qr.rows));
224    }
225    Err(SqlError::TableNotFound(name.to_string()))
226}
227
228pub(super) fn resolve_table_or_cte(
229    name: &str,
230    ctes: &CteContext,
231    scan_table: ScanTableFn<'_>,
232) -> Result<(TableSchema, Vec<Vec<Value>>)> {
233    let lower = name.to_ascii_lowercase();
234    if let Some(cte_result) = ctes.get(&lower) {
235        let schema = build_cte_schema(&lower, cte_result);
236        Ok((schema, cte_result.rows.clone()))
237    } else {
238        scan_table(&lower)
239    }
240}
241
242pub(super) fn exec_select_join_with_ctes(
243    stmt: &SelectStmt,
244    ctes: &CteContext,
245    scan_table: ScanTableFn<'_>,
246) -> Result<ExecutionResult> {
247    let (from_schema, from_rows) = resolve_table_or_cte(&stmt.from, ctes, scan_table)?;
248    let from_alias = table_alias_or_name(&stmt.from, &stmt.from_alias);
249
250    let mut tables: Vec<(String, TableSchema)> = vec![(from_alias.clone(), from_schema)];
251    let mut join_rows: Vec<Vec<Vec<Value>>> = Vec::new();
252
253    for join in &stmt.joins {
254        let jname = &join.table.name;
255        let (js, jrows) = resolve_table_or_cte(jname, ctes, scan_table)?;
256        let jalias = table_alias_or_name(jname, &join.table.alias);
257        tables.push((jalias, js));
258        join_rows.push(jrows);
259    }
260
261    let mut outer_rows = from_rows;
262    let mut cur_tables: Vec<(String, &TableSchema)> = vec![(from_alias.clone(), &tables[0].1)];
263
264    for (ji, join) in stmt.joins.iter().enumerate() {
265        let inner_schema = &tables[ji + 1].1;
266        let inner_alias = &tables[ji + 1].0;
267        let inner_rows = &mut join_rows[ji];
268
269        let mut preview_tables = cur_tables.clone();
270        preview_tables.push((inner_alias.clone(), inner_schema));
271        let combined_cols = build_joined_columns(&preview_tables);
272
273        let outer_col_count = if outer_rows.is_empty() {
274            cur_tables.iter().map(|(_, s)| s.columns.len()).sum()
275        } else {
276            outer_rows[0].len()
277        };
278        let inner_col_count = inner_schema.columns.len();
279
280        let (equi_pairs, is_pure_equi) =
281            compute_equi_join_meta(join, &combined_cols, outer_col_count);
282        outer_rows = exec_join_step(
283            outer_rows,
284            inner_rows,
285            join,
286            &combined_cols,
287            outer_col_count,
288            inner_col_count,
289            None,
290            None,
291            &equi_pairs,
292            is_pure_equi,
293        );
294        cur_tables.push((inner_alias.clone(), inner_schema));
295    }
296
297    let joined_cols = build_joined_columns(&cur_tables);
298    process_select(&joined_cols, outer_rows, stmt, false)
299}