1use std::collections::BTreeMap;
4
5use citadel::Database;
6
7use crate::encoding::{
8 decode_composite_key, decode_key_value, decode_row, encode_composite_key, encode_row,
9};
10use crate::error::{Result, SqlError};
11use crate::eval::{eval_expr, is_truthy};
12use crate::parser::*;
13use crate::planner::{self, ScanPlan};
14use crate::schema::SchemaManager;
15use crate::types::*;
16
17fn encode_index_key(idx: &IndexDef, row: &[Value], pk_values: &[Value]) -> Vec<u8> {
20 let indexed_values: Vec<Value> = idx
21 .columns
22 .iter()
23 .map(|&col_idx| row[col_idx as usize].clone())
24 .collect();
25
26 if idx.unique {
27 let any_null = indexed_values.iter().any(|v| v.is_null());
28 if !any_null {
29 return encode_composite_key(&indexed_values);
30 }
31 }
32
33 let mut all_values = indexed_values;
34 all_values.extend_from_slice(pk_values);
35 encode_composite_key(&all_values)
36}
37
38fn encode_index_value(idx: &IndexDef, row: &[Value], pk_values: &[Value]) -> Vec<u8> {
39 if idx.unique {
40 let indexed_values: Vec<Value> = idx
41 .columns
42 .iter()
43 .map(|&col_idx| row[col_idx as usize].clone())
44 .collect();
45 let any_null = indexed_values.iter().any(|v| v.is_null());
46 if !any_null {
47 return encode_composite_key(pk_values);
48 }
49 }
50 vec![]
51}
52
53fn insert_index_entries(
54 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
55 table_schema: &TableSchema,
56 row: &[Value],
57 pk_values: &[Value],
58) -> Result<()> {
59 for idx in &table_schema.indices {
60 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
61 let key = encode_index_key(idx, row, pk_values);
62 let value = encode_index_value(idx, row, pk_values);
63
64 let is_new = wtx
65 .table_insert(&idx_table, &key, &value)
66 .map_err(SqlError::Storage)?;
67
68 if idx.unique && !is_new {
69 let indexed_values: Vec<Value> = idx
70 .columns
71 .iter()
72 .map(|&col_idx| row[col_idx as usize].clone())
73 .collect();
74 let any_null = indexed_values.iter().any(|v| v.is_null());
75 if !any_null {
76 return Err(SqlError::UniqueViolation(idx.name.clone()));
77 }
78 }
79 }
80 Ok(())
81}
82
83fn delete_index_entries(
84 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
85 table_schema: &TableSchema,
86 row: &[Value],
87 pk_values: &[Value],
88) -> Result<()> {
89 for idx in &table_schema.indices {
90 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
91 let key = encode_index_key(idx, row, pk_values);
92 wtx.table_delete(&idx_table, &key)
93 .map_err(SqlError::Storage)?;
94 }
95 Ok(())
96}
97
98fn index_columns_changed(idx: &IndexDef, old_row: &[Value], new_row: &[Value]) -> bool {
99 idx.columns
100 .iter()
101 .any(|&col_idx| old_row[col_idx as usize] != new_row[col_idx as usize])
102}
103
104pub fn execute(
106 db: &Database,
107 schema: &mut SchemaManager,
108 stmt: &Statement,
109) -> Result<ExecutionResult> {
110 match stmt {
111 Statement::CreateTable(ct) => exec_create_table(db, schema, ct),
112 Statement::DropTable(dt) => exec_drop_table(db, schema, dt),
113 Statement::CreateIndex(ci) => exec_create_index(db, schema, ci),
114 Statement::DropIndex(di) => exec_drop_index(db, schema, di),
115 Statement::Insert(ins) => exec_insert(db, schema, ins),
116 Statement::Select(sel) => exec_select(db, schema, sel),
117 Statement::Update(upd) => exec_update(db, schema, upd),
118 Statement::Delete(del) => exec_delete(db, schema, del),
119 Statement::Explain(inner) => explain(schema, inner),
120 Statement::Begin | Statement::Commit | Statement::Rollback => Err(SqlError::Unsupported(
121 "transaction control in auto-commit mode".into(),
122 )),
123 }
124}
125
126pub fn execute_in_txn(
128 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
129 schema: &mut SchemaManager,
130 stmt: &Statement,
131) -> Result<ExecutionResult> {
132 match stmt {
133 Statement::CreateTable(ct) => exec_create_table_in_txn(wtx, schema, ct),
134 Statement::DropTable(dt) => exec_drop_table_in_txn(wtx, schema, dt),
135 Statement::CreateIndex(ci) => exec_create_index_in_txn(wtx, schema, ci),
136 Statement::DropIndex(di) => exec_drop_index_in_txn(wtx, schema, di),
137 Statement::Insert(ins) => exec_insert_in_txn(wtx, schema, ins),
138 Statement::Select(sel) => exec_select_in_txn(wtx, schema, sel),
139 Statement::Update(upd) => exec_update_in_txn(wtx, schema, upd),
140 Statement::Delete(del) => exec_delete_in_txn(wtx, schema, del),
141 Statement::Explain(inner) => explain(schema, inner),
142 Statement::Begin | Statement::Commit | Statement::Rollback => {
143 Err(SqlError::Unsupported("nested transaction control".into()))
144 }
145 }
146}
147
148pub fn explain(schema: &SchemaManager, stmt: &Statement) -> Result<ExecutionResult> {
151 let lines = match stmt {
152 Statement::Select(sel) => explain_select(schema, sel)?,
153 Statement::Insert(ins) => {
154 vec![format!("INSERT INTO {}", ins.table.to_ascii_lowercase())]
155 }
156 Statement::Update(upd) => explain_dml(schema, &upd.table, &upd.where_clause, "UPDATE")?,
157 Statement::Delete(del) => {
158 explain_dml(schema, &del.table, &del.where_clause, "DELETE FROM")?
159 }
160 Statement::Explain(_) => {
161 return Err(SqlError::Unsupported("EXPLAIN EXPLAIN".into()));
162 }
163 _ => {
164 return Err(SqlError::Unsupported(
165 "EXPLAIN for this statement type".into(),
166 ));
167 }
168 };
169
170 let rows = lines
171 .into_iter()
172 .map(|line| vec![Value::Text(line)])
173 .collect();
174 Ok(ExecutionResult::Query(QueryResult {
175 columns: vec!["plan".into()],
176 rows,
177 }))
178}
179
180fn explain_dml(
181 schema: &SchemaManager,
182 table: &str,
183 where_clause: &Option<Expr>,
184 verb: &str,
185) -> Result<Vec<String>> {
186 let lower = table.to_ascii_lowercase();
187 let table_schema = schema
188 .get(&lower)
189 .ok_or_else(|| SqlError::TableNotFound(table.to_string()))?;
190 let plan = planner::plan_select(table_schema, where_clause);
191 let scan_line = format_scan_line(&lower, &None, &plan, table_schema);
192 Ok(vec![format!("{verb} {}", scan_line)])
193}
194
195fn explain_select(schema: &SchemaManager, stmt: &SelectStmt) -> Result<Vec<String>> {
196 let mut lines = Vec::new();
197
198 if stmt.from.is_empty() {
199 lines.push("CONSTANT ROW".into());
200 return Ok(lines);
201 }
202
203 let lower_from = stmt.from.to_ascii_lowercase();
204 let from_schema = schema
205 .get(&lower_from)
206 .ok_or_else(|| SqlError::TableNotFound(stmt.from.clone()))?;
207
208 if stmt.joins.is_empty() {
209 let plan = planner::plan_select(from_schema, &stmt.where_clause);
210 lines.push(format_scan_line(
211 &lower_from,
212 &stmt.from_alias,
213 &plan,
214 from_schema,
215 ));
216 } else {
217 let from_plan = planner::plan_select(from_schema, &None);
218 lines.push(format_scan_line(
219 &lower_from,
220 &stmt.from_alias,
221 &from_plan,
222 from_schema,
223 ));
224
225 for join in &stmt.joins {
226 let inner_lower = join.table.name.to_ascii_lowercase();
227 let inner_schema = schema
228 .get(&inner_lower)
229 .ok_or_else(|| SqlError::TableNotFound(join.table.name.clone()))?;
230 let inner_plan = planner::plan_select(inner_schema, &None);
231 lines.push(format_scan_line(
232 &inner_lower,
233 &join.table.alias,
234 &inner_plan,
235 inner_schema,
236 ));
237 }
238
239 let join_type_str = match stmt.joins.last().map(|j| j.join_type) {
240 Some(JoinType::Left) => "LEFT JOIN",
241 Some(JoinType::Right) => "RIGHT JOIN",
242 Some(JoinType::Cross) => "CROSS JOIN",
243 _ => "NESTED LOOP",
244 };
245 lines.push(join_type_str.into());
246 }
247
248 if stmt.where_clause.is_some() && stmt.joins.is_empty() {
249 let plan = planner::plan_select(from_schema, &stmt.where_clause);
250 if matches!(plan, ScanPlan::SeqScan) {
251 lines.push("FILTER".into());
252 }
253 }
254
255 if let Some(ref w) = stmt.where_clause {
256 if !stmt.joins.is_empty() && has_subquery(w) {
257 lines.push("SUBQUERY".into());
258 }
259 }
260
261 explain_subqueries(stmt, &mut lines);
262
263 if !stmt.group_by.is_empty() {
264 lines.push("GROUP BY".into());
265 }
266
267 if stmt.distinct {
268 lines.push("DISTINCT".into());
269 }
270
271 if !stmt.order_by.is_empty() {
272 lines.push("SORT".into());
273 }
274
275 if let Some(ref offset_expr) = stmt.offset {
276 if let Ok(n) = eval_const_int(offset_expr) {
277 lines.push(format!("OFFSET {n}"));
278 } else {
279 lines.push("OFFSET".into());
280 }
281 }
282
283 if let Some(ref limit_expr) = stmt.limit {
284 if let Ok(n) = eval_const_int(limit_expr) {
285 lines.push(format!("LIMIT {n}"));
286 } else {
287 lines.push("LIMIT".into());
288 }
289 }
290
291 Ok(lines)
292}
293
294fn explain_subqueries(stmt: &SelectStmt, lines: &mut Vec<String>) {
295 let mut count = 0;
296 if let Some(ref w) = stmt.where_clause {
297 count += count_subqueries(w);
298 }
299 if let Some(ref h) = stmt.having {
300 count += count_subqueries(h);
301 }
302 for col in &stmt.columns {
303 if let SelectColumn::Expr { expr, .. } = col {
304 count += count_subqueries(expr);
305 }
306 }
307 for _ in 0..count {
308 lines.push("SUBQUERY".into());
309 }
310}
311
312fn count_subqueries(expr: &Expr) -> usize {
313 match expr {
314 Expr::InSubquery { expr: e, .. } => 1 + count_subqueries(e),
315 Expr::ScalarSubquery(_) => 1,
316 Expr::Exists { .. } => 1,
317 Expr::BinaryOp { left, right, .. } => count_subqueries(left) + count_subqueries(right),
318 Expr::UnaryOp { expr: e, .. } => count_subqueries(e),
319 Expr::IsNull(e) | Expr::IsNotNull(e) => count_subqueries(e),
320 Expr::Function { args, .. } => args.iter().map(count_subqueries).sum(),
321 Expr::Between {
322 expr: e, low, high, ..
323 } => count_subqueries(e) + count_subqueries(low) + count_subqueries(high),
324 Expr::Like {
325 expr: e, pattern, ..
326 } => count_subqueries(e) + count_subqueries(pattern),
327 Expr::Case {
328 operand,
329 conditions,
330 else_result,
331 } => {
332 let mut n = 0;
333 if let Some(op) = operand {
334 n += count_subqueries(op);
335 }
336 for (c, r) in conditions {
337 n += count_subqueries(c) + count_subqueries(r);
338 }
339 if let Some(el) = else_result {
340 n += count_subqueries(el);
341 }
342 n
343 }
344 Expr::Coalesce(args) => args.iter().map(count_subqueries).sum(),
345 Expr::Cast { expr: e, .. } => count_subqueries(e),
346 Expr::InList { expr: e, list, .. } => {
347 count_subqueries(e) + list.iter().map(count_subqueries).sum::<usize>()
348 }
349 _ => 0,
350 }
351}
352
353fn format_scan_line(
354 table_name: &str,
355 alias: &Option<String>,
356 plan: &ScanPlan,
357 table_schema: &TableSchema,
358) -> String {
359 let alias_part = match alias {
360 Some(a) if !a.eq_ignore_ascii_case(table_name) => {
361 format!(" AS {}", a.to_ascii_lowercase())
362 }
363 _ => String::new(),
364 };
365
366 let desc = planner::describe_plan(plan, table_schema);
367
368 if desc.is_empty() {
369 format!("SCAN TABLE {table_name}{alias_part}")
370 } else {
371 format!("SEARCH TABLE {table_name}{alias_part} {desc}")
372 }
373}
374
375fn exec_create_table(
378 db: &Database,
379 schema: &mut SchemaManager,
380 stmt: &CreateTableStmt,
381) -> Result<ExecutionResult> {
382 let lower_name = stmt.name.to_ascii_lowercase();
383
384 if schema.contains(&lower_name) {
385 if stmt.if_not_exists {
386 return Ok(ExecutionResult::Ok);
387 }
388 return Err(SqlError::TableAlreadyExists(stmt.name.clone()));
389 }
390
391 if stmt.primary_key.is_empty() {
392 return Err(SqlError::PrimaryKeyRequired);
393 }
394
395 let mut seen = std::collections::HashSet::new();
396 for col in &stmt.columns {
397 let lower = col.name.to_ascii_lowercase();
398 if !seen.insert(lower.clone()) {
399 return Err(SqlError::DuplicateColumn(col.name.clone()));
400 }
401 }
402
403 let columns: Vec<ColumnDef> = stmt
404 .columns
405 .iter()
406 .enumerate()
407 .map(|(i, c)| ColumnDef {
408 name: c.name.to_ascii_lowercase(),
409 data_type: c.data_type,
410 nullable: c.nullable,
411 position: i as u16,
412 })
413 .collect();
414
415 let primary_key_columns: Vec<u16> = stmt
416 .primary_key
417 .iter()
418 .map(|pk_name| {
419 let lower = pk_name.to_ascii_lowercase();
420 columns
421 .iter()
422 .position(|c| c.name == lower)
423 .map(|i| i as u16)
424 .ok_or_else(|| SqlError::ColumnNotFound(pk_name.clone()))
425 })
426 .collect::<Result<_>>()?;
427
428 let table_schema = TableSchema {
429 name: lower_name.clone(),
430 columns,
431 primary_key_columns,
432 indices: vec![],
433 };
434
435 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
436 SchemaManager::ensure_schema_table(&mut wtx)?;
437 wtx.create_table(lower_name.as_bytes())
438 .map_err(SqlError::Storage)?;
439 SchemaManager::save_schema(&mut wtx, &table_schema)?;
440 wtx.commit().map_err(SqlError::Storage)?;
441
442 schema.register(table_schema);
443 Ok(ExecutionResult::Ok)
444}
445
446fn exec_drop_table(
447 db: &Database,
448 schema: &mut SchemaManager,
449 stmt: &DropTableStmt,
450) -> Result<ExecutionResult> {
451 let lower_name = stmt.name.to_ascii_lowercase();
452
453 if !schema.contains(&lower_name) {
454 if stmt.if_exists {
455 return Ok(ExecutionResult::Ok);
456 }
457 return Err(SqlError::TableNotFound(stmt.name.clone()));
458 }
459
460 let table_schema = schema.get(&lower_name).unwrap();
461 let idx_tables: Vec<Vec<u8>> = table_schema
462 .indices
463 .iter()
464 .map(|idx| TableSchema::index_table_name(&lower_name, &idx.name))
465 .collect();
466
467 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
468 for idx_table in &idx_tables {
469 wtx.drop_table(idx_table).map_err(SqlError::Storage)?;
470 }
471 wtx.drop_table(lower_name.as_bytes())
472 .map_err(SqlError::Storage)?;
473 SchemaManager::delete_schema(&mut wtx, &lower_name)?;
474 wtx.commit().map_err(SqlError::Storage)?;
475
476 schema.remove(&lower_name);
477 Ok(ExecutionResult::Ok)
478}
479
480fn exec_create_table_in_txn(
481 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
482 schema: &mut SchemaManager,
483 stmt: &CreateTableStmt,
484) -> Result<ExecutionResult> {
485 let lower_name = stmt.name.to_ascii_lowercase();
486
487 if schema.contains(&lower_name) {
488 if stmt.if_not_exists {
489 return Ok(ExecutionResult::Ok);
490 }
491 return Err(SqlError::TableAlreadyExists(stmt.name.clone()));
492 }
493
494 if stmt.primary_key.is_empty() {
495 return Err(SqlError::PrimaryKeyRequired);
496 }
497
498 let mut seen = std::collections::HashSet::new();
499 for col in &stmt.columns {
500 let lower = col.name.to_ascii_lowercase();
501 if !seen.insert(lower.clone()) {
502 return Err(SqlError::DuplicateColumn(col.name.clone()));
503 }
504 }
505
506 let columns: Vec<ColumnDef> = stmt
507 .columns
508 .iter()
509 .enumerate()
510 .map(|(i, c)| ColumnDef {
511 name: c.name.to_ascii_lowercase(),
512 data_type: c.data_type,
513 nullable: c.nullable,
514 position: i as u16,
515 })
516 .collect();
517
518 let primary_key_columns: Vec<u16> = stmt
519 .primary_key
520 .iter()
521 .map(|pk_name| {
522 let lower = pk_name.to_ascii_lowercase();
523 columns
524 .iter()
525 .position(|c| c.name == lower)
526 .map(|i| i as u16)
527 .ok_or_else(|| SqlError::ColumnNotFound(pk_name.clone()))
528 })
529 .collect::<Result<_>>()?;
530
531 let table_schema = TableSchema {
532 name: lower_name.clone(),
533 columns,
534 primary_key_columns,
535 indices: vec![],
536 };
537
538 SchemaManager::ensure_schema_table(wtx)?;
539 wtx.create_table(lower_name.as_bytes())
540 .map_err(SqlError::Storage)?;
541 SchemaManager::save_schema(wtx, &table_schema)?;
542
543 schema.register(table_schema);
544 Ok(ExecutionResult::Ok)
545}
546
547fn exec_drop_table_in_txn(
548 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
549 schema: &mut SchemaManager,
550 stmt: &DropTableStmt,
551) -> Result<ExecutionResult> {
552 let lower_name = stmt.name.to_ascii_lowercase();
553
554 if !schema.contains(&lower_name) {
555 if stmt.if_exists {
556 return Ok(ExecutionResult::Ok);
557 }
558 return Err(SqlError::TableNotFound(stmt.name.clone()));
559 }
560
561 let table_schema = schema.get(&lower_name).unwrap();
562 let idx_tables: Vec<Vec<u8>> = table_schema
563 .indices
564 .iter()
565 .map(|idx| TableSchema::index_table_name(&lower_name, &idx.name))
566 .collect();
567
568 for idx_table in &idx_tables {
569 wtx.drop_table(idx_table).map_err(SqlError::Storage)?;
570 }
571 wtx.drop_table(lower_name.as_bytes())
572 .map_err(SqlError::Storage)?;
573 SchemaManager::delete_schema(wtx, &lower_name)?;
574
575 schema.remove(&lower_name);
576 Ok(ExecutionResult::Ok)
577}
578
579fn exec_create_index(
580 db: &Database,
581 schema: &mut SchemaManager,
582 stmt: &CreateIndexStmt,
583) -> Result<ExecutionResult> {
584 let lower_table = stmt.table_name.to_ascii_lowercase();
585 let lower_idx = stmt.index_name.to_ascii_lowercase();
586
587 let table_schema = schema
588 .get(&lower_table)
589 .ok_or_else(|| SqlError::TableNotFound(stmt.table_name.clone()))?;
590
591 if table_schema.index_by_name(&lower_idx).is_some() {
592 if stmt.if_not_exists {
593 return Ok(ExecutionResult::Ok);
594 }
595 return Err(SqlError::IndexAlreadyExists(stmt.index_name.clone()));
596 }
597
598 let col_indices: Vec<u16> = stmt
599 .columns
600 .iter()
601 .map(|col_name| {
602 let lower = col_name.to_ascii_lowercase();
603 table_schema
604 .column_index(&lower)
605 .map(|i| i as u16)
606 .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))
607 })
608 .collect::<Result<_>>()?;
609
610 let idx_def = IndexDef {
611 name: lower_idx.clone(),
612 columns: col_indices,
613 unique: stmt.unique,
614 };
615
616 let idx_table = TableSchema::index_table_name(&lower_table, &lower_idx);
617
618 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
619 SchemaManager::ensure_schema_table(&mut wtx)?;
620 wtx.create_table(&idx_table).map_err(SqlError::Storage)?;
621
622 let pk_indices = table_schema.pk_indices();
624 let mut rows: Vec<Vec<Value>> = Vec::new();
625 {
626 let mut scan_err: Option<SqlError> = None;
627 wtx.table_for_each(lower_table.as_bytes(), |key, value| {
628 match decode_full_row(table_schema, key, value) {
629 Ok(row) => rows.push(row),
630 Err(e) => scan_err = Some(e),
631 }
632 Ok(())
633 })
634 .map_err(SqlError::Storage)?;
635 if let Some(e) = scan_err {
636 return Err(e);
637 }
638 }
639
640 for row in &rows {
641 let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
642 let key = encode_index_key(&idx_def, row, &pk_values);
643 let value = encode_index_value(&idx_def, row, &pk_values);
644 let is_new = wtx
645 .table_insert(&idx_table, &key, &value)
646 .map_err(SqlError::Storage)?;
647 if idx_def.unique && !is_new {
648 let indexed_values: Vec<Value> = idx_def
649 .columns
650 .iter()
651 .map(|&col_idx| row[col_idx as usize].clone())
652 .collect();
653 let any_null = indexed_values.iter().any(|v| v.is_null());
654 if !any_null {
655 return Err(SqlError::UniqueViolation(stmt.index_name.clone()));
656 }
657 }
658 }
659
660 let mut updated_schema = table_schema.clone();
661 updated_schema.indices.push(idx_def);
662 SchemaManager::save_schema(&mut wtx, &updated_schema)?;
663 wtx.commit().map_err(SqlError::Storage)?;
664
665 schema.register(updated_schema);
666 Ok(ExecutionResult::Ok)
667}
668
669fn exec_drop_index(
670 db: &Database,
671 schema: &mut SchemaManager,
672 stmt: &DropIndexStmt,
673) -> Result<ExecutionResult> {
674 let lower_idx = stmt.index_name.to_ascii_lowercase();
675
676 let (table_name, _idx_pos) = match find_index_in_schemas(schema, &lower_idx) {
677 Some(found) => found,
678 None => {
679 if stmt.if_exists {
680 return Ok(ExecutionResult::Ok);
681 }
682 return Err(SqlError::IndexNotFound(stmt.index_name.clone()));
683 }
684 };
685
686 let idx_table = TableSchema::index_table_name(&table_name, &lower_idx);
687
688 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
689 wtx.drop_table(&idx_table).map_err(SqlError::Storage)?;
690
691 let table_schema = schema.get(&table_name).unwrap();
692 let mut updated_schema = table_schema.clone();
693 updated_schema.indices.retain(|i| i.name != lower_idx);
694 SchemaManager::save_schema(&mut wtx, &updated_schema)?;
695 wtx.commit().map_err(SqlError::Storage)?;
696
697 schema.register(updated_schema);
698 Ok(ExecutionResult::Ok)
699}
700
701fn exec_create_index_in_txn(
702 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
703 schema: &mut SchemaManager,
704 stmt: &CreateIndexStmt,
705) -> Result<ExecutionResult> {
706 let lower_table = stmt.table_name.to_ascii_lowercase();
707 let lower_idx = stmt.index_name.to_ascii_lowercase();
708
709 let table_schema = schema
710 .get(&lower_table)
711 .ok_or_else(|| SqlError::TableNotFound(stmt.table_name.clone()))?;
712
713 if table_schema.index_by_name(&lower_idx).is_some() {
714 if stmt.if_not_exists {
715 return Ok(ExecutionResult::Ok);
716 }
717 return Err(SqlError::IndexAlreadyExists(stmt.index_name.clone()));
718 }
719
720 let col_indices: Vec<u16> = stmt
721 .columns
722 .iter()
723 .map(|col_name| {
724 let lower = col_name.to_ascii_lowercase();
725 table_schema
726 .column_index(&lower)
727 .map(|i| i as u16)
728 .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))
729 })
730 .collect::<Result<_>>()?;
731
732 let idx_def = IndexDef {
733 name: lower_idx.clone(),
734 columns: col_indices,
735 unique: stmt.unique,
736 };
737
738 let idx_table = TableSchema::index_table_name(&lower_table, &lower_idx);
739
740 SchemaManager::ensure_schema_table(wtx)?;
741 wtx.create_table(&idx_table).map_err(SqlError::Storage)?;
742
743 let pk_indices = table_schema.pk_indices();
744 let mut rows: Vec<Vec<Value>> = Vec::new();
745 {
746 let mut scan_err: Option<SqlError> = None;
747 wtx.table_for_each(lower_table.as_bytes(), |key, value| {
748 match decode_full_row(table_schema, key, value) {
749 Ok(row) => rows.push(row),
750 Err(e) => scan_err = Some(e),
751 }
752 Ok(())
753 })
754 .map_err(SqlError::Storage)?;
755 if let Some(e) = scan_err {
756 return Err(e);
757 }
758 }
759
760 for row in &rows {
761 let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
762 let key = encode_index_key(&idx_def, row, &pk_values);
763 let value = encode_index_value(&idx_def, row, &pk_values);
764 let is_new = wtx
765 .table_insert(&idx_table, &key, &value)
766 .map_err(SqlError::Storage)?;
767 if idx_def.unique && !is_new {
768 let indexed_values: Vec<Value> = idx_def
769 .columns
770 .iter()
771 .map(|&col_idx| row[col_idx as usize].clone())
772 .collect();
773 let any_null = indexed_values.iter().any(|v| v.is_null());
774 if !any_null {
775 return Err(SqlError::UniqueViolation(stmt.index_name.clone()));
776 }
777 }
778 }
779
780 let mut updated_schema = table_schema.clone();
781 updated_schema.indices.push(idx_def);
782 SchemaManager::save_schema(wtx, &updated_schema)?;
783
784 schema.register(updated_schema);
785 Ok(ExecutionResult::Ok)
786}
787
788fn exec_drop_index_in_txn(
789 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
790 schema: &mut SchemaManager,
791 stmt: &DropIndexStmt,
792) -> Result<ExecutionResult> {
793 let lower_idx = stmt.index_name.to_ascii_lowercase();
794
795 let (table_name, _idx_pos) = match find_index_in_schemas(schema, &lower_idx) {
796 Some(found) => found,
797 None => {
798 if stmt.if_exists {
799 return Ok(ExecutionResult::Ok);
800 }
801 return Err(SqlError::IndexNotFound(stmt.index_name.clone()));
802 }
803 };
804
805 let idx_table = TableSchema::index_table_name(&table_name, &lower_idx);
806 wtx.drop_table(&idx_table).map_err(SqlError::Storage)?;
807
808 let table_schema = schema.get(&table_name).unwrap();
809 let mut updated_schema = table_schema.clone();
810 updated_schema.indices.retain(|i| i.name != lower_idx);
811 SchemaManager::save_schema(wtx, &updated_schema)?;
812
813 schema.register(updated_schema);
814 Ok(ExecutionResult::Ok)
815}
816
817fn find_index_in_schemas(schema: &SchemaManager, index_name: &str) -> Option<(String, usize)> {
818 for table_name in schema.table_names() {
819 if let Some(ts) = schema.get(table_name) {
820 if let Some(pos) = ts.indices.iter().position(|i| i.name == index_name) {
821 return Some((table_name.to_string(), pos));
822 }
823 }
824 }
825 None
826}
827
828fn extract_pk_key(
831 idx_key: &[u8],
832 idx_value: &[u8],
833 is_unique: bool,
834 num_index_cols: usize,
835 num_pk_cols: usize,
836) -> Result<Vec<u8>> {
837 if is_unique && !idx_value.is_empty() {
838 Ok(idx_value.to_vec())
839 } else {
840 let total_cols = num_index_cols + num_pk_cols;
841 let all_values = decode_composite_key(idx_key, total_cols)?;
842 let pk_values = &all_values[num_index_cols..];
843 Ok(encode_composite_key(pk_values))
844 }
845}
846
847fn check_range_conditions(
848 idx_key: &[u8],
849 num_prefix_cols: usize,
850 range_conds: &[(BinOp, Value)],
851 num_index_cols: usize,
852) -> Result<RangeCheck> {
853 if range_conds.is_empty() {
854 return Ok(RangeCheck::Match);
855 }
856
857 let num_to_decode = num_prefix_cols + 1;
858 if num_to_decode > num_index_cols {
859 return Ok(RangeCheck::Match);
860 }
861
862 let mut pos = 0;
864 for _ in 0..num_prefix_cols {
865 let (_, n) = decode_key_value(&idx_key[pos..])?;
866 pos += n;
867 }
868 let (range_val, _) = decode_key_value(&idx_key[pos..])?;
869
870 let mut exceeds_upper = false;
871 let mut below_lower = false;
872
873 for (op, val) in range_conds {
874 match op {
875 BinOp::Lt => {
876 if range_val >= *val {
877 exceeds_upper = true;
878 }
879 }
880 BinOp::LtEq => {
881 if range_val > *val {
882 exceeds_upper = true;
883 }
884 }
885 BinOp::Gt => {
886 if range_val <= *val {
887 below_lower = true;
888 }
889 }
890 BinOp::GtEq => {
891 if range_val < *val {
892 below_lower = true;
893 }
894 }
895 _ => {}
896 }
897 }
898
899 if exceeds_upper {
900 Ok(RangeCheck::ExceedsUpper)
901 } else if below_lower {
902 Ok(RangeCheck::BelowLower)
903 } else {
904 Ok(RangeCheck::Match)
905 }
906}
907
908enum RangeCheck {
909 Match,
910 BelowLower,
911 ExceedsUpper,
912}
913
914fn collect_rows_read(
916 db: &Database,
917 table_schema: &TableSchema,
918 where_clause: &Option<Expr>,
919) -> Result<Vec<Vec<Value>>> {
920 let plan = planner::plan_select(table_schema, where_clause);
921 let lower_name = &table_schema.name;
922
923 match plan {
924 ScanPlan::SeqScan => {
925 let mut rows = Vec::new();
926 let mut rtx = db.begin_read();
927 let mut scan_err: Option<SqlError> = None;
928 rtx.table_for_each(lower_name.as_bytes(), |key, value| {
929 match decode_full_row(table_schema, key, value) {
930 Ok(row) => rows.push(row),
931 Err(e) => scan_err = Some(e),
932 }
933 Ok(())
934 })
935 .map_err(SqlError::Storage)?;
936 if let Some(e) = scan_err {
937 return Err(e);
938 }
939 Ok(rows)
940 }
941
942 ScanPlan::PkLookup { pk_values } => {
943 let key = encode_composite_key(&pk_values);
944 let mut rtx = db.begin_read();
945 match rtx
946 .table_get(lower_name.as_bytes(), &key)
947 .map_err(SqlError::Storage)?
948 {
949 Some(value) => {
950 let row = decode_full_row(table_schema, &key, &value)?;
951 Ok(vec![row])
952 }
953 None => Ok(vec![]),
954 }
955 }
956
957 ScanPlan::IndexScan {
958 idx_table,
959 prefix,
960 num_prefix_cols,
961 range_conds,
962 is_unique,
963 index_columns,
964 ..
965 } => {
966 let num_pk_cols = table_schema.primary_key_columns.len();
967 let num_index_cols = index_columns.len();
968 let mut pk_keys: Vec<Vec<u8>> = Vec::new();
969
970 {
971 let mut rtx = db.begin_read();
972 let mut scan_err: Option<SqlError> = None;
973 rtx.table_scan_from(&idx_table, &prefix, |key, value| {
974 if !key.starts_with(&prefix) {
975 return Ok(false);
976 }
977 match check_range_conditions(key, num_prefix_cols, &range_conds, num_index_cols)
978 {
979 Ok(RangeCheck::ExceedsUpper) => return Ok(false),
980 Ok(RangeCheck::BelowLower) => return Ok(true),
981 Ok(RangeCheck::Match) => {}
982 Err(e) => {
983 scan_err = Some(e);
984 return Ok(false);
985 }
986 }
987 match extract_pk_key(key, value, is_unique, num_index_cols, num_pk_cols) {
988 Ok(pk) => pk_keys.push(pk),
989 Err(e) => {
990 scan_err = Some(e);
991 return Ok(false);
992 }
993 }
994 Ok(true)
995 })
996 .map_err(SqlError::Storage)?;
997 if let Some(e) = scan_err {
998 return Err(e);
999 }
1000 }
1001
1002 let mut rows = Vec::new();
1003 let mut rtx = db.begin_read();
1004 for pk_key in &pk_keys {
1005 if let Some(value) = rtx
1006 .table_get(lower_name.as_bytes(), pk_key)
1007 .map_err(SqlError::Storage)?
1008 {
1009 rows.push(decode_full_row(table_schema, pk_key, &value)?);
1010 }
1011 }
1012 Ok(rows)
1013 }
1014 }
1015}
1016
1017fn collect_rows_write(
1019 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1020 table_schema: &TableSchema,
1021 where_clause: &Option<Expr>,
1022) -> Result<Vec<Vec<Value>>> {
1023 let plan = planner::plan_select(table_schema, where_clause);
1024 let lower_name = &table_schema.name;
1025
1026 match plan {
1027 ScanPlan::SeqScan => {
1028 let mut rows = Vec::new();
1029 let mut scan_err: Option<SqlError> = None;
1030 wtx.table_for_each(lower_name.as_bytes(), |key, value| {
1031 match decode_full_row(table_schema, key, value) {
1032 Ok(row) => rows.push(row),
1033 Err(e) => scan_err = Some(e),
1034 }
1035 Ok(())
1036 })
1037 .map_err(SqlError::Storage)?;
1038 if let Some(e) = scan_err {
1039 return Err(e);
1040 }
1041 Ok(rows)
1042 }
1043
1044 ScanPlan::PkLookup { pk_values } => {
1045 let key = encode_composite_key(&pk_values);
1046 match wtx
1047 .table_get(lower_name.as_bytes(), &key)
1048 .map_err(SqlError::Storage)?
1049 {
1050 Some(value) => {
1051 let row = decode_full_row(table_schema, &key, &value)?;
1052 Ok(vec![row])
1053 }
1054 None => Ok(vec![]),
1055 }
1056 }
1057
1058 ScanPlan::IndexScan {
1059 idx_table,
1060 prefix,
1061 num_prefix_cols,
1062 range_conds,
1063 is_unique,
1064 index_columns,
1065 ..
1066 } => {
1067 let num_pk_cols = table_schema.primary_key_columns.len();
1068 let num_index_cols = index_columns.len();
1069 let mut pk_keys: Vec<Vec<u8>> = Vec::new();
1070
1071 {
1072 let mut scan_err: Option<SqlError> = None;
1073 wtx.table_scan_from(&idx_table, &prefix, |key, value| {
1074 if !key.starts_with(&prefix) {
1075 return Ok(false);
1076 }
1077 match check_range_conditions(key, num_prefix_cols, &range_conds, num_index_cols)
1078 {
1079 Ok(RangeCheck::ExceedsUpper) => return Ok(false),
1080 Ok(RangeCheck::BelowLower) => return Ok(true),
1081 Ok(RangeCheck::Match) => {}
1082 Err(e) => {
1083 scan_err = Some(e);
1084 return Ok(false);
1085 }
1086 }
1087 match extract_pk_key(key, value, is_unique, num_index_cols, num_pk_cols) {
1088 Ok(pk) => pk_keys.push(pk),
1089 Err(e) => {
1090 scan_err = Some(e);
1091 return Ok(false);
1092 }
1093 }
1094 Ok(true)
1095 })
1096 .map_err(SqlError::Storage)?;
1097 if let Some(e) = scan_err {
1098 return Err(e);
1099 }
1100 }
1101
1102 let mut rows = Vec::new();
1103 for pk_key in &pk_keys {
1104 if let Some(value) = wtx
1105 .table_get(lower_name.as_bytes(), pk_key)
1106 .map_err(SqlError::Storage)?
1107 {
1108 rows.push(decode_full_row(table_schema, pk_key, &value)?);
1109 }
1110 }
1111 Ok(rows)
1112 }
1113 }
1114}
1115
1116fn collect_keyed_rows_read(
1119 db: &Database,
1120 table_schema: &TableSchema,
1121 where_clause: &Option<Expr>,
1122) -> Result<Vec<(Vec<u8>, Vec<Value>)>> {
1123 let plan = planner::plan_select(table_schema, where_clause);
1124 let lower_name = &table_schema.name;
1125
1126 match plan {
1127 ScanPlan::SeqScan => {
1128 let mut rows = Vec::new();
1129 let mut rtx = db.begin_read();
1130 let mut scan_err: Option<SqlError> = None;
1131 rtx.table_for_each(lower_name.as_bytes(), |key, value| {
1132 match decode_full_row(table_schema, key, value) {
1133 Ok(row) => rows.push((key.to_vec(), row)),
1134 Err(e) => scan_err = Some(e),
1135 }
1136 Ok(())
1137 })
1138 .map_err(SqlError::Storage)?;
1139 if let Some(e) = scan_err {
1140 return Err(e);
1141 }
1142 Ok(rows)
1143 }
1144
1145 ScanPlan::PkLookup { pk_values } => {
1146 let key = encode_composite_key(&pk_values);
1147 let mut rtx = db.begin_read();
1148 match rtx
1149 .table_get(lower_name.as_bytes(), &key)
1150 .map_err(SqlError::Storage)?
1151 {
1152 Some(value) => {
1153 let row = decode_full_row(table_schema, &key, &value)?;
1154 Ok(vec![(key, row)])
1155 }
1156 None => Ok(vec![]),
1157 }
1158 }
1159
1160 ScanPlan::IndexScan {
1161 idx_table,
1162 prefix,
1163 num_prefix_cols,
1164 range_conds,
1165 is_unique,
1166 index_columns,
1167 ..
1168 } => {
1169 let num_pk_cols = table_schema.primary_key_columns.len();
1170 let num_index_cols = index_columns.len();
1171 let mut pk_keys: Vec<Vec<u8>> = Vec::new();
1172
1173 {
1174 let mut rtx = db.begin_read();
1175 let mut scan_err: Option<SqlError> = None;
1176 rtx.table_scan_from(&idx_table, &prefix, |key, value| {
1177 if !key.starts_with(&prefix) {
1178 return Ok(false);
1179 }
1180 match check_range_conditions(key, num_prefix_cols, &range_conds, num_index_cols)
1181 {
1182 Ok(RangeCheck::ExceedsUpper) => return Ok(false),
1183 Ok(RangeCheck::BelowLower) => return Ok(true),
1184 Ok(RangeCheck::Match) => {}
1185 Err(e) => {
1186 scan_err = Some(e);
1187 return Ok(false);
1188 }
1189 }
1190 match extract_pk_key(key, value, is_unique, num_index_cols, num_pk_cols) {
1191 Ok(pk) => pk_keys.push(pk),
1192 Err(e) => {
1193 scan_err = Some(e);
1194 return Ok(false);
1195 }
1196 }
1197 Ok(true)
1198 })
1199 .map_err(SqlError::Storage)?;
1200 if let Some(e) = scan_err {
1201 return Err(e);
1202 }
1203 }
1204
1205 let mut rows = Vec::new();
1206 let mut rtx = db.begin_read();
1207 for pk_key in &pk_keys {
1208 if let Some(value) = rtx
1209 .table_get(lower_name.as_bytes(), pk_key)
1210 .map_err(SqlError::Storage)?
1211 {
1212 rows.push((
1213 pk_key.clone(),
1214 decode_full_row(table_schema, pk_key, &value)?,
1215 ));
1216 }
1217 }
1218 Ok(rows)
1219 }
1220 }
1221}
1222
1223fn collect_keyed_rows_write(
1225 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1226 table_schema: &TableSchema,
1227 where_clause: &Option<Expr>,
1228) -> Result<Vec<(Vec<u8>, Vec<Value>)>> {
1229 let plan = planner::plan_select(table_schema, where_clause);
1230 let lower_name = &table_schema.name;
1231
1232 match plan {
1233 ScanPlan::SeqScan => {
1234 let mut rows = Vec::new();
1235 let mut scan_err: Option<SqlError> = None;
1236 wtx.table_for_each(lower_name.as_bytes(), |key, value| {
1237 match decode_full_row(table_schema, key, value) {
1238 Ok(row) => rows.push((key.to_vec(), row)),
1239 Err(e) => scan_err = Some(e),
1240 }
1241 Ok(())
1242 })
1243 .map_err(SqlError::Storage)?;
1244 if let Some(e) = scan_err {
1245 return Err(e);
1246 }
1247 Ok(rows)
1248 }
1249
1250 ScanPlan::PkLookup { pk_values } => {
1251 let key = encode_composite_key(&pk_values);
1252 match wtx
1253 .table_get(lower_name.as_bytes(), &key)
1254 .map_err(SqlError::Storage)?
1255 {
1256 Some(value) => {
1257 let row = decode_full_row(table_schema, &key, &value)?;
1258 Ok(vec![(key, row)])
1259 }
1260 None => Ok(vec![]),
1261 }
1262 }
1263
1264 ScanPlan::IndexScan {
1265 idx_table,
1266 prefix,
1267 num_prefix_cols,
1268 range_conds,
1269 is_unique,
1270 index_columns,
1271 ..
1272 } => {
1273 let num_pk_cols = table_schema.primary_key_columns.len();
1274 let num_index_cols = index_columns.len();
1275 let mut pk_keys: Vec<Vec<u8>> = Vec::new();
1276
1277 {
1278 let mut scan_err: Option<SqlError> = None;
1279 wtx.table_scan_from(&idx_table, &prefix, |key, value| {
1280 if !key.starts_with(&prefix) {
1281 return Ok(false);
1282 }
1283 match check_range_conditions(key, num_prefix_cols, &range_conds, num_index_cols)
1284 {
1285 Ok(RangeCheck::ExceedsUpper) => return Ok(false),
1286 Ok(RangeCheck::BelowLower) => return Ok(true),
1287 Ok(RangeCheck::Match) => {}
1288 Err(e) => {
1289 scan_err = Some(e);
1290 return Ok(false);
1291 }
1292 }
1293 match extract_pk_key(key, value, is_unique, num_index_cols, num_pk_cols) {
1294 Ok(pk) => pk_keys.push(pk),
1295 Err(e) => {
1296 scan_err = Some(e);
1297 return Ok(false);
1298 }
1299 }
1300 Ok(true)
1301 })
1302 .map_err(SqlError::Storage)?;
1303 if let Some(e) = scan_err {
1304 return Err(e);
1305 }
1306 }
1307
1308 let mut rows = Vec::new();
1309 for pk_key in &pk_keys {
1310 if let Some(value) = wtx
1311 .table_get(lower_name.as_bytes(), pk_key)
1312 .map_err(SqlError::Storage)?
1313 {
1314 rows.push((
1315 pk_key.clone(),
1316 decode_full_row(table_schema, pk_key, &value)?,
1317 ));
1318 }
1319 }
1320 Ok(rows)
1321 }
1322 }
1323}
1324
1325fn exec_insert(
1328 db: &Database,
1329 schema: &SchemaManager,
1330 stmt: &InsertStmt,
1331) -> Result<ExecutionResult> {
1332 let materialized;
1333 let stmt = if insert_has_subquery(stmt) {
1334 materialized = materialize_insert(stmt, &mut |sub| exec_subquery_read(db, schema, sub))?;
1335 &materialized
1336 } else {
1337 stmt
1338 };
1339
1340 let lower_name = stmt.table.to_ascii_lowercase();
1341 let table_schema = schema
1342 .get(&lower_name)
1343 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1344
1345 let insert_columns = if stmt.columns.is_empty() {
1346 table_schema
1347 .columns
1348 .iter()
1349 .map(|c| c.name.clone())
1350 .collect::<Vec<_>>()
1351 } else {
1352 stmt.columns
1353 .iter()
1354 .map(|c| c.to_ascii_lowercase())
1355 .collect()
1356 };
1357
1358 let col_indices: Vec<usize> = insert_columns
1359 .iter()
1360 .map(|name| {
1361 table_schema
1362 .column_index(name)
1363 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
1364 })
1365 .collect::<Result<_>>()?;
1366
1367 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
1368 let mut count: u64 = 0;
1369
1370 for value_row in &stmt.values {
1371 if value_row.len() != insert_columns.len() {
1372 return Err(SqlError::InvalidValue(format!(
1373 "expected {} values, got {}",
1374 insert_columns.len(),
1375 value_row.len()
1376 )));
1377 }
1378
1379 let mut row = vec![Value::Null; table_schema.columns.len()];
1380 for (i, expr) in value_row.iter().enumerate() {
1381 let val = eval_const_expr(expr)?;
1382 let col_idx = col_indices[i];
1383 let col = &table_schema.columns[col_idx];
1384
1385 let coerced = if val.is_null() {
1386 Value::Null
1387 } else {
1388 val.coerce_to(col.data_type)
1389 .ok_or_else(|| SqlError::TypeMismatch {
1390 expected: col.data_type.to_string(),
1391 got: val.data_type().to_string(),
1392 })?
1393 };
1394
1395 row[col_idx] = coerced;
1396 }
1397
1398 for col in &table_schema.columns {
1399 if !col.nullable && row[col.position as usize].is_null() {
1400 return Err(SqlError::NotNullViolation(col.name.clone()));
1401 }
1402 }
1403
1404 let pk_values: Vec<Value> = table_schema
1405 .pk_indices()
1406 .iter()
1407 .map(|&i| row[i].clone())
1408 .collect();
1409 let key = encode_composite_key(&pk_values);
1410
1411 let non_pk = table_schema.non_pk_indices();
1412 let value_values: Vec<Value> = non_pk.iter().map(|&i| row[i].clone()).collect();
1413 let value = encode_row(&value_values);
1414
1415 if key.len() > citadel_core::MAX_KEY_SIZE {
1416 return Err(SqlError::KeyTooLarge {
1417 size: key.len(),
1418 max: citadel_core::MAX_KEY_SIZE,
1419 });
1420 }
1421 if value.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
1422 return Err(SqlError::RowTooLarge {
1423 size: value.len(),
1424 max: citadel_core::MAX_INLINE_VALUE_SIZE,
1425 });
1426 }
1427
1428 let is_new = wtx
1429 .table_insert(lower_name.as_bytes(), &key, &value)
1430 .map_err(SqlError::Storage)?;
1431 if !is_new {
1432 return Err(SqlError::DuplicateKey);
1433 }
1434
1435 insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
1436 count += 1;
1437 }
1438
1439 wtx.commit().map_err(SqlError::Storage)?;
1440 Ok(ExecutionResult::RowsAffected(count))
1441}
1442
1443fn has_subquery(expr: &Expr) -> bool {
1444 match expr {
1445 Expr::InSubquery { .. } | Expr::Exists { .. } | Expr::ScalarSubquery(_) => true,
1446 Expr::BinaryOp { left, right, .. } => has_subquery(left) || has_subquery(right),
1447 Expr::UnaryOp { expr, .. } => has_subquery(expr),
1448 Expr::IsNull(e) | Expr::IsNotNull(e) => has_subquery(e),
1449 Expr::InList { expr, list, .. } => has_subquery(expr) || list.iter().any(has_subquery),
1450 Expr::InSet { expr, .. } => has_subquery(expr),
1451 Expr::Between {
1452 expr, low, high, ..
1453 } => has_subquery(expr) || has_subquery(low) || has_subquery(high),
1454 Expr::Like {
1455 expr,
1456 pattern,
1457 escape,
1458 ..
1459 } => {
1460 has_subquery(expr)
1461 || has_subquery(pattern)
1462 || escape.as_ref().is_some_and(|e| has_subquery(e))
1463 }
1464 Expr::Case {
1465 operand,
1466 conditions,
1467 else_result,
1468 } => {
1469 operand.as_ref().is_some_and(|e| has_subquery(e))
1470 || conditions
1471 .iter()
1472 .any(|(c, r)| has_subquery(c) || has_subquery(r))
1473 || else_result.as_ref().is_some_and(|e| has_subquery(e))
1474 }
1475 Expr::Coalesce(args) => args.iter().any(has_subquery),
1476 Expr::Cast { expr, .. } => has_subquery(expr),
1477 Expr::Function { args, .. } => args.iter().any(has_subquery),
1478 _ => false,
1479 }
1480}
1481
1482fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
1483 if let Some(ref w) = stmt.where_clause {
1484 if has_subquery(w) {
1485 return true;
1486 }
1487 }
1488 if let Some(ref h) = stmt.having {
1489 if has_subquery(h) {
1490 return true;
1491 }
1492 }
1493 for col in &stmt.columns {
1494 if let SelectColumn::Expr { expr, .. } = col {
1495 if has_subquery(expr) {
1496 return true;
1497 }
1498 }
1499 }
1500 for ob in &stmt.order_by {
1501 if has_subquery(&ob.expr) {
1502 return true;
1503 }
1504 }
1505 for join in &stmt.joins {
1506 if let Some(ref on_expr) = join.on_clause {
1507 if has_subquery(on_expr) {
1508 return true;
1509 }
1510 }
1511 }
1512 false
1513}
1514
1515fn materialize_expr(
1516 expr: &Expr,
1517 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1518) -> Result<Expr> {
1519 match expr {
1520 Expr::InSubquery {
1521 expr: e,
1522 subquery,
1523 negated,
1524 } => {
1525 let inner = materialize_expr(e, exec_sub)?;
1526 let qr = exec_sub(subquery)?;
1527 if !qr.columns.is_empty() && qr.columns.len() != 1 {
1528 return Err(SqlError::SubqueryMultipleColumns);
1529 }
1530 let mut values = std::collections::HashSet::new();
1531 let mut has_null = false;
1532 for row in &qr.rows {
1533 if row[0].is_null() {
1534 has_null = true;
1535 } else {
1536 values.insert(row[0].clone());
1537 }
1538 }
1539 Ok(Expr::InSet {
1540 expr: Box::new(inner),
1541 values,
1542 has_null,
1543 negated: *negated,
1544 })
1545 }
1546 Expr::ScalarSubquery(subquery) => {
1547 let qr = exec_sub(subquery)?;
1548 if qr.rows.len() > 1 {
1549 return Err(SqlError::SubqueryMultipleRows);
1550 }
1551 let val = if qr.rows.is_empty() {
1552 Value::Null
1553 } else {
1554 qr.rows[0][0].clone()
1555 };
1556 Ok(Expr::Literal(val))
1557 }
1558 Expr::Exists { subquery, negated } => {
1559 let qr = exec_sub(subquery)?;
1560 let exists = !qr.rows.is_empty();
1561 let result = if *negated { !exists } else { exists };
1562 Ok(Expr::Literal(Value::Boolean(result)))
1563 }
1564 Expr::InList {
1565 expr: e,
1566 list,
1567 negated,
1568 } => {
1569 let inner = materialize_expr(e, exec_sub)?;
1570 let items = list
1571 .iter()
1572 .map(|item| materialize_expr(item, exec_sub))
1573 .collect::<Result<Vec<_>>>()?;
1574 Ok(Expr::InList {
1575 expr: Box::new(inner),
1576 list: items,
1577 negated: *negated,
1578 })
1579 }
1580 Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
1581 left: Box::new(materialize_expr(left, exec_sub)?),
1582 op: *op,
1583 right: Box::new(materialize_expr(right, exec_sub)?),
1584 }),
1585 Expr::UnaryOp { op, expr: e } => Ok(Expr::UnaryOp {
1586 op: *op,
1587 expr: Box::new(materialize_expr(e, exec_sub)?),
1588 }),
1589 Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
1590 Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
1591 Expr::InSet {
1592 expr: e,
1593 values,
1594 has_null,
1595 negated,
1596 } => Ok(Expr::InSet {
1597 expr: Box::new(materialize_expr(e, exec_sub)?),
1598 values: values.clone(),
1599 has_null: *has_null,
1600 negated: *negated,
1601 }),
1602 Expr::Between {
1603 expr: e,
1604 low,
1605 high,
1606 negated,
1607 } => Ok(Expr::Between {
1608 expr: Box::new(materialize_expr(e, exec_sub)?),
1609 low: Box::new(materialize_expr(low, exec_sub)?),
1610 high: Box::new(materialize_expr(high, exec_sub)?),
1611 negated: *negated,
1612 }),
1613 Expr::Like {
1614 expr: e,
1615 pattern,
1616 escape,
1617 negated,
1618 } => {
1619 let esc = escape
1620 .as_ref()
1621 .map(|es| materialize_expr(es, exec_sub).map(Box::new))
1622 .transpose()?;
1623 Ok(Expr::Like {
1624 expr: Box::new(materialize_expr(e, exec_sub)?),
1625 pattern: Box::new(materialize_expr(pattern, exec_sub)?),
1626 escape: esc,
1627 negated: *negated,
1628 })
1629 }
1630 Expr::Case {
1631 operand,
1632 conditions,
1633 else_result,
1634 } => {
1635 let op = operand
1636 .as_ref()
1637 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
1638 .transpose()?;
1639 let conds = conditions
1640 .iter()
1641 .map(|(c, r)| {
1642 Ok((
1643 materialize_expr(c, exec_sub)?,
1644 materialize_expr(r, exec_sub)?,
1645 ))
1646 })
1647 .collect::<Result<Vec<_>>>()?;
1648 let else_r = else_result
1649 .as_ref()
1650 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
1651 .transpose()?;
1652 Ok(Expr::Case {
1653 operand: op,
1654 conditions: conds,
1655 else_result: else_r,
1656 })
1657 }
1658 Expr::Coalesce(args) => {
1659 let materialized = args
1660 .iter()
1661 .map(|a| materialize_expr(a, exec_sub))
1662 .collect::<Result<Vec<_>>>()?;
1663 Ok(Expr::Coalesce(materialized))
1664 }
1665 Expr::Cast { expr: e, data_type } => Ok(Expr::Cast {
1666 expr: Box::new(materialize_expr(e, exec_sub)?),
1667 data_type: *data_type,
1668 }),
1669 Expr::Function { name, args } => {
1670 let materialized = args
1671 .iter()
1672 .map(|a| materialize_expr(a, exec_sub))
1673 .collect::<Result<Vec<_>>>()?;
1674 Ok(Expr::Function {
1675 name: name.clone(),
1676 args: materialized,
1677 })
1678 }
1679 other => Ok(other.clone()),
1680 }
1681}
1682
1683fn materialize_stmt(
1684 stmt: &SelectStmt,
1685 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1686) -> Result<SelectStmt> {
1687 let where_clause = stmt
1688 .where_clause
1689 .as_ref()
1690 .map(|e| materialize_expr(e, exec_sub))
1691 .transpose()?;
1692 let having = stmt
1693 .having
1694 .as_ref()
1695 .map(|e| materialize_expr(e, exec_sub))
1696 .transpose()?;
1697 let columns = stmt
1698 .columns
1699 .iter()
1700 .map(|c| match c {
1701 SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
1702 SelectColumn::Expr { expr, alias } => Ok(SelectColumn::Expr {
1703 expr: materialize_expr(expr, exec_sub)?,
1704 alias: alias.clone(),
1705 }),
1706 })
1707 .collect::<Result<Vec<_>>>()?;
1708 let order_by = stmt
1709 .order_by
1710 .iter()
1711 .map(|ob| {
1712 Ok(OrderByItem {
1713 expr: materialize_expr(&ob.expr, exec_sub)?,
1714 descending: ob.descending,
1715 nulls_first: ob.nulls_first,
1716 })
1717 })
1718 .collect::<Result<Vec<_>>>()?;
1719 let joins = stmt
1720 .joins
1721 .iter()
1722 .map(|j| {
1723 let on_clause = j
1724 .on_clause
1725 .as_ref()
1726 .map(|e| materialize_expr(e, exec_sub))
1727 .transpose()?;
1728 Ok(JoinClause {
1729 join_type: j.join_type,
1730 table: j.table.clone(),
1731 on_clause,
1732 })
1733 })
1734 .collect::<Result<Vec<_>>>()?;
1735 let group_by = stmt
1736 .group_by
1737 .iter()
1738 .map(|e| materialize_expr(e, exec_sub))
1739 .collect::<Result<Vec<_>>>()?;
1740 Ok(SelectStmt {
1741 columns,
1742 from: stmt.from.clone(),
1743 from_alias: stmt.from_alias.clone(),
1744 joins,
1745 distinct: stmt.distinct,
1746 where_clause,
1747 order_by,
1748 limit: stmt.limit.clone(),
1749 offset: stmt.offset.clone(),
1750 group_by,
1751 having,
1752 })
1753}
1754
1755fn exec_subquery_read(
1756 db: &Database,
1757 schema: &SchemaManager,
1758 stmt: &SelectStmt,
1759) -> Result<QueryResult> {
1760 match exec_select(db, schema, stmt)? {
1761 ExecutionResult::Query(qr) => Ok(qr),
1762 _ => Ok(QueryResult {
1763 columns: vec![],
1764 rows: vec![],
1765 }),
1766 }
1767}
1768
1769fn exec_subquery_write(
1770 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1771 schema: &SchemaManager,
1772 stmt: &SelectStmt,
1773) -> Result<QueryResult> {
1774 match exec_select_in_txn(wtx, schema, stmt)? {
1775 ExecutionResult::Query(qr) => Ok(qr),
1776 _ => Ok(QueryResult {
1777 columns: vec![],
1778 rows: vec![],
1779 }),
1780 }
1781}
1782
1783fn update_has_subquery(stmt: &UpdateStmt) -> bool {
1784 stmt.where_clause.as_ref().is_some_and(has_subquery)
1785 || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
1786}
1787
1788fn materialize_update(
1789 stmt: &UpdateStmt,
1790 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1791) -> Result<UpdateStmt> {
1792 let where_clause = stmt
1793 .where_clause
1794 .as_ref()
1795 .map(|e| materialize_expr(e, exec_sub))
1796 .transpose()?;
1797 let assignments = stmt
1798 .assignments
1799 .iter()
1800 .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
1801 .collect::<Result<Vec<_>>>()?;
1802 Ok(UpdateStmt {
1803 table: stmt.table.clone(),
1804 assignments,
1805 where_clause,
1806 })
1807}
1808
1809fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
1810 stmt.where_clause.as_ref().is_some_and(has_subquery)
1811}
1812
1813fn materialize_delete(
1814 stmt: &DeleteStmt,
1815 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1816) -> Result<DeleteStmt> {
1817 let where_clause = stmt
1818 .where_clause
1819 .as_ref()
1820 .map(|e| materialize_expr(e, exec_sub))
1821 .transpose()?;
1822 Ok(DeleteStmt {
1823 table: stmt.table.clone(),
1824 where_clause,
1825 })
1826}
1827
1828fn insert_has_subquery(stmt: &InsertStmt) -> bool {
1829 stmt.values.iter().any(|row| row.iter().any(has_subquery))
1830}
1831
1832fn materialize_insert(
1833 stmt: &InsertStmt,
1834 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1835) -> Result<InsertStmt> {
1836 let values = stmt
1837 .values
1838 .iter()
1839 .map(|row| {
1840 row.iter()
1841 .map(|e| materialize_expr(e, exec_sub))
1842 .collect::<Result<Vec<_>>>()
1843 })
1844 .collect::<Result<Vec<_>>>()?;
1845 Ok(InsertStmt {
1846 table: stmt.table.clone(),
1847 columns: stmt.columns.clone(),
1848 values,
1849 })
1850}
1851
1852fn exec_select(
1853 db: &Database,
1854 schema: &SchemaManager,
1855 stmt: &SelectStmt,
1856) -> Result<ExecutionResult> {
1857 let materialized;
1858 let stmt = if stmt_has_subquery(stmt) {
1859 materialized = materialize_stmt(stmt, &mut |sub| exec_subquery_read(db, schema, sub))?;
1860 &materialized
1861 } else {
1862 stmt
1863 };
1864
1865 if stmt.from.is_empty() {
1866 return exec_select_no_from(stmt);
1867 }
1868
1869 let lower_name = stmt.from.to_ascii_lowercase();
1870 let table_schema = schema
1871 .get(&lower_name)
1872 .ok_or_else(|| SqlError::TableNotFound(stmt.from.clone()))?;
1873
1874 if !stmt.joins.is_empty() {
1875 return exec_select_join(db, schema, stmt);
1876 }
1877
1878 let rows = collect_rows_read(db, table_schema, &stmt.where_clause)?;
1879 process_select(&table_schema.columns, rows, stmt)
1880}
1881
1882fn exec_select_no_from(stmt: &SelectStmt) -> Result<ExecutionResult> {
1883 let empty_cols: Vec<ColumnDef> = vec![];
1884 let empty_row: Vec<Value> = vec![];
1885 let (col_names, projected) = project_rows(&empty_cols, &stmt.columns, &[empty_row])?;
1886 Ok(ExecutionResult::Query(QueryResult {
1887 columns: col_names,
1888 rows: projected,
1889 }))
1890}
1891
1892fn process_select(
1894 columns: &[ColumnDef],
1895 mut rows: Vec<Vec<Value>>,
1896 stmt: &SelectStmt,
1897) -> Result<ExecutionResult> {
1898 if let Some(ref where_expr) = stmt.where_clause {
1899 rows.retain(|row| match eval_expr(where_expr, columns, row) {
1900 Ok(val) => is_truthy(&val),
1901 Err(_) => false,
1902 });
1903 }
1904
1905 let has_aggregates = stmt.columns.iter().any(|c| match c {
1906 SelectColumn::Expr { expr, .. } => is_aggregate_expr(expr),
1907 _ => false,
1908 });
1909
1910 if has_aggregates || !stmt.group_by.is_empty() {
1911 return exec_aggregate(columns, &rows, stmt);
1912 }
1913
1914 if stmt.distinct {
1915 let (col_names, mut projected) = project_rows(columns, &stmt.columns, &rows)?;
1916
1917 let mut seen = std::collections::HashSet::new();
1918 projected.retain(|row| seen.insert(row.clone()));
1919
1920 if !stmt.order_by.is_empty() {
1921 let output_cols = build_output_columns(&stmt.columns, columns);
1922 sort_rows(&mut projected, &stmt.order_by, &output_cols)?;
1923 }
1924
1925 if let Some(ref offset_expr) = stmt.offset {
1926 let offset = eval_const_int(offset_expr)? as usize;
1927 if offset < projected.len() {
1928 projected = projected.split_off(offset);
1929 } else {
1930 projected.clear();
1931 }
1932 }
1933
1934 if let Some(ref limit_expr) = stmt.limit {
1935 let limit = eval_const_int(limit_expr)? as usize;
1936 projected.truncate(limit);
1937 }
1938
1939 return Ok(ExecutionResult::Query(QueryResult {
1940 columns: col_names,
1941 rows: projected,
1942 }));
1943 }
1944
1945 if !stmt.order_by.is_empty() {
1946 sort_rows(&mut rows, &stmt.order_by, columns)?;
1947 }
1948
1949 if let Some(ref offset_expr) = stmt.offset {
1950 let offset = eval_const_int(offset_expr)? as usize;
1951 if offset < rows.len() {
1952 rows = rows.split_off(offset);
1953 } else {
1954 rows.clear();
1955 }
1956 }
1957
1958 if let Some(ref limit_expr) = stmt.limit {
1959 let limit = eval_const_int(limit_expr)? as usize;
1960 rows.truncate(limit);
1961 }
1962
1963 let (col_names, projected) = project_rows(columns, &stmt.columns, &rows)?;
1964
1965 Ok(ExecutionResult::Query(QueryResult {
1966 columns: col_names,
1967 rows: projected,
1968 }))
1969}
1970
1971fn resolve_table_name<'a>(schema: &'a SchemaManager, name: &str) -> Result<&'a TableSchema> {
1972 let lower = name.to_ascii_lowercase();
1973 schema
1974 .get(&lower)
1975 .ok_or_else(|| SqlError::TableNotFound(name.to_string()))
1976}
1977
1978fn build_joined_columns(tables: &[(String, &TableSchema)]) -> Vec<ColumnDef> {
1979 let mut result = Vec::new();
1980 let mut pos: u16 = 0;
1981
1982 for (alias, schema) in tables {
1983 for col in &schema.columns {
1984 result.push(ColumnDef {
1985 name: format!("{}.{}", alias.to_ascii_lowercase(), col.name),
1986 data_type: col.data_type,
1987 nullable: col.nullable,
1988 position: pos,
1989 });
1990 pos += 1;
1991 }
1992 }
1993
1994 result
1995}
1996
1997fn table_alias_or_name(name: &str, alias: &Option<String>) -> String {
1998 alias
1999 .as_ref()
2000 .unwrap_or(&name.to_string())
2001 .to_ascii_lowercase()
2002}
2003
2004fn collect_all_rows_read(db: &Database, table_schema: &TableSchema) -> Result<Vec<Vec<Value>>> {
2005 collect_rows_read(db, table_schema, &None)
2006}
2007
2008fn collect_all_rows_write(
2009 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2010 table_schema: &TableSchema,
2011) -> Result<Vec<Vec<Value>>> {
2012 collect_rows_write(wtx, table_schema, &None)
2013}
2014
2015fn exec_select_join(
2016 db: &Database,
2017 schema: &SchemaManager,
2018 stmt: &SelectStmt,
2019) -> Result<ExecutionResult> {
2020 let from_schema = resolve_table_name(schema, &stmt.from)?;
2021 let from_alias = table_alias_or_name(&stmt.from, &stmt.from_alias);
2022 let mut outer_rows = collect_all_rows_read(db, from_schema)?;
2023
2024 let mut tables: Vec<(String, &TableSchema)> = vec![(from_alias.clone(), from_schema)];
2025
2026 for join in &stmt.joins {
2027 let inner_schema = resolve_table_name(schema, &join.table.name)?;
2028 let inner_alias = table_alias_or_name(&join.table.name, &join.table.alias);
2029 let inner_rows = collect_all_rows_read(db, inner_schema)?;
2030
2031 let mut preview_tables = tables.clone();
2032 preview_tables.push((inner_alias.clone(), inner_schema));
2033 let combined_cols = build_joined_columns(&preview_tables);
2034
2035 let mut new_rows = Vec::new();
2036
2037 match join.join_type {
2038 JoinType::Inner | JoinType::Cross => {
2039 for outer in &outer_rows {
2040 for inner in &inner_rows {
2041 let combined: Vec<Value> =
2042 outer.iter().chain(inner.iter()).cloned().collect();
2043 if let Some(ref on_expr) = join.on_clause {
2044 match eval_expr(on_expr, &combined_cols, &combined) {
2045 Ok(val) if is_truthy(&val) => new_rows.push(combined),
2046 _ => {}
2047 }
2048 } else {
2049 new_rows.push(combined);
2050 }
2051 }
2052 }
2053 }
2054 JoinType::Left => {
2055 let inner_col_count = inner_schema.columns.len();
2056 for outer in &outer_rows {
2057 let mut matched = false;
2058 for inner in &inner_rows {
2059 let combined: Vec<Value> =
2060 outer.iter().chain(inner.iter()).cloned().collect();
2061 if let Some(ref on_expr) = join.on_clause {
2062 match eval_expr(on_expr, &combined_cols, &combined) {
2063 Ok(val) if is_truthy(&val) => {
2064 new_rows.push(combined);
2065 matched = true;
2066 }
2067 _ => {}
2068 }
2069 } else {
2070 new_rows.push(combined);
2071 matched = true;
2072 }
2073 }
2074 if !matched {
2075 let mut padded = outer.clone();
2076 padded.extend(std::iter::repeat(Value::Null).take(inner_col_count));
2077 new_rows.push(padded);
2078 }
2079 }
2080 }
2081 JoinType::Right => {
2082 let outer_col_count = if outer_rows.is_empty() {
2083 tables.iter().map(|(_, s)| s.columns.len()).sum()
2084 } else {
2085 outer_rows[0].len()
2086 };
2087 let mut inner_matched = vec![false; inner_rows.len()];
2088 for outer in &outer_rows {
2089 for (j, inner) in inner_rows.iter().enumerate() {
2090 let combined: Vec<Value> =
2091 outer.iter().chain(inner.iter()).cloned().collect();
2092 if let Some(ref on_expr) = join.on_clause {
2093 match eval_expr(on_expr, &combined_cols, &combined) {
2094 Ok(val) if is_truthy(&val) => {
2095 new_rows.push(combined);
2096 inner_matched[j] = true;
2097 }
2098 _ => {}
2099 }
2100 } else {
2101 new_rows.push(combined);
2102 inner_matched[j] = true;
2103 }
2104 }
2105 }
2106 for (j, inner) in inner_rows.iter().enumerate() {
2107 if !inner_matched[j] {
2108 let mut padded: Vec<Value> = std::iter::repeat(Value::Null)
2109 .take(outer_col_count)
2110 .collect();
2111 padded.extend(inner.iter().cloned());
2112 new_rows.push(padded);
2113 }
2114 }
2115 }
2116 }
2117
2118 tables.push((inner_alias, inner_schema));
2119 outer_rows = new_rows;
2120 }
2121
2122 let joined_cols = build_joined_columns(&tables);
2123 process_select(&joined_cols, outer_rows, stmt)
2124}
2125
2126fn exec_select_join_in_txn(
2127 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2128 schema: &SchemaManager,
2129 stmt: &SelectStmt,
2130) -> Result<ExecutionResult> {
2131 let from_schema = resolve_table_name(schema, &stmt.from)?;
2132 let from_alias = table_alias_or_name(&stmt.from, &stmt.from_alias);
2133 let mut outer_rows = collect_all_rows_write(wtx, from_schema)?;
2134
2135 let mut tables: Vec<(String, &TableSchema)> = vec![(from_alias.clone(), from_schema)];
2136
2137 for join in &stmt.joins {
2138 let inner_schema = resolve_table_name(schema, &join.table.name)?;
2139 let inner_alias = table_alias_or_name(&join.table.name, &join.table.alias);
2140 let inner_rows = collect_all_rows_write(wtx, inner_schema)?;
2141
2142 let mut preview_tables = tables.clone();
2143 preview_tables.push((inner_alias.clone(), inner_schema));
2144 let combined_cols = build_joined_columns(&preview_tables);
2145
2146 let mut new_rows = Vec::new();
2147
2148 match join.join_type {
2149 JoinType::Inner | JoinType::Cross => {
2150 for outer in &outer_rows {
2151 for inner in &inner_rows {
2152 let combined: Vec<Value> =
2153 outer.iter().chain(inner.iter()).cloned().collect();
2154 if let Some(ref on_expr) = join.on_clause {
2155 match eval_expr(on_expr, &combined_cols, &combined) {
2156 Ok(val) if is_truthy(&val) => new_rows.push(combined),
2157 _ => {}
2158 }
2159 } else {
2160 new_rows.push(combined);
2161 }
2162 }
2163 }
2164 }
2165 JoinType::Left => {
2166 let inner_col_count = inner_schema.columns.len();
2167 for outer in &outer_rows {
2168 let mut matched = false;
2169 for inner in &inner_rows {
2170 let combined: Vec<Value> =
2171 outer.iter().chain(inner.iter()).cloned().collect();
2172 if let Some(ref on_expr) = join.on_clause {
2173 match eval_expr(on_expr, &combined_cols, &combined) {
2174 Ok(val) if is_truthy(&val) => {
2175 new_rows.push(combined);
2176 matched = true;
2177 }
2178 _ => {}
2179 }
2180 } else {
2181 new_rows.push(combined);
2182 matched = true;
2183 }
2184 }
2185 if !matched {
2186 let mut padded = outer.clone();
2187 padded.extend(std::iter::repeat(Value::Null).take(inner_col_count));
2188 new_rows.push(padded);
2189 }
2190 }
2191 }
2192 JoinType::Right => {
2193 let outer_col_count = if outer_rows.is_empty() {
2194 tables.iter().map(|(_, s)| s.columns.len()).sum()
2195 } else {
2196 outer_rows[0].len()
2197 };
2198 let mut inner_matched = vec![false; inner_rows.len()];
2199 for outer in &outer_rows {
2200 for (j, inner) in inner_rows.iter().enumerate() {
2201 let combined: Vec<Value> =
2202 outer.iter().chain(inner.iter()).cloned().collect();
2203 if let Some(ref on_expr) = join.on_clause {
2204 match eval_expr(on_expr, &combined_cols, &combined) {
2205 Ok(val) if is_truthy(&val) => {
2206 new_rows.push(combined);
2207 inner_matched[j] = true;
2208 }
2209 _ => {}
2210 }
2211 } else {
2212 new_rows.push(combined);
2213 inner_matched[j] = true;
2214 }
2215 }
2216 }
2217 for (j, inner) in inner_rows.iter().enumerate() {
2218 if !inner_matched[j] {
2219 let mut padded: Vec<Value> = std::iter::repeat(Value::Null)
2220 .take(outer_col_count)
2221 .collect();
2222 padded.extend(inner.iter().cloned());
2223 new_rows.push(padded);
2224 }
2225 }
2226 }
2227 }
2228
2229 tables.push((inner_alias, inner_schema));
2230 outer_rows = new_rows;
2231 }
2232
2233 let joined_cols = build_joined_columns(&tables);
2234 process_select(&joined_cols, outer_rows, stmt)
2235}
2236
2237fn exec_update(
2238 db: &Database,
2239 schema: &SchemaManager,
2240 stmt: &UpdateStmt,
2241) -> Result<ExecutionResult> {
2242 let materialized;
2243 let stmt = if update_has_subquery(stmt) {
2244 materialized = materialize_update(stmt, &mut |sub| exec_subquery_read(db, schema, sub))?;
2245 &materialized
2246 } else {
2247 stmt
2248 };
2249
2250 let lower_name = stmt.table.to_ascii_lowercase();
2251 let table_schema = schema
2252 .get(&lower_name)
2253 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
2254
2255 let all_candidates = collect_keyed_rows_read(db, table_schema, &stmt.where_clause)?;
2256 let matching_rows: Vec<(Vec<u8>, Vec<Value>)> = all_candidates
2257 .into_iter()
2258 .filter(|(_, row)| match &stmt.where_clause {
2259 Some(where_expr) => match eval_expr(where_expr, &table_schema.columns, row) {
2260 Ok(val) => is_truthy(&val),
2261 Err(_) => false,
2262 },
2263 None => true,
2264 })
2265 .collect();
2266
2267 if matching_rows.is_empty() {
2268 return Ok(ExecutionResult::RowsAffected(0));
2269 }
2270
2271 struct UpdateChange {
2272 old_key: Vec<u8>,
2273 new_key: Vec<u8>,
2274 new_value: Vec<u8>,
2275 pk_changed: bool,
2276 old_row: Vec<Value>,
2277 new_row: Vec<Value>,
2278 }
2279
2280 let pk_indices = table_schema.pk_indices();
2281 let mut changes: Vec<UpdateChange> = Vec::new();
2282
2283 for (old_key, row) in &matching_rows {
2284 let mut new_row = row.clone();
2285 let mut pk_changed = false;
2286 for (col_name, expr) in &stmt.assignments {
2287 let col_idx = table_schema
2288 .column_index(col_name)
2289 .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))?;
2290 let new_val = eval_expr(expr, &table_schema.columns, &new_row)?;
2291 let col = &table_schema.columns[col_idx];
2292
2293 let coerced = if new_val.is_null() {
2294 if !col.nullable {
2295 return Err(SqlError::NotNullViolation(col.name.clone()));
2296 }
2297 Value::Null
2298 } else {
2299 new_val
2300 .coerce_to(col.data_type)
2301 .ok_or_else(|| SqlError::TypeMismatch {
2302 expected: col.data_type.to_string(),
2303 got: new_val.data_type().to_string(),
2304 })?
2305 };
2306
2307 if table_schema.primary_key_columns.contains(&(col_idx as u16)) {
2308 pk_changed = true;
2309 }
2310 new_row[col_idx] = coerced;
2311 }
2312
2313 let pk_values: Vec<Value> = pk_indices.iter().map(|&i| new_row[i].clone()).collect();
2314 let new_key = encode_composite_key(&pk_values);
2315
2316 let non_pk = table_schema.non_pk_indices();
2317 let value_values: Vec<Value> = non_pk.iter().map(|&i| new_row[i].clone()).collect();
2318 let new_value = encode_row(&value_values);
2319
2320 changes.push(UpdateChange {
2321 old_key: old_key.clone(),
2322 new_key,
2323 new_value,
2324 pk_changed,
2325 old_row: row.clone(),
2326 new_row,
2327 });
2328 }
2329
2330 {
2331 use std::collections::HashSet;
2332 let mut new_keys: HashSet<Vec<u8>> = HashSet::new();
2333 for c in &changes {
2334 if c.pk_changed && c.new_key != c.old_key && !new_keys.insert(c.new_key.clone()) {
2335 return Err(SqlError::DuplicateKey);
2336 }
2337 }
2338 }
2339
2340 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
2341
2342 for c in &changes {
2343 let old_pk: Vec<Value> = pk_indices.iter().map(|&i| c.old_row[i].clone()).collect();
2344
2345 for idx in &table_schema.indices {
2346 if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
2347 let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
2348 let old_idx_key = encode_index_key(idx, &c.old_row, &old_pk);
2349 wtx.table_delete(&idx_table, &old_idx_key)
2350 .map_err(SqlError::Storage)?;
2351 }
2352 }
2353
2354 if c.pk_changed {
2355 wtx.table_delete(lower_name.as_bytes(), &c.old_key)
2356 .map_err(SqlError::Storage)?;
2357 }
2358 }
2359
2360 for c in &changes {
2361 let new_pk: Vec<Value> = pk_indices.iter().map(|&i| c.new_row[i].clone()).collect();
2362
2363 if c.pk_changed {
2364 let is_new = wtx
2365 .table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
2366 .map_err(SqlError::Storage)?;
2367 if !is_new {
2368 return Err(SqlError::DuplicateKey);
2369 }
2370 } else {
2371 wtx.table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
2372 .map_err(SqlError::Storage)?;
2373 }
2374
2375 for idx in &table_schema.indices {
2376 if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
2377 let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
2378 let new_idx_key = encode_index_key(idx, &c.new_row, &new_pk);
2379 let new_idx_val = encode_index_value(idx, &c.new_row, &new_pk);
2380 let is_new = wtx
2381 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
2382 .map_err(SqlError::Storage)?;
2383 if idx.unique && !is_new {
2384 let indexed_values: Vec<Value> = idx
2385 .columns
2386 .iter()
2387 .map(|&col_idx| c.new_row[col_idx as usize].clone())
2388 .collect();
2389 let any_null = indexed_values.iter().any(|v| v.is_null());
2390 if !any_null {
2391 return Err(SqlError::UniqueViolation(idx.name.clone()));
2392 }
2393 }
2394 }
2395 }
2396 }
2397
2398 let count = changes.len() as u64;
2399 wtx.commit().map_err(SqlError::Storage)?;
2400 Ok(ExecutionResult::RowsAffected(count))
2401}
2402
2403fn exec_delete(
2404 db: &Database,
2405 schema: &SchemaManager,
2406 stmt: &DeleteStmt,
2407) -> Result<ExecutionResult> {
2408 let materialized;
2409 let stmt = if delete_has_subquery(stmt) {
2410 materialized = materialize_delete(stmt, &mut |sub| exec_subquery_read(db, schema, sub))?;
2411 &materialized
2412 } else {
2413 stmt
2414 };
2415
2416 let lower_name = stmt.table.to_ascii_lowercase();
2417 let table_schema = schema
2418 .get(&lower_name)
2419 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
2420
2421 let all_candidates = collect_keyed_rows_read(db, table_schema, &stmt.where_clause)?;
2422 let rows_to_delete: Vec<(Vec<u8>, Vec<Value>)> = all_candidates
2423 .into_iter()
2424 .filter(|(_, row)| match &stmt.where_clause {
2425 Some(where_expr) => match eval_expr(where_expr, &table_schema.columns, row) {
2426 Ok(val) => is_truthy(&val),
2427 Err(_) => false,
2428 },
2429 None => true,
2430 })
2431 .collect();
2432
2433 if rows_to_delete.is_empty() {
2434 return Ok(ExecutionResult::RowsAffected(0));
2435 }
2436
2437 let pk_indices = table_schema.pk_indices();
2438 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
2439 for (key, row) in &rows_to_delete {
2440 let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
2441 delete_index_entries(&mut wtx, table_schema, row, &pk_values)?;
2442 wtx.table_delete(lower_name.as_bytes(), key)
2443 .map_err(SqlError::Storage)?;
2444 }
2445 let count = rows_to_delete.len() as u64;
2446 wtx.commit().map_err(SqlError::Storage)?;
2447 Ok(ExecutionResult::RowsAffected(count))
2448}
2449
2450fn exec_insert_in_txn(
2453 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2454 schema: &SchemaManager,
2455 stmt: &InsertStmt,
2456) -> Result<ExecutionResult> {
2457 let materialized;
2458 let stmt = if insert_has_subquery(stmt) {
2459 materialized = materialize_insert(stmt, &mut |sub| exec_subquery_write(wtx, schema, sub))?;
2460 &materialized
2461 } else {
2462 stmt
2463 };
2464
2465 let lower_name = stmt.table.to_ascii_lowercase();
2466 let table_schema = schema
2467 .get(&lower_name)
2468 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
2469
2470 let insert_columns = if stmt.columns.is_empty() {
2471 table_schema
2472 .columns
2473 .iter()
2474 .map(|c| c.name.clone())
2475 .collect::<Vec<_>>()
2476 } else {
2477 stmt.columns
2478 .iter()
2479 .map(|c| c.to_ascii_lowercase())
2480 .collect()
2481 };
2482
2483 let col_indices: Vec<usize> = insert_columns
2484 .iter()
2485 .map(|name| {
2486 table_schema
2487 .column_index(name)
2488 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
2489 })
2490 .collect::<Result<_>>()?;
2491
2492 let mut count: u64 = 0;
2493
2494 for value_row in &stmt.values {
2495 if value_row.len() != insert_columns.len() {
2496 return Err(SqlError::InvalidValue(format!(
2497 "expected {} values, got {}",
2498 insert_columns.len(),
2499 value_row.len()
2500 )));
2501 }
2502
2503 let mut row = vec![Value::Null; table_schema.columns.len()];
2504 for (i, expr) in value_row.iter().enumerate() {
2505 let val = eval_const_expr(expr)?;
2506 let col_idx = col_indices[i];
2507 let col = &table_schema.columns[col_idx];
2508
2509 let coerced = if val.is_null() {
2510 Value::Null
2511 } else {
2512 val.coerce_to(col.data_type)
2513 .ok_or_else(|| SqlError::TypeMismatch {
2514 expected: col.data_type.to_string(),
2515 got: val.data_type().to_string(),
2516 })?
2517 };
2518
2519 row[col_idx] = coerced;
2520 }
2521
2522 for col in &table_schema.columns {
2523 if !col.nullable && row[col.position as usize].is_null() {
2524 return Err(SqlError::NotNullViolation(col.name.clone()));
2525 }
2526 }
2527
2528 let pk_values: Vec<Value> = table_schema
2529 .pk_indices()
2530 .iter()
2531 .map(|&i| row[i].clone())
2532 .collect();
2533 let key = encode_composite_key(&pk_values);
2534
2535 let non_pk = table_schema.non_pk_indices();
2536 let value_values: Vec<Value> = non_pk.iter().map(|&i| row[i].clone()).collect();
2537 let value = encode_row(&value_values);
2538
2539 if key.len() > citadel_core::MAX_KEY_SIZE {
2540 return Err(SqlError::KeyTooLarge {
2541 size: key.len(),
2542 max: citadel_core::MAX_KEY_SIZE,
2543 });
2544 }
2545 if value.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
2546 return Err(SqlError::RowTooLarge {
2547 size: value.len(),
2548 max: citadel_core::MAX_INLINE_VALUE_SIZE,
2549 });
2550 }
2551
2552 let is_new = wtx
2553 .table_insert(lower_name.as_bytes(), &key, &value)
2554 .map_err(SqlError::Storage)?;
2555 if !is_new {
2556 return Err(SqlError::DuplicateKey);
2557 }
2558
2559 insert_index_entries(wtx, table_schema, &row, &pk_values)?;
2560 count += 1;
2561 }
2562
2563 Ok(ExecutionResult::RowsAffected(count))
2564}
2565
2566fn exec_select_in_txn(
2567 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2568 schema: &SchemaManager,
2569 stmt: &SelectStmt,
2570) -> Result<ExecutionResult> {
2571 let materialized;
2572 let stmt = if stmt_has_subquery(stmt) {
2573 materialized = materialize_stmt(stmt, &mut |sub| exec_subquery_write(wtx, schema, sub))?;
2574 &materialized
2575 } else {
2576 stmt
2577 };
2578
2579 if stmt.from.is_empty() {
2580 return exec_select_no_from(stmt);
2581 }
2582
2583 if !stmt.joins.is_empty() {
2584 return exec_select_join_in_txn(wtx, schema, stmt);
2585 }
2586
2587 let lower_name = stmt.from.to_ascii_lowercase();
2588 let table_schema = schema
2589 .get(&lower_name)
2590 .ok_or_else(|| SqlError::TableNotFound(stmt.from.clone()))?;
2591
2592 let rows = collect_rows_write(wtx, table_schema, &stmt.where_clause)?;
2593 process_select(&table_schema.columns, rows, stmt)
2594}
2595
2596fn exec_update_in_txn(
2597 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2598 schema: &SchemaManager,
2599 stmt: &UpdateStmt,
2600) -> Result<ExecutionResult> {
2601 let materialized;
2602 let stmt = if update_has_subquery(stmt) {
2603 materialized = materialize_update(stmt, &mut |sub| exec_subquery_write(wtx, schema, sub))?;
2604 &materialized
2605 } else {
2606 stmt
2607 };
2608
2609 let lower_name = stmt.table.to_ascii_lowercase();
2610 let table_schema = schema
2611 .get(&lower_name)
2612 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
2613
2614 let all_candidates = collect_keyed_rows_write(wtx, table_schema, &stmt.where_clause)?;
2615 let matching_rows: Vec<(Vec<u8>, Vec<Value>)> = all_candidates
2616 .into_iter()
2617 .filter(|(_, row)| match &stmt.where_clause {
2618 Some(where_expr) => match eval_expr(where_expr, &table_schema.columns, row) {
2619 Ok(val) => is_truthy(&val),
2620 Err(_) => false,
2621 },
2622 None => true,
2623 })
2624 .collect();
2625
2626 if matching_rows.is_empty() {
2627 return Ok(ExecutionResult::RowsAffected(0));
2628 }
2629
2630 struct UpdateChange {
2631 old_key: Vec<u8>,
2632 new_key: Vec<u8>,
2633 new_value: Vec<u8>,
2634 pk_changed: bool,
2635 old_row: Vec<Value>,
2636 new_row: Vec<Value>,
2637 }
2638
2639 let pk_indices = table_schema.pk_indices();
2640 let mut changes: Vec<UpdateChange> = Vec::new();
2641
2642 for (old_key, row) in &matching_rows {
2643 let mut new_row = row.clone();
2644 let mut pk_changed = false;
2645 for (col_name, expr) in &stmt.assignments {
2646 let col_idx = table_schema
2647 .column_index(col_name)
2648 .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))?;
2649 let new_val = eval_expr(expr, &table_schema.columns, &new_row)?;
2650 let col = &table_schema.columns[col_idx];
2651
2652 let coerced = if new_val.is_null() {
2653 if !col.nullable {
2654 return Err(SqlError::NotNullViolation(col.name.clone()));
2655 }
2656 Value::Null
2657 } else {
2658 new_val
2659 .coerce_to(col.data_type)
2660 .ok_or_else(|| SqlError::TypeMismatch {
2661 expected: col.data_type.to_string(),
2662 got: new_val.data_type().to_string(),
2663 })?
2664 };
2665
2666 if table_schema.primary_key_columns.contains(&(col_idx as u16)) {
2667 pk_changed = true;
2668 }
2669 new_row[col_idx] = coerced;
2670 }
2671
2672 let pk_values: Vec<Value> = pk_indices.iter().map(|&i| new_row[i].clone()).collect();
2673 let new_key = encode_composite_key(&pk_values);
2674
2675 let non_pk = table_schema.non_pk_indices();
2676 let value_values: Vec<Value> = non_pk.iter().map(|&i| new_row[i].clone()).collect();
2677 let new_value = encode_row(&value_values);
2678
2679 changes.push(UpdateChange {
2680 old_key: old_key.clone(),
2681 new_key,
2682 new_value,
2683 pk_changed,
2684 old_row: row.clone(),
2685 new_row,
2686 });
2687 }
2688
2689 {
2690 use std::collections::HashSet;
2691 let mut new_keys: HashSet<Vec<u8>> = HashSet::new();
2692 for c in &changes {
2693 if c.pk_changed && c.new_key != c.old_key && !new_keys.insert(c.new_key.clone()) {
2694 return Err(SqlError::DuplicateKey);
2695 }
2696 }
2697 }
2698
2699 for c in &changes {
2700 let old_pk: Vec<Value> = pk_indices.iter().map(|&i| c.old_row[i].clone()).collect();
2701
2702 for idx in &table_schema.indices {
2703 if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
2704 let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
2705 let old_idx_key = encode_index_key(idx, &c.old_row, &old_pk);
2706 wtx.table_delete(&idx_table, &old_idx_key)
2707 .map_err(SqlError::Storage)?;
2708 }
2709 }
2710
2711 if c.pk_changed {
2712 wtx.table_delete(lower_name.as_bytes(), &c.old_key)
2713 .map_err(SqlError::Storage)?;
2714 }
2715 }
2716
2717 for c in &changes {
2718 let new_pk: Vec<Value> = pk_indices.iter().map(|&i| c.new_row[i].clone()).collect();
2719
2720 if c.pk_changed {
2721 let is_new = wtx
2722 .table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
2723 .map_err(SqlError::Storage)?;
2724 if !is_new {
2725 return Err(SqlError::DuplicateKey);
2726 }
2727 } else {
2728 wtx.table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
2729 .map_err(SqlError::Storage)?;
2730 }
2731
2732 for idx in &table_schema.indices {
2733 if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
2734 let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
2735 let new_idx_key = encode_index_key(idx, &c.new_row, &new_pk);
2736 let new_idx_val = encode_index_value(idx, &c.new_row, &new_pk);
2737 let is_new = wtx
2738 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
2739 .map_err(SqlError::Storage)?;
2740 if idx.unique && !is_new {
2741 let indexed_values: Vec<Value> = idx
2742 .columns
2743 .iter()
2744 .map(|&col_idx| c.new_row[col_idx as usize].clone())
2745 .collect();
2746 let any_null = indexed_values.iter().any(|v| v.is_null());
2747 if !any_null {
2748 return Err(SqlError::UniqueViolation(idx.name.clone()));
2749 }
2750 }
2751 }
2752 }
2753 }
2754
2755 let count = changes.len() as u64;
2756 Ok(ExecutionResult::RowsAffected(count))
2757}
2758
2759fn exec_delete_in_txn(
2760 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2761 schema: &SchemaManager,
2762 stmt: &DeleteStmt,
2763) -> Result<ExecutionResult> {
2764 let materialized;
2765 let stmt = if delete_has_subquery(stmt) {
2766 materialized = materialize_delete(stmt, &mut |sub| exec_subquery_write(wtx, schema, sub))?;
2767 &materialized
2768 } else {
2769 stmt
2770 };
2771
2772 let lower_name = stmt.table.to_ascii_lowercase();
2773 let table_schema = schema
2774 .get(&lower_name)
2775 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
2776
2777 let all_candidates = collect_keyed_rows_write(wtx, table_schema, &stmt.where_clause)?;
2778 let rows_to_delete: Vec<(Vec<u8>, Vec<Value>)> = all_candidates
2779 .into_iter()
2780 .filter(|(_, row)| match &stmt.where_clause {
2781 Some(where_expr) => match eval_expr(where_expr, &table_schema.columns, row) {
2782 Ok(val) => is_truthy(&val),
2783 Err(_) => false,
2784 },
2785 None => true,
2786 })
2787 .collect();
2788
2789 if rows_to_delete.is_empty() {
2790 return Ok(ExecutionResult::RowsAffected(0));
2791 }
2792
2793 let pk_indices = table_schema.pk_indices();
2794 for (key, row) in &rows_to_delete {
2795 let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
2796 delete_index_entries(wtx, table_schema, row, &pk_values)?;
2797 wtx.table_delete(lower_name.as_bytes(), key)
2798 .map_err(SqlError::Storage)?;
2799 }
2800 let count = rows_to_delete.len() as u64;
2801 Ok(ExecutionResult::RowsAffected(count))
2802}
2803
2804fn exec_aggregate(
2807 columns: &[ColumnDef],
2808 rows: &[Vec<Value>],
2809 stmt: &SelectStmt,
2810) -> Result<ExecutionResult> {
2811 let groups: BTreeMap<Vec<Value>, Vec<&Vec<Value>>> = if stmt.group_by.is_empty() {
2812 let mut m = BTreeMap::new();
2813 m.insert(vec![], rows.iter().collect());
2814 m
2815 } else {
2816 let mut m: BTreeMap<Vec<Value>, Vec<&Vec<Value>>> = BTreeMap::new();
2817 for row in rows {
2818 let group_key: Vec<Value> = stmt
2819 .group_by
2820 .iter()
2821 .map(|expr| eval_expr(expr, columns, row))
2822 .collect::<Result<_>>()?;
2823 m.entry(group_key).or_default().push(row);
2824 }
2825 m
2826 };
2827
2828 let mut result_rows = Vec::new();
2829 let output_cols = build_output_columns(&stmt.columns, columns);
2830
2831 for group_rows in groups.values() {
2832 let mut result_row = Vec::new();
2833
2834 for sel_col in &stmt.columns {
2835 match sel_col {
2836 SelectColumn::AllColumns => {
2837 return Err(SqlError::Unsupported("SELECT * with GROUP BY".into()));
2838 }
2839 SelectColumn::Expr { expr, .. } => {
2840 let val = eval_aggregate_expr(expr, columns, group_rows)?;
2841 result_row.push(val);
2842 }
2843 }
2844 }
2845
2846 if let Some(ref having) = stmt.having {
2847 let passes = match eval_aggregate_expr(having, columns, group_rows) {
2848 Ok(val) => is_truthy(&val),
2849 Err(SqlError::ColumnNotFound(_)) => {
2850 match eval_expr(having, &output_cols, &result_row) {
2851 Ok(val) => is_truthy(&val),
2852 Err(_) => false,
2853 }
2854 }
2855 Err(e) => return Err(e),
2856 };
2857 if !passes {
2858 continue;
2859 }
2860 }
2861
2862 result_rows.push(result_row);
2863 }
2864
2865 if stmt.distinct {
2866 let mut seen = std::collections::HashSet::new();
2867 result_rows.retain(|row| seen.insert(row.clone()));
2868 }
2869
2870 if !stmt.order_by.is_empty() {
2871 let output_cols = build_output_columns(&stmt.columns, columns);
2872 sort_rows(&mut result_rows, &stmt.order_by, &output_cols)?;
2873 }
2874
2875 if let Some(ref offset_expr) = stmt.offset {
2876 let offset = eval_const_int(offset_expr)? as usize;
2877 if offset < result_rows.len() {
2878 result_rows = result_rows.split_off(offset);
2879 } else {
2880 result_rows.clear();
2881 }
2882 }
2883 if let Some(ref limit_expr) = stmt.limit {
2884 let limit = eval_const_int(limit_expr)? as usize;
2885 result_rows.truncate(limit);
2886 }
2887
2888 let col_names = stmt
2889 .columns
2890 .iter()
2891 .map(|c| match c {
2892 SelectColumn::AllColumns => "*".into(),
2893 SelectColumn::Expr { alias: Some(a), .. } => a.clone(),
2894 SelectColumn::Expr { expr, .. } => expr_display_name(expr),
2895 })
2896 .collect();
2897
2898 Ok(ExecutionResult::Query(QueryResult {
2899 columns: col_names,
2900 rows: result_rows,
2901 }))
2902}
2903
2904fn eval_aggregate_expr(
2905 expr: &Expr,
2906 columns: &[ColumnDef],
2907 group_rows: &[&Vec<Value>],
2908) -> Result<Value> {
2909 match expr {
2910 Expr::CountStar => Ok(Value::Integer(group_rows.len() as i64)),
2911
2912 Expr::Function { name, args } if is_aggregate_function(name, args.len()) => {
2913 let func = name.to_ascii_uppercase();
2914 if args.len() != 1 {
2915 return Err(SqlError::Unsupported(format!(
2916 "{func} with {} args",
2917 args.len()
2918 )));
2919 }
2920 let arg = &args[0];
2921 let values: Vec<Value> = group_rows
2922 .iter()
2923 .map(|row| eval_expr(arg, columns, row))
2924 .collect::<Result<_>>()?;
2925
2926 match func.as_str() {
2927 "COUNT" => {
2928 let count = values.iter().filter(|v| !v.is_null()).count();
2929 Ok(Value::Integer(count as i64))
2930 }
2931 "SUM" => {
2932 let mut int_sum: i64 = 0;
2933 let mut real_sum: f64 = 0.0;
2934 let mut has_real = false;
2935 let mut all_null = true;
2936 for v in &values {
2937 match v {
2938 Value::Integer(i) => {
2939 int_sum += i;
2940 all_null = false;
2941 }
2942 Value::Real(r) => {
2943 real_sum += r;
2944 has_real = true;
2945 all_null = false;
2946 }
2947 Value::Null => {}
2948 _ => {
2949 return Err(SqlError::TypeMismatch {
2950 expected: "numeric".into(),
2951 got: v.data_type().to_string(),
2952 })
2953 }
2954 }
2955 }
2956 if all_null {
2957 return Ok(Value::Null);
2958 }
2959 if has_real {
2960 Ok(Value::Real(real_sum + int_sum as f64))
2961 } else {
2962 Ok(Value::Integer(int_sum))
2963 }
2964 }
2965 "AVG" => {
2966 let mut sum: f64 = 0.0;
2967 let mut count: i64 = 0;
2968 for v in &values {
2969 match v {
2970 Value::Integer(i) => {
2971 sum += *i as f64;
2972 count += 1;
2973 }
2974 Value::Real(r) => {
2975 sum += r;
2976 count += 1;
2977 }
2978 Value::Null => {}
2979 _ => {
2980 return Err(SqlError::TypeMismatch {
2981 expected: "numeric".into(),
2982 got: v.data_type().to_string(),
2983 })
2984 }
2985 }
2986 }
2987 if count == 0 {
2988 Ok(Value::Null)
2989 } else {
2990 Ok(Value::Real(sum / count as f64))
2991 }
2992 }
2993 "MIN" => {
2994 let mut min: Option<&Value> = None;
2995 for v in &values {
2996 if v.is_null() {
2997 continue;
2998 }
2999 min = Some(match min {
3000 None => v,
3001 Some(m) => {
3002 if v < m {
3003 v
3004 } else {
3005 m
3006 }
3007 }
3008 });
3009 }
3010 Ok(min.cloned().unwrap_or(Value::Null))
3011 }
3012 "MAX" => {
3013 let mut max: Option<&Value> = None;
3014 for v in &values {
3015 if v.is_null() {
3016 continue;
3017 }
3018 max = Some(match max {
3019 None => v,
3020 Some(m) => {
3021 if v > m {
3022 v
3023 } else {
3024 m
3025 }
3026 }
3027 });
3028 }
3029 Ok(max.cloned().unwrap_or(Value::Null))
3030 }
3031 _ => Err(SqlError::Unsupported(format!("aggregate function: {func}"))),
3032 }
3033 }
3034
3035 Expr::Column(_) | Expr::QualifiedColumn { .. } => {
3036 if let Some(first) = group_rows.first() {
3037 eval_expr(expr, columns, first)
3038 } else {
3039 Ok(Value::Null)
3040 }
3041 }
3042
3043 Expr::Literal(v) => Ok(v.clone()),
3044
3045 Expr::BinaryOp { left, op, right } => {
3046 let l = eval_aggregate_expr(left, columns, group_rows)?;
3047 let r = eval_aggregate_expr(right, columns, group_rows)?;
3048 crate::eval::eval_expr(
3049 &Expr::BinaryOp {
3050 left: Box::new(Expr::Literal(l)),
3051 op: *op,
3052 right: Box::new(Expr::Literal(r)),
3053 },
3054 columns,
3055 &[],
3056 )
3057 }
3058
3059 Expr::UnaryOp { op, expr: e } => {
3060 let v = eval_aggregate_expr(e, columns, group_rows)?;
3061 crate::eval::eval_expr(
3062 &Expr::UnaryOp {
3063 op: *op,
3064 expr: Box::new(Expr::Literal(v)),
3065 },
3066 columns,
3067 &[],
3068 )
3069 }
3070
3071 Expr::IsNull(e) => {
3072 let v = eval_aggregate_expr(e, columns, group_rows)?;
3073 Ok(Value::Boolean(v.is_null()))
3074 }
3075
3076 Expr::IsNotNull(e) => {
3077 let v = eval_aggregate_expr(e, columns, group_rows)?;
3078 Ok(Value::Boolean(!v.is_null()))
3079 }
3080
3081 Expr::Cast { expr: e, data_type } => {
3082 let v = eval_aggregate_expr(e, columns, group_rows)?;
3083 crate::eval::eval_expr(
3084 &Expr::Cast {
3085 expr: Box::new(Expr::Literal(v)),
3086 data_type: *data_type,
3087 },
3088 columns,
3089 &[],
3090 )
3091 }
3092
3093 Expr::Case {
3094 operand,
3095 conditions,
3096 else_result,
3097 } => {
3098 let op_val = operand
3099 .as_ref()
3100 .map(|e| eval_aggregate_expr(e, columns, group_rows))
3101 .transpose()?;
3102 if let Some(ov) = &op_val {
3103 for (cond, result) in conditions {
3104 let cv = eval_aggregate_expr(cond, columns, group_rows)?;
3105 if !ov.is_null() && !cv.is_null() && *ov == cv {
3106 return eval_aggregate_expr(result, columns, group_rows);
3107 }
3108 }
3109 } else {
3110 for (cond, result) in conditions {
3111 let cv = eval_aggregate_expr(cond, columns, group_rows)?;
3112 if crate::eval::is_truthy(&cv) {
3113 return eval_aggregate_expr(result, columns, group_rows);
3114 }
3115 }
3116 }
3117 match else_result {
3118 Some(e) => eval_aggregate_expr(e, columns, group_rows),
3119 None => Ok(Value::Null),
3120 }
3121 }
3122
3123 Expr::Coalesce(args) => {
3124 for arg in args {
3125 let v = eval_aggregate_expr(arg, columns, group_rows)?;
3126 if !v.is_null() {
3127 return Ok(v);
3128 }
3129 }
3130 Ok(Value::Null)
3131 }
3132
3133 Expr::Between {
3134 expr: e,
3135 low,
3136 high,
3137 negated,
3138 } => {
3139 let v = eval_aggregate_expr(e, columns, group_rows)?;
3140 let lo = eval_aggregate_expr(low, columns, group_rows)?;
3141 let hi = eval_aggregate_expr(high, columns, group_rows)?;
3142 crate::eval::eval_expr(
3143 &Expr::Between {
3144 expr: Box::new(Expr::Literal(v)),
3145 low: Box::new(Expr::Literal(lo)),
3146 high: Box::new(Expr::Literal(hi)),
3147 negated: *negated,
3148 },
3149 columns,
3150 &[],
3151 )
3152 }
3153
3154 Expr::Like {
3155 expr: e,
3156 pattern,
3157 escape,
3158 negated,
3159 } => {
3160 let v = eval_aggregate_expr(e, columns, group_rows)?;
3161 let p = eval_aggregate_expr(pattern, columns, group_rows)?;
3162 let esc = escape
3163 .as_ref()
3164 .map(|es| eval_aggregate_expr(es, columns, group_rows))
3165 .transpose()?;
3166 let esc_box = esc.map(|v| Box::new(Expr::Literal(v)));
3167 crate::eval::eval_expr(
3168 &Expr::Like {
3169 expr: Box::new(Expr::Literal(v)),
3170 pattern: Box::new(Expr::Literal(p)),
3171 escape: esc_box,
3172 negated: *negated,
3173 },
3174 columns,
3175 &[],
3176 )
3177 }
3178
3179 Expr::Function { name, args } => {
3180 let evaluated: Vec<Value> = args
3181 .iter()
3182 .map(|a| eval_aggregate_expr(a, columns, group_rows))
3183 .collect::<Result<_>>()?;
3184 let literal_args: Vec<Expr> = evaluated.into_iter().map(Expr::Literal).collect();
3185 crate::eval::eval_expr(
3186 &Expr::Function {
3187 name: name.clone(),
3188 args: literal_args,
3189 },
3190 columns,
3191 &[],
3192 )
3193 }
3194
3195 _ => Err(SqlError::Unsupported(format!(
3196 "expression in aggregate: {expr:?}"
3197 ))),
3198 }
3199}
3200
3201fn is_aggregate_function(name: &str, arg_count: usize) -> bool {
3202 let u = name.to_ascii_uppercase();
3203 matches!(u.as_str(), "COUNT" | "SUM" | "AVG")
3204 || (matches!(u.as_str(), "MIN" | "MAX") && arg_count == 1)
3205}
3206
3207fn is_aggregate_expr(expr: &Expr) -> bool {
3208 match expr {
3209 Expr::CountStar => true,
3210 Expr::Function { name, args } => {
3211 is_aggregate_function(name, args.len()) || args.iter().any(is_aggregate_expr)
3212 }
3213 Expr::BinaryOp { left, right, .. } => is_aggregate_expr(left) || is_aggregate_expr(right),
3214 Expr::UnaryOp { expr, .. }
3215 | Expr::IsNull(expr)
3216 | Expr::IsNotNull(expr)
3217 | Expr::Cast { expr, .. } => is_aggregate_expr(expr),
3218 Expr::Case {
3219 operand,
3220 conditions,
3221 else_result,
3222 } => {
3223 operand.as_ref().is_some_and(|e| is_aggregate_expr(e))
3224 || conditions
3225 .iter()
3226 .any(|(c, r)| is_aggregate_expr(c) || is_aggregate_expr(r))
3227 || else_result.as_ref().is_some_and(|e| is_aggregate_expr(e))
3228 }
3229 Expr::Coalesce(args) => args.iter().any(is_aggregate_expr),
3230 Expr::Between {
3231 expr, low, high, ..
3232 } => is_aggregate_expr(expr) || is_aggregate_expr(low) || is_aggregate_expr(high),
3233 Expr::Like {
3234 expr,
3235 pattern,
3236 escape,
3237 ..
3238 } => {
3239 is_aggregate_expr(expr)
3240 || is_aggregate_expr(pattern)
3241 || escape.as_ref().is_some_and(|e| is_aggregate_expr(e))
3242 }
3243 _ => false,
3244 }
3245}
3246
3247fn decode_full_row(schema: &TableSchema, key: &[u8], value: &[u8]) -> Result<Vec<Value>> {
3251 let pk_values = decode_composite_key(key, schema.primary_key_columns.len())?;
3252 let non_pk_values = decode_row(value)?;
3253
3254 let mut row = vec![Value::Null; schema.columns.len()];
3255
3256 for (i, &col_idx) in schema.primary_key_columns.iter().enumerate() {
3257 row[col_idx as usize] = pk_values[i].clone();
3258 }
3259
3260 let non_pk = schema.non_pk_indices();
3261 for (i, &col_idx) in non_pk.iter().enumerate() {
3262 if i < non_pk_values.len() {
3263 row[col_idx] = non_pk_values[i].clone();
3264 }
3265 }
3266
3267 Ok(row)
3268}
3269
3270fn eval_const_expr(expr: &Expr) -> Result<Value> {
3272 eval_expr(expr, &[], &[])
3273}
3274
3275fn eval_const_int(expr: &Expr) -> Result<i64> {
3276 match eval_const_expr(expr)? {
3277 Value::Integer(i) => Ok(i),
3278 other => Err(SqlError::TypeMismatch {
3279 expected: "INTEGER".into(),
3280 got: other.data_type().to_string(),
3281 }),
3282 }
3283}
3284
3285fn sort_rows(
3286 rows: &mut [Vec<Value>],
3287 order_by: &[OrderByItem],
3288 columns: &[ColumnDef],
3289) -> Result<()> {
3290 rows.sort_by(|a, b| {
3291 for item in order_by {
3292 let a_val = eval_expr(&item.expr, columns, a).unwrap_or(Value::Null);
3293 let b_val = eval_expr(&item.expr, columns, b).unwrap_or(Value::Null);
3294
3295 let nulls_first = item.nulls_first.unwrap_or(!item.descending);
3296
3297 let ord = match (a_val.is_null(), b_val.is_null()) {
3298 (true, true) => std::cmp::Ordering::Equal,
3299 (true, false) => {
3300 if nulls_first {
3301 std::cmp::Ordering::Less
3302 } else {
3303 std::cmp::Ordering::Greater
3304 }
3305 }
3306 (false, true) => {
3307 if nulls_first {
3308 std::cmp::Ordering::Greater
3309 } else {
3310 std::cmp::Ordering::Less
3311 }
3312 }
3313 (false, false) => {
3314 let cmp = a_val.cmp(&b_val);
3315 if item.descending {
3316 cmp.reverse()
3317 } else {
3318 cmp
3319 }
3320 }
3321 };
3322
3323 if ord != std::cmp::Ordering::Equal {
3324 return ord;
3325 }
3326 }
3327 std::cmp::Ordering::Equal
3328 });
3329 Ok(())
3330}
3331
3332fn project_rows(
3333 columns: &[ColumnDef],
3334 select_cols: &[SelectColumn],
3335 rows: &[Vec<Value>],
3336) -> Result<(Vec<String>, Vec<Vec<Value>>)> {
3337 let mut col_names = Vec::new();
3338 type Projector = Box<dyn Fn(&[Value]) -> Result<Value>>;
3339 let mut projectors: Vec<Projector> = Vec::new();
3340
3341 for sel_col in select_cols {
3342 match sel_col {
3343 SelectColumn::AllColumns => {
3344 for col in columns {
3345 let idx = col.position as usize;
3346 col_names.push(col.name.clone());
3347 projectors.push(Box::new(move |row: &[Value]| Ok(row[idx].clone())));
3348 }
3349 }
3350 SelectColumn::Expr { expr, alias } => {
3351 let name = alias.clone().unwrap_or_else(|| expr_display_name(expr));
3352 col_names.push(name);
3353 let expr = expr.clone();
3354 let owned_cols = columns.to_vec();
3355 projectors.push(Box::new(move |row: &[Value]| {
3356 eval_expr(&expr, &owned_cols, row)
3357 }));
3358 }
3359 }
3360 }
3361
3362 let projected = rows
3363 .iter()
3364 .map(|row| {
3365 projectors
3366 .iter()
3367 .map(|p| p(row))
3368 .collect::<Result<Vec<_>>>()
3369 })
3370 .collect::<Result<Vec<_>>>()?;
3371
3372 Ok((col_names, projected))
3373}
3374
3375fn expr_display_name(expr: &Expr) -> String {
3376 match expr {
3377 Expr::Column(name) => name.clone(),
3378 Expr::QualifiedColumn { table, column } => format!("{table}.{column}"),
3379 Expr::Literal(v) => format!("{v}"),
3380 Expr::CountStar => "COUNT(*)".into(),
3381 Expr::Function { name, args } => {
3382 let arg_strs: Vec<String> = args.iter().map(expr_display_name).collect();
3383 format!("{name}({})", arg_strs.join(", "))
3384 }
3385 Expr::BinaryOp { left, op, right } => {
3386 format!(
3387 "{} {} {}",
3388 expr_display_name(left),
3389 op_symbol(op),
3390 expr_display_name(right)
3391 )
3392 }
3393 _ => "?".into(),
3394 }
3395}
3396
3397fn op_symbol(op: &BinOp) -> &'static str {
3398 match op {
3399 BinOp::Add => "+",
3400 BinOp::Sub => "-",
3401 BinOp::Mul => "*",
3402 BinOp::Div => "/",
3403 BinOp::Mod => "%",
3404 BinOp::Eq => "=",
3405 BinOp::NotEq => "<>",
3406 BinOp::Lt => "<",
3407 BinOp::Gt => ">",
3408 BinOp::LtEq => "<=",
3409 BinOp::GtEq => ">=",
3410 BinOp::And => "AND",
3411 BinOp::Or => "OR",
3412 BinOp::Concat => "||",
3413 }
3414}
3415
3416fn build_output_columns(select_cols: &[SelectColumn], columns: &[ColumnDef]) -> Vec<ColumnDef> {
3417 let mut out = Vec::new();
3418 for (i, col) in select_cols.iter().enumerate() {
3419 let (name, data_type) = match col {
3420 SelectColumn::AllColumns => (format!("col{i}"), DataType::Null),
3421 SelectColumn::Expr {
3422 alias: Some(a),
3423 expr,
3424 } => (a.clone(), infer_expr_type(expr, columns)),
3425 SelectColumn::Expr { expr, .. } => {
3426 (expr_display_name(expr), infer_expr_type(expr, columns))
3427 }
3428 };
3429 out.push(ColumnDef {
3430 name,
3431 data_type,
3432 nullable: true,
3433 position: i as u16,
3434 });
3435 }
3436 out
3437}
3438
3439fn infer_expr_type(expr: &Expr, columns: &[ColumnDef]) -> DataType {
3440 match expr {
3441 Expr::Column(name) => {
3442 let lower = name.to_ascii_lowercase();
3443 columns
3444 .iter()
3445 .find(|c| c.name.to_ascii_lowercase() == lower)
3446 .map(|c| c.data_type)
3447 .unwrap_or(DataType::Null)
3448 }
3449 Expr::QualifiedColumn { table, column } => {
3450 let qualified = format!(
3451 "{}.{}",
3452 table.to_ascii_lowercase(),
3453 column.to_ascii_lowercase()
3454 );
3455 columns
3456 .iter()
3457 .find(|c| c.name.to_ascii_lowercase() == qualified)
3458 .map(|c| c.data_type)
3459 .unwrap_or(DataType::Null)
3460 }
3461 Expr::Literal(v) => v.data_type(),
3462 Expr::CountStar => DataType::Integer,
3463 Expr::Function { name, .. } => match name.to_ascii_uppercase().as_str() {
3464 "COUNT" => DataType::Integer,
3465 "AVG" => DataType::Real,
3466 "SUM" | "MIN" | "MAX" => DataType::Null,
3467 _ => DataType::Null,
3468 },
3469 _ => DataType::Null,
3470 }
3471}