Skip to main content

citadel_sql/executor/
mod.rs

1//! SQL executor: DDL and DML operations.
2
3mod aggregate;
4pub(crate) mod compile;
5mod correlated;
6mod cte;
7mod ddl;
8mod dml;
9mod explain;
10pub(crate) mod helpers;
11mod join;
12pub(crate) mod matviews;
13mod scan;
14mod select;
15pub(crate) mod triggers;
16mod view;
17mod window;
18pub(crate) mod write;
19pub use compile::{compile, CompiledPlan};
20use cte::*;
21use ddl::*;
22pub use dml::exec_insert_in_txn;
23use dml::*;
24use explain::*;
25use join::*;
26use scan::*;
27use select::*;
28use view::*;
29use window::*;
30use write::*;
31
32use citadel::Database;
33use citadel_txn::read_txn::ReadTxn;
34use rustc_hash::FxHashMap;
35
36use crate::error::{Result, SqlError};
37use crate::parser::*;
38use crate::schema::SchemaManager;
39use crate::types::*;
40
41type CteContext = FxHashMap<String, QueryResult>;
42type ScanTableFn<'a> = &'a mut dyn FnMut(&str) -> Result<(TableSchema, Vec<Vec<Value>>)>;
43
44pub fn execute(
45    db: &Database,
46    schema: &mut SchemaManager,
47    stmt: &Statement,
48    params: &[Value],
49) -> Result<ExecutionResult> {
50    match stmt {
51        Statement::CreateTable(ct) => exec_create_table(db, schema, ct),
52        Statement::DropTable(dt) => exec_drop_table(db, schema, dt),
53        Statement::CreateIndex(ci) => exec_create_index(db, schema, ci),
54        Statement::DropIndex(di) => exec_drop_index(db, schema, di),
55        Statement::CreateView(cv) => exec_create_view(db, schema, cv),
56        Statement::DropView(dv) => exec_drop_view(db, schema, dv),
57        Statement::AlterTable(at) => exec_alter_table(db, schema, at),
58        Statement::Insert(ins) => exec_insert(db, schema, ins, params),
59        Statement::Select(sq) => exec_select_query(db, schema, sq),
60        Statement::Update(upd) => exec_update(db, schema, upd),
61        Statement::Delete(del) => exec_delete(db, schema, del),
62        Statement::Truncate(t) => exec_truncate(db, schema, t),
63        Statement::Explain(inner) => explain(schema, inner),
64        Statement::CreateTrigger(ct) => triggers::exec_create_trigger(db, schema, ct),
65        Statement::DropTrigger(dt) => triggers::exec_drop_trigger(db, schema, dt),
66        Statement::CreateMaterializedView(mv) => matviews::exec_create_matview(db, schema, mv),
67        Statement::RefreshMaterializedView(rmv) => matviews::exec_refresh_matview(db, schema, rmv),
68        Statement::DropMaterializedView(dmv) => matviews::exec_drop_matview(db, schema, dmv),
69        Statement::Begin { .. }
70        | Statement::Commit
71        | Statement::Rollback
72        | Statement::Savepoint(_)
73        | Statement::ReleaseSavepoint(_)
74        | Statement::RollbackTo(_)
75        | Statement::SetTimezone(_) => Err(SqlError::Unsupported(
76            "transaction / session control handled by Connection".into(),
77        )),
78    }
79}
80
81pub fn execute_with_read(
82    rtx: &mut citadel_txn::read_txn::ReadTxn<'_>,
83    schema: &SchemaManager,
84    stmt: &Statement,
85    _params: &[Value],
86) -> Result<ExecutionResult> {
87    match stmt {
88        Statement::Select(sq) => cte::exec_select_query_with_read(rtx, schema, sq),
89        Statement::Explain(inner) => explain(schema, inner),
90        Statement::CreateTable(_)
91        | Statement::DropTable(_)
92        | Statement::CreateIndex(_)
93        | Statement::DropIndex(_)
94        | Statement::CreateView(_)
95        | Statement::DropView(_)
96        | Statement::AlterTable(_)
97        | Statement::Insert(_)
98        | Statement::Update(_)
99        | Statement::Delete(_)
100        | Statement::Truncate(_)
101        | Statement::CreateTrigger(_)
102        | Statement::DropTrigger(_)
103        | Statement::CreateMaterializedView(_)
104        | Statement::RefreshMaterializedView(_)
105        | Statement::DropMaterializedView(_) => Err(SqlError::Unsupported(
106            "cannot execute mutating statement inside a read-only transaction".into(),
107        )),
108        Statement::Begin { .. }
109        | Statement::Commit
110        | Statement::Rollback
111        | Statement::Savepoint(_)
112        | Statement::ReleaseSavepoint(_)
113        | Statement::RollbackTo(_)
114        | Statement::SetTimezone(_) => Err(SqlError::Unsupported(
115            "transaction / session control handled by Connection".into(),
116        )),
117    }
118}
119
120/// Execute a parsed SQL statement within an existing write transaction.
121pub fn execute_in_txn(
122    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
123    schema: &mut SchemaManager,
124    stmt: &Statement,
125    params: &[Value],
126) -> Result<ExecutionResult> {
127    match stmt {
128        Statement::CreateTable(ct) => exec_create_table_in_txn(wtx, schema, ct),
129        Statement::DropTable(dt) => exec_drop_table_in_txn(wtx, schema, dt),
130        Statement::CreateIndex(ci) => exec_create_index_in_txn(wtx, schema, ci),
131        Statement::DropIndex(di) => exec_drop_index_in_txn(wtx, schema, di),
132        Statement::CreateView(cv) => exec_create_view_in_txn(wtx, schema, cv),
133        Statement::DropView(dv) => exec_drop_view_in_txn(wtx, schema, dv),
134        Statement::AlterTable(at) => exec_alter_table_in_txn(wtx, schema, at),
135        Statement::Insert(ins) => exec_insert_in_txn(wtx, schema, ins, params),
136        Statement::Select(sq) => exec_select_query_in_txn(wtx, schema, sq),
137        Statement::Update(upd) => exec_update_in_txn(wtx, schema, upd),
138        Statement::Delete(del) => exec_delete_in_txn(wtx, schema, del),
139        Statement::Truncate(t) => exec_truncate_in_txn(wtx, schema, t),
140        Statement::Explain(inner) => explain(schema, inner),
141        Statement::CreateTrigger(ct) => triggers::exec_create_trigger_in_txn(wtx, schema, ct),
142        Statement::DropTrigger(dt) => triggers::exec_drop_trigger_in_txn(wtx, schema, dt),
143        Statement::CreateMaterializedView(mv) => {
144            matviews::exec_create_matview_in_txn(wtx, schema, mv)
145        }
146        Statement::RefreshMaterializedView(rmv) => {
147            matviews::exec_refresh_matview_in_txn(wtx, schema, rmv)
148        }
149        Statement::DropMaterializedView(dmv) => {
150            matviews::exec_drop_matview_in_txn(wtx, schema, dmv)
151        }
152        Statement::Begin { .. }
153        | Statement::Commit
154        | Statement::Rollback
155        | Statement::Savepoint(_)
156        | Statement::ReleaseSavepoint(_)
157        | Statement::RollbackTo(_)
158        | Statement::SetTimezone(_) => {
159            Err(SqlError::Unsupported("nested transaction control".into()))
160        }
161    }
162}
163
164pub(super) fn scan_table_with_read(
165    rtx: &mut ReadTxn<'_>,
166    schema: &SchemaManager,
167    name: &str,
168) -> Result<(TableSchema, Vec<Vec<Value>>)> {
169    let table_schema = schema
170        .get(name)
171        .ok_or_else(|| SqlError::TableNotFound(name.to_string()))?;
172    let (rows, _) = collect_rows_with_read(rtx, table_schema, &None, None)?;
173    Ok((table_schema.clone(), rows))
174}
175
176pub(super) fn scan_table_with_read_or_view(
177    rtx: &mut ReadTxn<'_>,
178    schema: &SchemaManager,
179    name: &str,
180) -> Result<(TableSchema, Vec<Vec<Value>>)> {
181    if let Some(ts) = schema.get(name) {
182        let (rows, _) = collect_rows_with_read(rtx, ts, &None, None)?;
183        return Ok((ts.clone(), rows));
184    }
185    if let Some(vd) = schema.get_view(name) {
186        let qr = exec_view_with_read(rtx, schema, vd)?;
187        let vs = build_view_schema(name, &qr);
188        return Ok((vs, qr.rows));
189    }
190    if let Some(vt) = schema.get_virtual(name) {
191        let qr = vt.scan(schema)?;
192        let vs = build_view_schema(name, &qr);
193        return Ok((vs, qr.rows));
194    }
195    Err(SqlError::TableNotFound(name.to_string()))
196}
197
198pub(super) fn scan_table_write(
199    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
200    schema: &SchemaManager,
201    name: &str,
202) -> Result<(TableSchema, Vec<Vec<Value>>)> {
203    let table_schema = schema
204        .get(name)
205        .ok_or_else(|| SqlError::TableNotFound(name.to_string()))?;
206    let (rows, _) = collect_rows_write(wtx, table_schema, &None, None)?;
207    Ok((table_schema.clone(), rows))
208}
209
210pub(super) fn scan_table_write_or_view(
211    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
212    schema: &SchemaManager,
213    name: &str,
214) -> Result<(TableSchema, Vec<Vec<Value>>)> {
215    if let Some(ts) = schema.get(name) {
216        let (rows, _) = collect_rows_write(wtx, ts, &None, None)?;
217        return Ok((ts.clone(), rows));
218    }
219    if let Some(vd) = schema.get_view(name) {
220        let qr = exec_view_write(wtx, schema, vd)?;
221        let vs = build_view_schema(name, &qr);
222        return Ok((vs, qr.rows));
223    }
224    Err(SqlError::TableNotFound(name.to_string()))
225}
226
227pub(super) fn resolve_table_or_cte(
228    name: &str,
229    ctes: &CteContext,
230    scan_table: ScanTableFn<'_>,
231) -> Result<(TableSchema, Vec<Vec<Value>>)> {
232    let lower = name.to_ascii_lowercase();
233    if let Some(cte_result) = ctes.get(&lower) {
234        let schema = build_cte_schema(&lower, cte_result);
235        Ok((schema, cte_result.rows.clone()))
236    } else {
237        scan_table(&lower)
238    }
239}
240
241pub(super) fn exec_select_join_with_ctes(
242    stmt: &SelectStmt,
243    ctes: &CteContext,
244    scan_table: ScanTableFn<'_>,
245) -> Result<ExecutionResult> {
246    let (from_schema, from_rows) = resolve_table_or_cte(&stmt.from, ctes, scan_table)?;
247    let from_alias = table_alias_or_name(&stmt.from, &stmt.from_alias);
248
249    let mut tables: Vec<(String, TableSchema)> = vec![(from_alias.clone(), from_schema)];
250    let mut join_rows: Vec<Vec<Vec<Value>>> = Vec::new();
251
252    for join in &stmt.joins {
253        let jname = &join.table.name;
254        let (js, jrows) = resolve_table_or_cte(jname, ctes, scan_table)?;
255        let jalias = table_alias_or_name(jname, &join.table.alias);
256        tables.push((jalias, js));
257        join_rows.push(jrows);
258    }
259
260    let mut outer_rows = from_rows;
261    let mut cur_tables: Vec<(String, &TableSchema)> = vec![(from_alias.clone(), &tables[0].1)];
262
263    for (ji, join) in stmt.joins.iter().enumerate() {
264        let inner_schema = &tables[ji + 1].1;
265        let inner_alias = &tables[ji + 1].0;
266        let inner_rows = &mut join_rows[ji];
267
268        let mut preview_tables = cur_tables.clone();
269        preview_tables.push((inner_alias.clone(), inner_schema));
270        let combined_cols = build_joined_columns(&preview_tables);
271
272        let outer_col_count = if outer_rows.is_empty() {
273            cur_tables.iter().map(|(_, s)| s.columns.len()).sum()
274        } else {
275            outer_rows[0].len()
276        };
277        let inner_col_count = inner_schema.columns.len();
278
279        let (equi_pairs, is_pure_equi) =
280            compute_equi_join_meta(join, &combined_cols, outer_col_count);
281        outer_rows = exec_join_step(
282            outer_rows,
283            inner_rows,
284            join,
285            &combined_cols,
286            outer_col_count,
287            inner_col_count,
288            None,
289            None,
290            &equi_pairs,
291            is_pure_equi,
292        );
293        cur_tables.push((inner_alias.clone(), inner_schema));
294    }
295
296    let joined_cols = build_joined_columns(&cur_tables);
297    process_select(&joined_cols, outer_rows, stmt, false)
298}