Skip to main content

citadel_sql/executor/
mod.rs

1//! SQL executor: DDL and DML operations.
2
3mod aggregate;
4mod correlated;
5mod cte;
6mod ddl;
7mod dml;
8mod explain;
9mod helpers;
10mod join;
11mod scan;
12mod select;
13mod view;
14mod window;
15mod write;
16use cte::*;
17use ddl::*;
18use dml::*;
19pub use dml::{exec_insert_in_txn, InsertBufs};
20use explain::*;
21use join::*;
22use scan::*;
23use select::*;
24use view::*;
25use window::*;
26use write::*;
27pub use write::{compile_update, exec_update_compiled, CompiledUpdate, UpdateBufs};
28
29use std::collections::HashMap;
30
31use citadel::Database;
32
33use crate::error::{Result, SqlError};
34use crate::parser::*;
35use crate::schema::SchemaManager;
36use crate::types::*;
37
38type CteContext = HashMap<String, QueryResult>;
39type ScanTableFn<'a> = &'a mut dyn FnMut(&str) -> Result<(TableSchema, Vec<Vec<Value>>)>;
40
41pub fn execute(
42    db: &Database,
43    schema: &mut SchemaManager,
44    stmt: &Statement,
45    params: &[Value],
46) -> Result<ExecutionResult> {
47    match stmt {
48        Statement::CreateTable(ct) => exec_create_table(db, schema, ct),
49        Statement::DropTable(dt) => exec_drop_table(db, schema, dt),
50        Statement::CreateIndex(ci) => exec_create_index(db, schema, ci),
51        Statement::DropIndex(di) => exec_drop_index(db, schema, di),
52        Statement::CreateView(cv) => exec_create_view(db, schema, cv),
53        Statement::DropView(dv) => exec_drop_view(db, schema, dv),
54        Statement::AlterTable(at) => exec_alter_table(db, schema, at),
55        Statement::Insert(ins) => exec_insert(db, schema, ins, params),
56        Statement::Select(sq) => exec_select_query(db, schema, sq),
57        Statement::Update(upd) => exec_update(db, schema, upd),
58        Statement::Delete(del) => exec_delete(db, schema, del),
59        Statement::Explain(inner) => explain(schema, inner),
60        Statement::Begin
61        | Statement::Commit
62        | Statement::Rollback
63        | Statement::Savepoint(_)
64        | Statement::ReleaseSavepoint(_)
65        | Statement::RollbackTo(_) => Err(SqlError::Unsupported(
66            "transaction control in auto-commit mode".into(),
67        )),
68    }
69}
70
71/// Execute a parsed SQL statement within an existing write transaction.
72pub fn execute_in_txn(
73    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
74    schema: &mut SchemaManager,
75    stmt: &Statement,
76    params: &[Value],
77) -> Result<ExecutionResult> {
78    match stmt {
79        Statement::CreateTable(ct) => exec_create_table_in_txn(wtx, schema, ct),
80        Statement::DropTable(dt) => exec_drop_table_in_txn(wtx, schema, dt),
81        Statement::CreateIndex(ci) => exec_create_index_in_txn(wtx, schema, ci),
82        Statement::DropIndex(di) => exec_drop_index_in_txn(wtx, schema, di),
83        Statement::CreateView(cv) => exec_create_view_in_txn(wtx, schema, cv),
84        Statement::DropView(dv) => exec_drop_view_in_txn(wtx, schema, dv),
85        Statement::AlterTable(at) => exec_alter_table_in_txn(wtx, schema, at),
86        Statement::Insert(ins) => {
87            let mut bufs = InsertBufs::new();
88            exec_insert_in_txn(wtx, schema, ins, params, &mut bufs)
89        }
90        Statement::Select(sq) => exec_select_query_in_txn(wtx, schema, sq),
91        Statement::Update(upd) => exec_update_in_txn(wtx, schema, upd),
92        Statement::Delete(del) => exec_delete_in_txn(wtx, schema, del),
93        Statement::Explain(inner) => explain(schema, inner),
94        Statement::Begin
95        | Statement::Commit
96        | Statement::Rollback
97        | Statement::Savepoint(_)
98        | Statement::ReleaseSavepoint(_)
99        | Statement::RollbackTo(_) => {
100            Err(SqlError::Unsupported("nested transaction control".into()))
101        }
102    }
103}
104
105// ── Table scanning ──────────────────────────────────────────────────
106
107pub(super) fn scan_table_read(
108    db: &Database,
109    schema: &SchemaManager,
110    name: &str,
111) -> Result<(TableSchema, Vec<Vec<Value>>)> {
112    let table_schema = schema
113        .get(name)
114        .ok_or_else(|| SqlError::TableNotFound(name.to_string()))?;
115    let (rows, _) = collect_rows_read(db, table_schema, &None, None)?;
116    Ok((table_schema.clone(), rows))
117}
118
119pub(super) fn scan_table_read_or_view(
120    db: &Database,
121    schema: &SchemaManager,
122    name: &str,
123) -> Result<(TableSchema, Vec<Vec<Value>>)> {
124    if let Some(ts) = schema.get(name) {
125        let (rows, _) = collect_rows_read(db, ts, &None, None)?;
126        return Ok((ts.clone(), rows));
127    }
128    if let Some(vd) = schema.get_view(name) {
129        let qr = exec_view_read(db, schema, vd)?;
130        let vs = build_view_schema(name, &qr);
131        return Ok((vs, qr.rows));
132    }
133    Err(SqlError::TableNotFound(name.to_string()))
134}
135
136pub(super) fn scan_table_write(
137    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
138    schema: &SchemaManager,
139    name: &str,
140) -> Result<(TableSchema, Vec<Vec<Value>>)> {
141    let table_schema = schema
142        .get(name)
143        .ok_or_else(|| SqlError::TableNotFound(name.to_string()))?;
144    let (rows, _) = collect_rows_write(wtx, table_schema, &None, None)?;
145    Ok((table_schema.clone(), rows))
146}
147
148pub(super) fn scan_table_write_or_view(
149    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
150    schema: &SchemaManager,
151    name: &str,
152) -> Result<(TableSchema, Vec<Vec<Value>>)> {
153    if let Some(ts) = schema.get(name) {
154        let (rows, _) = collect_rows_write(wtx, ts, &None, None)?;
155        return Ok((ts.clone(), rows));
156    }
157    if let Some(vd) = schema.get_view(name) {
158        let qr = exec_view_write(wtx, schema, vd)?;
159        let vs = build_view_schema(name, &qr);
160        return Ok((vs, qr.rows));
161    }
162    Err(SqlError::TableNotFound(name.to_string()))
163}
164
165pub(super) fn resolve_table_or_cte(
166    name: &str,
167    ctes: &CteContext,
168    scan_table: ScanTableFn<'_>,
169) -> Result<(TableSchema, Vec<Vec<Value>>)> {
170    let lower = name.to_ascii_lowercase();
171    if let Some(cte_result) = ctes.get(&lower) {
172        let schema = build_cte_schema(&lower, cte_result);
173        Ok((schema, cte_result.rows.clone()))
174    } else {
175        scan_table(&lower)
176    }
177}
178
179pub(super) fn exec_select_join_with_ctes(
180    stmt: &SelectStmt,
181    ctes: &CteContext,
182    scan_table: ScanTableFn<'_>,
183) -> Result<ExecutionResult> {
184    let (from_schema, from_rows) = resolve_table_or_cte(&stmt.from, ctes, scan_table)?;
185    let from_alias = table_alias_or_name(&stmt.from, &stmt.from_alias);
186
187    let mut tables: Vec<(String, TableSchema)> = vec![(from_alias.clone(), from_schema)];
188    let mut join_rows: Vec<Vec<Vec<Value>>> = Vec::new();
189
190    for join in &stmt.joins {
191        let jname = &join.table.name;
192        let (js, jrows) = resolve_table_or_cte(jname, ctes, scan_table)?;
193        let jalias = table_alias_or_name(jname, &join.table.alias);
194        tables.push((jalias, js));
195        join_rows.push(jrows);
196    }
197
198    let mut outer_rows = from_rows;
199    let mut cur_tables: Vec<(String, &TableSchema)> = vec![(from_alias.clone(), &tables[0].1)];
200
201    for (ji, join) in stmt.joins.iter().enumerate() {
202        let inner_schema = &tables[ji + 1].1;
203        let inner_alias = &tables[ji + 1].0;
204        let inner_rows = &join_rows[ji];
205
206        let mut preview_tables = cur_tables.clone();
207        preview_tables.push((inner_alias.clone(), inner_schema));
208        let combined_cols = build_joined_columns(&preview_tables);
209
210        let outer_col_count = if outer_rows.is_empty() {
211            cur_tables.iter().map(|(_, s)| s.columns.len()).sum()
212        } else {
213            outer_rows[0].len()
214        };
215        let inner_col_count = inner_schema.columns.len();
216
217        outer_rows = exec_join_step(
218            outer_rows,
219            inner_rows,
220            join,
221            &combined_cols,
222            outer_col_count,
223            inner_col_count,
224            None,
225            None,
226        );
227        cur_tables.push((inner_alias.clone(), inner_schema));
228    }
229
230    let joined_cols = build_joined_columns(&cur_tables);
231    process_select(&joined_cols, outer_rows, stmt, false)
232}
233
234// ── SELECT execution ────────────────────────────────────────────────