1mod aggregate;
4mod correlated;
5mod cte;
6mod ddl;
7mod dml;
8mod explain;
9mod helpers;
10mod join;
11mod scan;
12mod select;
13mod view;
14mod window;
15mod write;
16use cte::*;
17use ddl::*;
18use dml::*;
19pub use dml::{exec_insert_in_txn, InsertBufs};
20use explain::*;
21use join::*;
22use scan::*;
23use select::*;
24use view::*;
25use window::*;
26use write::*;
27pub use write::{compile_update, exec_update_compiled, CompiledUpdate, UpdateBufs};
28
29use std::collections::HashMap;
30
31use citadel::Database;
32
33use crate::error::{Result, SqlError};
34use crate::parser::*;
35use crate::schema::SchemaManager;
36use crate::types::*;
37
38type CteContext = HashMap<String, QueryResult>;
39type ScanTableFn<'a> = &'a mut dyn FnMut(&str) -> Result<(TableSchema, Vec<Vec<Value>>)>;
40
41pub fn execute(
42 db: &Database,
43 schema: &mut SchemaManager,
44 stmt: &Statement,
45 params: &[Value],
46) -> Result<ExecutionResult> {
47 match stmt {
48 Statement::CreateTable(ct) => exec_create_table(db, schema, ct),
49 Statement::DropTable(dt) => exec_drop_table(db, schema, dt),
50 Statement::CreateIndex(ci) => exec_create_index(db, schema, ci),
51 Statement::DropIndex(di) => exec_drop_index(db, schema, di),
52 Statement::CreateView(cv) => exec_create_view(db, schema, cv),
53 Statement::DropView(dv) => exec_drop_view(db, schema, dv),
54 Statement::AlterTable(at) => exec_alter_table(db, schema, at),
55 Statement::Insert(ins) => exec_insert(db, schema, ins, params),
56 Statement::Select(sq) => exec_select_query(db, schema, sq),
57 Statement::Update(upd) => exec_update(db, schema, upd),
58 Statement::Delete(del) => exec_delete(db, schema, del),
59 Statement::Explain(inner) => explain(schema, inner),
60 Statement::Begin | Statement::Commit | Statement::Rollback => Err(SqlError::Unsupported(
61 "transaction control in auto-commit mode".into(),
62 )),
63 }
64}
65
66pub fn execute_in_txn(
68 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
69 schema: &mut SchemaManager,
70 stmt: &Statement,
71 params: &[Value],
72) -> Result<ExecutionResult> {
73 match stmt {
74 Statement::CreateTable(ct) => exec_create_table_in_txn(wtx, schema, ct),
75 Statement::DropTable(dt) => exec_drop_table_in_txn(wtx, schema, dt),
76 Statement::CreateIndex(ci) => exec_create_index_in_txn(wtx, schema, ci),
77 Statement::DropIndex(di) => exec_drop_index_in_txn(wtx, schema, di),
78 Statement::CreateView(cv) => exec_create_view_in_txn(wtx, schema, cv),
79 Statement::DropView(dv) => exec_drop_view_in_txn(wtx, schema, dv),
80 Statement::AlterTable(at) => exec_alter_table_in_txn(wtx, schema, at),
81 Statement::Insert(ins) => {
82 let mut bufs = InsertBufs::new();
83 exec_insert_in_txn(wtx, schema, ins, params, &mut bufs)
84 }
85 Statement::Select(sq) => exec_select_query_in_txn(wtx, schema, sq),
86 Statement::Update(upd) => exec_update_in_txn(wtx, schema, upd),
87 Statement::Delete(del) => exec_delete_in_txn(wtx, schema, del),
88 Statement::Explain(inner) => explain(schema, inner),
89 Statement::Begin | Statement::Commit | Statement::Rollback => {
90 Err(SqlError::Unsupported("nested transaction control".into()))
91 }
92 }
93}
94
95pub(super) fn scan_table_read(
98 db: &Database,
99 schema: &SchemaManager,
100 name: &str,
101) -> Result<(TableSchema, Vec<Vec<Value>>)> {
102 let table_schema = schema
103 .get(name)
104 .ok_or_else(|| SqlError::TableNotFound(name.to_string()))?;
105 let (rows, _) = collect_rows_read(db, table_schema, &None, None)?;
106 Ok((table_schema.clone(), rows))
107}
108
109pub(super) fn scan_table_read_or_view(
110 db: &Database,
111 schema: &SchemaManager,
112 name: &str,
113) -> Result<(TableSchema, Vec<Vec<Value>>)> {
114 if let Some(ts) = schema.get(name) {
115 let (rows, _) = collect_rows_read(db, ts, &None, None)?;
116 return Ok((ts.clone(), rows));
117 }
118 if let Some(vd) = schema.get_view(name) {
119 let qr = exec_view_read(db, schema, vd)?;
120 let vs = build_view_schema(name, &qr);
121 return Ok((vs, qr.rows));
122 }
123 Err(SqlError::TableNotFound(name.to_string()))
124}
125
126pub(super) fn scan_table_write(
127 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
128 schema: &SchemaManager,
129 name: &str,
130) -> Result<(TableSchema, Vec<Vec<Value>>)> {
131 let table_schema = schema
132 .get(name)
133 .ok_or_else(|| SqlError::TableNotFound(name.to_string()))?;
134 let (rows, _) = collect_rows_write(wtx, table_schema, &None, None)?;
135 Ok((table_schema.clone(), rows))
136}
137
138pub(super) fn scan_table_write_or_view(
139 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
140 schema: &SchemaManager,
141 name: &str,
142) -> Result<(TableSchema, Vec<Vec<Value>>)> {
143 if let Some(ts) = schema.get(name) {
144 let (rows, _) = collect_rows_write(wtx, ts, &None, None)?;
145 return Ok((ts.clone(), rows));
146 }
147 if let Some(vd) = schema.get_view(name) {
148 let qr = exec_view_write(wtx, schema, vd)?;
149 let vs = build_view_schema(name, &qr);
150 return Ok((vs, qr.rows));
151 }
152 Err(SqlError::TableNotFound(name.to_string()))
153}
154
155pub(super) fn resolve_table_or_cte(
156 name: &str,
157 ctes: &CteContext,
158 scan_table: ScanTableFn<'_>,
159) -> Result<(TableSchema, Vec<Vec<Value>>)> {
160 let lower = name.to_ascii_lowercase();
161 if let Some(cte_result) = ctes.get(&lower) {
162 let schema = build_cte_schema(&lower, cte_result);
163 Ok((schema, cte_result.rows.clone()))
164 } else {
165 scan_table(&lower)
166 }
167}
168
169pub(super) fn exec_select_join_with_ctes(
170 stmt: &SelectStmt,
171 ctes: &CteContext,
172 scan_table: ScanTableFn<'_>,
173) -> Result<ExecutionResult> {
174 let (from_schema, from_rows) = resolve_table_or_cte(&stmt.from, ctes, scan_table)?;
175 let from_alias = table_alias_or_name(&stmt.from, &stmt.from_alias);
176
177 let mut tables: Vec<(String, TableSchema)> = vec![(from_alias.clone(), from_schema)];
178 let mut join_rows: Vec<Vec<Vec<Value>>> = Vec::new();
179
180 for join in &stmt.joins {
181 let jname = &join.table.name;
182 let (js, jrows) = resolve_table_or_cte(jname, ctes, scan_table)?;
183 let jalias = table_alias_or_name(jname, &join.table.alias);
184 tables.push((jalias, js));
185 join_rows.push(jrows);
186 }
187
188 let mut outer_rows = from_rows;
189 let mut cur_tables: Vec<(String, &TableSchema)> = vec![(from_alias.clone(), &tables[0].1)];
190
191 for (ji, join) in stmt.joins.iter().enumerate() {
192 let inner_schema = &tables[ji + 1].1;
193 let inner_alias = &tables[ji + 1].0;
194 let inner_rows = &join_rows[ji];
195
196 let mut preview_tables = cur_tables.clone();
197 preview_tables.push((inner_alias.clone(), inner_schema));
198 let combined_cols = build_joined_columns(&preview_tables);
199
200 let outer_col_count = if outer_rows.is_empty() {
201 cur_tables.iter().map(|(_, s)| s.columns.len()).sum()
202 } else {
203 outer_rows[0].len()
204 };
205 let inner_col_count = inner_schema.columns.len();
206
207 outer_rows = exec_join_step(
208 outer_rows,
209 inner_rows,
210 join,
211 &combined_cols,
212 outer_col_count,
213 inner_col_count,
214 None,
215 None,
216 );
217 cur_tables.push((inner_alias.clone(), inner_schema));
218 }
219
220 let joined_cols = build_joined_columns(&cur_tables);
221 process_select(&joined_cols, outer_rows, stmt, false)
222}
223
224