1mod aggregate;
4pub(crate) mod compile;
5mod correlated;
6mod cte;
7mod ddl;
8mod dml;
9mod explain;
10pub(crate) mod helpers;
11mod join;
12mod scan;
13mod select;
14mod view;
15mod window;
16pub(crate) mod write;
17pub use compile::{compile, CompiledPlan};
18use cte::*;
19use ddl::*;
20pub use dml::exec_insert_in_txn;
21use dml::*;
22use explain::*;
23use join::*;
24use scan::*;
25use select::*;
26use view::*;
27use window::*;
28use write::*;
29
30use citadel::Database;
31use rustc_hash::FxHashMap;
32
33use crate::error::{Result, SqlError};
34use crate::parser::*;
35use crate::schema::SchemaManager;
36use crate::types::*;
37
38type CteContext = FxHashMap<String, QueryResult>;
39type ScanTableFn<'a> = &'a mut dyn FnMut(&str) -> Result<(TableSchema, Vec<Vec<Value>>)>;
40
41pub fn execute(
42 db: &Database,
43 schema: &mut SchemaManager,
44 stmt: &Statement,
45 params: &[Value],
46) -> Result<ExecutionResult> {
47 match stmt {
48 Statement::CreateTable(ct) => exec_create_table(db, schema, ct),
49 Statement::DropTable(dt) => exec_drop_table(db, schema, dt),
50 Statement::CreateIndex(ci) => exec_create_index(db, schema, ci),
51 Statement::DropIndex(di) => exec_drop_index(db, schema, di),
52 Statement::CreateView(cv) => exec_create_view(db, schema, cv),
53 Statement::DropView(dv) => exec_drop_view(db, schema, dv),
54 Statement::AlterTable(at) => exec_alter_table(db, schema, at),
55 Statement::Insert(ins) => exec_insert(db, schema, ins, params),
56 Statement::Select(sq) => exec_select_query(db, schema, sq),
57 Statement::Update(upd) => exec_update(db, schema, upd),
58 Statement::Delete(del) => exec_delete(db, schema, del),
59 Statement::Truncate(t) => exec_truncate(db, schema, t),
60 Statement::Explain(inner) => explain(schema, inner),
61 Statement::Begin
62 | Statement::Commit
63 | Statement::Rollback
64 | Statement::Savepoint(_)
65 | Statement::ReleaseSavepoint(_)
66 | Statement::RollbackTo(_)
67 | Statement::SetTimezone(_) => Err(SqlError::Unsupported(
68 "transaction / session control handled by Connection".into(),
69 )),
70 }
71}
72
73pub fn execute_in_txn(
75 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
76 schema: &mut SchemaManager,
77 stmt: &Statement,
78 params: &[Value],
79) -> Result<ExecutionResult> {
80 match stmt {
81 Statement::CreateTable(ct) => exec_create_table_in_txn(wtx, schema, ct),
82 Statement::DropTable(dt) => exec_drop_table_in_txn(wtx, schema, dt),
83 Statement::CreateIndex(ci) => exec_create_index_in_txn(wtx, schema, ci),
84 Statement::DropIndex(di) => exec_drop_index_in_txn(wtx, schema, di),
85 Statement::CreateView(cv) => exec_create_view_in_txn(wtx, schema, cv),
86 Statement::DropView(dv) => exec_drop_view_in_txn(wtx, schema, dv),
87 Statement::AlterTable(at) => exec_alter_table_in_txn(wtx, schema, at),
88 Statement::Insert(ins) => exec_insert_in_txn(wtx, schema, ins, params),
89 Statement::Select(sq) => exec_select_query_in_txn(wtx, schema, sq),
90 Statement::Update(upd) => exec_update_in_txn(wtx, schema, upd),
91 Statement::Delete(del) => exec_delete_in_txn(wtx, schema, del),
92 Statement::Truncate(t) => exec_truncate_in_txn(wtx, schema, t),
93 Statement::Explain(inner) => explain(schema, inner),
94 Statement::Begin
95 | Statement::Commit
96 | Statement::Rollback
97 | Statement::Savepoint(_)
98 | Statement::ReleaseSavepoint(_)
99 | Statement::RollbackTo(_)
100 | Statement::SetTimezone(_) => {
101 Err(SqlError::Unsupported("nested transaction control".into()))
102 }
103 }
104}
105
106pub(super) fn scan_table_read(
107 db: &Database,
108 schema: &SchemaManager,
109 name: &str,
110) -> Result<(TableSchema, Vec<Vec<Value>>)> {
111 let table_schema = schema
112 .get(name)
113 .ok_or_else(|| SqlError::TableNotFound(name.to_string()))?;
114 let (rows, _) = collect_rows_read(db, table_schema, &None, None)?;
115 Ok((table_schema.clone(), rows))
116}
117
118pub(super) fn scan_table_read_or_view(
119 db: &Database,
120 schema: &SchemaManager,
121 name: &str,
122) -> Result<(TableSchema, Vec<Vec<Value>>)> {
123 if let Some(ts) = schema.get(name) {
124 let (rows, _) = collect_rows_read(db, ts, &None, None)?;
125 return Ok((ts.clone(), rows));
126 }
127 if let Some(vd) = schema.get_view(name) {
128 let qr = exec_view_read(db, schema, vd)?;
129 let vs = build_view_schema(name, &qr);
130 return Ok((vs, qr.rows));
131 }
132 if let Some(vt) = schema.get_virtual(name) {
133 let qr = vt.scan(db, schema)?;
134 let vs = build_view_schema(name, &qr);
135 return Ok((vs, qr.rows));
136 }
137 Err(SqlError::TableNotFound(name.to_string()))
138}
139
140pub(super) fn scan_table_write(
141 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
142 schema: &SchemaManager,
143 name: &str,
144) -> Result<(TableSchema, Vec<Vec<Value>>)> {
145 let table_schema = schema
146 .get(name)
147 .ok_or_else(|| SqlError::TableNotFound(name.to_string()))?;
148 let (rows, _) = collect_rows_write(wtx, table_schema, &None, None)?;
149 Ok((table_schema.clone(), rows))
150}
151
152pub(super) fn scan_table_write_or_view(
153 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
154 schema: &SchemaManager,
155 name: &str,
156) -> Result<(TableSchema, Vec<Vec<Value>>)> {
157 if let Some(ts) = schema.get(name) {
158 let (rows, _) = collect_rows_write(wtx, ts, &None, None)?;
159 return Ok((ts.clone(), rows));
160 }
161 if let Some(vd) = schema.get_view(name) {
162 let qr = exec_view_write(wtx, schema, vd)?;
163 let vs = build_view_schema(name, &qr);
164 return Ok((vs, qr.rows));
165 }
166 Err(SqlError::TableNotFound(name.to_string()))
167}
168
169pub(super) fn resolve_table_or_cte(
170 name: &str,
171 ctes: &CteContext,
172 scan_table: ScanTableFn<'_>,
173) -> Result<(TableSchema, Vec<Vec<Value>>)> {
174 let lower = name.to_ascii_lowercase();
175 if let Some(cte_result) = ctes.get(&lower) {
176 let schema = build_cte_schema(&lower, cte_result);
177 Ok((schema, cte_result.rows.clone()))
178 } else {
179 scan_table(&lower)
180 }
181}
182
183pub(super) fn exec_select_join_with_ctes(
184 stmt: &SelectStmt,
185 ctes: &CteContext,
186 scan_table: ScanTableFn<'_>,
187) -> Result<ExecutionResult> {
188 let (from_schema, from_rows) = resolve_table_or_cte(&stmt.from, ctes, scan_table)?;
189 let from_alias = table_alias_or_name(&stmt.from, &stmt.from_alias);
190
191 let mut tables: Vec<(String, TableSchema)> = vec![(from_alias.clone(), from_schema)];
192 let mut join_rows: Vec<Vec<Vec<Value>>> = Vec::new();
193
194 for join in &stmt.joins {
195 let jname = &join.table.name;
196 let (js, jrows) = resolve_table_or_cte(jname, ctes, scan_table)?;
197 let jalias = table_alias_or_name(jname, &join.table.alias);
198 tables.push((jalias, js));
199 join_rows.push(jrows);
200 }
201
202 let mut outer_rows = from_rows;
203 let mut cur_tables: Vec<(String, &TableSchema)> = vec![(from_alias.clone(), &tables[0].1)];
204
205 for (ji, join) in stmt.joins.iter().enumerate() {
206 let inner_schema = &tables[ji + 1].1;
207 let inner_alias = &tables[ji + 1].0;
208 let inner_rows = &mut join_rows[ji];
209
210 let mut preview_tables = cur_tables.clone();
211 preview_tables.push((inner_alias.clone(), inner_schema));
212 let combined_cols = build_joined_columns(&preview_tables);
213
214 let outer_col_count = if outer_rows.is_empty() {
215 cur_tables.iter().map(|(_, s)| s.columns.len()).sum()
216 } else {
217 outer_rows[0].len()
218 };
219 let inner_col_count = inner_schema.columns.len();
220
221 let (equi_pairs, is_pure_equi) =
222 compute_equi_join_meta(join, &combined_cols, outer_col_count);
223 outer_rows = exec_join_step(
224 outer_rows,
225 inner_rows,
226 join,
227 &combined_cols,
228 outer_col_count,
229 inner_col_count,
230 None,
231 None,
232 &equi_pairs,
233 is_pure_equi,
234 );
235 cur_tables.push((inner_alias.clone(), inner_schema));
236 }
237
238 let joined_cols = build_joined_columns(&cur_tables);
239 process_select(&joined_cols, outer_rows, stmt, false)
240}