Skip to main content

citadel_sql/executor/
mod.rs

1//! SQL executor: DDL and DML operations.
2
3mod aggregate;
4pub(crate) mod compile;
5mod correlated;
6mod cte;
7mod ddl;
8mod dml;
9mod explain;
10pub(crate) mod helpers;
11mod join;
12mod scan;
13mod select;
14mod view;
15mod window;
16pub(crate) mod write;
17pub use compile::{compile, CompiledPlan};
18use cte::*;
19use ddl::*;
20pub use dml::exec_insert_in_txn;
21use dml::*;
22use explain::*;
23use join::*;
24use scan::*;
25use select::*;
26use view::*;
27use window::*;
28use write::*;
29
30use citadel::Database;
31use rustc_hash::FxHashMap;
32
33use crate::error::{Result, SqlError};
34use crate::parser::*;
35use crate::schema::SchemaManager;
36use crate::types::*;
37
38type CteContext = FxHashMap<String, QueryResult>;
39type ScanTableFn<'a> = &'a mut dyn FnMut(&str) -> Result<(TableSchema, Vec<Vec<Value>>)>;
40
41pub fn execute(
42    db: &Database,
43    schema: &mut SchemaManager,
44    stmt: &Statement,
45    params: &[Value],
46) -> Result<ExecutionResult> {
47    match stmt {
48        Statement::CreateTable(ct) => exec_create_table(db, schema, ct),
49        Statement::DropTable(dt) => exec_drop_table(db, schema, dt),
50        Statement::CreateIndex(ci) => exec_create_index(db, schema, ci),
51        Statement::DropIndex(di) => exec_drop_index(db, schema, di),
52        Statement::CreateView(cv) => exec_create_view(db, schema, cv),
53        Statement::DropView(dv) => exec_drop_view(db, schema, dv),
54        Statement::AlterTable(at) => exec_alter_table(db, schema, at),
55        Statement::Insert(ins) => exec_insert(db, schema, ins, params),
56        Statement::Select(sq) => exec_select_query(db, schema, sq),
57        Statement::Update(upd) => exec_update(db, schema, upd),
58        Statement::Delete(del) => exec_delete(db, schema, del),
59        Statement::Truncate(t) => exec_truncate(db, schema, t),
60        Statement::Explain(inner) => explain(schema, inner),
61        Statement::Begin
62        | Statement::Commit
63        | Statement::Rollback
64        | Statement::Savepoint(_)
65        | Statement::ReleaseSavepoint(_)
66        | Statement::RollbackTo(_)
67        | Statement::SetTimezone(_) => Err(SqlError::Unsupported(
68            "transaction / session control handled by Connection".into(),
69        )),
70    }
71}
72
73/// Execute a parsed SQL statement within an existing write transaction.
74pub fn execute_in_txn(
75    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
76    schema: &mut SchemaManager,
77    stmt: &Statement,
78    params: &[Value],
79) -> Result<ExecutionResult> {
80    match stmt {
81        Statement::CreateTable(ct) => exec_create_table_in_txn(wtx, schema, ct),
82        Statement::DropTable(dt) => exec_drop_table_in_txn(wtx, schema, dt),
83        Statement::CreateIndex(ci) => exec_create_index_in_txn(wtx, schema, ci),
84        Statement::DropIndex(di) => exec_drop_index_in_txn(wtx, schema, di),
85        Statement::CreateView(cv) => exec_create_view_in_txn(wtx, schema, cv),
86        Statement::DropView(dv) => exec_drop_view_in_txn(wtx, schema, dv),
87        Statement::AlterTable(at) => exec_alter_table_in_txn(wtx, schema, at),
88        Statement::Insert(ins) => exec_insert_in_txn(wtx, schema, ins, params),
89        Statement::Select(sq) => exec_select_query_in_txn(wtx, schema, sq),
90        Statement::Update(upd) => exec_update_in_txn(wtx, schema, upd),
91        Statement::Delete(del) => exec_delete_in_txn(wtx, schema, del),
92        Statement::Truncate(t) => exec_truncate_in_txn(wtx, schema, t),
93        Statement::Explain(inner) => explain(schema, inner),
94        Statement::Begin
95        | Statement::Commit
96        | Statement::Rollback
97        | Statement::Savepoint(_)
98        | Statement::ReleaseSavepoint(_)
99        | Statement::RollbackTo(_)
100        | Statement::SetTimezone(_) => {
101            Err(SqlError::Unsupported("nested transaction control".into()))
102        }
103    }
104}
105
106pub(super) fn scan_table_read(
107    db: &Database,
108    schema: &SchemaManager,
109    name: &str,
110) -> Result<(TableSchema, Vec<Vec<Value>>)> {
111    let table_schema = schema
112        .get(name)
113        .ok_or_else(|| SqlError::TableNotFound(name.to_string()))?;
114    let (rows, _) = collect_rows_read(db, table_schema, &None, None)?;
115    Ok((table_schema.clone(), rows))
116}
117
118pub(super) fn scan_table_read_or_view(
119    db: &Database,
120    schema: &SchemaManager,
121    name: &str,
122) -> Result<(TableSchema, Vec<Vec<Value>>)> {
123    if let Some(ts) = schema.get(name) {
124        let (rows, _) = collect_rows_read(db, ts, &None, None)?;
125        return Ok((ts.clone(), rows));
126    }
127    if let Some(vd) = schema.get_view(name) {
128        let qr = exec_view_read(db, schema, vd)?;
129        let vs = build_view_schema(name, &qr);
130        return Ok((vs, qr.rows));
131    }
132    Err(SqlError::TableNotFound(name.to_string()))
133}
134
135pub(super) fn scan_table_write(
136    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
137    schema: &SchemaManager,
138    name: &str,
139) -> Result<(TableSchema, Vec<Vec<Value>>)> {
140    let table_schema = schema
141        .get(name)
142        .ok_or_else(|| SqlError::TableNotFound(name.to_string()))?;
143    let (rows, _) = collect_rows_write(wtx, table_schema, &None, None)?;
144    Ok((table_schema.clone(), rows))
145}
146
147pub(super) fn scan_table_write_or_view(
148    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
149    schema: &SchemaManager,
150    name: &str,
151) -> Result<(TableSchema, Vec<Vec<Value>>)> {
152    if let Some(ts) = schema.get(name) {
153        let (rows, _) = collect_rows_write(wtx, ts, &None, None)?;
154        return Ok((ts.clone(), rows));
155    }
156    if let Some(vd) = schema.get_view(name) {
157        let qr = exec_view_write(wtx, schema, vd)?;
158        let vs = build_view_schema(name, &qr);
159        return Ok((vs, qr.rows));
160    }
161    Err(SqlError::TableNotFound(name.to_string()))
162}
163
164pub(super) fn resolve_table_or_cte(
165    name: &str,
166    ctes: &CteContext,
167    scan_table: ScanTableFn<'_>,
168) -> Result<(TableSchema, Vec<Vec<Value>>)> {
169    let lower = name.to_ascii_lowercase();
170    if let Some(cte_result) = ctes.get(&lower) {
171        let schema = build_cte_schema(&lower, cte_result);
172        Ok((schema, cte_result.rows.clone()))
173    } else {
174        scan_table(&lower)
175    }
176}
177
178pub(super) fn exec_select_join_with_ctes(
179    stmt: &SelectStmt,
180    ctes: &CteContext,
181    scan_table: ScanTableFn<'_>,
182) -> Result<ExecutionResult> {
183    let (from_schema, from_rows) = resolve_table_or_cte(&stmt.from, ctes, scan_table)?;
184    let from_alias = table_alias_or_name(&stmt.from, &stmt.from_alias);
185
186    let mut tables: Vec<(String, TableSchema)> = vec![(from_alias.clone(), from_schema)];
187    let mut join_rows: Vec<Vec<Vec<Value>>> = Vec::new();
188
189    for join in &stmt.joins {
190        let jname = &join.table.name;
191        let (js, jrows) = resolve_table_or_cte(jname, ctes, scan_table)?;
192        let jalias = table_alias_or_name(jname, &join.table.alias);
193        tables.push((jalias, js));
194        join_rows.push(jrows);
195    }
196
197    let mut outer_rows = from_rows;
198    let mut cur_tables: Vec<(String, &TableSchema)> = vec![(from_alias.clone(), &tables[0].1)];
199
200    for (ji, join) in stmt.joins.iter().enumerate() {
201        let inner_schema = &tables[ji + 1].1;
202        let inner_alias = &tables[ji + 1].0;
203        let inner_rows = &join_rows[ji];
204
205        let mut preview_tables = cur_tables.clone();
206        preview_tables.push((inner_alias.clone(), inner_schema));
207        let combined_cols = build_joined_columns(&preview_tables);
208
209        let outer_col_count = if outer_rows.is_empty() {
210            cur_tables.iter().map(|(_, s)| s.columns.len()).sum()
211        } else {
212            outer_rows[0].len()
213        };
214        let inner_col_count = inner_schema.columns.len();
215
216        outer_rows = exec_join_step(
217            outer_rows,
218            inner_rows,
219            join,
220            &combined_cols,
221            outer_col_count,
222            inner_col_count,
223            None,
224            None,
225        );
226        cur_tables.push((inner_alias.clone(), inner_schema));
227    }
228
229    let joined_cols = build_joined_columns(&cur_tables);
230    process_select(&joined_cols, outer_rows, stmt, false)
231}