Skip to main content

citadel_sql/executor/
mod.rs

1//! SQL executor: DDL and DML operations.
2
3mod aggregate;
4pub(crate) mod ann_persist;
5mod ann_topk;
6pub(crate) mod compile;
7mod correlated;
8mod cte;
9mod ddl;
10mod dml;
11mod explain;
12pub(crate) mod helpers;
13mod join;
14pub(crate) mod matviews;
15mod result_cache;
16mod scan;
17mod select;
18pub(crate) mod triggers;
19mod view;
20mod window;
21pub(crate) mod write;
22pub use ann_persist::AnnSegmentInfo;
23pub use ann_topk::AnnIndexSource;
24pub(crate) use ann_topk::{ann_appends_safe, ann_dml_gen_key};
25pub(crate) use ann_topk::{ann_cache_status, persist_ann_index};
26pub use compile::{compile, CompiledPlan};
27use cte::*;
28use ddl::*;
29pub use dml::exec_insert_in_txn;
30use dml::*;
31use explain::*;
32use join::*;
33use scan::*;
34use select::*;
35use view::*;
36use window::*;
37use write::*;
38
39use citadel::Database;
40use citadel_txn::read_txn::ReadTxn;
41use rustc_hash::FxHashMap;
42
43use crate::error::{Result, SqlError};
44use crate::parser::*;
45use crate::schema::SchemaManager;
46use crate::types::*;
47
48type CteContext = FxHashMap<String, QueryResult>;
49type ScanTableFn<'a> = &'a mut dyn FnMut(&str) -> Result<(TableSchema, Vec<Vec<Value>>)>;
50
51pub fn execute(
52    db: &Database,
53    schema: &mut SchemaManager,
54    stmt: &Statement,
55    params: &[Value],
56) -> Result<ExecutionResult> {
57    match stmt {
58        Statement::CreateTable(ct) => exec_create_table(db, schema, ct),
59        Statement::DropTable(dt) => exec_drop_table(db, schema, dt),
60        Statement::CreateIndex(ci) => exec_create_index(db, schema, ci),
61        Statement::DropIndex(di) => exec_drop_index(db, schema, di),
62        Statement::CreateView(cv) => exec_create_view(db, schema, cv),
63        Statement::DropView(dv) => exec_drop_view(db, schema, dv),
64        Statement::AlterTable(at) => exec_alter_table(db, schema, at),
65        Statement::Insert(ins) => exec_insert(db, schema, ins, params),
66        Statement::Select(sq) => exec_select_query(db, schema, sq),
67        Statement::Update(upd) => exec_update(db, schema, upd),
68        Statement::Delete(del) => exec_delete(db, schema, del),
69        Statement::Truncate(t) => exec_truncate(db, schema, t),
70        Statement::Explain(inner) => explain(schema, inner),
71        Statement::CreateTrigger(ct) => triggers::exec_create_trigger(db, schema, ct),
72        Statement::DropTrigger(dt) => triggers::exec_drop_trigger(db, schema, dt),
73        Statement::CreateMaterializedView(mv) => matviews::exec_create_matview(db, schema, mv),
74        Statement::RefreshMaterializedView(rmv) => matviews::exec_refresh_matview(db, schema, rmv),
75        Statement::DropMaterializedView(dmv) => matviews::exec_drop_matview(db, schema, dmv),
76        Statement::Begin { .. }
77        | Statement::Commit
78        | Statement::Rollback
79        | Statement::Savepoint(_)
80        | Statement::ReleaseSavepoint(_)
81        | Statement::RollbackTo(_)
82        | Statement::SetTimezone(_) => Err(SqlError::Unsupported(
83            "transaction / session control handled by Connection".into(),
84        )),
85    }
86}
87
88pub fn execute_with_read(
89    rtx: &mut citadel_txn::read_txn::ReadTxn<'_>,
90    schema: &SchemaManager,
91    stmt: &Statement,
92    _params: &[Value],
93) -> Result<ExecutionResult> {
94    match stmt {
95        Statement::Select(sq) => cte::exec_select_query_with_read(rtx, schema, sq),
96        Statement::Explain(inner) => explain(schema, inner),
97        Statement::CreateTable(_)
98        | Statement::DropTable(_)
99        | Statement::CreateIndex(_)
100        | Statement::DropIndex(_)
101        | Statement::CreateView(_)
102        | Statement::DropView(_)
103        | Statement::AlterTable(_)
104        | Statement::Insert(_)
105        | Statement::Update(_)
106        | Statement::Delete(_)
107        | Statement::Truncate(_)
108        | Statement::CreateTrigger(_)
109        | Statement::DropTrigger(_)
110        | Statement::CreateMaterializedView(_)
111        | Statement::RefreshMaterializedView(_)
112        | Statement::DropMaterializedView(_) => Err(SqlError::Unsupported(
113            "cannot execute mutating statement inside a read-only transaction".into(),
114        )),
115        Statement::Begin { .. }
116        | Statement::Commit
117        | Statement::Rollback
118        | Statement::Savepoint(_)
119        | Statement::ReleaseSavepoint(_)
120        | Statement::RollbackTo(_)
121        | Statement::SetTimezone(_) => Err(SqlError::Unsupported(
122            "transaction / session control handled by Connection".into(),
123        )),
124    }
125}
126
127/// Execute a parsed SQL statement within an existing write transaction.
128pub fn execute_in_txn(
129    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
130    schema: &mut SchemaManager,
131    stmt: &Statement,
132    params: &[Value],
133) -> Result<ExecutionResult> {
134    match stmt {
135        Statement::CreateTable(ct) => exec_create_table_in_txn(wtx, schema, ct),
136        Statement::DropTable(dt) => exec_drop_table_in_txn(wtx, schema, dt),
137        Statement::CreateIndex(ci) => exec_create_index_in_txn(wtx, schema, ci),
138        Statement::DropIndex(di) => exec_drop_index_in_txn(wtx, schema, di),
139        Statement::CreateView(cv) => exec_create_view_in_txn(wtx, schema, cv),
140        Statement::DropView(dv) => exec_drop_view_in_txn(wtx, schema, dv),
141        Statement::AlterTable(at) => exec_alter_table_in_txn(wtx, schema, at),
142        Statement::Insert(ins) => exec_insert_in_txn(wtx, schema, ins, params),
143        Statement::Select(sq) => exec_select_query_in_txn(wtx, schema, sq),
144        Statement::Update(upd) => exec_update_in_txn(wtx, schema, upd),
145        Statement::Delete(del) => exec_delete_in_txn(wtx, schema, del),
146        Statement::Truncate(t) => exec_truncate_in_txn(wtx, schema, t),
147        Statement::Explain(inner) => explain(schema, inner),
148        Statement::CreateTrigger(ct) => triggers::exec_create_trigger_in_txn(wtx, schema, ct),
149        Statement::DropTrigger(dt) => triggers::exec_drop_trigger_in_txn(wtx, schema, dt),
150        Statement::CreateMaterializedView(mv) => {
151            matviews::exec_create_matview_in_txn(wtx, schema, mv)
152        }
153        Statement::RefreshMaterializedView(rmv) => {
154            matviews::exec_refresh_matview_in_txn(wtx, schema, rmv)
155        }
156        Statement::DropMaterializedView(dmv) => {
157            matviews::exec_drop_matview_in_txn(wtx, schema, dmv)
158        }
159        Statement::Begin { .. }
160        | Statement::Commit
161        | Statement::Rollback
162        | Statement::Savepoint(_)
163        | Statement::ReleaseSavepoint(_)
164        | Statement::RollbackTo(_)
165        | Statement::SetTimezone(_) => {
166            Err(SqlError::Unsupported("nested transaction control".into()))
167        }
168    }
169}
170
171pub(super) fn scan_table_with_read(
172    rtx: &mut ReadTxn<'_>,
173    schema: &SchemaManager,
174    name: &str,
175) -> Result<(TableSchema, Vec<Vec<Value>>)> {
176    let table_schema = schema
177        .get(name)
178        .ok_or_else(|| SqlError::TableNotFound(name.to_string()))?;
179    let (rows, _) = collect_rows_with_read(rtx, table_schema, &None, None)?;
180    Ok((table_schema.clone(), rows))
181}
182
183pub(super) fn scan_table_with_read_or_view(
184    rtx: &mut ReadTxn<'_>,
185    schema: &SchemaManager,
186    name: &str,
187) -> Result<(TableSchema, Vec<Vec<Value>>)> {
188    if let Some(ts) = schema.get(name) {
189        let (rows, _) = collect_rows_with_read(rtx, ts, &None, None)?;
190        return Ok((ts.clone(), rows));
191    }
192    if let Some(vd) = schema.get_view(name) {
193        let qr = exec_view_with_read(rtx, schema, vd)?;
194        let vs = build_view_schema(name, &qr);
195        return Ok((vs, qr.rows));
196    }
197    if let Some(vt) = schema.get_virtual(name) {
198        let qr = vt.scan(schema)?;
199        let vs = build_view_schema(name, &qr);
200        return Ok((vs, qr.rows));
201    }
202    Err(SqlError::TableNotFound(name.to_string()))
203}
204
205pub(super) fn scan_table_write(
206    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
207    schema: &SchemaManager,
208    name: &str,
209) -> Result<(TableSchema, Vec<Vec<Value>>)> {
210    let table_schema = schema
211        .get(name)
212        .ok_or_else(|| SqlError::TableNotFound(name.to_string()))?;
213    let (rows, _) = collect_rows_write(wtx, table_schema, &None, None)?;
214    Ok((table_schema.clone(), rows))
215}
216
217pub(super) fn scan_table_write_or_view(
218    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
219    schema: &SchemaManager,
220    name: &str,
221) -> Result<(TableSchema, Vec<Vec<Value>>)> {
222    if let Some(ts) = schema.get(name) {
223        let (rows, _) = collect_rows_write(wtx, ts, &None, None)?;
224        return Ok((ts.clone(), rows));
225    }
226    if let Some(vd) = schema.get_view(name) {
227        let qr = exec_view_write(wtx, schema, vd)?;
228        let vs = build_view_schema(name, &qr);
229        return Ok((vs, qr.rows));
230    }
231    Err(SqlError::TableNotFound(name.to_string()))
232}
233
234pub(super) fn resolve_table_or_cte(
235    name: &str,
236    ctes: &CteContext,
237    scan_table: ScanTableFn<'_>,
238) -> Result<(TableSchema, Vec<Vec<Value>>)> {
239    let lower = name.to_ascii_lowercase();
240    if let Some(cte_result) = ctes.get(&lower) {
241        let schema = build_cte_schema(&lower, cte_result);
242        Ok((schema, cte_result.rows.clone()))
243    } else {
244        scan_table(&lower)
245    }
246}
247
248pub(super) fn exec_select_join_with_ctes(
249    stmt: &SelectStmt,
250    ctes: &CteContext,
251    scan_table: ScanTableFn<'_>,
252) -> Result<ExecutionResult> {
253    let (from_schema, from_rows) = resolve_table_or_cte(&stmt.from, ctes, scan_table)?;
254    let from_alias = table_alias_or_name(&stmt.from, &stmt.from_alias);
255
256    let mut tables: Vec<(String, TableSchema)> = vec![(from_alias.clone(), from_schema)];
257    let mut join_rows: Vec<Vec<Vec<Value>>> = Vec::new();
258
259    for join in &stmt.joins {
260        let jname = &join.table.name;
261        let (js, jrows) = resolve_table_or_cte(jname, ctes, scan_table)?;
262        let jalias = table_alias_or_name(jname, &join.table.alias);
263        tables.push((jalias, js));
264        join_rows.push(jrows);
265    }
266
267    let mut outer_rows = from_rows;
268    let mut cur_tables: Vec<(String, &TableSchema)> = vec![(from_alias.clone(), &tables[0].1)];
269
270    for (ji, join) in stmt.joins.iter().enumerate() {
271        let inner_schema = &tables[ji + 1].1;
272        let inner_alias = &tables[ji + 1].0;
273        let inner_rows = &mut join_rows[ji];
274
275        let mut preview_tables = cur_tables.clone();
276        preview_tables.push((inner_alias.clone(), inner_schema));
277        let combined_cols = build_joined_columns(&preview_tables);
278
279        let outer_col_count = if outer_rows.is_empty() {
280            cur_tables.iter().map(|(_, s)| s.columns.len()).sum()
281        } else {
282            outer_rows[0].len()
283        };
284        let inner_col_count = inner_schema.columns.len();
285
286        let (equi_pairs, is_pure_equi) =
287            compute_equi_join_meta(join, &combined_cols, outer_col_count);
288        outer_rows = exec_join_step(
289            outer_rows,
290            inner_rows,
291            join,
292            &combined_cols,
293            outer_col_count,
294            inner_col_count,
295            None,
296            None,
297            &equi_pairs,
298            is_pure_equi,
299        );
300        cur_tables.push((inner_alias.clone(), inner_schema));
301    }
302
303    let joined_cols = build_joined_columns(&cur_tables);
304    process_select(&joined_cols, outer_rows, stmt, false)
305}