1use std::collections::BTreeMap;
4
5use citadel::Database;
6
7use crate::encoding::{
8 decode_composite_key, decode_key_value, decode_row, encode_composite_key, encode_row,
9};
10use crate::error::{Result, SqlError};
11use crate::eval::{eval_expr, is_truthy};
12use crate::parser::*;
13use crate::planner::{self, ScanPlan};
14use crate::schema::SchemaManager;
15use crate::types::*;
16
17fn encode_index_key(idx: &IndexDef, row: &[Value], pk_values: &[Value]) -> Vec<u8> {
20 let indexed_values: Vec<Value> = idx
21 .columns
22 .iter()
23 .map(|&col_idx| row[col_idx as usize].clone())
24 .collect();
25
26 if idx.unique {
27 let any_null = indexed_values.iter().any(|v| v.is_null());
28 if !any_null {
29 return encode_composite_key(&indexed_values);
30 }
31 }
32
33 let mut all_values = indexed_values;
34 all_values.extend_from_slice(pk_values);
35 encode_composite_key(&all_values)
36}
37
38fn encode_index_value(idx: &IndexDef, row: &[Value], pk_values: &[Value]) -> Vec<u8> {
39 if idx.unique {
40 let indexed_values: Vec<Value> = idx
41 .columns
42 .iter()
43 .map(|&col_idx| row[col_idx as usize].clone())
44 .collect();
45 let any_null = indexed_values.iter().any(|v| v.is_null());
46 if !any_null {
47 return encode_composite_key(pk_values);
48 }
49 }
50 vec![]
51}
52
53fn insert_index_entries(
54 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
55 table_schema: &TableSchema,
56 row: &[Value],
57 pk_values: &[Value],
58) -> Result<()> {
59 for idx in &table_schema.indices {
60 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
61 let key = encode_index_key(idx, row, pk_values);
62 let value = encode_index_value(idx, row, pk_values);
63
64 let is_new = wtx
65 .table_insert(&idx_table, &key, &value)
66 .map_err(SqlError::Storage)?;
67
68 if idx.unique && !is_new {
69 let indexed_values: Vec<Value> = idx
70 .columns
71 .iter()
72 .map(|&col_idx| row[col_idx as usize].clone())
73 .collect();
74 let any_null = indexed_values.iter().any(|v| v.is_null());
75 if !any_null {
76 return Err(SqlError::UniqueViolation(idx.name.clone()));
77 }
78 }
79 }
80 Ok(())
81}
82
83fn delete_index_entries(
84 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
85 table_schema: &TableSchema,
86 row: &[Value],
87 pk_values: &[Value],
88) -> Result<()> {
89 for idx in &table_schema.indices {
90 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
91 let key = encode_index_key(idx, row, pk_values);
92 wtx.table_delete(&idx_table, &key)
93 .map_err(SqlError::Storage)?;
94 }
95 Ok(())
96}
97
98fn index_columns_changed(idx: &IndexDef, old_row: &[Value], new_row: &[Value]) -> bool {
99 idx.columns
100 .iter()
101 .any(|&col_idx| old_row[col_idx as usize] != new_row[col_idx as usize])
102}
103
104pub fn execute(
106 db: &Database,
107 schema: &mut SchemaManager,
108 stmt: &Statement,
109) -> Result<ExecutionResult> {
110 match stmt {
111 Statement::CreateTable(ct) => exec_create_table(db, schema, ct),
112 Statement::DropTable(dt) => exec_drop_table(db, schema, dt),
113 Statement::CreateIndex(ci) => exec_create_index(db, schema, ci),
114 Statement::DropIndex(di) => exec_drop_index(db, schema, di),
115 Statement::Insert(ins) => exec_insert(db, schema, ins),
116 Statement::Select(sel) => exec_select(db, schema, sel),
117 Statement::Update(upd) => exec_update(db, schema, upd),
118 Statement::Delete(del) => exec_delete(db, schema, del),
119 Statement::Explain(inner) => explain(schema, inner),
120 Statement::Begin | Statement::Commit | Statement::Rollback => Err(SqlError::Unsupported(
121 "transaction control in auto-commit mode".into(),
122 )),
123 }
124}
125
126pub fn execute_in_txn(
128 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
129 schema: &mut SchemaManager,
130 stmt: &Statement,
131) -> Result<ExecutionResult> {
132 match stmt {
133 Statement::CreateTable(ct) => exec_create_table_in_txn(wtx, schema, ct),
134 Statement::DropTable(dt) => exec_drop_table_in_txn(wtx, schema, dt),
135 Statement::CreateIndex(ci) => exec_create_index_in_txn(wtx, schema, ci),
136 Statement::DropIndex(di) => exec_drop_index_in_txn(wtx, schema, di),
137 Statement::Insert(ins) => exec_insert_in_txn(wtx, schema, ins),
138 Statement::Select(sel) => exec_select_in_txn(wtx, schema, sel),
139 Statement::Update(upd) => exec_update_in_txn(wtx, schema, upd),
140 Statement::Delete(del) => exec_delete_in_txn(wtx, schema, del),
141 Statement::Explain(inner) => explain(schema, inner),
142 Statement::Begin | Statement::Commit | Statement::Rollback => {
143 Err(SqlError::Unsupported("nested transaction control".into()))
144 }
145 }
146}
147
148pub fn explain(schema: &SchemaManager, stmt: &Statement) -> Result<ExecutionResult> {
151 let lines = match stmt {
152 Statement::Select(sel) => explain_select(schema, sel)?,
153 Statement::Insert(ins) => {
154 vec![format!("INSERT INTO {}", ins.table.to_ascii_lowercase())]
155 }
156 Statement::Update(upd) => explain_dml(schema, &upd.table, &upd.where_clause, "UPDATE")?,
157 Statement::Delete(del) => {
158 explain_dml(schema, &del.table, &del.where_clause, "DELETE FROM")?
159 }
160 Statement::Explain(_) => {
161 return Err(SqlError::Unsupported("EXPLAIN EXPLAIN".into()));
162 }
163 _ => {
164 return Err(SqlError::Unsupported(
165 "EXPLAIN for this statement type".into(),
166 ));
167 }
168 };
169
170 let rows = lines
171 .into_iter()
172 .map(|line| vec![Value::Text(line)])
173 .collect();
174 Ok(ExecutionResult::Query(QueryResult {
175 columns: vec!["plan".into()],
176 rows,
177 }))
178}
179
180fn explain_dml(
181 schema: &SchemaManager,
182 table: &str,
183 where_clause: &Option<Expr>,
184 verb: &str,
185) -> Result<Vec<String>> {
186 let lower = table.to_ascii_lowercase();
187 let table_schema = schema
188 .get(&lower)
189 .ok_or_else(|| SqlError::TableNotFound(table.to_string()))?;
190 let plan = planner::plan_select(table_schema, where_clause);
191 let scan_line = format_scan_line(&lower, &None, &plan, table_schema);
192 Ok(vec![format!("{verb} {}", scan_line)])
193}
194
195fn explain_select(schema: &SchemaManager, stmt: &SelectStmt) -> Result<Vec<String>> {
196 let mut lines = Vec::new();
197
198 if stmt.from.is_empty() {
199 lines.push("CONSTANT ROW".into());
200 return Ok(lines);
201 }
202
203 let lower_from = stmt.from.to_ascii_lowercase();
204 let from_schema = schema
205 .get(&lower_from)
206 .ok_or_else(|| SqlError::TableNotFound(stmt.from.clone()))?;
207
208 if stmt.joins.is_empty() {
209 let plan = planner::plan_select(from_schema, &stmt.where_clause);
210 lines.push(format_scan_line(
211 &lower_from,
212 &stmt.from_alias,
213 &plan,
214 from_schema,
215 ));
216 } else {
217 let from_plan = planner::plan_select(from_schema, &None);
218 lines.push(format_scan_line(
219 &lower_from,
220 &stmt.from_alias,
221 &from_plan,
222 from_schema,
223 ));
224
225 for join in &stmt.joins {
226 let inner_lower = join.table.name.to_ascii_lowercase();
227 let inner_schema = schema
228 .get(&inner_lower)
229 .ok_or_else(|| SqlError::TableNotFound(join.table.name.clone()))?;
230 let inner_plan = planner::plan_select(inner_schema, &None);
231 lines.push(format_scan_line(
232 &inner_lower,
233 &join.table.alias,
234 &inner_plan,
235 inner_schema,
236 ));
237 }
238
239 let join_type_str = match stmt.joins.last().map(|j| j.join_type) {
240 Some(JoinType::Left) => "LEFT JOIN",
241 Some(JoinType::Right) => "RIGHT JOIN",
242 Some(JoinType::Cross) => "CROSS JOIN",
243 _ => "NESTED LOOP",
244 };
245 lines.push(join_type_str.into());
246 }
247
248 if stmt.where_clause.is_some() && stmt.joins.is_empty() {
249 let plan = planner::plan_select(from_schema, &stmt.where_clause);
250 if matches!(plan, ScanPlan::SeqScan) {
251 lines.push("FILTER".into());
252 }
253 }
254
255 if let Some(ref w) = stmt.where_clause {
256 if !stmt.joins.is_empty() && has_subquery(w) {
257 lines.push("SUBQUERY".into());
258 }
259 }
260
261 explain_subqueries(stmt, &mut lines);
262
263 if !stmt.group_by.is_empty() {
264 lines.push("GROUP BY".into());
265 }
266
267 if stmt.distinct {
268 lines.push("DISTINCT".into());
269 }
270
271 if !stmt.order_by.is_empty() {
272 lines.push("SORT".into());
273 }
274
275 if let Some(ref offset_expr) = stmt.offset {
276 if let Ok(n) = eval_const_int(offset_expr) {
277 lines.push(format!("OFFSET {n}"));
278 } else {
279 lines.push("OFFSET".into());
280 }
281 }
282
283 if let Some(ref limit_expr) = stmt.limit {
284 if let Ok(n) = eval_const_int(limit_expr) {
285 lines.push(format!("LIMIT {n}"));
286 } else {
287 lines.push("LIMIT".into());
288 }
289 }
290
291 Ok(lines)
292}
293
294fn explain_subqueries(stmt: &SelectStmt, lines: &mut Vec<String>) {
295 let mut count = 0;
296 if let Some(ref w) = stmt.where_clause {
297 count += count_subqueries(w);
298 }
299 if let Some(ref h) = stmt.having {
300 count += count_subqueries(h);
301 }
302 for col in &stmt.columns {
303 if let SelectColumn::Expr { expr, .. } = col {
304 count += count_subqueries(expr);
305 }
306 }
307 for _ in 0..count {
308 lines.push("SUBQUERY".into());
309 }
310}
311
312fn count_subqueries(expr: &Expr) -> usize {
313 match expr {
314 Expr::InSubquery { expr: e, .. } => 1 + count_subqueries(e),
315 Expr::ScalarSubquery(_) => 1,
316 Expr::Exists { .. } => 1,
317 Expr::BinaryOp { left, right, .. } => count_subqueries(left) + count_subqueries(right),
318 Expr::UnaryOp { expr: e, .. } => count_subqueries(e),
319 Expr::IsNull(e) | Expr::IsNotNull(e) => count_subqueries(e),
320 Expr::Function { args, .. } => args.iter().map(count_subqueries).sum(),
321 Expr::Between {
322 expr: e, low, high, ..
323 } => count_subqueries(e) + count_subqueries(low) + count_subqueries(high),
324 Expr::Like {
325 expr: e, pattern, ..
326 } => count_subqueries(e) + count_subqueries(pattern),
327 Expr::Case {
328 operand,
329 conditions,
330 else_result,
331 } => {
332 let mut n = 0;
333 if let Some(op) = operand {
334 n += count_subqueries(op);
335 }
336 for (c, r) in conditions {
337 n += count_subqueries(c) + count_subqueries(r);
338 }
339 if let Some(el) = else_result {
340 n += count_subqueries(el);
341 }
342 n
343 }
344 Expr::Coalesce(args) => args.iter().map(count_subqueries).sum(),
345 Expr::Cast { expr: e, .. } => count_subqueries(e),
346 Expr::InList { expr: e, list, .. } => {
347 count_subqueries(e) + list.iter().map(count_subqueries).sum::<usize>()
348 }
349 _ => 0,
350 }
351}
352
353fn format_scan_line(
354 table_name: &str,
355 alias: &Option<String>,
356 plan: &ScanPlan,
357 table_schema: &TableSchema,
358) -> String {
359 let alias_part = match alias {
360 Some(a) if !a.eq_ignore_ascii_case(table_name) => {
361 format!(" AS {}", a.to_ascii_lowercase())
362 }
363 _ => String::new(),
364 };
365
366 let desc = planner::describe_plan(plan, table_schema);
367
368 if desc.is_empty() {
369 format!("SCAN TABLE {table_name}{alias_part}")
370 } else {
371 format!("SEARCH TABLE {table_name}{alias_part} {desc}")
372 }
373}
374
375fn exec_create_table(
378 db: &Database,
379 schema: &mut SchemaManager,
380 stmt: &CreateTableStmt,
381) -> Result<ExecutionResult> {
382 let lower_name = stmt.name.to_ascii_lowercase();
383
384 if schema.contains(&lower_name) {
385 if stmt.if_not_exists {
386 return Ok(ExecutionResult::Ok);
387 }
388 return Err(SqlError::TableAlreadyExists(stmt.name.clone()));
389 }
390
391 if stmt.primary_key.is_empty() {
392 return Err(SqlError::PrimaryKeyRequired);
393 }
394
395 let mut seen = std::collections::HashSet::new();
396 for col in &stmt.columns {
397 let lower = col.name.to_ascii_lowercase();
398 if !seen.insert(lower.clone()) {
399 return Err(SqlError::DuplicateColumn(col.name.clone()));
400 }
401 }
402
403 let columns: Vec<ColumnDef> = stmt
404 .columns
405 .iter()
406 .enumerate()
407 .map(|(i, c)| ColumnDef {
408 name: c.name.to_ascii_lowercase(),
409 data_type: c.data_type,
410 nullable: c.nullable,
411 position: i as u16,
412 })
413 .collect();
414
415 let primary_key_columns: Vec<u16> = stmt
416 .primary_key
417 .iter()
418 .map(|pk_name| {
419 let lower = pk_name.to_ascii_lowercase();
420 columns
421 .iter()
422 .position(|c| c.name == lower)
423 .map(|i| i as u16)
424 .ok_or_else(|| SqlError::ColumnNotFound(pk_name.clone()))
425 })
426 .collect::<Result<_>>()?;
427
428 let table_schema = TableSchema {
429 name: lower_name.clone(),
430 columns,
431 primary_key_columns,
432 indices: vec![],
433 };
434
435 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
436 SchemaManager::ensure_schema_table(&mut wtx)?;
437 wtx.create_table(lower_name.as_bytes())
438 .map_err(SqlError::Storage)?;
439 SchemaManager::save_schema(&mut wtx, &table_schema)?;
440 wtx.commit().map_err(SqlError::Storage)?;
441
442 schema.register(table_schema);
443 Ok(ExecutionResult::Ok)
444}
445
446fn exec_drop_table(
447 db: &Database,
448 schema: &mut SchemaManager,
449 stmt: &DropTableStmt,
450) -> Result<ExecutionResult> {
451 let lower_name = stmt.name.to_ascii_lowercase();
452
453 if !schema.contains(&lower_name) {
454 if stmt.if_exists {
455 return Ok(ExecutionResult::Ok);
456 }
457 return Err(SqlError::TableNotFound(stmt.name.clone()));
458 }
459
460 let table_schema = schema.get(&lower_name).unwrap();
461 let idx_tables: Vec<Vec<u8>> = table_schema
462 .indices
463 .iter()
464 .map(|idx| TableSchema::index_table_name(&lower_name, &idx.name))
465 .collect();
466
467 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
468 for idx_table in &idx_tables {
469 wtx.drop_table(idx_table).map_err(SqlError::Storage)?;
470 }
471 wtx.drop_table(lower_name.as_bytes())
472 .map_err(SqlError::Storage)?;
473 SchemaManager::delete_schema(&mut wtx, &lower_name)?;
474 wtx.commit().map_err(SqlError::Storage)?;
475
476 schema.remove(&lower_name);
477 Ok(ExecutionResult::Ok)
478}
479
480fn exec_create_table_in_txn(
481 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
482 schema: &mut SchemaManager,
483 stmt: &CreateTableStmt,
484) -> Result<ExecutionResult> {
485 let lower_name = stmt.name.to_ascii_lowercase();
486
487 if schema.contains(&lower_name) {
488 if stmt.if_not_exists {
489 return Ok(ExecutionResult::Ok);
490 }
491 return Err(SqlError::TableAlreadyExists(stmt.name.clone()));
492 }
493
494 if stmt.primary_key.is_empty() {
495 return Err(SqlError::PrimaryKeyRequired);
496 }
497
498 let mut seen = std::collections::HashSet::new();
499 for col in &stmt.columns {
500 let lower = col.name.to_ascii_lowercase();
501 if !seen.insert(lower.clone()) {
502 return Err(SqlError::DuplicateColumn(col.name.clone()));
503 }
504 }
505
506 let columns: Vec<ColumnDef> = stmt
507 .columns
508 .iter()
509 .enumerate()
510 .map(|(i, c)| ColumnDef {
511 name: c.name.to_ascii_lowercase(),
512 data_type: c.data_type,
513 nullable: c.nullable,
514 position: i as u16,
515 })
516 .collect();
517
518 let primary_key_columns: Vec<u16> = stmt
519 .primary_key
520 .iter()
521 .map(|pk_name| {
522 let lower = pk_name.to_ascii_lowercase();
523 columns
524 .iter()
525 .position(|c| c.name == lower)
526 .map(|i| i as u16)
527 .ok_or_else(|| SqlError::ColumnNotFound(pk_name.clone()))
528 })
529 .collect::<Result<_>>()?;
530
531 let table_schema = TableSchema {
532 name: lower_name.clone(),
533 columns,
534 primary_key_columns,
535 indices: vec![],
536 };
537
538 SchemaManager::ensure_schema_table(wtx)?;
539 wtx.create_table(lower_name.as_bytes())
540 .map_err(SqlError::Storage)?;
541 SchemaManager::save_schema(wtx, &table_schema)?;
542
543 schema.register(table_schema);
544 Ok(ExecutionResult::Ok)
545}
546
547fn exec_drop_table_in_txn(
548 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
549 schema: &mut SchemaManager,
550 stmt: &DropTableStmt,
551) -> Result<ExecutionResult> {
552 let lower_name = stmt.name.to_ascii_lowercase();
553
554 if !schema.contains(&lower_name) {
555 if stmt.if_exists {
556 return Ok(ExecutionResult::Ok);
557 }
558 return Err(SqlError::TableNotFound(stmt.name.clone()));
559 }
560
561 let table_schema = schema.get(&lower_name).unwrap();
562 let idx_tables: Vec<Vec<u8>> = table_schema
563 .indices
564 .iter()
565 .map(|idx| TableSchema::index_table_name(&lower_name, &idx.name))
566 .collect();
567
568 for idx_table in &idx_tables {
569 wtx.drop_table(idx_table).map_err(SqlError::Storage)?;
570 }
571 wtx.drop_table(lower_name.as_bytes())
572 .map_err(SqlError::Storage)?;
573 SchemaManager::delete_schema(wtx, &lower_name)?;
574
575 schema.remove(&lower_name);
576 Ok(ExecutionResult::Ok)
577}
578
579fn exec_create_index(
580 db: &Database,
581 schema: &mut SchemaManager,
582 stmt: &CreateIndexStmt,
583) -> Result<ExecutionResult> {
584 let lower_table = stmt.table_name.to_ascii_lowercase();
585 let lower_idx = stmt.index_name.to_ascii_lowercase();
586
587 let table_schema = schema
588 .get(&lower_table)
589 .ok_or_else(|| SqlError::TableNotFound(stmt.table_name.clone()))?;
590
591 if table_schema.index_by_name(&lower_idx).is_some() {
592 if stmt.if_not_exists {
593 return Ok(ExecutionResult::Ok);
594 }
595 return Err(SqlError::IndexAlreadyExists(stmt.index_name.clone()));
596 }
597
598 let col_indices: Vec<u16> = stmt
599 .columns
600 .iter()
601 .map(|col_name| {
602 let lower = col_name.to_ascii_lowercase();
603 table_schema
604 .column_index(&lower)
605 .map(|i| i as u16)
606 .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))
607 })
608 .collect::<Result<_>>()?;
609
610 let idx_def = IndexDef {
611 name: lower_idx.clone(),
612 columns: col_indices,
613 unique: stmt.unique,
614 };
615
616 let idx_table = TableSchema::index_table_name(&lower_table, &lower_idx);
617
618 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
619 SchemaManager::ensure_schema_table(&mut wtx)?;
620 wtx.create_table(&idx_table).map_err(SqlError::Storage)?;
621
622 let pk_indices = table_schema.pk_indices();
624 let mut rows: Vec<Vec<Value>> = Vec::new();
625 {
626 let mut scan_err: Option<SqlError> = None;
627 wtx.table_for_each(lower_table.as_bytes(), |key, value| {
628 match decode_full_row(table_schema, key, value) {
629 Ok(row) => rows.push(row),
630 Err(e) => scan_err = Some(e),
631 }
632 Ok(())
633 })
634 .map_err(SqlError::Storage)?;
635 if let Some(e) = scan_err {
636 return Err(e);
637 }
638 }
639
640 for row in &rows {
641 let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
642 let key = encode_index_key(&idx_def, row, &pk_values);
643 let value = encode_index_value(&idx_def, row, &pk_values);
644 let is_new = wtx
645 .table_insert(&idx_table, &key, &value)
646 .map_err(SqlError::Storage)?;
647 if idx_def.unique && !is_new {
648 let indexed_values: Vec<Value> = idx_def
649 .columns
650 .iter()
651 .map(|&col_idx| row[col_idx as usize].clone())
652 .collect();
653 let any_null = indexed_values.iter().any(|v| v.is_null());
654 if !any_null {
655 return Err(SqlError::UniqueViolation(stmt.index_name.clone()));
656 }
657 }
658 }
659
660 let mut updated_schema = table_schema.clone();
661 updated_schema.indices.push(idx_def);
662 SchemaManager::save_schema(&mut wtx, &updated_schema)?;
663 wtx.commit().map_err(SqlError::Storage)?;
664
665 schema.register(updated_schema);
666 Ok(ExecutionResult::Ok)
667}
668
669fn exec_drop_index(
670 db: &Database,
671 schema: &mut SchemaManager,
672 stmt: &DropIndexStmt,
673) -> Result<ExecutionResult> {
674 let lower_idx = stmt.index_name.to_ascii_lowercase();
675
676 let (table_name, _idx_pos) = match find_index_in_schemas(schema, &lower_idx) {
677 Some(found) => found,
678 None => {
679 if stmt.if_exists {
680 return Ok(ExecutionResult::Ok);
681 }
682 return Err(SqlError::IndexNotFound(stmt.index_name.clone()));
683 }
684 };
685
686 let idx_table = TableSchema::index_table_name(&table_name, &lower_idx);
687
688 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
689 wtx.drop_table(&idx_table).map_err(SqlError::Storage)?;
690
691 let table_schema = schema.get(&table_name).unwrap();
692 let mut updated_schema = table_schema.clone();
693 updated_schema.indices.retain(|i| i.name != lower_idx);
694 SchemaManager::save_schema(&mut wtx, &updated_schema)?;
695 wtx.commit().map_err(SqlError::Storage)?;
696
697 schema.register(updated_schema);
698 Ok(ExecutionResult::Ok)
699}
700
701fn exec_create_index_in_txn(
702 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
703 schema: &mut SchemaManager,
704 stmt: &CreateIndexStmt,
705) -> Result<ExecutionResult> {
706 let lower_table = stmt.table_name.to_ascii_lowercase();
707 let lower_idx = stmt.index_name.to_ascii_lowercase();
708
709 let table_schema = schema
710 .get(&lower_table)
711 .ok_or_else(|| SqlError::TableNotFound(stmt.table_name.clone()))?;
712
713 if table_schema.index_by_name(&lower_idx).is_some() {
714 if stmt.if_not_exists {
715 return Ok(ExecutionResult::Ok);
716 }
717 return Err(SqlError::IndexAlreadyExists(stmt.index_name.clone()));
718 }
719
720 let col_indices: Vec<u16> = stmt
721 .columns
722 .iter()
723 .map(|col_name| {
724 let lower = col_name.to_ascii_lowercase();
725 table_schema
726 .column_index(&lower)
727 .map(|i| i as u16)
728 .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))
729 })
730 .collect::<Result<_>>()?;
731
732 let idx_def = IndexDef {
733 name: lower_idx.clone(),
734 columns: col_indices,
735 unique: stmt.unique,
736 };
737
738 let idx_table = TableSchema::index_table_name(&lower_table, &lower_idx);
739
740 SchemaManager::ensure_schema_table(wtx)?;
741 wtx.create_table(&idx_table).map_err(SqlError::Storage)?;
742
743 let pk_indices = table_schema.pk_indices();
744 let mut rows: Vec<Vec<Value>> = Vec::new();
745 {
746 let mut scan_err: Option<SqlError> = None;
747 wtx.table_for_each(lower_table.as_bytes(), |key, value| {
748 match decode_full_row(table_schema, key, value) {
749 Ok(row) => rows.push(row),
750 Err(e) => scan_err = Some(e),
751 }
752 Ok(())
753 })
754 .map_err(SqlError::Storage)?;
755 if let Some(e) = scan_err {
756 return Err(e);
757 }
758 }
759
760 for row in &rows {
761 let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
762 let key = encode_index_key(&idx_def, row, &pk_values);
763 let value = encode_index_value(&idx_def, row, &pk_values);
764 let is_new = wtx
765 .table_insert(&idx_table, &key, &value)
766 .map_err(SqlError::Storage)?;
767 if idx_def.unique && !is_new {
768 let indexed_values: Vec<Value> = idx_def
769 .columns
770 .iter()
771 .map(|&col_idx| row[col_idx as usize].clone())
772 .collect();
773 let any_null = indexed_values.iter().any(|v| v.is_null());
774 if !any_null {
775 return Err(SqlError::UniqueViolation(stmt.index_name.clone()));
776 }
777 }
778 }
779
780 let mut updated_schema = table_schema.clone();
781 updated_schema.indices.push(idx_def);
782 SchemaManager::save_schema(wtx, &updated_schema)?;
783
784 schema.register(updated_schema);
785 Ok(ExecutionResult::Ok)
786}
787
788fn exec_drop_index_in_txn(
789 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
790 schema: &mut SchemaManager,
791 stmt: &DropIndexStmt,
792) -> Result<ExecutionResult> {
793 let lower_idx = stmt.index_name.to_ascii_lowercase();
794
795 let (table_name, _idx_pos) = match find_index_in_schemas(schema, &lower_idx) {
796 Some(found) => found,
797 None => {
798 if stmt.if_exists {
799 return Ok(ExecutionResult::Ok);
800 }
801 return Err(SqlError::IndexNotFound(stmt.index_name.clone()));
802 }
803 };
804
805 let idx_table = TableSchema::index_table_name(&table_name, &lower_idx);
806 wtx.drop_table(&idx_table).map_err(SqlError::Storage)?;
807
808 let table_schema = schema.get(&table_name).unwrap();
809 let mut updated_schema = table_schema.clone();
810 updated_schema.indices.retain(|i| i.name != lower_idx);
811 SchemaManager::save_schema(wtx, &updated_schema)?;
812
813 schema.register(updated_schema);
814 Ok(ExecutionResult::Ok)
815}
816
817fn find_index_in_schemas(schema: &SchemaManager, index_name: &str) -> Option<(String, usize)> {
818 for table_name in schema.table_names() {
819 if let Some(ts) = schema.get(table_name) {
820 if let Some(pos) = ts.indices.iter().position(|i| i.name == index_name) {
821 return Some((table_name.to_string(), pos));
822 }
823 }
824 }
825 None
826}
827
828fn extract_pk_key(
831 idx_key: &[u8],
832 idx_value: &[u8],
833 is_unique: bool,
834 num_index_cols: usize,
835 num_pk_cols: usize,
836) -> Result<Vec<u8>> {
837 if is_unique && !idx_value.is_empty() {
838 Ok(idx_value.to_vec())
839 } else {
840 let total_cols = num_index_cols + num_pk_cols;
841 let all_values = decode_composite_key(idx_key, total_cols)?;
842 let pk_values = &all_values[num_index_cols..];
843 Ok(encode_composite_key(pk_values))
844 }
845}
846
847fn check_range_conditions(
848 idx_key: &[u8],
849 num_prefix_cols: usize,
850 range_conds: &[(BinOp, Value)],
851 num_index_cols: usize,
852) -> Result<RangeCheck> {
853 if range_conds.is_empty() {
854 return Ok(RangeCheck::Match);
855 }
856
857 let num_to_decode = num_prefix_cols + 1;
858 if num_to_decode > num_index_cols {
859 return Ok(RangeCheck::Match);
860 }
861
862 let mut pos = 0;
864 for _ in 0..num_prefix_cols {
865 let (_, n) = decode_key_value(&idx_key[pos..])?;
866 pos += n;
867 }
868 let (range_val, _) = decode_key_value(&idx_key[pos..])?;
869
870 let mut exceeds_upper = false;
871 let mut below_lower = false;
872
873 for (op, val) in range_conds {
874 match op {
875 BinOp::Lt => {
876 if range_val >= *val {
877 exceeds_upper = true;
878 }
879 }
880 BinOp::LtEq => {
881 if range_val > *val {
882 exceeds_upper = true;
883 }
884 }
885 BinOp::Gt => {
886 if range_val <= *val {
887 below_lower = true;
888 }
889 }
890 BinOp::GtEq => {
891 if range_val < *val {
892 below_lower = true;
893 }
894 }
895 _ => {}
896 }
897 }
898
899 if exceeds_upper {
900 Ok(RangeCheck::ExceedsUpper)
901 } else if below_lower {
902 Ok(RangeCheck::BelowLower)
903 } else {
904 Ok(RangeCheck::Match)
905 }
906}
907
908enum RangeCheck {
909 Match,
910 BelowLower,
911 ExceedsUpper,
912}
913
914fn collect_rows_read(
916 db: &Database,
917 table_schema: &TableSchema,
918 where_clause: &Option<Expr>,
919) -> Result<Vec<Vec<Value>>> {
920 let plan = planner::plan_select(table_schema, where_clause);
921 let lower_name = &table_schema.name;
922
923 match plan {
924 ScanPlan::SeqScan => {
925 let mut rows = Vec::new();
926 let mut rtx = db.begin_read();
927 let mut scan_err: Option<SqlError> = None;
928 rtx.table_for_each(lower_name.as_bytes(), |key, value| {
929 match decode_full_row(table_schema, key, value) {
930 Ok(row) => rows.push(row),
931 Err(e) => scan_err = Some(e),
932 }
933 Ok(())
934 })
935 .map_err(SqlError::Storage)?;
936 if let Some(e) = scan_err {
937 return Err(e);
938 }
939 Ok(rows)
940 }
941
942 ScanPlan::PkLookup { pk_values } => {
943 let key = encode_composite_key(&pk_values);
944 let mut rtx = db.begin_read();
945 match rtx
946 .table_get(lower_name.as_bytes(), &key)
947 .map_err(SqlError::Storage)?
948 {
949 Some(value) => {
950 let row = decode_full_row(table_schema, &key, &value)?;
951 Ok(vec![row])
952 }
953 None => Ok(vec![]),
954 }
955 }
956
957 ScanPlan::IndexScan {
958 idx_table,
959 prefix,
960 num_prefix_cols,
961 range_conds,
962 is_unique,
963 index_columns,
964 ..
965 } => {
966 let num_pk_cols = table_schema.primary_key_columns.len();
967 let num_index_cols = index_columns.len();
968 let mut pk_keys: Vec<Vec<u8>> = Vec::new();
969
970 {
971 let mut rtx = db.begin_read();
972 let mut scan_err: Option<SqlError> = None;
973 rtx.table_scan_from(&idx_table, &prefix, |key, value| {
974 if !key.starts_with(&prefix) {
975 return Ok(false);
976 }
977 match check_range_conditions(key, num_prefix_cols, &range_conds, num_index_cols)
978 {
979 Ok(RangeCheck::ExceedsUpper) => return Ok(false),
980 Ok(RangeCheck::BelowLower) => return Ok(true),
981 Ok(RangeCheck::Match) => {}
982 Err(e) => {
983 scan_err = Some(e);
984 return Ok(false);
985 }
986 }
987 match extract_pk_key(key, value, is_unique, num_index_cols, num_pk_cols) {
988 Ok(pk) => pk_keys.push(pk),
989 Err(e) => {
990 scan_err = Some(e);
991 return Ok(false);
992 }
993 }
994 Ok(true)
995 })
996 .map_err(SqlError::Storage)?;
997 if let Some(e) = scan_err {
998 return Err(e);
999 }
1000 }
1001
1002 let mut rows = Vec::new();
1003 let mut rtx = db.begin_read();
1004 for pk_key in &pk_keys {
1005 if let Some(value) = rtx
1006 .table_get(lower_name.as_bytes(), pk_key)
1007 .map_err(SqlError::Storage)?
1008 {
1009 rows.push(decode_full_row(table_schema, pk_key, &value)?);
1010 }
1011 }
1012 Ok(rows)
1013 }
1014 }
1015}
1016
1017fn collect_rows_write(
1019 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1020 table_schema: &TableSchema,
1021 where_clause: &Option<Expr>,
1022) -> Result<Vec<Vec<Value>>> {
1023 let plan = planner::plan_select(table_schema, where_clause);
1024 let lower_name = &table_schema.name;
1025
1026 match plan {
1027 ScanPlan::SeqScan => {
1028 let mut rows = Vec::new();
1029 let mut scan_err: Option<SqlError> = None;
1030 wtx.table_for_each(lower_name.as_bytes(), |key, value| {
1031 match decode_full_row(table_schema, key, value) {
1032 Ok(row) => rows.push(row),
1033 Err(e) => scan_err = Some(e),
1034 }
1035 Ok(())
1036 })
1037 .map_err(SqlError::Storage)?;
1038 if let Some(e) = scan_err {
1039 return Err(e);
1040 }
1041 Ok(rows)
1042 }
1043
1044 ScanPlan::PkLookup { pk_values } => {
1045 let key = encode_composite_key(&pk_values);
1046 match wtx
1047 .table_get(lower_name.as_bytes(), &key)
1048 .map_err(SqlError::Storage)?
1049 {
1050 Some(value) => {
1051 let row = decode_full_row(table_schema, &key, &value)?;
1052 Ok(vec![row])
1053 }
1054 None => Ok(vec![]),
1055 }
1056 }
1057
1058 ScanPlan::IndexScan {
1059 idx_table,
1060 prefix,
1061 num_prefix_cols,
1062 range_conds,
1063 is_unique,
1064 index_columns,
1065 ..
1066 } => {
1067 let num_pk_cols = table_schema.primary_key_columns.len();
1068 let num_index_cols = index_columns.len();
1069 let mut pk_keys: Vec<Vec<u8>> = Vec::new();
1070
1071 {
1072 let mut scan_err: Option<SqlError> = None;
1073 wtx.table_scan_from(&idx_table, &prefix, |key, value| {
1074 if !key.starts_with(&prefix) {
1075 return Ok(false);
1076 }
1077 match check_range_conditions(key, num_prefix_cols, &range_conds, num_index_cols)
1078 {
1079 Ok(RangeCheck::ExceedsUpper) => return Ok(false),
1080 Ok(RangeCheck::BelowLower) => return Ok(true),
1081 Ok(RangeCheck::Match) => {}
1082 Err(e) => {
1083 scan_err = Some(e);
1084 return Ok(false);
1085 }
1086 }
1087 match extract_pk_key(key, value, is_unique, num_index_cols, num_pk_cols) {
1088 Ok(pk) => pk_keys.push(pk),
1089 Err(e) => {
1090 scan_err = Some(e);
1091 return Ok(false);
1092 }
1093 }
1094 Ok(true)
1095 })
1096 .map_err(SqlError::Storage)?;
1097 if let Some(e) = scan_err {
1098 return Err(e);
1099 }
1100 }
1101
1102 let mut rows = Vec::new();
1103 for pk_key in &pk_keys {
1104 if let Some(value) = wtx
1105 .table_get(lower_name.as_bytes(), pk_key)
1106 .map_err(SqlError::Storage)?
1107 {
1108 rows.push(decode_full_row(table_schema, pk_key, &value)?);
1109 }
1110 }
1111 Ok(rows)
1112 }
1113 }
1114}
1115
1116fn collect_keyed_rows_read(
1119 db: &Database,
1120 table_schema: &TableSchema,
1121 where_clause: &Option<Expr>,
1122) -> Result<Vec<(Vec<u8>, Vec<Value>)>> {
1123 let plan = planner::plan_select(table_schema, where_clause);
1124 let lower_name = &table_schema.name;
1125
1126 match plan {
1127 ScanPlan::SeqScan => {
1128 let mut rows = Vec::new();
1129 let mut rtx = db.begin_read();
1130 let mut scan_err: Option<SqlError> = None;
1131 rtx.table_for_each(lower_name.as_bytes(), |key, value| {
1132 match decode_full_row(table_schema, key, value) {
1133 Ok(row) => rows.push((key.to_vec(), row)),
1134 Err(e) => scan_err = Some(e),
1135 }
1136 Ok(())
1137 })
1138 .map_err(SqlError::Storage)?;
1139 if let Some(e) = scan_err {
1140 return Err(e);
1141 }
1142 Ok(rows)
1143 }
1144
1145 ScanPlan::PkLookup { pk_values } => {
1146 let key = encode_composite_key(&pk_values);
1147 let mut rtx = db.begin_read();
1148 match rtx
1149 .table_get(lower_name.as_bytes(), &key)
1150 .map_err(SqlError::Storage)?
1151 {
1152 Some(value) => {
1153 let row = decode_full_row(table_schema, &key, &value)?;
1154 Ok(vec![(key, row)])
1155 }
1156 None => Ok(vec![]),
1157 }
1158 }
1159
1160 ScanPlan::IndexScan {
1161 idx_table,
1162 prefix,
1163 num_prefix_cols,
1164 range_conds,
1165 is_unique,
1166 index_columns,
1167 ..
1168 } => {
1169 let num_pk_cols = table_schema.primary_key_columns.len();
1170 let num_index_cols = index_columns.len();
1171 let mut pk_keys: Vec<Vec<u8>> = Vec::new();
1172
1173 {
1174 let mut rtx = db.begin_read();
1175 let mut scan_err: Option<SqlError> = None;
1176 rtx.table_scan_from(&idx_table, &prefix, |key, value| {
1177 if !key.starts_with(&prefix) {
1178 return Ok(false);
1179 }
1180 match check_range_conditions(key, num_prefix_cols, &range_conds, num_index_cols)
1181 {
1182 Ok(RangeCheck::ExceedsUpper) => return Ok(false),
1183 Ok(RangeCheck::BelowLower) => return Ok(true),
1184 Ok(RangeCheck::Match) => {}
1185 Err(e) => {
1186 scan_err = Some(e);
1187 return Ok(false);
1188 }
1189 }
1190 match extract_pk_key(key, value, is_unique, num_index_cols, num_pk_cols) {
1191 Ok(pk) => pk_keys.push(pk),
1192 Err(e) => {
1193 scan_err = Some(e);
1194 return Ok(false);
1195 }
1196 }
1197 Ok(true)
1198 })
1199 .map_err(SqlError::Storage)?;
1200 if let Some(e) = scan_err {
1201 return Err(e);
1202 }
1203 }
1204
1205 let mut rows = Vec::new();
1206 let mut rtx = db.begin_read();
1207 for pk_key in &pk_keys {
1208 if let Some(value) = rtx
1209 .table_get(lower_name.as_bytes(), pk_key)
1210 .map_err(SqlError::Storage)?
1211 {
1212 rows.push((
1213 pk_key.clone(),
1214 decode_full_row(table_schema, pk_key, &value)?,
1215 ));
1216 }
1217 }
1218 Ok(rows)
1219 }
1220 }
1221}
1222
1223fn collect_keyed_rows_write(
1225 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1226 table_schema: &TableSchema,
1227 where_clause: &Option<Expr>,
1228) -> Result<Vec<(Vec<u8>, Vec<Value>)>> {
1229 let plan = planner::plan_select(table_schema, where_clause);
1230 let lower_name = &table_schema.name;
1231
1232 match plan {
1233 ScanPlan::SeqScan => {
1234 let mut rows = Vec::new();
1235 let mut scan_err: Option<SqlError> = None;
1236 wtx.table_for_each(lower_name.as_bytes(), |key, value| {
1237 match decode_full_row(table_schema, key, value) {
1238 Ok(row) => rows.push((key.to_vec(), row)),
1239 Err(e) => scan_err = Some(e),
1240 }
1241 Ok(())
1242 })
1243 .map_err(SqlError::Storage)?;
1244 if let Some(e) = scan_err {
1245 return Err(e);
1246 }
1247 Ok(rows)
1248 }
1249
1250 ScanPlan::PkLookup { pk_values } => {
1251 let key = encode_composite_key(&pk_values);
1252 match wtx
1253 .table_get(lower_name.as_bytes(), &key)
1254 .map_err(SqlError::Storage)?
1255 {
1256 Some(value) => {
1257 let row = decode_full_row(table_schema, &key, &value)?;
1258 Ok(vec![(key, row)])
1259 }
1260 None => Ok(vec![]),
1261 }
1262 }
1263
1264 ScanPlan::IndexScan {
1265 idx_table,
1266 prefix,
1267 num_prefix_cols,
1268 range_conds,
1269 is_unique,
1270 index_columns,
1271 ..
1272 } => {
1273 let num_pk_cols = table_schema.primary_key_columns.len();
1274 let num_index_cols = index_columns.len();
1275 let mut pk_keys: Vec<Vec<u8>> = Vec::new();
1276
1277 {
1278 let mut scan_err: Option<SqlError> = None;
1279 wtx.table_scan_from(&idx_table, &prefix, |key, value| {
1280 if !key.starts_with(&prefix) {
1281 return Ok(false);
1282 }
1283 match check_range_conditions(key, num_prefix_cols, &range_conds, num_index_cols)
1284 {
1285 Ok(RangeCheck::ExceedsUpper) => return Ok(false),
1286 Ok(RangeCheck::BelowLower) => return Ok(true),
1287 Ok(RangeCheck::Match) => {}
1288 Err(e) => {
1289 scan_err = Some(e);
1290 return Ok(false);
1291 }
1292 }
1293 match extract_pk_key(key, value, is_unique, num_index_cols, num_pk_cols) {
1294 Ok(pk) => pk_keys.push(pk),
1295 Err(e) => {
1296 scan_err = Some(e);
1297 return Ok(false);
1298 }
1299 }
1300 Ok(true)
1301 })
1302 .map_err(SqlError::Storage)?;
1303 if let Some(e) = scan_err {
1304 return Err(e);
1305 }
1306 }
1307
1308 let mut rows = Vec::new();
1309 for pk_key in &pk_keys {
1310 if let Some(value) = wtx
1311 .table_get(lower_name.as_bytes(), pk_key)
1312 .map_err(SqlError::Storage)?
1313 {
1314 rows.push((
1315 pk_key.clone(),
1316 decode_full_row(table_schema, pk_key, &value)?,
1317 ));
1318 }
1319 }
1320 Ok(rows)
1321 }
1322 }
1323}
1324
1325fn exec_insert(
1328 db: &Database,
1329 schema: &SchemaManager,
1330 stmt: &InsertStmt,
1331) -> Result<ExecutionResult> {
1332 let materialized;
1333 let stmt = if insert_has_subquery(stmt) {
1334 materialized = materialize_insert(stmt, &mut |sub| exec_subquery_read(db, schema, sub))?;
1335 &materialized
1336 } else {
1337 stmt
1338 };
1339
1340 let lower_name = stmt.table.to_ascii_lowercase();
1341 let table_schema = schema
1342 .get(&lower_name)
1343 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1344
1345 let insert_columns = if stmt.columns.is_empty() {
1346 table_schema
1347 .columns
1348 .iter()
1349 .map(|c| c.name.clone())
1350 .collect::<Vec<_>>()
1351 } else {
1352 stmt.columns
1353 .iter()
1354 .map(|c| c.to_ascii_lowercase())
1355 .collect()
1356 };
1357
1358 let col_indices: Vec<usize> = insert_columns
1359 .iter()
1360 .map(|name| {
1361 table_schema
1362 .column_index(name)
1363 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
1364 })
1365 .collect::<Result<_>>()?;
1366
1367 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
1368 let mut count: u64 = 0;
1369
1370 for value_row in &stmt.values {
1371 if value_row.len() != insert_columns.len() {
1372 return Err(SqlError::InvalidValue(format!(
1373 "expected {} values, got {}",
1374 insert_columns.len(),
1375 value_row.len()
1376 )));
1377 }
1378
1379 let mut row = vec![Value::Null; table_schema.columns.len()];
1380 for (i, expr) in value_row.iter().enumerate() {
1381 let val = eval_const_expr(expr)?;
1382 let col_idx = col_indices[i];
1383 let col = &table_schema.columns[col_idx];
1384
1385 let coerced = if val.is_null() {
1386 Value::Null
1387 } else {
1388 val.coerce_to(col.data_type)
1389 .ok_or_else(|| SqlError::TypeMismatch {
1390 expected: col.data_type.to_string(),
1391 got: val.data_type().to_string(),
1392 })?
1393 };
1394
1395 row[col_idx] = coerced;
1396 }
1397
1398 for col in &table_schema.columns {
1399 if !col.nullable && row[col.position as usize].is_null() {
1400 return Err(SqlError::NotNullViolation(col.name.clone()));
1401 }
1402 }
1403
1404 let pk_values: Vec<Value> = table_schema
1405 .pk_indices()
1406 .iter()
1407 .map(|&i| row[i].clone())
1408 .collect();
1409 let key = encode_composite_key(&pk_values);
1410
1411 let non_pk = table_schema.non_pk_indices();
1412 let value_values: Vec<Value> = non_pk.iter().map(|&i| row[i].clone()).collect();
1413 let value = encode_row(&value_values);
1414
1415 if key.len() > citadel_core::MAX_KEY_SIZE {
1416 return Err(SqlError::KeyTooLarge {
1417 size: key.len(),
1418 max: citadel_core::MAX_KEY_SIZE,
1419 });
1420 }
1421 if value.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
1422 return Err(SqlError::RowTooLarge {
1423 size: value.len(),
1424 max: citadel_core::MAX_INLINE_VALUE_SIZE,
1425 });
1426 }
1427
1428 let is_new = wtx
1429 .table_insert(lower_name.as_bytes(), &key, &value)
1430 .map_err(SqlError::Storage)?;
1431 if !is_new {
1432 return Err(SqlError::DuplicateKey);
1433 }
1434
1435 insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
1436 count += 1;
1437 }
1438
1439 wtx.commit().map_err(SqlError::Storage)?;
1440 Ok(ExecutionResult::RowsAffected(count))
1441}
1442
1443fn has_subquery(expr: &Expr) -> bool {
1444 match expr {
1445 Expr::InSubquery { .. } | Expr::Exists { .. } | Expr::ScalarSubquery(_) => true,
1446 Expr::BinaryOp { left, right, .. } => has_subquery(left) || has_subquery(right),
1447 Expr::UnaryOp { expr, .. } => has_subquery(expr),
1448 Expr::IsNull(e) | Expr::IsNotNull(e) => has_subquery(e),
1449 Expr::InList { expr, list, .. } => has_subquery(expr) || list.iter().any(has_subquery),
1450 Expr::InSet { expr, .. } => has_subquery(expr),
1451 Expr::Between {
1452 expr, low, high, ..
1453 } => has_subquery(expr) || has_subquery(low) || has_subquery(high),
1454 Expr::Like {
1455 expr,
1456 pattern,
1457 escape,
1458 ..
1459 } => {
1460 has_subquery(expr)
1461 || has_subquery(pattern)
1462 || escape.as_ref().is_some_and(|e| has_subquery(e))
1463 }
1464 Expr::Case {
1465 operand,
1466 conditions,
1467 else_result,
1468 } => {
1469 operand.as_ref().is_some_and(|e| has_subquery(e))
1470 || conditions
1471 .iter()
1472 .any(|(c, r)| has_subquery(c) || has_subquery(r))
1473 || else_result.as_ref().is_some_and(|e| has_subquery(e))
1474 }
1475 Expr::Coalesce(args) => args.iter().any(has_subquery),
1476 Expr::Cast { expr, .. } => has_subquery(expr),
1477 Expr::Function { args, .. } => args.iter().any(has_subquery),
1478 _ => false,
1479 }
1480}
1481
1482fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
1483 if let Some(ref w) = stmt.where_clause {
1484 if has_subquery(w) {
1485 return true;
1486 }
1487 }
1488 if let Some(ref h) = stmt.having {
1489 if has_subquery(h) {
1490 return true;
1491 }
1492 }
1493 for col in &stmt.columns {
1494 if let SelectColumn::Expr { expr, .. } = col {
1495 if has_subquery(expr) {
1496 return true;
1497 }
1498 }
1499 }
1500 for ob in &stmt.order_by {
1501 if has_subquery(&ob.expr) {
1502 return true;
1503 }
1504 }
1505 for join in &stmt.joins {
1506 if let Some(ref on_expr) = join.on_clause {
1507 if has_subquery(on_expr) {
1508 return true;
1509 }
1510 }
1511 }
1512 false
1513}
1514
1515fn materialize_expr(
1516 expr: &Expr,
1517 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1518) -> Result<Expr> {
1519 match expr {
1520 Expr::InSubquery {
1521 expr: e,
1522 subquery,
1523 negated,
1524 } => {
1525 let inner = materialize_expr(e, exec_sub)?;
1526 let qr = exec_sub(subquery)?;
1527 if !qr.columns.is_empty() && qr.columns.len() != 1 {
1528 return Err(SqlError::SubqueryMultipleColumns);
1529 }
1530 let mut values = std::collections::HashSet::new();
1531 let mut has_null = false;
1532 for row in &qr.rows {
1533 if row[0].is_null() {
1534 has_null = true;
1535 } else {
1536 values.insert(row[0].clone());
1537 }
1538 }
1539 Ok(Expr::InSet {
1540 expr: Box::new(inner),
1541 values,
1542 has_null,
1543 negated: *negated,
1544 })
1545 }
1546 Expr::ScalarSubquery(subquery) => {
1547 let qr = exec_sub(subquery)?;
1548 if qr.rows.len() > 1 {
1549 return Err(SqlError::SubqueryMultipleRows);
1550 }
1551 let val = if qr.rows.is_empty() {
1552 Value::Null
1553 } else {
1554 qr.rows[0][0].clone()
1555 };
1556 Ok(Expr::Literal(val))
1557 }
1558 Expr::Exists { subquery, negated } => {
1559 let qr = exec_sub(subquery)?;
1560 let exists = !qr.rows.is_empty();
1561 let result = if *negated { !exists } else { exists };
1562 Ok(Expr::Literal(Value::Boolean(result)))
1563 }
1564 Expr::InList {
1565 expr: e,
1566 list,
1567 negated,
1568 } => {
1569 let inner = materialize_expr(e, exec_sub)?;
1570 let items = list
1571 .iter()
1572 .map(|item| materialize_expr(item, exec_sub))
1573 .collect::<Result<Vec<_>>>()?;
1574 Ok(Expr::InList {
1575 expr: Box::new(inner),
1576 list: items,
1577 negated: *negated,
1578 })
1579 }
1580 Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
1581 left: Box::new(materialize_expr(left, exec_sub)?),
1582 op: *op,
1583 right: Box::new(materialize_expr(right, exec_sub)?),
1584 }),
1585 Expr::UnaryOp { op, expr: e } => Ok(Expr::UnaryOp {
1586 op: *op,
1587 expr: Box::new(materialize_expr(e, exec_sub)?),
1588 }),
1589 Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
1590 Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
1591 Expr::InSet {
1592 expr: e,
1593 values,
1594 has_null,
1595 negated,
1596 } => Ok(Expr::InSet {
1597 expr: Box::new(materialize_expr(e, exec_sub)?),
1598 values: values.clone(),
1599 has_null: *has_null,
1600 negated: *negated,
1601 }),
1602 Expr::Between {
1603 expr: e,
1604 low,
1605 high,
1606 negated,
1607 } => Ok(Expr::Between {
1608 expr: Box::new(materialize_expr(e, exec_sub)?),
1609 low: Box::new(materialize_expr(low, exec_sub)?),
1610 high: Box::new(materialize_expr(high, exec_sub)?),
1611 negated: *negated,
1612 }),
1613 Expr::Like {
1614 expr: e,
1615 pattern,
1616 escape,
1617 negated,
1618 } => {
1619 let esc = escape
1620 .as_ref()
1621 .map(|es| materialize_expr(es, exec_sub).map(Box::new))
1622 .transpose()?;
1623 Ok(Expr::Like {
1624 expr: Box::new(materialize_expr(e, exec_sub)?),
1625 pattern: Box::new(materialize_expr(pattern, exec_sub)?),
1626 escape: esc,
1627 negated: *negated,
1628 })
1629 }
1630 Expr::Case {
1631 operand,
1632 conditions,
1633 else_result,
1634 } => {
1635 let op = operand
1636 .as_ref()
1637 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
1638 .transpose()?;
1639 let conds = conditions
1640 .iter()
1641 .map(|(c, r)| {
1642 Ok((
1643 materialize_expr(c, exec_sub)?,
1644 materialize_expr(r, exec_sub)?,
1645 ))
1646 })
1647 .collect::<Result<Vec<_>>>()?;
1648 let else_r = else_result
1649 .as_ref()
1650 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
1651 .transpose()?;
1652 Ok(Expr::Case {
1653 operand: op,
1654 conditions: conds,
1655 else_result: else_r,
1656 })
1657 }
1658 Expr::Coalesce(args) => {
1659 let materialized = args
1660 .iter()
1661 .map(|a| materialize_expr(a, exec_sub))
1662 .collect::<Result<Vec<_>>>()?;
1663 Ok(Expr::Coalesce(materialized))
1664 }
1665 Expr::Cast { expr: e, data_type } => Ok(Expr::Cast {
1666 expr: Box::new(materialize_expr(e, exec_sub)?),
1667 data_type: *data_type,
1668 }),
1669 Expr::Function { name, args } => {
1670 let materialized = args
1671 .iter()
1672 .map(|a| materialize_expr(a, exec_sub))
1673 .collect::<Result<Vec<_>>>()?;
1674 Ok(Expr::Function {
1675 name: name.clone(),
1676 args: materialized,
1677 })
1678 }
1679 other => Ok(other.clone()),
1680 }
1681}
1682
1683fn materialize_stmt(
1684 stmt: &SelectStmt,
1685 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1686) -> Result<SelectStmt> {
1687 let where_clause = stmt
1688 .where_clause
1689 .as_ref()
1690 .map(|e| materialize_expr(e, exec_sub))
1691 .transpose()?;
1692 let having = stmt
1693 .having
1694 .as_ref()
1695 .map(|e| materialize_expr(e, exec_sub))
1696 .transpose()?;
1697 let columns = stmt
1698 .columns
1699 .iter()
1700 .map(|c| match c {
1701 SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
1702 SelectColumn::Expr { expr, alias } => Ok(SelectColumn::Expr {
1703 expr: materialize_expr(expr, exec_sub)?,
1704 alias: alias.clone(),
1705 }),
1706 })
1707 .collect::<Result<Vec<_>>>()?;
1708 let order_by = stmt
1709 .order_by
1710 .iter()
1711 .map(|ob| {
1712 Ok(OrderByItem {
1713 expr: materialize_expr(&ob.expr, exec_sub)?,
1714 descending: ob.descending,
1715 nulls_first: ob.nulls_first,
1716 })
1717 })
1718 .collect::<Result<Vec<_>>>()?;
1719 let joins = stmt
1720 .joins
1721 .iter()
1722 .map(|j| {
1723 let on_clause = j
1724 .on_clause
1725 .as_ref()
1726 .map(|e| materialize_expr(e, exec_sub))
1727 .transpose()?;
1728 Ok(JoinClause {
1729 join_type: j.join_type,
1730 table: j.table.clone(),
1731 on_clause,
1732 })
1733 })
1734 .collect::<Result<Vec<_>>>()?;
1735 let group_by = stmt
1736 .group_by
1737 .iter()
1738 .map(|e| materialize_expr(e, exec_sub))
1739 .collect::<Result<Vec<_>>>()?;
1740 Ok(SelectStmt {
1741 columns,
1742 from: stmt.from.clone(),
1743 from_alias: stmt.from_alias.clone(),
1744 joins,
1745 distinct: stmt.distinct,
1746 where_clause,
1747 order_by,
1748 limit: stmt.limit.clone(),
1749 offset: stmt.offset.clone(),
1750 group_by,
1751 having,
1752 })
1753}
1754
1755fn exec_subquery_read(
1756 db: &Database,
1757 schema: &SchemaManager,
1758 stmt: &SelectStmt,
1759) -> Result<QueryResult> {
1760 match exec_select(db, schema, stmt)? {
1761 ExecutionResult::Query(qr) => Ok(qr),
1762 _ => Ok(QueryResult {
1763 columns: vec![],
1764 rows: vec![],
1765 }),
1766 }
1767}
1768
1769fn exec_subquery_write(
1770 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1771 schema: &SchemaManager,
1772 stmt: &SelectStmt,
1773) -> Result<QueryResult> {
1774 match exec_select_in_txn(wtx, schema, stmt)? {
1775 ExecutionResult::Query(qr) => Ok(qr),
1776 _ => Ok(QueryResult {
1777 columns: vec![],
1778 rows: vec![],
1779 }),
1780 }
1781}
1782
1783fn update_has_subquery(stmt: &UpdateStmt) -> bool {
1784 stmt.where_clause.as_ref().is_some_and(has_subquery)
1785 || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
1786}
1787
1788fn materialize_update(
1789 stmt: &UpdateStmt,
1790 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1791) -> Result<UpdateStmt> {
1792 let where_clause = stmt
1793 .where_clause
1794 .as_ref()
1795 .map(|e| materialize_expr(e, exec_sub))
1796 .transpose()?;
1797 let assignments = stmt
1798 .assignments
1799 .iter()
1800 .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
1801 .collect::<Result<Vec<_>>>()?;
1802 Ok(UpdateStmt {
1803 table: stmt.table.clone(),
1804 assignments,
1805 where_clause,
1806 })
1807}
1808
1809fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
1810 stmt.where_clause.as_ref().is_some_and(has_subquery)
1811}
1812
1813fn materialize_delete(
1814 stmt: &DeleteStmt,
1815 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1816) -> Result<DeleteStmt> {
1817 let where_clause = stmt
1818 .where_clause
1819 .as_ref()
1820 .map(|e| materialize_expr(e, exec_sub))
1821 .transpose()?;
1822 Ok(DeleteStmt {
1823 table: stmt.table.clone(),
1824 where_clause,
1825 })
1826}
1827
1828fn insert_has_subquery(stmt: &InsertStmt) -> bool {
1829 stmt.values.iter().any(|row| row.iter().any(has_subquery))
1830}
1831
1832fn materialize_insert(
1833 stmt: &InsertStmt,
1834 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1835) -> Result<InsertStmt> {
1836 let values = stmt
1837 .values
1838 .iter()
1839 .map(|row| {
1840 row.iter()
1841 .map(|e| materialize_expr(e, exec_sub))
1842 .collect::<Result<Vec<_>>>()
1843 })
1844 .collect::<Result<Vec<_>>>()?;
1845 Ok(InsertStmt {
1846 table: stmt.table.clone(),
1847 columns: stmt.columns.clone(),
1848 values,
1849 })
1850}
1851
1852fn exec_select(
1853 db: &Database,
1854 schema: &SchemaManager,
1855 stmt: &SelectStmt,
1856) -> Result<ExecutionResult> {
1857 let materialized;
1858 let stmt = if stmt_has_subquery(stmt) {
1859 materialized = materialize_stmt(stmt, &mut |sub| exec_subquery_read(db, schema, sub))?;
1860 &materialized
1861 } else {
1862 stmt
1863 };
1864
1865 if stmt.from.is_empty() {
1866 return exec_select_no_from(stmt);
1867 }
1868
1869 let lower_name = stmt.from.to_ascii_lowercase();
1870 let table_schema = schema
1871 .get(&lower_name)
1872 .ok_or_else(|| SqlError::TableNotFound(stmt.from.clone()))?;
1873
1874 if !stmt.joins.is_empty() {
1875 return exec_select_join(db, schema, stmt);
1876 }
1877
1878 let rows = collect_rows_read(db, table_schema, &stmt.where_clause)?;
1879 process_select(&table_schema.columns, rows, stmt)
1880}
1881
1882fn exec_select_no_from(stmt: &SelectStmt) -> Result<ExecutionResult> {
1883 let empty_cols: Vec<ColumnDef> = vec![];
1884 let empty_row: Vec<Value> = vec![];
1885 let (col_names, projected) = project_rows(&empty_cols, &stmt.columns, &[empty_row])?;
1886 Ok(ExecutionResult::Query(QueryResult {
1887 columns: col_names,
1888 rows: projected,
1889 }))
1890}
1891
1892fn process_select(
1894 columns: &[ColumnDef],
1895 mut rows: Vec<Vec<Value>>,
1896 stmt: &SelectStmt,
1897) -> Result<ExecutionResult> {
1898 if let Some(ref where_expr) = stmt.where_clause {
1899 rows.retain(|row| match eval_expr(where_expr, columns, row) {
1900 Ok(val) => is_truthy(&val),
1901 Err(_) => false,
1902 });
1903 }
1904
1905 let has_aggregates = stmt.columns.iter().any(|c| match c {
1906 SelectColumn::Expr { expr, .. } => is_aggregate_expr(expr),
1907 _ => false,
1908 });
1909
1910 if has_aggregates || !stmt.group_by.is_empty() {
1911 return exec_aggregate(columns, &rows, stmt);
1912 }
1913
1914 if stmt.distinct {
1915 let (col_names, mut projected) = project_rows(columns, &stmt.columns, &rows)?;
1916
1917 let mut seen = std::collections::HashSet::new();
1918 projected.retain(|row| seen.insert(row.clone()));
1919
1920 if !stmt.order_by.is_empty() {
1921 let output_cols = build_output_columns(&stmt.columns, columns);
1922 sort_rows(&mut projected, &stmt.order_by, &output_cols)?;
1923 }
1924
1925 if let Some(ref offset_expr) = stmt.offset {
1926 let offset = eval_const_int(offset_expr)?.max(0) as usize;
1927 if offset < projected.len() {
1928 projected = projected.split_off(offset);
1929 } else {
1930 projected.clear();
1931 }
1932 }
1933
1934 if let Some(ref limit_expr) = stmt.limit {
1935 let limit = eval_const_int(limit_expr)?.max(0) as usize;
1936 projected.truncate(limit);
1937 }
1938
1939 return Ok(ExecutionResult::Query(QueryResult {
1940 columns: col_names,
1941 rows: projected,
1942 }));
1943 }
1944
1945 if !stmt.order_by.is_empty() {
1946 sort_rows(&mut rows, &stmt.order_by, columns)?;
1947 }
1948
1949 if let Some(ref offset_expr) = stmt.offset {
1950 let offset = eval_const_int(offset_expr)?.max(0) as usize;
1951 if offset < rows.len() {
1952 rows = rows.split_off(offset);
1953 } else {
1954 rows.clear();
1955 }
1956 }
1957
1958 if let Some(ref limit_expr) = stmt.limit {
1959 let limit = eval_const_int(limit_expr)?.max(0) as usize;
1960 rows.truncate(limit);
1961 }
1962
1963 let (col_names, projected) = project_rows(columns, &stmt.columns, &rows)?;
1964
1965 Ok(ExecutionResult::Query(QueryResult {
1966 columns: col_names,
1967 rows: projected,
1968 }))
1969}
1970
1971fn resolve_table_name<'a>(schema: &'a SchemaManager, name: &str) -> Result<&'a TableSchema> {
1972 let lower = name.to_ascii_lowercase();
1973 schema
1974 .get(&lower)
1975 .ok_or_else(|| SqlError::TableNotFound(name.to_string()))
1976}
1977
1978fn build_joined_columns(tables: &[(String, &TableSchema)]) -> Vec<ColumnDef> {
1979 let mut result = Vec::new();
1980 let mut pos: u16 = 0;
1981
1982 for (alias, schema) in tables {
1983 for col in &schema.columns {
1984 result.push(ColumnDef {
1985 name: format!("{}.{}", alias.to_ascii_lowercase(), col.name),
1986 data_type: col.data_type,
1987 nullable: col.nullable,
1988 position: pos,
1989 });
1990 pos += 1;
1991 }
1992 }
1993
1994 result
1995}
1996
1997fn table_alias_or_name(name: &str, alias: &Option<String>) -> String {
1998 alias
1999 .as_ref()
2000 .unwrap_or(&name.to_string())
2001 .to_ascii_lowercase()
2002}
2003
2004fn collect_all_rows_read(db: &Database, table_schema: &TableSchema) -> Result<Vec<Vec<Value>>> {
2005 collect_rows_read(db, table_schema, &None)
2006}
2007
2008fn collect_all_rows_write(
2009 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2010 table_schema: &TableSchema,
2011) -> Result<Vec<Vec<Value>>> {
2012 collect_rows_write(wtx, table_schema, &None)
2013}
2014
2015fn exec_select_join(
2016 db: &Database,
2017 schema: &SchemaManager,
2018 stmt: &SelectStmt,
2019) -> Result<ExecutionResult> {
2020 let from_schema = resolve_table_name(schema, &stmt.from)?;
2021 let from_alias = table_alias_or_name(&stmt.from, &stmt.from_alias);
2022 let mut outer_rows = collect_all_rows_read(db, from_schema)?;
2023
2024 let mut tables: Vec<(String, &TableSchema)> = vec![(from_alias.clone(), from_schema)];
2025
2026 for join in &stmt.joins {
2027 let inner_schema = resolve_table_name(schema, &join.table.name)?;
2028 let inner_alias = table_alias_or_name(&join.table.name, &join.table.alias);
2029 let inner_rows = collect_all_rows_read(db, inner_schema)?;
2030
2031 let mut preview_tables = tables.clone();
2032 preview_tables.push((inner_alias.clone(), inner_schema));
2033 let combined_cols = build_joined_columns(&preview_tables);
2034
2035 let mut new_rows = Vec::new();
2036
2037 match join.join_type {
2038 JoinType::Inner | JoinType::Cross => {
2039 for outer in &outer_rows {
2040 for inner in &inner_rows {
2041 let combined: Vec<Value> =
2042 outer.iter().chain(inner.iter()).cloned().collect();
2043 if let Some(ref on_expr) = join.on_clause {
2044 match eval_expr(on_expr, &combined_cols, &combined) {
2045 Ok(val) if is_truthy(&val) => new_rows.push(combined),
2046 _ => {}
2047 }
2048 } else {
2049 new_rows.push(combined);
2050 }
2051 }
2052 }
2053 }
2054 JoinType::Left => {
2055 let inner_col_count = inner_schema.columns.len();
2056 for outer in &outer_rows {
2057 let mut matched = false;
2058 for inner in &inner_rows {
2059 let combined: Vec<Value> =
2060 outer.iter().chain(inner.iter()).cloned().collect();
2061 if let Some(ref on_expr) = join.on_clause {
2062 match eval_expr(on_expr, &combined_cols, &combined) {
2063 Ok(val) if is_truthy(&val) => {
2064 new_rows.push(combined);
2065 matched = true;
2066 }
2067 _ => {}
2068 }
2069 } else {
2070 new_rows.push(combined);
2071 matched = true;
2072 }
2073 }
2074 if !matched {
2075 let mut padded = outer.clone();
2076 padded.extend(std::iter::repeat(Value::Null).take(inner_col_count));
2077 new_rows.push(padded);
2078 }
2079 }
2080 }
2081 JoinType::Right => {
2082 let outer_col_count = if outer_rows.is_empty() {
2083 tables.iter().map(|(_, s)| s.columns.len()).sum()
2084 } else {
2085 outer_rows[0].len()
2086 };
2087 let mut inner_matched = vec![false; inner_rows.len()];
2088 for outer in &outer_rows {
2089 for (j, inner) in inner_rows.iter().enumerate() {
2090 let combined: Vec<Value> =
2091 outer.iter().chain(inner.iter()).cloned().collect();
2092 if let Some(ref on_expr) = join.on_clause {
2093 match eval_expr(on_expr, &combined_cols, &combined) {
2094 Ok(val) if is_truthy(&val) => {
2095 new_rows.push(combined);
2096 inner_matched[j] = true;
2097 }
2098 _ => {}
2099 }
2100 } else {
2101 new_rows.push(combined);
2102 inner_matched[j] = true;
2103 }
2104 }
2105 }
2106 for (j, inner) in inner_rows.iter().enumerate() {
2107 if !inner_matched[j] {
2108 let mut padded: Vec<Value> = std::iter::repeat(Value::Null)
2109 .take(outer_col_count)
2110 .collect();
2111 padded.extend(inner.iter().cloned());
2112 new_rows.push(padded);
2113 }
2114 }
2115 }
2116 }
2117
2118 tables.push((inner_alias, inner_schema));
2119 outer_rows = new_rows;
2120 }
2121
2122 let joined_cols = build_joined_columns(&tables);
2123 process_select(&joined_cols, outer_rows, stmt)
2124}
2125
2126fn exec_select_join_in_txn(
2127 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2128 schema: &SchemaManager,
2129 stmt: &SelectStmt,
2130) -> Result<ExecutionResult> {
2131 let from_schema = resolve_table_name(schema, &stmt.from)?;
2132 let from_alias = table_alias_or_name(&stmt.from, &stmt.from_alias);
2133 let mut outer_rows = collect_all_rows_write(wtx, from_schema)?;
2134
2135 let mut tables: Vec<(String, &TableSchema)> = vec![(from_alias.clone(), from_schema)];
2136
2137 for join in &stmt.joins {
2138 let inner_schema = resolve_table_name(schema, &join.table.name)?;
2139 let inner_alias = table_alias_or_name(&join.table.name, &join.table.alias);
2140 let inner_rows = collect_all_rows_write(wtx, inner_schema)?;
2141
2142 let mut preview_tables = tables.clone();
2143 preview_tables.push((inner_alias.clone(), inner_schema));
2144 let combined_cols = build_joined_columns(&preview_tables);
2145
2146 let mut new_rows = Vec::new();
2147
2148 match join.join_type {
2149 JoinType::Inner | JoinType::Cross => {
2150 for outer in &outer_rows {
2151 for inner in &inner_rows {
2152 let combined: Vec<Value> =
2153 outer.iter().chain(inner.iter()).cloned().collect();
2154 if let Some(ref on_expr) = join.on_clause {
2155 match eval_expr(on_expr, &combined_cols, &combined) {
2156 Ok(val) if is_truthy(&val) => new_rows.push(combined),
2157 _ => {}
2158 }
2159 } else {
2160 new_rows.push(combined);
2161 }
2162 }
2163 }
2164 }
2165 JoinType::Left => {
2166 let inner_col_count = inner_schema.columns.len();
2167 for outer in &outer_rows {
2168 let mut matched = false;
2169 for inner in &inner_rows {
2170 let combined: Vec<Value> =
2171 outer.iter().chain(inner.iter()).cloned().collect();
2172 if let Some(ref on_expr) = join.on_clause {
2173 match eval_expr(on_expr, &combined_cols, &combined) {
2174 Ok(val) if is_truthy(&val) => {
2175 new_rows.push(combined);
2176 matched = true;
2177 }
2178 _ => {}
2179 }
2180 } else {
2181 new_rows.push(combined);
2182 matched = true;
2183 }
2184 }
2185 if !matched {
2186 let mut padded = outer.clone();
2187 padded.extend(std::iter::repeat(Value::Null).take(inner_col_count));
2188 new_rows.push(padded);
2189 }
2190 }
2191 }
2192 JoinType::Right => {
2193 let outer_col_count = if outer_rows.is_empty() {
2194 tables.iter().map(|(_, s)| s.columns.len()).sum()
2195 } else {
2196 outer_rows[0].len()
2197 };
2198 let mut inner_matched = vec![false; inner_rows.len()];
2199 for outer in &outer_rows {
2200 for (j, inner) in inner_rows.iter().enumerate() {
2201 let combined: Vec<Value> =
2202 outer.iter().chain(inner.iter()).cloned().collect();
2203 if let Some(ref on_expr) = join.on_clause {
2204 match eval_expr(on_expr, &combined_cols, &combined) {
2205 Ok(val) if is_truthy(&val) => {
2206 new_rows.push(combined);
2207 inner_matched[j] = true;
2208 }
2209 _ => {}
2210 }
2211 } else {
2212 new_rows.push(combined);
2213 inner_matched[j] = true;
2214 }
2215 }
2216 }
2217 for (j, inner) in inner_rows.iter().enumerate() {
2218 if !inner_matched[j] {
2219 let mut padded: Vec<Value> = std::iter::repeat(Value::Null)
2220 .take(outer_col_count)
2221 .collect();
2222 padded.extend(inner.iter().cloned());
2223 new_rows.push(padded);
2224 }
2225 }
2226 }
2227 }
2228
2229 tables.push((inner_alias, inner_schema));
2230 outer_rows = new_rows;
2231 }
2232
2233 let joined_cols = build_joined_columns(&tables);
2234 process_select(&joined_cols, outer_rows, stmt)
2235}
2236
2237fn exec_update(
2238 db: &Database,
2239 schema: &SchemaManager,
2240 stmt: &UpdateStmt,
2241) -> Result<ExecutionResult> {
2242 let materialized;
2243 let stmt = if update_has_subquery(stmt) {
2244 materialized = materialize_update(stmt, &mut |sub| exec_subquery_read(db, schema, sub))?;
2245 &materialized
2246 } else {
2247 stmt
2248 };
2249
2250 let lower_name = stmt.table.to_ascii_lowercase();
2251 let table_schema = schema
2252 .get(&lower_name)
2253 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
2254
2255 let all_candidates = collect_keyed_rows_read(db, table_schema, &stmt.where_clause)?;
2256 let matching_rows: Vec<(Vec<u8>, Vec<Value>)> = all_candidates
2257 .into_iter()
2258 .filter(|(_, row)| match &stmt.where_clause {
2259 Some(where_expr) => match eval_expr(where_expr, &table_schema.columns, row) {
2260 Ok(val) => is_truthy(&val),
2261 Err(_) => false,
2262 },
2263 None => true,
2264 })
2265 .collect();
2266
2267 if matching_rows.is_empty() {
2268 return Ok(ExecutionResult::RowsAffected(0));
2269 }
2270
2271 struct UpdateChange {
2272 old_key: Vec<u8>,
2273 new_key: Vec<u8>,
2274 new_value: Vec<u8>,
2275 pk_changed: bool,
2276 old_row: Vec<Value>,
2277 new_row: Vec<Value>,
2278 }
2279
2280 let pk_indices = table_schema.pk_indices();
2281 let mut changes: Vec<UpdateChange> = Vec::new();
2282
2283 for (old_key, row) in &matching_rows {
2284 let mut new_row = row.clone();
2285 let mut pk_changed = false;
2286
2287 let mut evaluated: Vec<(usize, Value)> = Vec::with_capacity(stmt.assignments.len());
2289 for (col_name, expr) in &stmt.assignments {
2290 let col_idx = table_schema
2291 .column_index(col_name)
2292 .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))?;
2293 let new_val = eval_expr(expr, &table_schema.columns, row)?;
2294 let col = &table_schema.columns[col_idx];
2295
2296 let coerced = if new_val.is_null() {
2297 if !col.nullable {
2298 return Err(SqlError::NotNullViolation(col.name.clone()));
2299 }
2300 Value::Null
2301 } else {
2302 new_val
2303 .coerce_to(col.data_type)
2304 .ok_or_else(|| SqlError::TypeMismatch {
2305 expected: col.data_type.to_string(),
2306 got: new_val.data_type().to_string(),
2307 })?
2308 };
2309
2310 evaluated.push((col_idx, coerced));
2311 }
2312
2313 for (col_idx, coerced) in evaluated {
2314 if table_schema.primary_key_columns.contains(&(col_idx as u16)) {
2315 pk_changed = true;
2316 }
2317 new_row[col_idx] = coerced;
2318 }
2319
2320 let pk_values: Vec<Value> = pk_indices.iter().map(|&i| new_row[i].clone()).collect();
2321 let new_key = encode_composite_key(&pk_values);
2322
2323 let non_pk = table_schema.non_pk_indices();
2324 let value_values: Vec<Value> = non_pk.iter().map(|&i| new_row[i].clone()).collect();
2325 let new_value = encode_row(&value_values);
2326
2327 changes.push(UpdateChange {
2328 old_key: old_key.clone(),
2329 new_key,
2330 new_value,
2331 pk_changed,
2332 old_row: row.clone(),
2333 new_row,
2334 });
2335 }
2336
2337 {
2338 use std::collections::HashSet;
2339 let mut new_keys: HashSet<Vec<u8>> = HashSet::new();
2340 for c in &changes {
2341 if c.pk_changed && c.new_key != c.old_key && !new_keys.insert(c.new_key.clone()) {
2342 return Err(SqlError::DuplicateKey);
2343 }
2344 }
2345 }
2346
2347 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
2348
2349 for c in &changes {
2350 let old_pk: Vec<Value> = pk_indices.iter().map(|&i| c.old_row[i].clone()).collect();
2351
2352 for idx in &table_schema.indices {
2353 if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
2354 let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
2355 let old_idx_key = encode_index_key(idx, &c.old_row, &old_pk);
2356 wtx.table_delete(&idx_table, &old_idx_key)
2357 .map_err(SqlError::Storage)?;
2358 }
2359 }
2360
2361 if c.pk_changed {
2362 wtx.table_delete(lower_name.as_bytes(), &c.old_key)
2363 .map_err(SqlError::Storage)?;
2364 }
2365 }
2366
2367 for c in &changes {
2368 let new_pk: Vec<Value> = pk_indices.iter().map(|&i| c.new_row[i].clone()).collect();
2369
2370 if c.pk_changed {
2371 let is_new = wtx
2372 .table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
2373 .map_err(SqlError::Storage)?;
2374 if !is_new {
2375 return Err(SqlError::DuplicateKey);
2376 }
2377 } else {
2378 wtx.table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
2379 .map_err(SqlError::Storage)?;
2380 }
2381
2382 for idx in &table_schema.indices {
2383 if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
2384 let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
2385 let new_idx_key = encode_index_key(idx, &c.new_row, &new_pk);
2386 let new_idx_val = encode_index_value(idx, &c.new_row, &new_pk);
2387 let is_new = wtx
2388 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
2389 .map_err(SqlError::Storage)?;
2390 if idx.unique && !is_new {
2391 let indexed_values: Vec<Value> = idx
2392 .columns
2393 .iter()
2394 .map(|&col_idx| c.new_row[col_idx as usize].clone())
2395 .collect();
2396 let any_null = indexed_values.iter().any(|v| v.is_null());
2397 if !any_null {
2398 return Err(SqlError::UniqueViolation(idx.name.clone()));
2399 }
2400 }
2401 }
2402 }
2403 }
2404
2405 let count = changes.len() as u64;
2406 wtx.commit().map_err(SqlError::Storage)?;
2407 Ok(ExecutionResult::RowsAffected(count))
2408}
2409
2410fn exec_delete(
2411 db: &Database,
2412 schema: &SchemaManager,
2413 stmt: &DeleteStmt,
2414) -> Result<ExecutionResult> {
2415 let materialized;
2416 let stmt = if delete_has_subquery(stmt) {
2417 materialized = materialize_delete(stmt, &mut |sub| exec_subquery_read(db, schema, sub))?;
2418 &materialized
2419 } else {
2420 stmt
2421 };
2422
2423 let lower_name = stmt.table.to_ascii_lowercase();
2424 let table_schema = schema
2425 .get(&lower_name)
2426 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
2427
2428 let all_candidates = collect_keyed_rows_read(db, table_schema, &stmt.where_clause)?;
2429 let rows_to_delete: Vec<(Vec<u8>, Vec<Value>)> = all_candidates
2430 .into_iter()
2431 .filter(|(_, row)| match &stmt.where_clause {
2432 Some(where_expr) => match eval_expr(where_expr, &table_schema.columns, row) {
2433 Ok(val) => is_truthy(&val),
2434 Err(_) => false,
2435 },
2436 None => true,
2437 })
2438 .collect();
2439
2440 if rows_to_delete.is_empty() {
2441 return Ok(ExecutionResult::RowsAffected(0));
2442 }
2443
2444 let pk_indices = table_schema.pk_indices();
2445 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
2446 for (key, row) in &rows_to_delete {
2447 let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
2448 delete_index_entries(&mut wtx, table_schema, row, &pk_values)?;
2449 wtx.table_delete(lower_name.as_bytes(), key)
2450 .map_err(SqlError::Storage)?;
2451 }
2452 let count = rows_to_delete.len() as u64;
2453 wtx.commit().map_err(SqlError::Storage)?;
2454 Ok(ExecutionResult::RowsAffected(count))
2455}
2456
2457fn exec_insert_in_txn(
2460 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2461 schema: &SchemaManager,
2462 stmt: &InsertStmt,
2463) -> Result<ExecutionResult> {
2464 let materialized;
2465 let stmt = if insert_has_subquery(stmt) {
2466 materialized = materialize_insert(stmt, &mut |sub| exec_subquery_write(wtx, schema, sub))?;
2467 &materialized
2468 } else {
2469 stmt
2470 };
2471
2472 let lower_name = stmt.table.to_ascii_lowercase();
2473 let table_schema = schema
2474 .get(&lower_name)
2475 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
2476
2477 let insert_columns = if stmt.columns.is_empty() {
2478 table_schema
2479 .columns
2480 .iter()
2481 .map(|c| c.name.clone())
2482 .collect::<Vec<_>>()
2483 } else {
2484 stmt.columns
2485 .iter()
2486 .map(|c| c.to_ascii_lowercase())
2487 .collect()
2488 };
2489
2490 let col_indices: Vec<usize> = insert_columns
2491 .iter()
2492 .map(|name| {
2493 table_schema
2494 .column_index(name)
2495 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
2496 })
2497 .collect::<Result<_>>()?;
2498
2499 let mut count: u64 = 0;
2500
2501 for value_row in &stmt.values {
2502 if value_row.len() != insert_columns.len() {
2503 return Err(SqlError::InvalidValue(format!(
2504 "expected {} values, got {}",
2505 insert_columns.len(),
2506 value_row.len()
2507 )));
2508 }
2509
2510 let mut row = vec![Value::Null; table_schema.columns.len()];
2511 for (i, expr) in value_row.iter().enumerate() {
2512 let val = eval_const_expr(expr)?;
2513 let col_idx = col_indices[i];
2514 let col = &table_schema.columns[col_idx];
2515
2516 let coerced = if val.is_null() {
2517 Value::Null
2518 } else {
2519 val.coerce_to(col.data_type)
2520 .ok_or_else(|| SqlError::TypeMismatch {
2521 expected: col.data_type.to_string(),
2522 got: val.data_type().to_string(),
2523 })?
2524 };
2525
2526 row[col_idx] = coerced;
2527 }
2528
2529 for col in &table_schema.columns {
2530 if !col.nullable && row[col.position as usize].is_null() {
2531 return Err(SqlError::NotNullViolation(col.name.clone()));
2532 }
2533 }
2534
2535 let pk_values: Vec<Value> = table_schema
2536 .pk_indices()
2537 .iter()
2538 .map(|&i| row[i].clone())
2539 .collect();
2540 let key = encode_composite_key(&pk_values);
2541
2542 let non_pk = table_schema.non_pk_indices();
2543 let value_values: Vec<Value> = non_pk.iter().map(|&i| row[i].clone()).collect();
2544 let value = encode_row(&value_values);
2545
2546 if key.len() > citadel_core::MAX_KEY_SIZE {
2547 return Err(SqlError::KeyTooLarge {
2548 size: key.len(),
2549 max: citadel_core::MAX_KEY_SIZE,
2550 });
2551 }
2552 if value.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
2553 return Err(SqlError::RowTooLarge {
2554 size: value.len(),
2555 max: citadel_core::MAX_INLINE_VALUE_SIZE,
2556 });
2557 }
2558
2559 let is_new = wtx
2560 .table_insert(lower_name.as_bytes(), &key, &value)
2561 .map_err(SqlError::Storage)?;
2562 if !is_new {
2563 return Err(SqlError::DuplicateKey);
2564 }
2565
2566 insert_index_entries(wtx, table_schema, &row, &pk_values)?;
2567 count += 1;
2568 }
2569
2570 Ok(ExecutionResult::RowsAffected(count))
2571}
2572
2573fn exec_select_in_txn(
2574 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2575 schema: &SchemaManager,
2576 stmt: &SelectStmt,
2577) -> Result<ExecutionResult> {
2578 let materialized;
2579 let stmt = if stmt_has_subquery(stmt) {
2580 materialized = materialize_stmt(stmt, &mut |sub| exec_subquery_write(wtx, schema, sub))?;
2581 &materialized
2582 } else {
2583 stmt
2584 };
2585
2586 if stmt.from.is_empty() {
2587 return exec_select_no_from(stmt);
2588 }
2589
2590 if !stmt.joins.is_empty() {
2591 return exec_select_join_in_txn(wtx, schema, stmt);
2592 }
2593
2594 let lower_name = stmt.from.to_ascii_lowercase();
2595 let table_schema = schema
2596 .get(&lower_name)
2597 .ok_or_else(|| SqlError::TableNotFound(stmt.from.clone()))?;
2598
2599 let rows = collect_rows_write(wtx, table_schema, &stmt.where_clause)?;
2600 process_select(&table_schema.columns, rows, stmt)
2601}
2602
2603fn exec_update_in_txn(
2604 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2605 schema: &SchemaManager,
2606 stmt: &UpdateStmt,
2607) -> Result<ExecutionResult> {
2608 let materialized;
2609 let stmt = if update_has_subquery(stmt) {
2610 materialized = materialize_update(stmt, &mut |sub| exec_subquery_write(wtx, schema, sub))?;
2611 &materialized
2612 } else {
2613 stmt
2614 };
2615
2616 let lower_name = stmt.table.to_ascii_lowercase();
2617 let table_schema = schema
2618 .get(&lower_name)
2619 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
2620
2621 let all_candidates = collect_keyed_rows_write(wtx, table_schema, &stmt.where_clause)?;
2622 let matching_rows: Vec<(Vec<u8>, Vec<Value>)> = all_candidates
2623 .into_iter()
2624 .filter(|(_, row)| match &stmt.where_clause {
2625 Some(where_expr) => match eval_expr(where_expr, &table_schema.columns, row) {
2626 Ok(val) => is_truthy(&val),
2627 Err(_) => false,
2628 },
2629 None => true,
2630 })
2631 .collect();
2632
2633 if matching_rows.is_empty() {
2634 return Ok(ExecutionResult::RowsAffected(0));
2635 }
2636
2637 struct UpdateChange {
2638 old_key: Vec<u8>,
2639 new_key: Vec<u8>,
2640 new_value: Vec<u8>,
2641 pk_changed: bool,
2642 old_row: Vec<Value>,
2643 new_row: Vec<Value>,
2644 }
2645
2646 let pk_indices = table_schema.pk_indices();
2647 let mut changes: Vec<UpdateChange> = Vec::new();
2648
2649 for (old_key, row) in &matching_rows {
2650 let mut new_row = row.clone();
2651 let mut pk_changed = false;
2652
2653 let mut evaluated: Vec<(usize, Value)> = Vec::with_capacity(stmt.assignments.len());
2655 for (col_name, expr) in &stmt.assignments {
2656 let col_idx = table_schema
2657 .column_index(col_name)
2658 .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))?;
2659 let new_val = eval_expr(expr, &table_schema.columns, row)?;
2660 let col = &table_schema.columns[col_idx];
2661
2662 let coerced = if new_val.is_null() {
2663 if !col.nullable {
2664 return Err(SqlError::NotNullViolation(col.name.clone()));
2665 }
2666 Value::Null
2667 } else {
2668 new_val
2669 .coerce_to(col.data_type)
2670 .ok_or_else(|| SqlError::TypeMismatch {
2671 expected: col.data_type.to_string(),
2672 got: new_val.data_type().to_string(),
2673 })?
2674 };
2675
2676 evaluated.push((col_idx, coerced));
2677 }
2678
2679 for (col_idx, coerced) in evaluated {
2680 if table_schema.primary_key_columns.contains(&(col_idx as u16)) {
2681 pk_changed = true;
2682 }
2683 new_row[col_idx] = coerced;
2684 }
2685
2686 let pk_values: Vec<Value> = pk_indices.iter().map(|&i| new_row[i].clone()).collect();
2687 let new_key = encode_composite_key(&pk_values);
2688
2689 let non_pk = table_schema.non_pk_indices();
2690 let value_values: Vec<Value> = non_pk.iter().map(|&i| new_row[i].clone()).collect();
2691 let new_value = encode_row(&value_values);
2692
2693 changes.push(UpdateChange {
2694 old_key: old_key.clone(),
2695 new_key,
2696 new_value,
2697 pk_changed,
2698 old_row: row.clone(),
2699 new_row,
2700 });
2701 }
2702
2703 {
2704 use std::collections::HashSet;
2705 let mut new_keys: HashSet<Vec<u8>> = HashSet::new();
2706 for c in &changes {
2707 if c.pk_changed && c.new_key != c.old_key && !new_keys.insert(c.new_key.clone()) {
2708 return Err(SqlError::DuplicateKey);
2709 }
2710 }
2711 }
2712
2713 for c in &changes {
2714 let old_pk: Vec<Value> = pk_indices.iter().map(|&i| c.old_row[i].clone()).collect();
2715
2716 for idx in &table_schema.indices {
2717 if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
2718 let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
2719 let old_idx_key = encode_index_key(idx, &c.old_row, &old_pk);
2720 wtx.table_delete(&idx_table, &old_idx_key)
2721 .map_err(SqlError::Storage)?;
2722 }
2723 }
2724
2725 if c.pk_changed {
2726 wtx.table_delete(lower_name.as_bytes(), &c.old_key)
2727 .map_err(SqlError::Storage)?;
2728 }
2729 }
2730
2731 for c in &changes {
2732 let new_pk: Vec<Value> = pk_indices.iter().map(|&i| c.new_row[i].clone()).collect();
2733
2734 if c.pk_changed {
2735 let is_new = wtx
2736 .table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
2737 .map_err(SqlError::Storage)?;
2738 if !is_new {
2739 return Err(SqlError::DuplicateKey);
2740 }
2741 } else {
2742 wtx.table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
2743 .map_err(SqlError::Storage)?;
2744 }
2745
2746 for idx in &table_schema.indices {
2747 if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
2748 let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
2749 let new_idx_key = encode_index_key(idx, &c.new_row, &new_pk);
2750 let new_idx_val = encode_index_value(idx, &c.new_row, &new_pk);
2751 let is_new = wtx
2752 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
2753 .map_err(SqlError::Storage)?;
2754 if idx.unique && !is_new {
2755 let indexed_values: Vec<Value> = idx
2756 .columns
2757 .iter()
2758 .map(|&col_idx| c.new_row[col_idx as usize].clone())
2759 .collect();
2760 let any_null = indexed_values.iter().any(|v| v.is_null());
2761 if !any_null {
2762 return Err(SqlError::UniqueViolation(idx.name.clone()));
2763 }
2764 }
2765 }
2766 }
2767 }
2768
2769 let count = changes.len() as u64;
2770 Ok(ExecutionResult::RowsAffected(count))
2771}
2772
2773fn exec_delete_in_txn(
2774 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2775 schema: &SchemaManager,
2776 stmt: &DeleteStmt,
2777) -> Result<ExecutionResult> {
2778 let materialized;
2779 let stmt = if delete_has_subquery(stmt) {
2780 materialized = materialize_delete(stmt, &mut |sub| exec_subquery_write(wtx, schema, sub))?;
2781 &materialized
2782 } else {
2783 stmt
2784 };
2785
2786 let lower_name = stmt.table.to_ascii_lowercase();
2787 let table_schema = schema
2788 .get(&lower_name)
2789 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
2790
2791 let all_candidates = collect_keyed_rows_write(wtx, table_schema, &stmt.where_clause)?;
2792 let rows_to_delete: Vec<(Vec<u8>, Vec<Value>)> = all_candidates
2793 .into_iter()
2794 .filter(|(_, row)| match &stmt.where_clause {
2795 Some(where_expr) => match eval_expr(where_expr, &table_schema.columns, row) {
2796 Ok(val) => is_truthy(&val),
2797 Err(_) => false,
2798 },
2799 None => true,
2800 })
2801 .collect();
2802
2803 if rows_to_delete.is_empty() {
2804 return Ok(ExecutionResult::RowsAffected(0));
2805 }
2806
2807 let pk_indices = table_schema.pk_indices();
2808 for (key, row) in &rows_to_delete {
2809 let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
2810 delete_index_entries(wtx, table_schema, row, &pk_values)?;
2811 wtx.table_delete(lower_name.as_bytes(), key)
2812 .map_err(SqlError::Storage)?;
2813 }
2814 let count = rows_to_delete.len() as u64;
2815 Ok(ExecutionResult::RowsAffected(count))
2816}
2817
2818fn exec_aggregate(
2821 columns: &[ColumnDef],
2822 rows: &[Vec<Value>],
2823 stmt: &SelectStmt,
2824) -> Result<ExecutionResult> {
2825 let groups: BTreeMap<Vec<Value>, Vec<&Vec<Value>>> = if stmt.group_by.is_empty() {
2826 let mut m = BTreeMap::new();
2827 m.insert(vec![], rows.iter().collect());
2828 m
2829 } else {
2830 let mut m: BTreeMap<Vec<Value>, Vec<&Vec<Value>>> = BTreeMap::new();
2831 for row in rows {
2832 let group_key: Vec<Value> = stmt
2833 .group_by
2834 .iter()
2835 .map(|expr| eval_expr(expr, columns, row))
2836 .collect::<Result<_>>()?;
2837 m.entry(group_key).or_default().push(row);
2838 }
2839 m
2840 };
2841
2842 let mut result_rows = Vec::new();
2843 let output_cols = build_output_columns(&stmt.columns, columns);
2844
2845 for group_rows in groups.values() {
2846 let mut result_row = Vec::new();
2847
2848 for sel_col in &stmt.columns {
2849 match sel_col {
2850 SelectColumn::AllColumns => {
2851 return Err(SqlError::Unsupported("SELECT * with GROUP BY".into()));
2852 }
2853 SelectColumn::Expr { expr, .. } => {
2854 let val = eval_aggregate_expr(expr, columns, group_rows)?;
2855 result_row.push(val);
2856 }
2857 }
2858 }
2859
2860 if let Some(ref having) = stmt.having {
2861 let passes = match eval_aggregate_expr(having, columns, group_rows) {
2862 Ok(val) => is_truthy(&val),
2863 Err(SqlError::ColumnNotFound(_)) => {
2864 match eval_expr(having, &output_cols, &result_row) {
2865 Ok(val) => is_truthy(&val),
2866 Err(_) => false,
2867 }
2868 }
2869 Err(e) => return Err(e),
2870 };
2871 if !passes {
2872 continue;
2873 }
2874 }
2875
2876 result_rows.push(result_row);
2877 }
2878
2879 if stmt.distinct {
2880 let mut seen = std::collections::HashSet::new();
2881 result_rows.retain(|row| seen.insert(row.clone()));
2882 }
2883
2884 if !stmt.order_by.is_empty() {
2885 let output_cols = build_output_columns(&stmt.columns, columns);
2886 sort_rows(&mut result_rows, &stmt.order_by, &output_cols)?;
2887 }
2888
2889 if let Some(ref offset_expr) = stmt.offset {
2890 let offset = eval_const_int(offset_expr)?.max(0) as usize;
2891 if offset < result_rows.len() {
2892 result_rows = result_rows.split_off(offset);
2893 } else {
2894 result_rows.clear();
2895 }
2896 }
2897 if let Some(ref limit_expr) = stmt.limit {
2898 let limit = eval_const_int(limit_expr)?.max(0) as usize;
2899 result_rows.truncate(limit);
2900 }
2901
2902 let col_names = stmt
2903 .columns
2904 .iter()
2905 .map(|c| match c {
2906 SelectColumn::AllColumns => "*".into(),
2907 SelectColumn::Expr { alias: Some(a), .. } => a.clone(),
2908 SelectColumn::Expr { expr, .. } => expr_display_name(expr),
2909 })
2910 .collect();
2911
2912 Ok(ExecutionResult::Query(QueryResult {
2913 columns: col_names,
2914 rows: result_rows,
2915 }))
2916}
2917
2918fn eval_aggregate_expr(
2919 expr: &Expr,
2920 columns: &[ColumnDef],
2921 group_rows: &[&Vec<Value>],
2922) -> Result<Value> {
2923 match expr {
2924 Expr::CountStar => Ok(Value::Integer(group_rows.len() as i64)),
2925
2926 Expr::Function { name, args } if is_aggregate_function(name, args.len()) => {
2927 let func = name.to_ascii_uppercase();
2928 if args.len() != 1 {
2929 return Err(SqlError::Unsupported(format!(
2930 "{func} with {} args",
2931 args.len()
2932 )));
2933 }
2934 let arg = &args[0];
2935 let values: Vec<Value> = group_rows
2936 .iter()
2937 .map(|row| eval_expr(arg, columns, row))
2938 .collect::<Result<_>>()?;
2939
2940 match func.as_str() {
2941 "COUNT" => {
2942 let count = values.iter().filter(|v| !v.is_null()).count();
2943 Ok(Value::Integer(count as i64))
2944 }
2945 "SUM" => {
2946 let mut int_sum: i64 = 0;
2947 let mut real_sum: f64 = 0.0;
2948 let mut has_real = false;
2949 let mut all_null = true;
2950 for v in &values {
2951 match v {
2952 Value::Integer(i) => {
2953 int_sum += i;
2954 all_null = false;
2955 }
2956 Value::Real(r) => {
2957 real_sum += r;
2958 has_real = true;
2959 all_null = false;
2960 }
2961 Value::Null => {}
2962 _ => {
2963 return Err(SqlError::TypeMismatch {
2964 expected: "numeric".into(),
2965 got: v.data_type().to_string(),
2966 })
2967 }
2968 }
2969 }
2970 if all_null {
2971 return Ok(Value::Null);
2972 }
2973 if has_real {
2974 Ok(Value::Real(real_sum + int_sum as f64))
2975 } else {
2976 Ok(Value::Integer(int_sum))
2977 }
2978 }
2979 "AVG" => {
2980 let mut sum: f64 = 0.0;
2981 let mut count: i64 = 0;
2982 for v in &values {
2983 match v {
2984 Value::Integer(i) => {
2985 sum += *i as f64;
2986 count += 1;
2987 }
2988 Value::Real(r) => {
2989 sum += r;
2990 count += 1;
2991 }
2992 Value::Null => {}
2993 _ => {
2994 return Err(SqlError::TypeMismatch {
2995 expected: "numeric".into(),
2996 got: v.data_type().to_string(),
2997 })
2998 }
2999 }
3000 }
3001 if count == 0 {
3002 Ok(Value::Null)
3003 } else {
3004 Ok(Value::Real(sum / count as f64))
3005 }
3006 }
3007 "MIN" => {
3008 let mut min: Option<&Value> = None;
3009 for v in &values {
3010 if v.is_null() {
3011 continue;
3012 }
3013 min = Some(match min {
3014 None => v,
3015 Some(m) => {
3016 if v < m {
3017 v
3018 } else {
3019 m
3020 }
3021 }
3022 });
3023 }
3024 Ok(min.cloned().unwrap_or(Value::Null))
3025 }
3026 "MAX" => {
3027 let mut max: Option<&Value> = None;
3028 for v in &values {
3029 if v.is_null() {
3030 continue;
3031 }
3032 max = Some(match max {
3033 None => v,
3034 Some(m) => {
3035 if v > m {
3036 v
3037 } else {
3038 m
3039 }
3040 }
3041 });
3042 }
3043 Ok(max.cloned().unwrap_or(Value::Null))
3044 }
3045 _ => Err(SqlError::Unsupported(format!("aggregate function: {func}"))),
3046 }
3047 }
3048
3049 Expr::Column(_) | Expr::QualifiedColumn { .. } => {
3050 if let Some(first) = group_rows.first() {
3051 eval_expr(expr, columns, first)
3052 } else {
3053 Ok(Value::Null)
3054 }
3055 }
3056
3057 Expr::Literal(v) => Ok(v.clone()),
3058
3059 Expr::BinaryOp { left, op, right } => {
3060 let l = eval_aggregate_expr(left, columns, group_rows)?;
3061 let r = eval_aggregate_expr(right, columns, group_rows)?;
3062 crate::eval::eval_expr(
3063 &Expr::BinaryOp {
3064 left: Box::new(Expr::Literal(l)),
3065 op: *op,
3066 right: Box::new(Expr::Literal(r)),
3067 },
3068 columns,
3069 &[],
3070 )
3071 }
3072
3073 Expr::UnaryOp { op, expr: e } => {
3074 let v = eval_aggregate_expr(e, columns, group_rows)?;
3075 crate::eval::eval_expr(
3076 &Expr::UnaryOp {
3077 op: *op,
3078 expr: Box::new(Expr::Literal(v)),
3079 },
3080 columns,
3081 &[],
3082 )
3083 }
3084
3085 Expr::IsNull(e) => {
3086 let v = eval_aggregate_expr(e, columns, group_rows)?;
3087 Ok(Value::Boolean(v.is_null()))
3088 }
3089
3090 Expr::IsNotNull(e) => {
3091 let v = eval_aggregate_expr(e, columns, group_rows)?;
3092 Ok(Value::Boolean(!v.is_null()))
3093 }
3094
3095 Expr::Cast { expr: e, data_type } => {
3096 let v = eval_aggregate_expr(e, columns, group_rows)?;
3097 crate::eval::eval_expr(
3098 &Expr::Cast {
3099 expr: Box::new(Expr::Literal(v)),
3100 data_type: *data_type,
3101 },
3102 columns,
3103 &[],
3104 )
3105 }
3106
3107 Expr::Case {
3108 operand,
3109 conditions,
3110 else_result,
3111 } => {
3112 let op_val = operand
3113 .as_ref()
3114 .map(|e| eval_aggregate_expr(e, columns, group_rows))
3115 .transpose()?;
3116 if let Some(ov) = &op_val {
3117 for (cond, result) in conditions {
3118 let cv = eval_aggregate_expr(cond, columns, group_rows)?;
3119 if !ov.is_null() && !cv.is_null() && *ov == cv {
3120 return eval_aggregate_expr(result, columns, group_rows);
3121 }
3122 }
3123 } else {
3124 for (cond, result) in conditions {
3125 let cv = eval_aggregate_expr(cond, columns, group_rows)?;
3126 if crate::eval::is_truthy(&cv) {
3127 return eval_aggregate_expr(result, columns, group_rows);
3128 }
3129 }
3130 }
3131 match else_result {
3132 Some(e) => eval_aggregate_expr(e, columns, group_rows),
3133 None => Ok(Value::Null),
3134 }
3135 }
3136
3137 Expr::Coalesce(args) => {
3138 for arg in args {
3139 let v = eval_aggregate_expr(arg, columns, group_rows)?;
3140 if !v.is_null() {
3141 return Ok(v);
3142 }
3143 }
3144 Ok(Value::Null)
3145 }
3146
3147 Expr::Between {
3148 expr: e,
3149 low,
3150 high,
3151 negated,
3152 } => {
3153 let v = eval_aggregate_expr(e, columns, group_rows)?;
3154 let lo = eval_aggregate_expr(low, columns, group_rows)?;
3155 let hi = eval_aggregate_expr(high, columns, group_rows)?;
3156 crate::eval::eval_expr(
3157 &Expr::Between {
3158 expr: Box::new(Expr::Literal(v)),
3159 low: Box::new(Expr::Literal(lo)),
3160 high: Box::new(Expr::Literal(hi)),
3161 negated: *negated,
3162 },
3163 columns,
3164 &[],
3165 )
3166 }
3167
3168 Expr::Like {
3169 expr: e,
3170 pattern,
3171 escape,
3172 negated,
3173 } => {
3174 let v = eval_aggregate_expr(e, columns, group_rows)?;
3175 let p = eval_aggregate_expr(pattern, columns, group_rows)?;
3176 let esc = escape
3177 .as_ref()
3178 .map(|es| eval_aggregate_expr(es, columns, group_rows))
3179 .transpose()?;
3180 let esc_box = esc.map(|v| Box::new(Expr::Literal(v)));
3181 crate::eval::eval_expr(
3182 &Expr::Like {
3183 expr: Box::new(Expr::Literal(v)),
3184 pattern: Box::new(Expr::Literal(p)),
3185 escape: esc_box,
3186 negated: *negated,
3187 },
3188 columns,
3189 &[],
3190 )
3191 }
3192
3193 Expr::Function { name, args } => {
3194 let evaluated: Vec<Value> = args
3195 .iter()
3196 .map(|a| eval_aggregate_expr(a, columns, group_rows))
3197 .collect::<Result<_>>()?;
3198 let literal_args: Vec<Expr> = evaluated.into_iter().map(Expr::Literal).collect();
3199 crate::eval::eval_expr(
3200 &Expr::Function {
3201 name: name.clone(),
3202 args: literal_args,
3203 },
3204 columns,
3205 &[],
3206 )
3207 }
3208
3209 _ => Err(SqlError::Unsupported(format!(
3210 "expression in aggregate: {expr:?}"
3211 ))),
3212 }
3213}
3214
3215fn is_aggregate_function(name: &str, arg_count: usize) -> bool {
3216 let u = name.to_ascii_uppercase();
3217 matches!(u.as_str(), "COUNT" | "SUM" | "AVG")
3218 || (matches!(u.as_str(), "MIN" | "MAX") && arg_count == 1)
3219}
3220
3221fn is_aggregate_expr(expr: &Expr) -> bool {
3222 match expr {
3223 Expr::CountStar => true,
3224 Expr::Function { name, args } => {
3225 is_aggregate_function(name, args.len()) || args.iter().any(is_aggregate_expr)
3226 }
3227 Expr::BinaryOp { left, right, .. } => is_aggregate_expr(left) || is_aggregate_expr(right),
3228 Expr::UnaryOp { expr, .. }
3229 | Expr::IsNull(expr)
3230 | Expr::IsNotNull(expr)
3231 | Expr::Cast { expr, .. } => is_aggregate_expr(expr),
3232 Expr::Case {
3233 operand,
3234 conditions,
3235 else_result,
3236 } => {
3237 operand.as_ref().is_some_and(|e| is_aggregate_expr(e))
3238 || conditions
3239 .iter()
3240 .any(|(c, r)| is_aggregate_expr(c) || is_aggregate_expr(r))
3241 || else_result.as_ref().is_some_and(|e| is_aggregate_expr(e))
3242 }
3243 Expr::Coalesce(args) => args.iter().any(is_aggregate_expr),
3244 Expr::Between {
3245 expr, low, high, ..
3246 } => is_aggregate_expr(expr) || is_aggregate_expr(low) || is_aggregate_expr(high),
3247 Expr::Like {
3248 expr,
3249 pattern,
3250 escape,
3251 ..
3252 } => {
3253 is_aggregate_expr(expr)
3254 || is_aggregate_expr(pattern)
3255 || escape.as_ref().is_some_and(|e| is_aggregate_expr(e))
3256 }
3257 _ => false,
3258 }
3259}
3260
3261fn decode_full_row(schema: &TableSchema, key: &[u8], value: &[u8]) -> Result<Vec<Value>> {
3265 let pk_values = decode_composite_key(key, schema.primary_key_columns.len())?;
3266 let non_pk_values = decode_row(value)?;
3267
3268 let mut row = vec![Value::Null; schema.columns.len()];
3269
3270 for (i, &col_idx) in schema.primary_key_columns.iter().enumerate() {
3271 row[col_idx as usize] = pk_values[i].clone();
3272 }
3273
3274 let non_pk = schema.non_pk_indices();
3275 for (i, &col_idx) in non_pk.iter().enumerate() {
3276 if i < non_pk_values.len() {
3277 row[col_idx] = non_pk_values[i].clone();
3278 }
3279 }
3280
3281 Ok(row)
3282}
3283
3284fn eval_const_expr(expr: &Expr) -> Result<Value> {
3286 eval_expr(expr, &[], &[])
3287}
3288
3289fn eval_const_int(expr: &Expr) -> Result<i64> {
3290 match eval_const_expr(expr)? {
3291 Value::Integer(i) => Ok(i),
3292 other => Err(SqlError::TypeMismatch {
3293 expected: "INTEGER".into(),
3294 got: other.data_type().to_string(),
3295 }),
3296 }
3297}
3298
3299fn sort_rows(
3300 rows: &mut [Vec<Value>],
3301 order_by: &[OrderByItem],
3302 columns: &[ColumnDef],
3303) -> Result<()> {
3304 rows.sort_by(|a, b| {
3305 for item in order_by {
3306 let a_val = eval_expr(&item.expr, columns, a).unwrap_or(Value::Null);
3307 let b_val = eval_expr(&item.expr, columns, b).unwrap_or(Value::Null);
3308
3309 let nulls_first = item.nulls_first.unwrap_or(!item.descending);
3310
3311 let ord = match (a_val.is_null(), b_val.is_null()) {
3312 (true, true) => std::cmp::Ordering::Equal,
3313 (true, false) => {
3314 if nulls_first {
3315 std::cmp::Ordering::Less
3316 } else {
3317 std::cmp::Ordering::Greater
3318 }
3319 }
3320 (false, true) => {
3321 if nulls_first {
3322 std::cmp::Ordering::Greater
3323 } else {
3324 std::cmp::Ordering::Less
3325 }
3326 }
3327 (false, false) => {
3328 let cmp = a_val.cmp(&b_val);
3329 if item.descending {
3330 cmp.reverse()
3331 } else {
3332 cmp
3333 }
3334 }
3335 };
3336
3337 if ord != std::cmp::Ordering::Equal {
3338 return ord;
3339 }
3340 }
3341 std::cmp::Ordering::Equal
3342 });
3343 Ok(())
3344}
3345
3346fn project_rows(
3347 columns: &[ColumnDef],
3348 select_cols: &[SelectColumn],
3349 rows: &[Vec<Value>],
3350) -> Result<(Vec<String>, Vec<Vec<Value>>)> {
3351 let mut col_names = Vec::new();
3352 type Projector = Box<dyn Fn(&[Value]) -> Result<Value>>;
3353 let mut projectors: Vec<Projector> = Vec::new();
3354
3355 for sel_col in select_cols {
3356 match sel_col {
3357 SelectColumn::AllColumns => {
3358 for col in columns {
3359 let idx = col.position as usize;
3360 col_names.push(col.name.clone());
3361 projectors.push(Box::new(move |row: &[Value]| Ok(row[idx].clone())));
3362 }
3363 }
3364 SelectColumn::Expr { expr, alias } => {
3365 let name = alias.clone().unwrap_or_else(|| expr_display_name(expr));
3366 col_names.push(name);
3367 let expr = expr.clone();
3368 let owned_cols = columns.to_vec();
3369 projectors.push(Box::new(move |row: &[Value]| {
3370 eval_expr(&expr, &owned_cols, row)
3371 }));
3372 }
3373 }
3374 }
3375
3376 let projected = rows
3377 .iter()
3378 .map(|row| {
3379 projectors
3380 .iter()
3381 .map(|p| p(row))
3382 .collect::<Result<Vec<_>>>()
3383 })
3384 .collect::<Result<Vec<_>>>()?;
3385
3386 Ok((col_names, projected))
3387}
3388
3389fn expr_display_name(expr: &Expr) -> String {
3390 match expr {
3391 Expr::Column(name) => name.clone(),
3392 Expr::QualifiedColumn { table, column } => format!("{table}.{column}"),
3393 Expr::Literal(v) => format!("{v}"),
3394 Expr::CountStar => "COUNT(*)".into(),
3395 Expr::Function { name, args } => {
3396 let arg_strs: Vec<String> = args.iter().map(expr_display_name).collect();
3397 format!("{name}({})", arg_strs.join(", "))
3398 }
3399 Expr::BinaryOp { left, op, right } => {
3400 format!(
3401 "{} {} {}",
3402 expr_display_name(left),
3403 op_symbol(op),
3404 expr_display_name(right)
3405 )
3406 }
3407 _ => "?".into(),
3408 }
3409}
3410
3411fn op_symbol(op: &BinOp) -> &'static str {
3412 match op {
3413 BinOp::Add => "+",
3414 BinOp::Sub => "-",
3415 BinOp::Mul => "*",
3416 BinOp::Div => "/",
3417 BinOp::Mod => "%",
3418 BinOp::Eq => "=",
3419 BinOp::NotEq => "<>",
3420 BinOp::Lt => "<",
3421 BinOp::Gt => ">",
3422 BinOp::LtEq => "<=",
3423 BinOp::GtEq => ">=",
3424 BinOp::And => "AND",
3425 BinOp::Or => "OR",
3426 BinOp::Concat => "||",
3427 }
3428}
3429
3430fn build_output_columns(select_cols: &[SelectColumn], columns: &[ColumnDef]) -> Vec<ColumnDef> {
3431 let mut out = Vec::new();
3432 for (i, col) in select_cols.iter().enumerate() {
3433 let (name, data_type) = match col {
3434 SelectColumn::AllColumns => (format!("col{i}"), DataType::Null),
3435 SelectColumn::Expr {
3436 alias: Some(a),
3437 expr,
3438 } => (a.clone(), infer_expr_type(expr, columns)),
3439 SelectColumn::Expr { expr, .. } => {
3440 (expr_display_name(expr), infer_expr_type(expr, columns))
3441 }
3442 };
3443 out.push(ColumnDef {
3444 name,
3445 data_type,
3446 nullable: true,
3447 position: i as u16,
3448 });
3449 }
3450 out
3451}
3452
3453fn infer_expr_type(expr: &Expr, columns: &[ColumnDef]) -> DataType {
3454 match expr {
3455 Expr::Column(name) => {
3456 let lower = name.to_ascii_lowercase();
3457 columns
3458 .iter()
3459 .find(|c| c.name.to_ascii_lowercase() == lower)
3460 .map(|c| c.data_type)
3461 .unwrap_or(DataType::Null)
3462 }
3463 Expr::QualifiedColumn { table, column } => {
3464 let qualified = format!(
3465 "{}.{}",
3466 table.to_ascii_lowercase(),
3467 column.to_ascii_lowercase()
3468 );
3469 columns
3470 .iter()
3471 .find(|c| c.name.to_ascii_lowercase() == qualified)
3472 .map(|c| c.data_type)
3473 .unwrap_or(DataType::Null)
3474 }
3475 Expr::Literal(v) => v.data_type(),
3476 Expr::CountStar => DataType::Integer,
3477 Expr::Function { name, .. } => match name.to_ascii_uppercase().as_str() {
3478 "COUNT" => DataType::Integer,
3479 "AVG" => DataType::Real,
3480 "SUM" | "MIN" | "MAX" => DataType::Null,
3481 _ => DataType::Null,
3482 },
3483 _ => DataType::Null,
3484 }
3485}