Skip to main content

citadel_sql/executor/
mod.rs

1//! SQL executor: DDL and DML operations.
2
3mod aggregate;
4pub(crate) mod compile;
5mod correlated;
6mod cte;
7mod ddl;
8mod dml;
9mod explain;
10pub(crate) mod helpers;
11mod join;
12pub(crate) mod matviews;
13mod scan;
14mod select;
15pub(crate) mod triggers;
16mod view;
17mod window;
18pub(crate) mod write;
19pub use compile::{compile, CompiledPlan};
20use cte::*;
21use ddl::*;
22pub use dml::exec_insert_in_txn;
23use dml::*;
24use explain::*;
25use join::*;
26use scan::*;
27use select::*;
28use view::*;
29use window::*;
30use write::*;
31
32use citadel::Database;
33use rustc_hash::FxHashMap;
34
35use crate::error::{Result, SqlError};
36use crate::parser::*;
37use crate::schema::SchemaManager;
38use crate::types::*;
39
40type CteContext = FxHashMap<String, QueryResult>;
41type ScanTableFn<'a> = &'a mut dyn FnMut(&str) -> Result<(TableSchema, Vec<Vec<Value>>)>;
42
43pub fn execute(
44    db: &Database,
45    schema: &mut SchemaManager,
46    stmt: &Statement,
47    params: &[Value],
48) -> Result<ExecutionResult> {
49    match stmt {
50        Statement::CreateTable(ct) => exec_create_table(db, schema, ct),
51        Statement::DropTable(dt) => exec_drop_table(db, schema, dt),
52        Statement::CreateIndex(ci) => exec_create_index(db, schema, ci),
53        Statement::DropIndex(di) => exec_drop_index(db, schema, di),
54        Statement::CreateView(cv) => exec_create_view(db, schema, cv),
55        Statement::DropView(dv) => exec_drop_view(db, schema, dv),
56        Statement::AlterTable(at) => exec_alter_table(db, schema, at),
57        Statement::Insert(ins) => exec_insert(db, schema, ins, params),
58        Statement::Select(sq) => exec_select_query(db, schema, sq),
59        Statement::Update(upd) => exec_update(db, schema, upd),
60        Statement::Delete(del) => exec_delete(db, schema, del),
61        Statement::Truncate(t) => exec_truncate(db, schema, t),
62        Statement::Explain(inner) => explain(schema, inner),
63        Statement::CreateTrigger(ct) => triggers::exec_create_trigger(db, schema, ct),
64        Statement::DropTrigger(dt) => triggers::exec_drop_trigger(db, schema, dt),
65        Statement::CreateMaterializedView(mv) => matviews::exec_create_matview(db, schema, mv),
66        Statement::RefreshMaterializedView(rmv) => matviews::exec_refresh_matview(db, schema, rmv),
67        Statement::DropMaterializedView(dmv) => matviews::exec_drop_matview(db, schema, dmv),
68        Statement::Begin { .. }
69        | Statement::Commit
70        | Statement::Rollback
71        | Statement::Savepoint(_)
72        | Statement::ReleaseSavepoint(_)
73        | Statement::RollbackTo(_)
74        | Statement::SetTimezone(_) => Err(SqlError::Unsupported(
75            "transaction / session control handled by Connection".into(),
76        )),
77    }
78}
79
80/// Execute a parsed SQL statement within an existing write transaction.
81pub fn execute_in_txn(
82    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
83    schema: &mut SchemaManager,
84    stmt: &Statement,
85    params: &[Value],
86) -> Result<ExecutionResult> {
87    match stmt {
88        Statement::CreateTable(ct) => exec_create_table_in_txn(wtx, schema, ct),
89        Statement::DropTable(dt) => exec_drop_table_in_txn(wtx, schema, dt),
90        Statement::CreateIndex(ci) => exec_create_index_in_txn(wtx, schema, ci),
91        Statement::DropIndex(di) => exec_drop_index_in_txn(wtx, schema, di),
92        Statement::CreateView(cv) => exec_create_view_in_txn(wtx, schema, cv),
93        Statement::DropView(dv) => exec_drop_view_in_txn(wtx, schema, dv),
94        Statement::AlterTable(at) => exec_alter_table_in_txn(wtx, schema, at),
95        Statement::Insert(ins) => exec_insert_in_txn(wtx, schema, ins, params),
96        Statement::Select(sq) => exec_select_query_in_txn(wtx, schema, sq),
97        Statement::Update(upd) => exec_update_in_txn(wtx, schema, upd),
98        Statement::Delete(del) => exec_delete_in_txn(wtx, schema, del),
99        Statement::Truncate(t) => exec_truncate_in_txn(wtx, schema, t),
100        Statement::Explain(inner) => explain(schema, inner),
101        Statement::CreateTrigger(ct) => triggers::exec_create_trigger_in_txn(wtx, schema, ct),
102        Statement::DropTrigger(dt) => triggers::exec_drop_trigger_in_txn(wtx, schema, dt),
103        Statement::CreateMaterializedView(mv) => {
104            matviews::exec_create_matview_in_txn(wtx, schema, mv)
105        }
106        Statement::RefreshMaterializedView(rmv) => {
107            matviews::exec_refresh_matview_in_txn(wtx, schema, rmv)
108        }
109        Statement::DropMaterializedView(dmv) => {
110            matviews::exec_drop_matview_in_txn(wtx, schema, dmv)
111        }
112        Statement::Begin { .. }
113        | Statement::Commit
114        | Statement::Rollback
115        | Statement::Savepoint(_)
116        | Statement::ReleaseSavepoint(_)
117        | Statement::RollbackTo(_)
118        | Statement::SetTimezone(_) => {
119            Err(SqlError::Unsupported("nested transaction control".into()))
120        }
121    }
122}
123
124pub(super) fn scan_table_read(
125    db: &Database,
126    schema: &SchemaManager,
127    name: &str,
128) -> Result<(TableSchema, Vec<Vec<Value>>)> {
129    let table_schema = schema
130        .get(name)
131        .ok_or_else(|| SqlError::TableNotFound(name.to_string()))?;
132    let (rows, _) = collect_rows_read(db, table_schema, &None, None)?;
133    Ok((table_schema.clone(), rows))
134}
135
136pub(super) fn scan_table_read_or_view(
137    db: &Database,
138    schema: &SchemaManager,
139    name: &str,
140) -> Result<(TableSchema, Vec<Vec<Value>>)> {
141    if let Some(ts) = schema.get(name) {
142        let (rows, _) = collect_rows_read(db, ts, &None, None)?;
143        return Ok((ts.clone(), rows));
144    }
145    if let Some(vd) = schema.get_view(name) {
146        let qr = exec_view_read(db, schema, vd)?;
147        let vs = build_view_schema(name, &qr);
148        return Ok((vs, qr.rows));
149    }
150    if let Some(vt) = schema.get_virtual(name) {
151        let qr = vt.scan(db, schema)?;
152        let vs = build_view_schema(name, &qr);
153        return Ok((vs, qr.rows));
154    }
155    Err(SqlError::TableNotFound(name.to_string()))
156}
157
158pub(super) fn scan_table_write(
159    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
160    schema: &SchemaManager,
161    name: &str,
162) -> Result<(TableSchema, Vec<Vec<Value>>)> {
163    let table_schema = schema
164        .get(name)
165        .ok_or_else(|| SqlError::TableNotFound(name.to_string()))?;
166    let (rows, _) = collect_rows_write(wtx, table_schema, &None, None)?;
167    Ok((table_schema.clone(), rows))
168}
169
170pub(super) fn scan_table_write_or_view(
171    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
172    schema: &SchemaManager,
173    name: &str,
174) -> Result<(TableSchema, Vec<Vec<Value>>)> {
175    if let Some(ts) = schema.get(name) {
176        let (rows, _) = collect_rows_write(wtx, ts, &None, None)?;
177        return Ok((ts.clone(), rows));
178    }
179    if let Some(vd) = schema.get_view(name) {
180        let qr = exec_view_write(wtx, schema, vd)?;
181        let vs = build_view_schema(name, &qr);
182        return Ok((vs, qr.rows));
183    }
184    Err(SqlError::TableNotFound(name.to_string()))
185}
186
187pub(super) fn resolve_table_or_cte(
188    name: &str,
189    ctes: &CteContext,
190    scan_table: ScanTableFn<'_>,
191) -> Result<(TableSchema, Vec<Vec<Value>>)> {
192    let lower = name.to_ascii_lowercase();
193    if let Some(cte_result) = ctes.get(&lower) {
194        let schema = build_cte_schema(&lower, cte_result);
195        Ok((schema, cte_result.rows.clone()))
196    } else {
197        scan_table(&lower)
198    }
199}
200
201pub(super) fn exec_select_join_with_ctes(
202    stmt: &SelectStmt,
203    ctes: &CteContext,
204    scan_table: ScanTableFn<'_>,
205) -> Result<ExecutionResult> {
206    let (from_schema, from_rows) = resolve_table_or_cte(&stmt.from, ctes, scan_table)?;
207    let from_alias = table_alias_or_name(&stmt.from, &stmt.from_alias);
208
209    let mut tables: Vec<(String, TableSchema)> = vec![(from_alias.clone(), from_schema)];
210    let mut join_rows: Vec<Vec<Vec<Value>>> = Vec::new();
211
212    for join in &stmt.joins {
213        let jname = &join.table.name;
214        let (js, jrows) = resolve_table_or_cte(jname, ctes, scan_table)?;
215        let jalias = table_alias_or_name(jname, &join.table.alias);
216        tables.push((jalias, js));
217        join_rows.push(jrows);
218    }
219
220    let mut outer_rows = from_rows;
221    let mut cur_tables: Vec<(String, &TableSchema)> = vec![(from_alias.clone(), &tables[0].1)];
222
223    for (ji, join) in stmt.joins.iter().enumerate() {
224        let inner_schema = &tables[ji + 1].1;
225        let inner_alias = &tables[ji + 1].0;
226        let inner_rows = &mut join_rows[ji];
227
228        let mut preview_tables = cur_tables.clone();
229        preview_tables.push((inner_alias.clone(), inner_schema));
230        let combined_cols = build_joined_columns(&preview_tables);
231
232        let outer_col_count = if outer_rows.is_empty() {
233            cur_tables.iter().map(|(_, s)| s.columns.len()).sum()
234        } else {
235            outer_rows[0].len()
236        };
237        let inner_col_count = inner_schema.columns.len();
238
239        let (equi_pairs, is_pure_equi) =
240            compute_equi_join_meta(join, &combined_cols, outer_col_count);
241        outer_rows = exec_join_step(
242            outer_rows,
243            inner_rows,
244            join,
245            &combined_cols,
246            outer_col_count,
247            inner_col_count,
248            None,
249            None,
250            &equi_pairs,
251            is_pure_equi,
252        );
253        cur_tables.push((inner_alias.clone(), inner_schema));
254    }
255
256    let joined_cols = build_joined_columns(&cur_tables);
257    process_select(&joined_cols, outer_rows, stmt, false)
258}