Skip to main content

citadel_sql/executor/
mod.rs

1//! SQL executor: DDL and DML operations.
2
3mod aggregate;
4pub(crate) mod ann_persist;
5mod ann_topk;
6pub(crate) mod compile;
7mod correlated;
8mod cte;
9mod ddl;
10mod dml;
11mod explain;
12pub(crate) mod helpers;
13mod join;
14pub(crate) mod matviews;
15mod scan;
16mod select;
17pub(crate) mod triggers;
18mod view;
19mod window;
20pub(crate) mod write;
21pub use ann_persist::AnnSegmentInfo;
22pub(crate) use ann_topk::ann_dml_gen_key;
23pub use ann_topk::AnnIndexSource;
24pub(crate) use ann_topk::{ann_cache_status, persist_ann_index};
25pub use compile::{compile, CompiledPlan};
26use cte::*;
27use ddl::*;
28pub use dml::exec_insert_in_txn;
29use dml::*;
30use explain::*;
31use join::*;
32use scan::*;
33use select::*;
34use view::*;
35use window::*;
36use write::*;
37
38use citadel::Database;
39use citadel_txn::read_txn::ReadTxn;
40use rustc_hash::FxHashMap;
41
42use crate::error::{Result, SqlError};
43use crate::parser::*;
44use crate::schema::SchemaManager;
45use crate::types::*;
46
47type CteContext = FxHashMap<String, QueryResult>;
48type ScanTableFn<'a> = &'a mut dyn FnMut(&str) -> Result<(TableSchema, Vec<Vec<Value>>)>;
49
50pub fn execute(
51    db: &Database,
52    schema: &mut SchemaManager,
53    stmt: &Statement,
54    params: &[Value],
55) -> Result<ExecutionResult> {
56    match stmt {
57        Statement::CreateTable(ct) => exec_create_table(db, schema, ct),
58        Statement::DropTable(dt) => exec_drop_table(db, schema, dt),
59        Statement::CreateIndex(ci) => exec_create_index(db, schema, ci),
60        Statement::DropIndex(di) => exec_drop_index(db, schema, di),
61        Statement::CreateView(cv) => exec_create_view(db, schema, cv),
62        Statement::DropView(dv) => exec_drop_view(db, schema, dv),
63        Statement::AlterTable(at) => exec_alter_table(db, schema, at),
64        Statement::Insert(ins) => exec_insert(db, schema, ins, params),
65        Statement::Select(sq) => exec_select_query(db, schema, sq),
66        Statement::Update(upd) => exec_update(db, schema, upd),
67        Statement::Delete(del) => exec_delete(db, schema, del),
68        Statement::Truncate(t) => exec_truncate(db, schema, t),
69        Statement::Explain(inner) => explain(schema, inner),
70        Statement::CreateTrigger(ct) => triggers::exec_create_trigger(db, schema, ct),
71        Statement::DropTrigger(dt) => triggers::exec_drop_trigger(db, schema, dt),
72        Statement::CreateMaterializedView(mv) => matviews::exec_create_matview(db, schema, mv),
73        Statement::RefreshMaterializedView(rmv) => matviews::exec_refresh_matview(db, schema, rmv),
74        Statement::DropMaterializedView(dmv) => matviews::exec_drop_matview(db, schema, dmv),
75        Statement::Begin { .. }
76        | Statement::Commit
77        | Statement::Rollback
78        | Statement::Savepoint(_)
79        | Statement::ReleaseSavepoint(_)
80        | Statement::RollbackTo(_)
81        | Statement::SetTimezone(_) => Err(SqlError::Unsupported(
82            "transaction / session control handled by Connection".into(),
83        )),
84    }
85}
86
87pub fn execute_with_read(
88    rtx: &mut citadel_txn::read_txn::ReadTxn<'_>,
89    schema: &SchemaManager,
90    stmt: &Statement,
91    _params: &[Value],
92) -> Result<ExecutionResult> {
93    match stmt {
94        Statement::Select(sq) => cte::exec_select_query_with_read(rtx, schema, sq),
95        Statement::Explain(inner) => explain(schema, inner),
96        Statement::CreateTable(_)
97        | Statement::DropTable(_)
98        | Statement::CreateIndex(_)
99        | Statement::DropIndex(_)
100        | Statement::CreateView(_)
101        | Statement::DropView(_)
102        | Statement::AlterTable(_)
103        | Statement::Insert(_)
104        | Statement::Update(_)
105        | Statement::Delete(_)
106        | Statement::Truncate(_)
107        | Statement::CreateTrigger(_)
108        | Statement::DropTrigger(_)
109        | Statement::CreateMaterializedView(_)
110        | Statement::RefreshMaterializedView(_)
111        | Statement::DropMaterializedView(_) => Err(SqlError::Unsupported(
112            "cannot execute mutating statement inside a read-only transaction".into(),
113        )),
114        Statement::Begin { .. }
115        | Statement::Commit
116        | Statement::Rollback
117        | Statement::Savepoint(_)
118        | Statement::ReleaseSavepoint(_)
119        | Statement::RollbackTo(_)
120        | Statement::SetTimezone(_) => Err(SqlError::Unsupported(
121            "transaction / session control handled by Connection".into(),
122        )),
123    }
124}
125
126/// Execute a parsed SQL statement within an existing write transaction.
127pub fn execute_in_txn(
128    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
129    schema: &mut SchemaManager,
130    stmt: &Statement,
131    params: &[Value],
132) -> Result<ExecutionResult> {
133    match stmt {
134        Statement::CreateTable(ct) => exec_create_table_in_txn(wtx, schema, ct),
135        Statement::DropTable(dt) => exec_drop_table_in_txn(wtx, schema, dt),
136        Statement::CreateIndex(ci) => exec_create_index_in_txn(wtx, schema, ci),
137        Statement::DropIndex(di) => exec_drop_index_in_txn(wtx, schema, di),
138        Statement::CreateView(cv) => exec_create_view_in_txn(wtx, schema, cv),
139        Statement::DropView(dv) => exec_drop_view_in_txn(wtx, schema, dv),
140        Statement::AlterTable(at) => exec_alter_table_in_txn(wtx, schema, at),
141        Statement::Insert(ins) => exec_insert_in_txn(wtx, schema, ins, params),
142        Statement::Select(sq) => exec_select_query_in_txn(wtx, schema, sq),
143        Statement::Update(upd) => exec_update_in_txn(wtx, schema, upd),
144        Statement::Delete(del) => exec_delete_in_txn(wtx, schema, del),
145        Statement::Truncate(t) => exec_truncate_in_txn(wtx, schema, t),
146        Statement::Explain(inner) => explain(schema, inner),
147        Statement::CreateTrigger(ct) => triggers::exec_create_trigger_in_txn(wtx, schema, ct),
148        Statement::DropTrigger(dt) => triggers::exec_drop_trigger_in_txn(wtx, schema, dt),
149        Statement::CreateMaterializedView(mv) => {
150            matviews::exec_create_matview_in_txn(wtx, schema, mv)
151        }
152        Statement::RefreshMaterializedView(rmv) => {
153            matviews::exec_refresh_matview_in_txn(wtx, schema, rmv)
154        }
155        Statement::DropMaterializedView(dmv) => {
156            matviews::exec_drop_matview_in_txn(wtx, schema, dmv)
157        }
158        Statement::Begin { .. }
159        | Statement::Commit
160        | Statement::Rollback
161        | Statement::Savepoint(_)
162        | Statement::ReleaseSavepoint(_)
163        | Statement::RollbackTo(_)
164        | Statement::SetTimezone(_) => {
165            Err(SqlError::Unsupported("nested transaction control".into()))
166        }
167    }
168}
169
170pub(super) fn scan_table_with_read(
171    rtx: &mut ReadTxn<'_>,
172    schema: &SchemaManager,
173    name: &str,
174) -> Result<(TableSchema, Vec<Vec<Value>>)> {
175    let table_schema = schema
176        .get(name)
177        .ok_or_else(|| SqlError::TableNotFound(name.to_string()))?;
178    let (rows, _) = collect_rows_with_read(rtx, table_schema, &None, None)?;
179    Ok((table_schema.clone(), rows))
180}
181
182pub(super) fn scan_table_with_read_or_view(
183    rtx: &mut ReadTxn<'_>,
184    schema: &SchemaManager,
185    name: &str,
186) -> Result<(TableSchema, Vec<Vec<Value>>)> {
187    if let Some(ts) = schema.get(name) {
188        let (rows, _) = collect_rows_with_read(rtx, ts, &None, None)?;
189        return Ok((ts.clone(), rows));
190    }
191    if let Some(vd) = schema.get_view(name) {
192        let qr = exec_view_with_read(rtx, schema, vd)?;
193        let vs = build_view_schema(name, &qr);
194        return Ok((vs, qr.rows));
195    }
196    if let Some(vt) = schema.get_virtual(name) {
197        let qr = vt.scan(schema)?;
198        let vs = build_view_schema(name, &qr);
199        return Ok((vs, qr.rows));
200    }
201    Err(SqlError::TableNotFound(name.to_string()))
202}
203
204pub(super) fn scan_table_write(
205    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
206    schema: &SchemaManager,
207    name: &str,
208) -> Result<(TableSchema, Vec<Vec<Value>>)> {
209    let table_schema = schema
210        .get(name)
211        .ok_or_else(|| SqlError::TableNotFound(name.to_string()))?;
212    let (rows, _) = collect_rows_write(wtx, table_schema, &None, None)?;
213    Ok((table_schema.clone(), rows))
214}
215
216pub(super) fn scan_table_write_or_view(
217    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
218    schema: &SchemaManager,
219    name: &str,
220) -> Result<(TableSchema, Vec<Vec<Value>>)> {
221    if let Some(ts) = schema.get(name) {
222        let (rows, _) = collect_rows_write(wtx, ts, &None, None)?;
223        return Ok((ts.clone(), rows));
224    }
225    if let Some(vd) = schema.get_view(name) {
226        let qr = exec_view_write(wtx, schema, vd)?;
227        let vs = build_view_schema(name, &qr);
228        return Ok((vs, qr.rows));
229    }
230    Err(SqlError::TableNotFound(name.to_string()))
231}
232
233pub(super) fn resolve_table_or_cte(
234    name: &str,
235    ctes: &CteContext,
236    scan_table: ScanTableFn<'_>,
237) -> Result<(TableSchema, Vec<Vec<Value>>)> {
238    let lower = name.to_ascii_lowercase();
239    if let Some(cte_result) = ctes.get(&lower) {
240        let schema = build_cte_schema(&lower, cte_result);
241        Ok((schema, cte_result.rows.clone()))
242    } else {
243        scan_table(&lower)
244    }
245}
246
247pub(super) fn exec_select_join_with_ctes(
248    stmt: &SelectStmt,
249    ctes: &CteContext,
250    scan_table: ScanTableFn<'_>,
251) -> Result<ExecutionResult> {
252    let (from_schema, from_rows) = resolve_table_or_cte(&stmt.from, ctes, scan_table)?;
253    let from_alias = table_alias_or_name(&stmt.from, &stmt.from_alias);
254
255    let mut tables: Vec<(String, TableSchema)> = vec![(from_alias.clone(), from_schema)];
256    let mut join_rows: Vec<Vec<Vec<Value>>> = Vec::new();
257
258    for join in &stmt.joins {
259        let jname = &join.table.name;
260        let (js, jrows) = resolve_table_or_cte(jname, ctes, scan_table)?;
261        let jalias = table_alias_or_name(jname, &join.table.alias);
262        tables.push((jalias, js));
263        join_rows.push(jrows);
264    }
265
266    let mut outer_rows = from_rows;
267    let mut cur_tables: Vec<(String, &TableSchema)> = vec![(from_alias.clone(), &tables[0].1)];
268
269    for (ji, join) in stmt.joins.iter().enumerate() {
270        let inner_schema = &tables[ji + 1].1;
271        let inner_alias = &tables[ji + 1].0;
272        let inner_rows = &mut join_rows[ji];
273
274        let mut preview_tables = cur_tables.clone();
275        preview_tables.push((inner_alias.clone(), inner_schema));
276        let combined_cols = build_joined_columns(&preview_tables);
277
278        let outer_col_count = if outer_rows.is_empty() {
279            cur_tables.iter().map(|(_, s)| s.columns.len()).sum()
280        } else {
281            outer_rows[0].len()
282        };
283        let inner_col_count = inner_schema.columns.len();
284
285        let (equi_pairs, is_pure_equi) =
286            compute_equi_join_meta(join, &combined_cols, outer_col_count);
287        outer_rows = exec_join_step(
288            outer_rows,
289            inner_rows,
290            join,
291            &combined_cols,
292            outer_col_count,
293            inner_col_count,
294            None,
295            None,
296            &equi_pairs,
297            is_pure_equi,
298        );
299        cur_tables.push((inner_alias.clone(), inner_schema));
300    }
301
302    let joined_cols = build_joined_columns(&cur_tables);
303    process_select(&joined_cols, outer_rows, stmt, false)
304}