Skip to main content

citadel_sql/
executor.rs

1//! SQL executor: DDL and DML operations.
2
3use std::collections::BTreeMap;
4
5use citadel::Database;
6
7use crate::encoding::{
8    decode_composite_key, decode_key_value, decode_row, encode_composite_key, encode_row,
9};
10use crate::error::{Result, SqlError};
11use crate::eval::{eval_expr, is_truthy};
12use crate::parser::*;
13use crate::planner::{self, ScanPlan};
14use crate::schema::SchemaManager;
15use crate::types::*;
16
17// ── Index helpers ────────────────────────────────────────────────────
18
19fn encode_index_key(idx: &IndexDef, row: &[Value], pk_values: &[Value]) -> Vec<u8> {
20    let indexed_values: Vec<Value> = idx
21        .columns
22        .iter()
23        .map(|&col_idx| row[col_idx as usize].clone())
24        .collect();
25
26    if idx.unique {
27        let any_null = indexed_values.iter().any(|v| v.is_null());
28        if !any_null {
29            return encode_composite_key(&indexed_values);
30        }
31    }
32
33    let mut all_values = indexed_values;
34    all_values.extend_from_slice(pk_values);
35    encode_composite_key(&all_values)
36}
37
38fn encode_index_value(idx: &IndexDef, row: &[Value], pk_values: &[Value]) -> Vec<u8> {
39    if idx.unique {
40        let indexed_values: Vec<Value> = idx
41            .columns
42            .iter()
43            .map(|&col_idx| row[col_idx as usize].clone())
44            .collect();
45        let any_null = indexed_values.iter().any(|v| v.is_null());
46        if !any_null {
47            return encode_composite_key(pk_values);
48        }
49    }
50    vec![]
51}
52
53fn insert_index_entries(
54    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
55    table_schema: &TableSchema,
56    row: &[Value],
57    pk_values: &[Value],
58) -> Result<()> {
59    for idx in &table_schema.indices {
60        let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
61        let key = encode_index_key(idx, row, pk_values);
62        let value = encode_index_value(idx, row, pk_values);
63
64        let is_new = wtx
65            .table_insert(&idx_table, &key, &value)
66            .map_err(SqlError::Storage)?;
67
68        if idx.unique && !is_new {
69            let indexed_values: Vec<Value> = idx
70                .columns
71                .iter()
72                .map(|&col_idx| row[col_idx as usize].clone())
73                .collect();
74            let any_null = indexed_values.iter().any(|v| v.is_null());
75            if !any_null {
76                return Err(SqlError::UniqueViolation(idx.name.clone()));
77            }
78        }
79    }
80    Ok(())
81}
82
83fn delete_index_entries(
84    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
85    table_schema: &TableSchema,
86    row: &[Value],
87    pk_values: &[Value],
88) -> Result<()> {
89    for idx in &table_schema.indices {
90        let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
91        let key = encode_index_key(idx, row, pk_values);
92        wtx.table_delete(&idx_table, &key)
93            .map_err(SqlError::Storage)?;
94    }
95    Ok(())
96}
97
98fn index_columns_changed(idx: &IndexDef, old_row: &[Value], new_row: &[Value]) -> bool {
99    idx.columns
100        .iter()
101        .any(|&col_idx| old_row[col_idx as usize] != new_row[col_idx as usize])
102}
103
104/// Execute a parsed SQL statement in auto-commit mode.
105pub fn execute(
106    db: &Database,
107    schema: &mut SchemaManager,
108    stmt: &Statement,
109) -> Result<ExecutionResult> {
110    match stmt {
111        Statement::CreateTable(ct) => exec_create_table(db, schema, ct),
112        Statement::DropTable(dt) => exec_drop_table(db, schema, dt),
113        Statement::CreateIndex(ci) => exec_create_index(db, schema, ci),
114        Statement::DropIndex(di) => exec_drop_index(db, schema, di),
115        Statement::Insert(ins) => exec_insert(db, schema, ins),
116        Statement::Select(sel) => exec_select(db, schema, sel),
117        Statement::Update(upd) => exec_update(db, schema, upd),
118        Statement::Delete(del) => exec_delete(db, schema, del),
119        Statement::Explain(inner) => explain(schema, inner),
120        Statement::Begin | Statement::Commit | Statement::Rollback => Err(SqlError::Unsupported(
121            "transaction control in auto-commit mode".into(),
122        )),
123    }
124}
125
126/// Execute a parsed SQL statement within an existing write transaction.
127pub fn execute_in_txn(
128    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
129    schema: &mut SchemaManager,
130    stmt: &Statement,
131) -> Result<ExecutionResult> {
132    match stmt {
133        Statement::CreateTable(ct) => exec_create_table_in_txn(wtx, schema, ct),
134        Statement::DropTable(dt) => exec_drop_table_in_txn(wtx, schema, dt),
135        Statement::CreateIndex(ci) => exec_create_index_in_txn(wtx, schema, ci),
136        Statement::DropIndex(di) => exec_drop_index_in_txn(wtx, schema, di),
137        Statement::Insert(ins) => exec_insert_in_txn(wtx, schema, ins),
138        Statement::Select(sel) => exec_select_in_txn(wtx, schema, sel),
139        Statement::Update(upd) => exec_update_in_txn(wtx, schema, upd),
140        Statement::Delete(del) => exec_delete_in_txn(wtx, schema, del),
141        Statement::Explain(inner) => explain(schema, inner),
142        Statement::Begin | Statement::Commit | Statement::Rollback => {
143            Err(SqlError::Unsupported("nested transaction control".into()))
144        }
145    }
146}
147
148// ── EXPLAIN ──────────────────────────────────────────────────────────
149
150pub fn explain(schema: &SchemaManager, stmt: &Statement) -> Result<ExecutionResult> {
151    let lines = match stmt {
152        Statement::Select(sel) => explain_select(schema, sel)?,
153        Statement::Insert(ins) => {
154            vec![format!("INSERT INTO {}", ins.table.to_ascii_lowercase())]
155        }
156        Statement::Update(upd) => explain_dml(schema, &upd.table, &upd.where_clause, "UPDATE")?,
157        Statement::Delete(del) => {
158            explain_dml(schema, &del.table, &del.where_clause, "DELETE FROM")?
159        }
160        Statement::Explain(_) => {
161            return Err(SqlError::Unsupported("EXPLAIN EXPLAIN".into()));
162        }
163        _ => {
164            return Err(SqlError::Unsupported(
165                "EXPLAIN for this statement type".into(),
166            ));
167        }
168    };
169
170    let rows = lines
171        .into_iter()
172        .map(|line| vec![Value::Text(line)])
173        .collect();
174    Ok(ExecutionResult::Query(QueryResult {
175        columns: vec!["plan".into()],
176        rows,
177    }))
178}
179
180fn explain_dml(
181    schema: &SchemaManager,
182    table: &str,
183    where_clause: &Option<Expr>,
184    verb: &str,
185) -> Result<Vec<String>> {
186    let lower = table.to_ascii_lowercase();
187    let table_schema = schema
188        .get(&lower)
189        .ok_or_else(|| SqlError::TableNotFound(table.to_string()))?;
190    let plan = planner::plan_select(table_schema, where_clause);
191    let scan_line = format_scan_line(&lower, &None, &plan, table_schema);
192    Ok(vec![format!("{verb} {}", scan_line)])
193}
194
195fn explain_select(schema: &SchemaManager, stmt: &SelectStmt) -> Result<Vec<String>> {
196    let mut lines = Vec::new();
197
198    if stmt.from.is_empty() {
199        lines.push("CONSTANT ROW".into());
200        return Ok(lines);
201    }
202
203    let lower_from = stmt.from.to_ascii_lowercase();
204    let from_schema = schema
205        .get(&lower_from)
206        .ok_or_else(|| SqlError::TableNotFound(stmt.from.clone()))?;
207
208    if stmt.joins.is_empty() {
209        let plan = planner::plan_select(from_schema, &stmt.where_clause);
210        lines.push(format_scan_line(
211            &lower_from,
212            &stmt.from_alias,
213            &plan,
214            from_schema,
215        ));
216    } else {
217        let from_plan = planner::plan_select(from_schema, &None);
218        lines.push(format_scan_line(
219            &lower_from,
220            &stmt.from_alias,
221            &from_plan,
222            from_schema,
223        ));
224
225        for join in &stmt.joins {
226            let inner_lower = join.table.name.to_ascii_lowercase();
227            let inner_schema = schema
228                .get(&inner_lower)
229                .ok_or_else(|| SqlError::TableNotFound(join.table.name.clone()))?;
230            let inner_plan = planner::plan_select(inner_schema, &None);
231            lines.push(format_scan_line(
232                &inner_lower,
233                &join.table.alias,
234                &inner_plan,
235                inner_schema,
236            ));
237        }
238
239        let join_type_str = match stmt.joins.last().map(|j| j.join_type) {
240            Some(JoinType::Left) => "LEFT JOIN",
241            Some(JoinType::Right) => "RIGHT JOIN",
242            Some(JoinType::Cross) => "CROSS JOIN",
243            _ => "NESTED LOOP",
244        };
245        lines.push(join_type_str.into());
246    }
247
248    if stmt.where_clause.is_some() && stmt.joins.is_empty() {
249        let plan = planner::plan_select(from_schema, &stmt.where_clause);
250        if matches!(plan, ScanPlan::SeqScan) {
251            lines.push("FILTER".into());
252        }
253    }
254
255    if let Some(ref w) = stmt.where_clause {
256        if !stmt.joins.is_empty() && has_subquery(w) {
257            lines.push("SUBQUERY".into());
258        }
259    }
260
261    explain_subqueries(stmt, &mut lines);
262
263    if !stmt.group_by.is_empty() {
264        lines.push("GROUP BY".into());
265    }
266
267    if stmt.distinct {
268        lines.push("DISTINCT".into());
269    }
270
271    if !stmt.order_by.is_empty() {
272        lines.push("SORT".into());
273    }
274
275    if let Some(ref offset_expr) = stmt.offset {
276        if let Ok(n) = eval_const_int(offset_expr) {
277            lines.push(format!("OFFSET {n}"));
278        } else {
279            lines.push("OFFSET".into());
280        }
281    }
282
283    if let Some(ref limit_expr) = stmt.limit {
284        if let Ok(n) = eval_const_int(limit_expr) {
285            lines.push(format!("LIMIT {n}"));
286        } else {
287            lines.push("LIMIT".into());
288        }
289    }
290
291    Ok(lines)
292}
293
294fn explain_subqueries(stmt: &SelectStmt, lines: &mut Vec<String>) {
295    let mut count = 0;
296    if let Some(ref w) = stmt.where_clause {
297        count += count_subqueries(w);
298    }
299    if let Some(ref h) = stmt.having {
300        count += count_subqueries(h);
301    }
302    for col in &stmt.columns {
303        if let SelectColumn::Expr { expr, .. } = col {
304            count += count_subqueries(expr);
305        }
306    }
307    for _ in 0..count {
308        lines.push("SUBQUERY".into());
309    }
310}
311
312fn count_subqueries(expr: &Expr) -> usize {
313    match expr {
314        Expr::InSubquery { expr: e, .. } => 1 + count_subqueries(e),
315        Expr::ScalarSubquery(_) => 1,
316        Expr::Exists { .. } => 1,
317        Expr::BinaryOp { left, right, .. } => count_subqueries(left) + count_subqueries(right),
318        Expr::UnaryOp { expr: e, .. } => count_subqueries(e),
319        Expr::IsNull(e) | Expr::IsNotNull(e) => count_subqueries(e),
320        Expr::Function { args, .. } => args.iter().map(count_subqueries).sum(),
321        Expr::Between {
322            expr: e, low, high, ..
323        } => count_subqueries(e) + count_subqueries(low) + count_subqueries(high),
324        Expr::Like {
325            expr: e, pattern, ..
326        } => count_subqueries(e) + count_subqueries(pattern),
327        Expr::Case {
328            operand,
329            conditions,
330            else_result,
331        } => {
332            let mut n = 0;
333            if let Some(op) = operand {
334                n += count_subqueries(op);
335            }
336            for (c, r) in conditions {
337                n += count_subqueries(c) + count_subqueries(r);
338            }
339            if let Some(el) = else_result {
340                n += count_subqueries(el);
341            }
342            n
343        }
344        Expr::Coalesce(args) => args.iter().map(count_subqueries).sum(),
345        Expr::Cast { expr: e, .. } => count_subqueries(e),
346        Expr::InList { expr: e, list, .. } => {
347            count_subqueries(e) + list.iter().map(count_subqueries).sum::<usize>()
348        }
349        _ => 0,
350    }
351}
352
353fn format_scan_line(
354    table_name: &str,
355    alias: &Option<String>,
356    plan: &ScanPlan,
357    table_schema: &TableSchema,
358) -> String {
359    let alias_part = match alias {
360        Some(a) if !a.eq_ignore_ascii_case(table_name) => {
361            format!(" AS {}", a.to_ascii_lowercase())
362        }
363        _ => String::new(),
364    };
365
366    let desc = planner::describe_plan(plan, table_schema);
367
368    if desc.is_empty() {
369        format!("SCAN TABLE {table_name}{alias_part}")
370    } else {
371        format!("SEARCH TABLE {table_name}{alias_part} {desc}")
372    }
373}
374
375// ── DDL ─────────────────────────────────────────────────────────────
376
377fn exec_create_table(
378    db: &Database,
379    schema: &mut SchemaManager,
380    stmt: &CreateTableStmt,
381) -> Result<ExecutionResult> {
382    let lower_name = stmt.name.to_ascii_lowercase();
383
384    if schema.contains(&lower_name) {
385        if stmt.if_not_exists {
386            return Ok(ExecutionResult::Ok);
387        }
388        return Err(SqlError::TableAlreadyExists(stmt.name.clone()));
389    }
390
391    if stmt.primary_key.is_empty() {
392        return Err(SqlError::PrimaryKeyRequired);
393    }
394
395    let mut seen = std::collections::HashSet::new();
396    for col in &stmt.columns {
397        let lower = col.name.to_ascii_lowercase();
398        if !seen.insert(lower.clone()) {
399            return Err(SqlError::DuplicateColumn(col.name.clone()));
400        }
401    }
402
403    let columns: Vec<ColumnDef> = stmt
404        .columns
405        .iter()
406        .enumerate()
407        .map(|(i, c)| ColumnDef {
408            name: c.name.to_ascii_lowercase(),
409            data_type: c.data_type,
410            nullable: c.nullable,
411            position: i as u16,
412        })
413        .collect();
414
415    let primary_key_columns: Vec<u16> = stmt
416        .primary_key
417        .iter()
418        .map(|pk_name| {
419            let lower = pk_name.to_ascii_lowercase();
420            columns
421                .iter()
422                .position(|c| c.name == lower)
423                .map(|i| i as u16)
424                .ok_or_else(|| SqlError::ColumnNotFound(pk_name.clone()))
425        })
426        .collect::<Result<_>>()?;
427
428    let table_schema = TableSchema {
429        name: lower_name.clone(),
430        columns,
431        primary_key_columns,
432        indices: vec![],
433    };
434
435    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
436    SchemaManager::ensure_schema_table(&mut wtx)?;
437    wtx.create_table(lower_name.as_bytes())
438        .map_err(SqlError::Storage)?;
439    SchemaManager::save_schema(&mut wtx, &table_schema)?;
440    wtx.commit().map_err(SqlError::Storage)?;
441
442    schema.register(table_schema);
443    Ok(ExecutionResult::Ok)
444}
445
446fn exec_drop_table(
447    db: &Database,
448    schema: &mut SchemaManager,
449    stmt: &DropTableStmt,
450) -> Result<ExecutionResult> {
451    let lower_name = stmt.name.to_ascii_lowercase();
452
453    if !schema.contains(&lower_name) {
454        if stmt.if_exists {
455            return Ok(ExecutionResult::Ok);
456        }
457        return Err(SqlError::TableNotFound(stmt.name.clone()));
458    }
459
460    let table_schema = schema.get(&lower_name).unwrap();
461    let idx_tables: Vec<Vec<u8>> = table_schema
462        .indices
463        .iter()
464        .map(|idx| TableSchema::index_table_name(&lower_name, &idx.name))
465        .collect();
466
467    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
468    for idx_table in &idx_tables {
469        wtx.drop_table(idx_table).map_err(SqlError::Storage)?;
470    }
471    wtx.drop_table(lower_name.as_bytes())
472        .map_err(SqlError::Storage)?;
473    SchemaManager::delete_schema(&mut wtx, &lower_name)?;
474    wtx.commit().map_err(SqlError::Storage)?;
475
476    schema.remove(&lower_name);
477    Ok(ExecutionResult::Ok)
478}
479
480fn exec_create_table_in_txn(
481    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
482    schema: &mut SchemaManager,
483    stmt: &CreateTableStmt,
484) -> Result<ExecutionResult> {
485    let lower_name = stmt.name.to_ascii_lowercase();
486
487    if schema.contains(&lower_name) {
488        if stmt.if_not_exists {
489            return Ok(ExecutionResult::Ok);
490        }
491        return Err(SqlError::TableAlreadyExists(stmt.name.clone()));
492    }
493
494    if stmt.primary_key.is_empty() {
495        return Err(SqlError::PrimaryKeyRequired);
496    }
497
498    let mut seen = std::collections::HashSet::new();
499    for col in &stmt.columns {
500        let lower = col.name.to_ascii_lowercase();
501        if !seen.insert(lower.clone()) {
502            return Err(SqlError::DuplicateColumn(col.name.clone()));
503        }
504    }
505
506    let columns: Vec<ColumnDef> = stmt
507        .columns
508        .iter()
509        .enumerate()
510        .map(|(i, c)| ColumnDef {
511            name: c.name.to_ascii_lowercase(),
512            data_type: c.data_type,
513            nullable: c.nullable,
514            position: i as u16,
515        })
516        .collect();
517
518    let primary_key_columns: Vec<u16> = stmt
519        .primary_key
520        .iter()
521        .map(|pk_name| {
522            let lower = pk_name.to_ascii_lowercase();
523            columns
524                .iter()
525                .position(|c| c.name == lower)
526                .map(|i| i as u16)
527                .ok_or_else(|| SqlError::ColumnNotFound(pk_name.clone()))
528        })
529        .collect::<Result<_>>()?;
530
531    let table_schema = TableSchema {
532        name: lower_name.clone(),
533        columns,
534        primary_key_columns,
535        indices: vec![],
536    };
537
538    SchemaManager::ensure_schema_table(wtx)?;
539    wtx.create_table(lower_name.as_bytes())
540        .map_err(SqlError::Storage)?;
541    SchemaManager::save_schema(wtx, &table_schema)?;
542
543    schema.register(table_schema);
544    Ok(ExecutionResult::Ok)
545}
546
547fn exec_drop_table_in_txn(
548    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
549    schema: &mut SchemaManager,
550    stmt: &DropTableStmt,
551) -> Result<ExecutionResult> {
552    let lower_name = stmt.name.to_ascii_lowercase();
553
554    if !schema.contains(&lower_name) {
555        if stmt.if_exists {
556            return Ok(ExecutionResult::Ok);
557        }
558        return Err(SqlError::TableNotFound(stmt.name.clone()));
559    }
560
561    let table_schema = schema.get(&lower_name).unwrap();
562    let idx_tables: Vec<Vec<u8>> = table_schema
563        .indices
564        .iter()
565        .map(|idx| TableSchema::index_table_name(&lower_name, &idx.name))
566        .collect();
567
568    for idx_table in &idx_tables {
569        wtx.drop_table(idx_table).map_err(SqlError::Storage)?;
570    }
571    wtx.drop_table(lower_name.as_bytes())
572        .map_err(SqlError::Storage)?;
573    SchemaManager::delete_schema(wtx, &lower_name)?;
574
575    schema.remove(&lower_name);
576    Ok(ExecutionResult::Ok)
577}
578
579fn exec_create_index(
580    db: &Database,
581    schema: &mut SchemaManager,
582    stmt: &CreateIndexStmt,
583) -> Result<ExecutionResult> {
584    let lower_table = stmt.table_name.to_ascii_lowercase();
585    let lower_idx = stmt.index_name.to_ascii_lowercase();
586
587    let table_schema = schema
588        .get(&lower_table)
589        .ok_or_else(|| SqlError::TableNotFound(stmt.table_name.clone()))?;
590
591    if table_schema.index_by_name(&lower_idx).is_some() {
592        if stmt.if_not_exists {
593            return Ok(ExecutionResult::Ok);
594        }
595        return Err(SqlError::IndexAlreadyExists(stmt.index_name.clone()));
596    }
597
598    let col_indices: Vec<u16> = stmt
599        .columns
600        .iter()
601        .map(|col_name| {
602            let lower = col_name.to_ascii_lowercase();
603            table_schema
604                .column_index(&lower)
605                .map(|i| i as u16)
606                .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))
607        })
608        .collect::<Result<_>>()?;
609
610    let idx_def = IndexDef {
611        name: lower_idx.clone(),
612        columns: col_indices,
613        unique: stmt.unique,
614    };
615
616    let idx_table = TableSchema::index_table_name(&lower_table, &lower_idx);
617
618    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
619    SchemaManager::ensure_schema_table(&mut wtx)?;
620    wtx.create_table(&idx_table).map_err(SqlError::Storage)?;
621
622    // Populate index from existing rows
623    let pk_indices = table_schema.pk_indices();
624    let mut rows: Vec<Vec<Value>> = Vec::new();
625    {
626        let mut scan_err: Option<SqlError> = None;
627        wtx.table_for_each(lower_table.as_bytes(), |key, value| {
628            match decode_full_row(table_schema, key, value) {
629                Ok(row) => rows.push(row),
630                Err(e) => scan_err = Some(e),
631            }
632            Ok(())
633        })
634        .map_err(SqlError::Storage)?;
635        if let Some(e) = scan_err {
636            return Err(e);
637        }
638    }
639
640    for row in &rows {
641        let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
642        let key = encode_index_key(&idx_def, row, &pk_values);
643        let value = encode_index_value(&idx_def, row, &pk_values);
644        let is_new = wtx
645            .table_insert(&idx_table, &key, &value)
646            .map_err(SqlError::Storage)?;
647        if idx_def.unique && !is_new {
648            let indexed_values: Vec<Value> = idx_def
649                .columns
650                .iter()
651                .map(|&col_idx| row[col_idx as usize].clone())
652                .collect();
653            let any_null = indexed_values.iter().any(|v| v.is_null());
654            if !any_null {
655                return Err(SqlError::UniqueViolation(stmt.index_name.clone()));
656            }
657        }
658    }
659
660    let mut updated_schema = table_schema.clone();
661    updated_schema.indices.push(idx_def);
662    SchemaManager::save_schema(&mut wtx, &updated_schema)?;
663    wtx.commit().map_err(SqlError::Storage)?;
664
665    schema.register(updated_schema);
666    Ok(ExecutionResult::Ok)
667}
668
669fn exec_drop_index(
670    db: &Database,
671    schema: &mut SchemaManager,
672    stmt: &DropIndexStmt,
673) -> Result<ExecutionResult> {
674    let lower_idx = stmt.index_name.to_ascii_lowercase();
675
676    let (table_name, _idx_pos) = match find_index_in_schemas(schema, &lower_idx) {
677        Some(found) => found,
678        None => {
679            if stmt.if_exists {
680                return Ok(ExecutionResult::Ok);
681            }
682            return Err(SqlError::IndexNotFound(stmt.index_name.clone()));
683        }
684    };
685
686    let idx_table = TableSchema::index_table_name(&table_name, &lower_idx);
687
688    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
689    wtx.drop_table(&idx_table).map_err(SqlError::Storage)?;
690
691    let table_schema = schema.get(&table_name).unwrap();
692    let mut updated_schema = table_schema.clone();
693    updated_schema.indices.retain(|i| i.name != lower_idx);
694    SchemaManager::save_schema(&mut wtx, &updated_schema)?;
695    wtx.commit().map_err(SqlError::Storage)?;
696
697    schema.register(updated_schema);
698    Ok(ExecutionResult::Ok)
699}
700
701fn exec_create_index_in_txn(
702    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
703    schema: &mut SchemaManager,
704    stmt: &CreateIndexStmt,
705) -> Result<ExecutionResult> {
706    let lower_table = stmt.table_name.to_ascii_lowercase();
707    let lower_idx = stmt.index_name.to_ascii_lowercase();
708
709    let table_schema = schema
710        .get(&lower_table)
711        .ok_or_else(|| SqlError::TableNotFound(stmt.table_name.clone()))?;
712
713    if table_schema.index_by_name(&lower_idx).is_some() {
714        if stmt.if_not_exists {
715            return Ok(ExecutionResult::Ok);
716        }
717        return Err(SqlError::IndexAlreadyExists(stmt.index_name.clone()));
718    }
719
720    let col_indices: Vec<u16> = stmt
721        .columns
722        .iter()
723        .map(|col_name| {
724            let lower = col_name.to_ascii_lowercase();
725            table_schema
726                .column_index(&lower)
727                .map(|i| i as u16)
728                .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))
729        })
730        .collect::<Result<_>>()?;
731
732    let idx_def = IndexDef {
733        name: lower_idx.clone(),
734        columns: col_indices,
735        unique: stmt.unique,
736    };
737
738    let idx_table = TableSchema::index_table_name(&lower_table, &lower_idx);
739
740    SchemaManager::ensure_schema_table(wtx)?;
741    wtx.create_table(&idx_table).map_err(SqlError::Storage)?;
742
743    let pk_indices = table_schema.pk_indices();
744    let mut rows: Vec<Vec<Value>> = Vec::new();
745    {
746        let mut scan_err: Option<SqlError> = None;
747        wtx.table_for_each(lower_table.as_bytes(), |key, value| {
748            match decode_full_row(table_schema, key, value) {
749                Ok(row) => rows.push(row),
750                Err(e) => scan_err = Some(e),
751            }
752            Ok(())
753        })
754        .map_err(SqlError::Storage)?;
755        if let Some(e) = scan_err {
756            return Err(e);
757        }
758    }
759
760    for row in &rows {
761        let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
762        let key = encode_index_key(&idx_def, row, &pk_values);
763        let value = encode_index_value(&idx_def, row, &pk_values);
764        let is_new = wtx
765            .table_insert(&idx_table, &key, &value)
766            .map_err(SqlError::Storage)?;
767        if idx_def.unique && !is_new {
768            let indexed_values: Vec<Value> = idx_def
769                .columns
770                .iter()
771                .map(|&col_idx| row[col_idx as usize].clone())
772                .collect();
773            let any_null = indexed_values.iter().any(|v| v.is_null());
774            if !any_null {
775                return Err(SqlError::UniqueViolation(stmt.index_name.clone()));
776            }
777        }
778    }
779
780    let mut updated_schema = table_schema.clone();
781    updated_schema.indices.push(idx_def);
782    SchemaManager::save_schema(wtx, &updated_schema)?;
783
784    schema.register(updated_schema);
785    Ok(ExecutionResult::Ok)
786}
787
788fn exec_drop_index_in_txn(
789    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
790    schema: &mut SchemaManager,
791    stmt: &DropIndexStmt,
792) -> Result<ExecutionResult> {
793    let lower_idx = stmt.index_name.to_ascii_lowercase();
794
795    let (table_name, _idx_pos) = match find_index_in_schemas(schema, &lower_idx) {
796        Some(found) => found,
797        None => {
798            if stmt.if_exists {
799                return Ok(ExecutionResult::Ok);
800            }
801            return Err(SqlError::IndexNotFound(stmt.index_name.clone()));
802        }
803    };
804
805    let idx_table = TableSchema::index_table_name(&table_name, &lower_idx);
806    wtx.drop_table(&idx_table).map_err(SqlError::Storage)?;
807
808    let table_schema = schema.get(&table_name).unwrap();
809    let mut updated_schema = table_schema.clone();
810    updated_schema.indices.retain(|i| i.name != lower_idx);
811    SchemaManager::save_schema(wtx, &updated_schema)?;
812
813    schema.register(updated_schema);
814    Ok(ExecutionResult::Ok)
815}
816
817fn find_index_in_schemas(schema: &SchemaManager, index_name: &str) -> Option<(String, usize)> {
818    for table_name in schema.table_names() {
819        if let Some(ts) = schema.get(table_name) {
820            if let Some(pos) = ts.indices.iter().position(|i| i.name == index_name) {
821                return Some((table_name.to_string(), pos));
822            }
823        }
824    }
825    None
826}
827
828// ── Index scan helpers ───────────────────────────────────────────────
829
830fn extract_pk_key(
831    idx_key: &[u8],
832    idx_value: &[u8],
833    is_unique: bool,
834    num_index_cols: usize,
835    num_pk_cols: usize,
836) -> Result<Vec<u8>> {
837    if is_unique && !idx_value.is_empty() {
838        Ok(idx_value.to_vec())
839    } else {
840        let total_cols = num_index_cols + num_pk_cols;
841        let all_values = decode_composite_key(idx_key, total_cols)?;
842        let pk_values = &all_values[num_index_cols..];
843        Ok(encode_composite_key(pk_values))
844    }
845}
846
847fn check_range_conditions(
848    idx_key: &[u8],
849    num_prefix_cols: usize,
850    range_conds: &[(BinOp, Value)],
851    num_index_cols: usize,
852) -> Result<RangeCheck> {
853    if range_conds.is_empty() {
854        return Ok(RangeCheck::Match);
855    }
856
857    let num_to_decode = num_prefix_cols + 1;
858    if num_to_decode > num_index_cols {
859        return Ok(RangeCheck::Match);
860    }
861
862    // Decode just enough columns to check the range column
863    let mut pos = 0;
864    for _ in 0..num_prefix_cols {
865        let (_, n) = decode_key_value(&idx_key[pos..])?;
866        pos += n;
867    }
868    let (range_val, _) = decode_key_value(&idx_key[pos..])?;
869
870    let mut exceeds_upper = false;
871    let mut below_lower = false;
872
873    for (op, val) in range_conds {
874        match op {
875            BinOp::Lt => {
876                if range_val >= *val {
877                    exceeds_upper = true;
878                }
879            }
880            BinOp::LtEq => {
881                if range_val > *val {
882                    exceeds_upper = true;
883                }
884            }
885            BinOp::Gt => {
886                if range_val <= *val {
887                    below_lower = true;
888                }
889            }
890            BinOp::GtEq => {
891                if range_val < *val {
892                    below_lower = true;
893                }
894            }
895            _ => {}
896        }
897    }
898
899    if exceeds_upper {
900        Ok(RangeCheck::ExceedsUpper)
901    } else if below_lower {
902        Ok(RangeCheck::BelowLower)
903    } else {
904        Ok(RangeCheck::Match)
905    }
906}
907
908enum RangeCheck {
909    Match,
910    BelowLower,
911    ExceedsUpper,
912}
913
914/// Collect rows via ReadTxn using the scan plan.
915fn collect_rows_read(
916    db: &Database,
917    table_schema: &TableSchema,
918    where_clause: &Option<Expr>,
919) -> Result<Vec<Vec<Value>>> {
920    let plan = planner::plan_select(table_schema, where_clause);
921    let lower_name = &table_schema.name;
922
923    match plan {
924        ScanPlan::SeqScan => {
925            let mut rows = Vec::new();
926            let mut rtx = db.begin_read();
927            let mut scan_err: Option<SqlError> = None;
928            rtx.table_for_each(lower_name.as_bytes(), |key, value| {
929                match decode_full_row(table_schema, key, value) {
930                    Ok(row) => rows.push(row),
931                    Err(e) => scan_err = Some(e),
932                }
933                Ok(())
934            })
935            .map_err(SqlError::Storage)?;
936            if let Some(e) = scan_err {
937                return Err(e);
938            }
939            Ok(rows)
940        }
941
942        ScanPlan::PkLookup { pk_values } => {
943            let key = encode_composite_key(&pk_values);
944            let mut rtx = db.begin_read();
945            match rtx
946                .table_get(lower_name.as_bytes(), &key)
947                .map_err(SqlError::Storage)?
948            {
949                Some(value) => {
950                    let row = decode_full_row(table_schema, &key, &value)?;
951                    Ok(vec![row])
952                }
953                None => Ok(vec![]),
954            }
955        }
956
957        ScanPlan::IndexScan {
958            idx_table,
959            prefix,
960            num_prefix_cols,
961            range_conds,
962            is_unique,
963            index_columns,
964            ..
965        } => {
966            let num_pk_cols = table_schema.primary_key_columns.len();
967            let num_index_cols = index_columns.len();
968            let mut pk_keys: Vec<Vec<u8>> = Vec::new();
969
970            {
971                let mut rtx = db.begin_read();
972                let mut scan_err: Option<SqlError> = None;
973                rtx.table_scan_from(&idx_table, &prefix, |key, value| {
974                    if !key.starts_with(&prefix) {
975                        return Ok(false);
976                    }
977                    match check_range_conditions(key, num_prefix_cols, &range_conds, num_index_cols)
978                    {
979                        Ok(RangeCheck::ExceedsUpper) => return Ok(false),
980                        Ok(RangeCheck::BelowLower) => return Ok(true),
981                        Ok(RangeCheck::Match) => {}
982                        Err(e) => {
983                            scan_err = Some(e);
984                            return Ok(false);
985                        }
986                    }
987                    match extract_pk_key(key, value, is_unique, num_index_cols, num_pk_cols) {
988                        Ok(pk) => pk_keys.push(pk),
989                        Err(e) => {
990                            scan_err = Some(e);
991                            return Ok(false);
992                        }
993                    }
994                    Ok(true)
995                })
996                .map_err(SqlError::Storage)?;
997                if let Some(e) = scan_err {
998                    return Err(e);
999                }
1000            }
1001
1002            let mut rows = Vec::new();
1003            let mut rtx = db.begin_read();
1004            for pk_key in &pk_keys {
1005                if let Some(value) = rtx
1006                    .table_get(lower_name.as_bytes(), pk_key)
1007                    .map_err(SqlError::Storage)?
1008                {
1009                    rows.push(decode_full_row(table_schema, pk_key, &value)?);
1010                }
1011            }
1012            Ok(rows)
1013        }
1014    }
1015}
1016
1017/// Collect rows via WriteTxn using the scan plan.
1018fn collect_rows_write(
1019    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1020    table_schema: &TableSchema,
1021    where_clause: &Option<Expr>,
1022) -> Result<Vec<Vec<Value>>> {
1023    let plan = planner::plan_select(table_schema, where_clause);
1024    let lower_name = &table_schema.name;
1025
1026    match plan {
1027        ScanPlan::SeqScan => {
1028            let mut rows = Vec::new();
1029            let mut scan_err: Option<SqlError> = None;
1030            wtx.table_for_each(lower_name.as_bytes(), |key, value| {
1031                match decode_full_row(table_schema, key, value) {
1032                    Ok(row) => rows.push(row),
1033                    Err(e) => scan_err = Some(e),
1034                }
1035                Ok(())
1036            })
1037            .map_err(SqlError::Storage)?;
1038            if let Some(e) = scan_err {
1039                return Err(e);
1040            }
1041            Ok(rows)
1042        }
1043
1044        ScanPlan::PkLookup { pk_values } => {
1045            let key = encode_composite_key(&pk_values);
1046            match wtx
1047                .table_get(lower_name.as_bytes(), &key)
1048                .map_err(SqlError::Storage)?
1049            {
1050                Some(value) => {
1051                    let row = decode_full_row(table_schema, &key, &value)?;
1052                    Ok(vec![row])
1053                }
1054                None => Ok(vec![]),
1055            }
1056        }
1057
1058        ScanPlan::IndexScan {
1059            idx_table,
1060            prefix,
1061            num_prefix_cols,
1062            range_conds,
1063            is_unique,
1064            index_columns,
1065            ..
1066        } => {
1067            let num_pk_cols = table_schema.primary_key_columns.len();
1068            let num_index_cols = index_columns.len();
1069            let mut pk_keys: Vec<Vec<u8>> = Vec::new();
1070
1071            {
1072                let mut scan_err: Option<SqlError> = None;
1073                wtx.table_scan_from(&idx_table, &prefix, |key, value| {
1074                    if !key.starts_with(&prefix) {
1075                        return Ok(false);
1076                    }
1077                    match check_range_conditions(key, num_prefix_cols, &range_conds, num_index_cols)
1078                    {
1079                        Ok(RangeCheck::ExceedsUpper) => return Ok(false),
1080                        Ok(RangeCheck::BelowLower) => return Ok(true),
1081                        Ok(RangeCheck::Match) => {}
1082                        Err(e) => {
1083                            scan_err = Some(e);
1084                            return Ok(false);
1085                        }
1086                    }
1087                    match extract_pk_key(key, value, is_unique, num_index_cols, num_pk_cols) {
1088                        Ok(pk) => pk_keys.push(pk),
1089                        Err(e) => {
1090                            scan_err = Some(e);
1091                            return Ok(false);
1092                        }
1093                    }
1094                    Ok(true)
1095                })
1096                .map_err(SqlError::Storage)?;
1097                if let Some(e) = scan_err {
1098                    return Err(e);
1099                }
1100            }
1101
1102            let mut rows = Vec::new();
1103            for pk_key in &pk_keys {
1104                if let Some(value) = wtx
1105                    .table_get(lower_name.as_bytes(), pk_key)
1106                    .map_err(SqlError::Storage)?
1107                {
1108                    rows.push(decode_full_row(table_schema, pk_key, &value)?);
1109                }
1110            }
1111            Ok(rows)
1112        }
1113    }
1114}
1115
1116/// Collect (encoded_key, full_row) pairs via ReadTxn using the scan plan.
1117/// Used by DELETE and UPDATE which need the encoded PK key.
1118fn collect_keyed_rows_read(
1119    db: &Database,
1120    table_schema: &TableSchema,
1121    where_clause: &Option<Expr>,
1122) -> Result<Vec<(Vec<u8>, Vec<Value>)>> {
1123    let plan = planner::plan_select(table_schema, where_clause);
1124    let lower_name = &table_schema.name;
1125
1126    match plan {
1127        ScanPlan::SeqScan => {
1128            let mut rows = Vec::new();
1129            let mut rtx = db.begin_read();
1130            let mut scan_err: Option<SqlError> = None;
1131            rtx.table_for_each(lower_name.as_bytes(), |key, value| {
1132                match decode_full_row(table_schema, key, value) {
1133                    Ok(row) => rows.push((key.to_vec(), row)),
1134                    Err(e) => scan_err = Some(e),
1135                }
1136                Ok(())
1137            })
1138            .map_err(SqlError::Storage)?;
1139            if let Some(e) = scan_err {
1140                return Err(e);
1141            }
1142            Ok(rows)
1143        }
1144
1145        ScanPlan::PkLookup { pk_values } => {
1146            let key = encode_composite_key(&pk_values);
1147            let mut rtx = db.begin_read();
1148            match rtx
1149                .table_get(lower_name.as_bytes(), &key)
1150                .map_err(SqlError::Storage)?
1151            {
1152                Some(value) => {
1153                    let row = decode_full_row(table_schema, &key, &value)?;
1154                    Ok(vec![(key, row)])
1155                }
1156                None => Ok(vec![]),
1157            }
1158        }
1159
1160        ScanPlan::IndexScan {
1161            idx_table,
1162            prefix,
1163            num_prefix_cols,
1164            range_conds,
1165            is_unique,
1166            index_columns,
1167            ..
1168        } => {
1169            let num_pk_cols = table_schema.primary_key_columns.len();
1170            let num_index_cols = index_columns.len();
1171            let mut pk_keys: Vec<Vec<u8>> = Vec::new();
1172
1173            {
1174                let mut rtx = db.begin_read();
1175                let mut scan_err: Option<SqlError> = None;
1176                rtx.table_scan_from(&idx_table, &prefix, |key, value| {
1177                    if !key.starts_with(&prefix) {
1178                        return Ok(false);
1179                    }
1180                    match check_range_conditions(key, num_prefix_cols, &range_conds, num_index_cols)
1181                    {
1182                        Ok(RangeCheck::ExceedsUpper) => return Ok(false),
1183                        Ok(RangeCheck::BelowLower) => return Ok(true),
1184                        Ok(RangeCheck::Match) => {}
1185                        Err(e) => {
1186                            scan_err = Some(e);
1187                            return Ok(false);
1188                        }
1189                    }
1190                    match extract_pk_key(key, value, is_unique, num_index_cols, num_pk_cols) {
1191                        Ok(pk) => pk_keys.push(pk),
1192                        Err(e) => {
1193                            scan_err = Some(e);
1194                            return Ok(false);
1195                        }
1196                    }
1197                    Ok(true)
1198                })
1199                .map_err(SqlError::Storage)?;
1200                if let Some(e) = scan_err {
1201                    return Err(e);
1202                }
1203            }
1204
1205            let mut rows = Vec::new();
1206            let mut rtx = db.begin_read();
1207            for pk_key in &pk_keys {
1208                if let Some(value) = rtx
1209                    .table_get(lower_name.as_bytes(), pk_key)
1210                    .map_err(SqlError::Storage)?
1211                {
1212                    rows.push((
1213                        pk_key.clone(),
1214                        decode_full_row(table_schema, pk_key, &value)?,
1215                    ));
1216                }
1217            }
1218            Ok(rows)
1219        }
1220    }
1221}
1222
1223/// Collect (encoded_key, full_row) pairs via WriteTxn using the scan plan.
1224fn collect_keyed_rows_write(
1225    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1226    table_schema: &TableSchema,
1227    where_clause: &Option<Expr>,
1228) -> Result<Vec<(Vec<u8>, Vec<Value>)>> {
1229    let plan = planner::plan_select(table_schema, where_clause);
1230    let lower_name = &table_schema.name;
1231
1232    match plan {
1233        ScanPlan::SeqScan => {
1234            let mut rows = Vec::new();
1235            let mut scan_err: Option<SqlError> = None;
1236            wtx.table_for_each(lower_name.as_bytes(), |key, value| {
1237                match decode_full_row(table_schema, key, value) {
1238                    Ok(row) => rows.push((key.to_vec(), row)),
1239                    Err(e) => scan_err = Some(e),
1240                }
1241                Ok(())
1242            })
1243            .map_err(SqlError::Storage)?;
1244            if let Some(e) = scan_err {
1245                return Err(e);
1246            }
1247            Ok(rows)
1248        }
1249
1250        ScanPlan::PkLookup { pk_values } => {
1251            let key = encode_composite_key(&pk_values);
1252            match wtx
1253                .table_get(lower_name.as_bytes(), &key)
1254                .map_err(SqlError::Storage)?
1255            {
1256                Some(value) => {
1257                    let row = decode_full_row(table_schema, &key, &value)?;
1258                    Ok(vec![(key, row)])
1259                }
1260                None => Ok(vec![]),
1261            }
1262        }
1263
1264        ScanPlan::IndexScan {
1265            idx_table,
1266            prefix,
1267            num_prefix_cols,
1268            range_conds,
1269            is_unique,
1270            index_columns,
1271            ..
1272        } => {
1273            let num_pk_cols = table_schema.primary_key_columns.len();
1274            let num_index_cols = index_columns.len();
1275            let mut pk_keys: Vec<Vec<u8>> = Vec::new();
1276
1277            {
1278                let mut scan_err: Option<SqlError> = None;
1279                wtx.table_scan_from(&idx_table, &prefix, |key, value| {
1280                    if !key.starts_with(&prefix) {
1281                        return Ok(false);
1282                    }
1283                    match check_range_conditions(key, num_prefix_cols, &range_conds, num_index_cols)
1284                    {
1285                        Ok(RangeCheck::ExceedsUpper) => return Ok(false),
1286                        Ok(RangeCheck::BelowLower) => return Ok(true),
1287                        Ok(RangeCheck::Match) => {}
1288                        Err(e) => {
1289                            scan_err = Some(e);
1290                            return Ok(false);
1291                        }
1292                    }
1293                    match extract_pk_key(key, value, is_unique, num_index_cols, num_pk_cols) {
1294                        Ok(pk) => pk_keys.push(pk),
1295                        Err(e) => {
1296                            scan_err = Some(e);
1297                            return Ok(false);
1298                        }
1299                    }
1300                    Ok(true)
1301                })
1302                .map_err(SqlError::Storage)?;
1303                if let Some(e) = scan_err {
1304                    return Err(e);
1305                }
1306            }
1307
1308            let mut rows = Vec::new();
1309            for pk_key in &pk_keys {
1310                if let Some(value) = wtx
1311                    .table_get(lower_name.as_bytes(), pk_key)
1312                    .map_err(SqlError::Storage)?
1313                {
1314                    rows.push((
1315                        pk_key.clone(),
1316                        decode_full_row(table_schema, pk_key, &value)?,
1317                    ));
1318                }
1319            }
1320            Ok(rows)
1321        }
1322    }
1323}
1324
1325// ── DML ─────────────────────────────────────────────────────────────
1326
1327fn exec_insert(
1328    db: &Database,
1329    schema: &SchemaManager,
1330    stmt: &InsertStmt,
1331) -> Result<ExecutionResult> {
1332    let materialized;
1333    let stmt = if insert_has_subquery(stmt) {
1334        materialized = materialize_insert(stmt, &mut |sub| exec_subquery_read(db, schema, sub))?;
1335        &materialized
1336    } else {
1337        stmt
1338    };
1339
1340    let lower_name = stmt.table.to_ascii_lowercase();
1341    let table_schema = schema
1342        .get(&lower_name)
1343        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1344
1345    let insert_columns = if stmt.columns.is_empty() {
1346        table_schema
1347            .columns
1348            .iter()
1349            .map(|c| c.name.clone())
1350            .collect::<Vec<_>>()
1351    } else {
1352        stmt.columns
1353            .iter()
1354            .map(|c| c.to_ascii_lowercase())
1355            .collect()
1356    };
1357
1358    let col_indices: Vec<usize> = insert_columns
1359        .iter()
1360        .map(|name| {
1361            table_schema
1362                .column_index(name)
1363                .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
1364        })
1365        .collect::<Result<_>>()?;
1366
1367    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
1368    let mut count: u64 = 0;
1369
1370    for value_row in &stmt.values {
1371        if value_row.len() != insert_columns.len() {
1372            return Err(SqlError::InvalidValue(format!(
1373                "expected {} values, got {}",
1374                insert_columns.len(),
1375                value_row.len()
1376            )));
1377        }
1378
1379        let mut row = vec![Value::Null; table_schema.columns.len()];
1380        for (i, expr) in value_row.iter().enumerate() {
1381            let val = eval_const_expr(expr)?;
1382            let col_idx = col_indices[i];
1383            let col = &table_schema.columns[col_idx];
1384
1385            let coerced = if val.is_null() {
1386                Value::Null
1387            } else {
1388                val.coerce_to(col.data_type)
1389                    .ok_or_else(|| SqlError::TypeMismatch {
1390                        expected: col.data_type.to_string(),
1391                        got: val.data_type().to_string(),
1392                    })?
1393            };
1394
1395            row[col_idx] = coerced;
1396        }
1397
1398        for col in &table_schema.columns {
1399            if !col.nullable && row[col.position as usize].is_null() {
1400                return Err(SqlError::NotNullViolation(col.name.clone()));
1401            }
1402        }
1403
1404        let pk_values: Vec<Value> = table_schema
1405            .pk_indices()
1406            .iter()
1407            .map(|&i| row[i].clone())
1408            .collect();
1409        let key = encode_composite_key(&pk_values);
1410
1411        let non_pk = table_schema.non_pk_indices();
1412        let value_values: Vec<Value> = non_pk.iter().map(|&i| row[i].clone()).collect();
1413        let value = encode_row(&value_values);
1414
1415        if key.len() > citadel_core::MAX_KEY_SIZE {
1416            return Err(SqlError::KeyTooLarge {
1417                size: key.len(),
1418                max: citadel_core::MAX_KEY_SIZE,
1419            });
1420        }
1421        if value.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
1422            return Err(SqlError::RowTooLarge {
1423                size: value.len(),
1424                max: citadel_core::MAX_INLINE_VALUE_SIZE,
1425            });
1426        }
1427
1428        let is_new = wtx
1429            .table_insert(lower_name.as_bytes(), &key, &value)
1430            .map_err(SqlError::Storage)?;
1431        if !is_new {
1432            return Err(SqlError::DuplicateKey);
1433        }
1434
1435        insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
1436        count += 1;
1437    }
1438
1439    wtx.commit().map_err(SqlError::Storage)?;
1440    Ok(ExecutionResult::RowsAffected(count))
1441}
1442
1443fn has_subquery(expr: &Expr) -> bool {
1444    match expr {
1445        Expr::InSubquery { .. } | Expr::Exists { .. } | Expr::ScalarSubquery(_) => true,
1446        Expr::BinaryOp { left, right, .. } => has_subquery(left) || has_subquery(right),
1447        Expr::UnaryOp { expr, .. } => has_subquery(expr),
1448        Expr::IsNull(e) | Expr::IsNotNull(e) => has_subquery(e),
1449        Expr::InList { expr, list, .. } => has_subquery(expr) || list.iter().any(has_subquery),
1450        Expr::InSet { expr, .. } => has_subquery(expr),
1451        Expr::Between {
1452            expr, low, high, ..
1453        } => has_subquery(expr) || has_subquery(low) || has_subquery(high),
1454        Expr::Like {
1455            expr,
1456            pattern,
1457            escape,
1458            ..
1459        } => {
1460            has_subquery(expr)
1461                || has_subquery(pattern)
1462                || escape.as_ref().is_some_and(|e| has_subquery(e))
1463        }
1464        Expr::Case {
1465            operand,
1466            conditions,
1467            else_result,
1468        } => {
1469            operand.as_ref().is_some_and(|e| has_subquery(e))
1470                || conditions
1471                    .iter()
1472                    .any(|(c, r)| has_subquery(c) || has_subquery(r))
1473                || else_result.as_ref().is_some_and(|e| has_subquery(e))
1474        }
1475        Expr::Coalesce(args) => args.iter().any(has_subquery),
1476        Expr::Cast { expr, .. } => has_subquery(expr),
1477        Expr::Function { args, .. } => args.iter().any(has_subquery),
1478        _ => false,
1479    }
1480}
1481
1482fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
1483    if let Some(ref w) = stmt.where_clause {
1484        if has_subquery(w) {
1485            return true;
1486        }
1487    }
1488    if let Some(ref h) = stmt.having {
1489        if has_subquery(h) {
1490            return true;
1491        }
1492    }
1493    for col in &stmt.columns {
1494        if let SelectColumn::Expr { expr, .. } = col {
1495            if has_subquery(expr) {
1496                return true;
1497            }
1498        }
1499    }
1500    for ob in &stmt.order_by {
1501        if has_subquery(&ob.expr) {
1502            return true;
1503        }
1504    }
1505    for join in &stmt.joins {
1506        if let Some(ref on_expr) = join.on_clause {
1507            if has_subquery(on_expr) {
1508                return true;
1509            }
1510        }
1511    }
1512    false
1513}
1514
1515fn materialize_expr(
1516    expr: &Expr,
1517    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1518) -> Result<Expr> {
1519    match expr {
1520        Expr::InSubquery {
1521            expr: e,
1522            subquery,
1523            negated,
1524        } => {
1525            let inner = materialize_expr(e, exec_sub)?;
1526            let qr = exec_sub(subquery)?;
1527            if !qr.columns.is_empty() && qr.columns.len() != 1 {
1528                return Err(SqlError::SubqueryMultipleColumns);
1529            }
1530            let mut values = std::collections::HashSet::new();
1531            let mut has_null = false;
1532            for row in &qr.rows {
1533                if row[0].is_null() {
1534                    has_null = true;
1535                } else {
1536                    values.insert(row[0].clone());
1537                }
1538            }
1539            Ok(Expr::InSet {
1540                expr: Box::new(inner),
1541                values,
1542                has_null,
1543                negated: *negated,
1544            })
1545        }
1546        Expr::ScalarSubquery(subquery) => {
1547            let qr = exec_sub(subquery)?;
1548            if qr.rows.len() > 1 {
1549                return Err(SqlError::SubqueryMultipleRows);
1550            }
1551            let val = if qr.rows.is_empty() {
1552                Value::Null
1553            } else {
1554                qr.rows[0][0].clone()
1555            };
1556            Ok(Expr::Literal(val))
1557        }
1558        Expr::Exists { subquery, negated } => {
1559            let qr = exec_sub(subquery)?;
1560            let exists = !qr.rows.is_empty();
1561            let result = if *negated { !exists } else { exists };
1562            Ok(Expr::Literal(Value::Boolean(result)))
1563        }
1564        Expr::InList {
1565            expr: e,
1566            list,
1567            negated,
1568        } => {
1569            let inner = materialize_expr(e, exec_sub)?;
1570            let items = list
1571                .iter()
1572                .map(|item| materialize_expr(item, exec_sub))
1573                .collect::<Result<Vec<_>>>()?;
1574            Ok(Expr::InList {
1575                expr: Box::new(inner),
1576                list: items,
1577                negated: *negated,
1578            })
1579        }
1580        Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
1581            left: Box::new(materialize_expr(left, exec_sub)?),
1582            op: *op,
1583            right: Box::new(materialize_expr(right, exec_sub)?),
1584        }),
1585        Expr::UnaryOp { op, expr: e } => Ok(Expr::UnaryOp {
1586            op: *op,
1587            expr: Box::new(materialize_expr(e, exec_sub)?),
1588        }),
1589        Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
1590        Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
1591        Expr::InSet {
1592            expr: e,
1593            values,
1594            has_null,
1595            negated,
1596        } => Ok(Expr::InSet {
1597            expr: Box::new(materialize_expr(e, exec_sub)?),
1598            values: values.clone(),
1599            has_null: *has_null,
1600            negated: *negated,
1601        }),
1602        Expr::Between {
1603            expr: e,
1604            low,
1605            high,
1606            negated,
1607        } => Ok(Expr::Between {
1608            expr: Box::new(materialize_expr(e, exec_sub)?),
1609            low: Box::new(materialize_expr(low, exec_sub)?),
1610            high: Box::new(materialize_expr(high, exec_sub)?),
1611            negated: *negated,
1612        }),
1613        Expr::Like {
1614            expr: e,
1615            pattern,
1616            escape,
1617            negated,
1618        } => {
1619            let esc = escape
1620                .as_ref()
1621                .map(|es| materialize_expr(es, exec_sub).map(Box::new))
1622                .transpose()?;
1623            Ok(Expr::Like {
1624                expr: Box::new(materialize_expr(e, exec_sub)?),
1625                pattern: Box::new(materialize_expr(pattern, exec_sub)?),
1626                escape: esc,
1627                negated: *negated,
1628            })
1629        }
1630        Expr::Case {
1631            operand,
1632            conditions,
1633            else_result,
1634        } => {
1635            let op = operand
1636                .as_ref()
1637                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
1638                .transpose()?;
1639            let conds = conditions
1640                .iter()
1641                .map(|(c, r)| {
1642                    Ok((
1643                        materialize_expr(c, exec_sub)?,
1644                        materialize_expr(r, exec_sub)?,
1645                    ))
1646                })
1647                .collect::<Result<Vec<_>>>()?;
1648            let else_r = else_result
1649                .as_ref()
1650                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
1651                .transpose()?;
1652            Ok(Expr::Case {
1653                operand: op,
1654                conditions: conds,
1655                else_result: else_r,
1656            })
1657        }
1658        Expr::Coalesce(args) => {
1659            let materialized = args
1660                .iter()
1661                .map(|a| materialize_expr(a, exec_sub))
1662                .collect::<Result<Vec<_>>>()?;
1663            Ok(Expr::Coalesce(materialized))
1664        }
1665        Expr::Cast { expr: e, data_type } => Ok(Expr::Cast {
1666            expr: Box::new(materialize_expr(e, exec_sub)?),
1667            data_type: *data_type,
1668        }),
1669        Expr::Function { name, args } => {
1670            let materialized = args
1671                .iter()
1672                .map(|a| materialize_expr(a, exec_sub))
1673                .collect::<Result<Vec<_>>>()?;
1674            Ok(Expr::Function {
1675                name: name.clone(),
1676                args: materialized,
1677            })
1678        }
1679        other => Ok(other.clone()),
1680    }
1681}
1682
1683fn materialize_stmt(
1684    stmt: &SelectStmt,
1685    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1686) -> Result<SelectStmt> {
1687    let where_clause = stmt
1688        .where_clause
1689        .as_ref()
1690        .map(|e| materialize_expr(e, exec_sub))
1691        .transpose()?;
1692    let having = stmt
1693        .having
1694        .as_ref()
1695        .map(|e| materialize_expr(e, exec_sub))
1696        .transpose()?;
1697    let columns = stmt
1698        .columns
1699        .iter()
1700        .map(|c| match c {
1701            SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
1702            SelectColumn::Expr { expr, alias } => Ok(SelectColumn::Expr {
1703                expr: materialize_expr(expr, exec_sub)?,
1704                alias: alias.clone(),
1705            }),
1706        })
1707        .collect::<Result<Vec<_>>>()?;
1708    let order_by = stmt
1709        .order_by
1710        .iter()
1711        .map(|ob| {
1712            Ok(OrderByItem {
1713                expr: materialize_expr(&ob.expr, exec_sub)?,
1714                descending: ob.descending,
1715                nulls_first: ob.nulls_first,
1716            })
1717        })
1718        .collect::<Result<Vec<_>>>()?;
1719    let joins = stmt
1720        .joins
1721        .iter()
1722        .map(|j| {
1723            let on_clause = j
1724                .on_clause
1725                .as_ref()
1726                .map(|e| materialize_expr(e, exec_sub))
1727                .transpose()?;
1728            Ok(JoinClause {
1729                join_type: j.join_type,
1730                table: j.table.clone(),
1731                on_clause,
1732            })
1733        })
1734        .collect::<Result<Vec<_>>>()?;
1735    let group_by = stmt
1736        .group_by
1737        .iter()
1738        .map(|e| materialize_expr(e, exec_sub))
1739        .collect::<Result<Vec<_>>>()?;
1740    Ok(SelectStmt {
1741        columns,
1742        from: stmt.from.clone(),
1743        from_alias: stmt.from_alias.clone(),
1744        joins,
1745        distinct: stmt.distinct,
1746        where_clause,
1747        order_by,
1748        limit: stmt.limit.clone(),
1749        offset: stmt.offset.clone(),
1750        group_by,
1751        having,
1752    })
1753}
1754
1755fn exec_subquery_read(
1756    db: &Database,
1757    schema: &SchemaManager,
1758    stmt: &SelectStmt,
1759) -> Result<QueryResult> {
1760    match exec_select(db, schema, stmt)? {
1761        ExecutionResult::Query(qr) => Ok(qr),
1762        _ => Ok(QueryResult {
1763            columns: vec![],
1764            rows: vec![],
1765        }),
1766    }
1767}
1768
1769fn exec_subquery_write(
1770    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1771    schema: &SchemaManager,
1772    stmt: &SelectStmt,
1773) -> Result<QueryResult> {
1774    match exec_select_in_txn(wtx, schema, stmt)? {
1775        ExecutionResult::Query(qr) => Ok(qr),
1776        _ => Ok(QueryResult {
1777            columns: vec![],
1778            rows: vec![],
1779        }),
1780    }
1781}
1782
1783fn update_has_subquery(stmt: &UpdateStmt) -> bool {
1784    stmt.where_clause.as_ref().is_some_and(has_subquery)
1785        || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
1786}
1787
1788fn materialize_update(
1789    stmt: &UpdateStmt,
1790    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1791) -> Result<UpdateStmt> {
1792    let where_clause = stmt
1793        .where_clause
1794        .as_ref()
1795        .map(|e| materialize_expr(e, exec_sub))
1796        .transpose()?;
1797    let assignments = stmt
1798        .assignments
1799        .iter()
1800        .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
1801        .collect::<Result<Vec<_>>>()?;
1802    Ok(UpdateStmt {
1803        table: stmt.table.clone(),
1804        assignments,
1805        where_clause,
1806    })
1807}
1808
1809fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
1810    stmt.where_clause.as_ref().is_some_and(has_subquery)
1811}
1812
1813fn materialize_delete(
1814    stmt: &DeleteStmt,
1815    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1816) -> Result<DeleteStmt> {
1817    let where_clause = stmt
1818        .where_clause
1819        .as_ref()
1820        .map(|e| materialize_expr(e, exec_sub))
1821        .transpose()?;
1822    Ok(DeleteStmt {
1823        table: stmt.table.clone(),
1824        where_clause,
1825    })
1826}
1827
1828fn insert_has_subquery(stmt: &InsertStmt) -> bool {
1829    stmt.values.iter().any(|row| row.iter().any(has_subquery))
1830}
1831
1832fn materialize_insert(
1833    stmt: &InsertStmt,
1834    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1835) -> Result<InsertStmt> {
1836    let values = stmt
1837        .values
1838        .iter()
1839        .map(|row| {
1840            row.iter()
1841                .map(|e| materialize_expr(e, exec_sub))
1842                .collect::<Result<Vec<_>>>()
1843        })
1844        .collect::<Result<Vec<_>>>()?;
1845    Ok(InsertStmt {
1846        table: stmt.table.clone(),
1847        columns: stmt.columns.clone(),
1848        values,
1849    })
1850}
1851
1852fn exec_select(
1853    db: &Database,
1854    schema: &SchemaManager,
1855    stmt: &SelectStmt,
1856) -> Result<ExecutionResult> {
1857    let materialized;
1858    let stmt = if stmt_has_subquery(stmt) {
1859        materialized = materialize_stmt(stmt, &mut |sub| exec_subquery_read(db, schema, sub))?;
1860        &materialized
1861    } else {
1862        stmt
1863    };
1864
1865    if stmt.from.is_empty() {
1866        return exec_select_no_from(stmt);
1867    }
1868
1869    let lower_name = stmt.from.to_ascii_lowercase();
1870    let table_schema = schema
1871        .get(&lower_name)
1872        .ok_or_else(|| SqlError::TableNotFound(stmt.from.clone()))?;
1873
1874    if !stmt.joins.is_empty() {
1875        return exec_select_join(db, schema, stmt);
1876    }
1877
1878    let rows = collect_rows_read(db, table_schema, &stmt.where_clause)?;
1879    process_select(&table_schema.columns, rows, stmt)
1880}
1881
1882fn exec_select_no_from(stmt: &SelectStmt) -> Result<ExecutionResult> {
1883    let empty_cols: Vec<ColumnDef> = vec![];
1884    let empty_row: Vec<Value> = vec![];
1885    let (col_names, projected) = project_rows(&empty_cols, &stmt.columns, &[empty_row])?;
1886    Ok(ExecutionResult::Query(QueryResult {
1887        columns: col_names,
1888        rows: projected,
1889    }))
1890}
1891
1892/// Shared SELECT processing: WHERE, aggregation, ORDER BY, LIMIT/OFFSET, projection.
1893fn process_select(
1894    columns: &[ColumnDef],
1895    mut rows: Vec<Vec<Value>>,
1896    stmt: &SelectStmt,
1897) -> Result<ExecutionResult> {
1898    if let Some(ref where_expr) = stmt.where_clause {
1899        rows.retain(|row| match eval_expr(where_expr, columns, row) {
1900            Ok(val) => is_truthy(&val),
1901            Err(_) => false,
1902        });
1903    }
1904
1905    let has_aggregates = stmt.columns.iter().any(|c| match c {
1906        SelectColumn::Expr { expr, .. } => is_aggregate_expr(expr),
1907        _ => false,
1908    });
1909
1910    if has_aggregates || !stmt.group_by.is_empty() {
1911        return exec_aggregate(columns, &rows, stmt);
1912    }
1913
1914    if stmt.distinct {
1915        let (col_names, mut projected) = project_rows(columns, &stmt.columns, &rows)?;
1916
1917        let mut seen = std::collections::HashSet::new();
1918        projected.retain(|row| seen.insert(row.clone()));
1919
1920        if !stmt.order_by.is_empty() {
1921            let output_cols = build_output_columns(&stmt.columns, columns);
1922            sort_rows(&mut projected, &stmt.order_by, &output_cols)?;
1923        }
1924
1925        if let Some(ref offset_expr) = stmt.offset {
1926            let offset = eval_const_int(offset_expr)? as usize;
1927            if offset < projected.len() {
1928                projected = projected.split_off(offset);
1929            } else {
1930                projected.clear();
1931            }
1932        }
1933
1934        if let Some(ref limit_expr) = stmt.limit {
1935            let limit = eval_const_int(limit_expr)? as usize;
1936            projected.truncate(limit);
1937        }
1938
1939        return Ok(ExecutionResult::Query(QueryResult {
1940            columns: col_names,
1941            rows: projected,
1942        }));
1943    }
1944
1945    if !stmt.order_by.is_empty() {
1946        sort_rows(&mut rows, &stmt.order_by, columns)?;
1947    }
1948
1949    if let Some(ref offset_expr) = stmt.offset {
1950        let offset = eval_const_int(offset_expr)? as usize;
1951        if offset < rows.len() {
1952            rows = rows.split_off(offset);
1953        } else {
1954            rows.clear();
1955        }
1956    }
1957
1958    if let Some(ref limit_expr) = stmt.limit {
1959        let limit = eval_const_int(limit_expr)? as usize;
1960        rows.truncate(limit);
1961    }
1962
1963    let (col_names, projected) = project_rows(columns, &stmt.columns, &rows)?;
1964
1965    Ok(ExecutionResult::Query(QueryResult {
1966        columns: col_names,
1967        rows: projected,
1968    }))
1969}
1970
1971fn resolve_table_name<'a>(schema: &'a SchemaManager, name: &str) -> Result<&'a TableSchema> {
1972    let lower = name.to_ascii_lowercase();
1973    schema
1974        .get(&lower)
1975        .ok_or_else(|| SqlError::TableNotFound(name.to_string()))
1976}
1977
1978fn build_joined_columns(tables: &[(String, &TableSchema)]) -> Vec<ColumnDef> {
1979    let mut result = Vec::new();
1980    let mut pos: u16 = 0;
1981
1982    for (alias, schema) in tables {
1983        for col in &schema.columns {
1984            result.push(ColumnDef {
1985                name: format!("{}.{}", alias.to_ascii_lowercase(), col.name),
1986                data_type: col.data_type,
1987                nullable: col.nullable,
1988                position: pos,
1989            });
1990            pos += 1;
1991        }
1992    }
1993
1994    result
1995}
1996
1997fn table_alias_or_name(name: &str, alias: &Option<String>) -> String {
1998    alias
1999        .as_ref()
2000        .unwrap_or(&name.to_string())
2001        .to_ascii_lowercase()
2002}
2003
2004fn collect_all_rows_read(db: &Database, table_schema: &TableSchema) -> Result<Vec<Vec<Value>>> {
2005    collect_rows_read(db, table_schema, &None)
2006}
2007
2008fn collect_all_rows_write(
2009    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2010    table_schema: &TableSchema,
2011) -> Result<Vec<Vec<Value>>> {
2012    collect_rows_write(wtx, table_schema, &None)
2013}
2014
2015fn exec_select_join(
2016    db: &Database,
2017    schema: &SchemaManager,
2018    stmt: &SelectStmt,
2019) -> Result<ExecutionResult> {
2020    let from_schema = resolve_table_name(schema, &stmt.from)?;
2021    let from_alias = table_alias_or_name(&stmt.from, &stmt.from_alias);
2022    let mut outer_rows = collect_all_rows_read(db, from_schema)?;
2023
2024    let mut tables: Vec<(String, &TableSchema)> = vec![(from_alias.clone(), from_schema)];
2025
2026    for join in &stmt.joins {
2027        let inner_schema = resolve_table_name(schema, &join.table.name)?;
2028        let inner_alias = table_alias_or_name(&join.table.name, &join.table.alias);
2029        let inner_rows = collect_all_rows_read(db, inner_schema)?;
2030
2031        let mut preview_tables = tables.clone();
2032        preview_tables.push((inner_alias.clone(), inner_schema));
2033        let combined_cols = build_joined_columns(&preview_tables);
2034
2035        let mut new_rows = Vec::new();
2036
2037        match join.join_type {
2038            JoinType::Inner | JoinType::Cross => {
2039                for outer in &outer_rows {
2040                    for inner in &inner_rows {
2041                        let combined: Vec<Value> =
2042                            outer.iter().chain(inner.iter()).cloned().collect();
2043                        if let Some(ref on_expr) = join.on_clause {
2044                            match eval_expr(on_expr, &combined_cols, &combined) {
2045                                Ok(val) if is_truthy(&val) => new_rows.push(combined),
2046                                _ => {}
2047                            }
2048                        } else {
2049                            new_rows.push(combined);
2050                        }
2051                    }
2052                }
2053            }
2054            JoinType::Left => {
2055                let inner_col_count = inner_schema.columns.len();
2056                for outer in &outer_rows {
2057                    let mut matched = false;
2058                    for inner in &inner_rows {
2059                        let combined: Vec<Value> =
2060                            outer.iter().chain(inner.iter()).cloned().collect();
2061                        if let Some(ref on_expr) = join.on_clause {
2062                            match eval_expr(on_expr, &combined_cols, &combined) {
2063                                Ok(val) if is_truthy(&val) => {
2064                                    new_rows.push(combined);
2065                                    matched = true;
2066                                }
2067                                _ => {}
2068                            }
2069                        } else {
2070                            new_rows.push(combined);
2071                            matched = true;
2072                        }
2073                    }
2074                    if !matched {
2075                        let mut padded = outer.clone();
2076                        padded.extend(std::iter::repeat(Value::Null).take(inner_col_count));
2077                        new_rows.push(padded);
2078                    }
2079                }
2080            }
2081            JoinType::Right => {
2082                let outer_col_count = if outer_rows.is_empty() {
2083                    tables.iter().map(|(_, s)| s.columns.len()).sum()
2084                } else {
2085                    outer_rows[0].len()
2086                };
2087                let mut inner_matched = vec![false; inner_rows.len()];
2088                for outer in &outer_rows {
2089                    for (j, inner) in inner_rows.iter().enumerate() {
2090                        let combined: Vec<Value> =
2091                            outer.iter().chain(inner.iter()).cloned().collect();
2092                        if let Some(ref on_expr) = join.on_clause {
2093                            match eval_expr(on_expr, &combined_cols, &combined) {
2094                                Ok(val) if is_truthy(&val) => {
2095                                    new_rows.push(combined);
2096                                    inner_matched[j] = true;
2097                                }
2098                                _ => {}
2099                            }
2100                        } else {
2101                            new_rows.push(combined);
2102                            inner_matched[j] = true;
2103                        }
2104                    }
2105                }
2106                for (j, inner) in inner_rows.iter().enumerate() {
2107                    if !inner_matched[j] {
2108                        let mut padded: Vec<Value> = std::iter::repeat(Value::Null)
2109                            .take(outer_col_count)
2110                            .collect();
2111                        padded.extend(inner.iter().cloned());
2112                        new_rows.push(padded);
2113                    }
2114                }
2115            }
2116        }
2117
2118        tables.push((inner_alias, inner_schema));
2119        outer_rows = new_rows;
2120    }
2121
2122    let joined_cols = build_joined_columns(&tables);
2123    process_select(&joined_cols, outer_rows, stmt)
2124}
2125
2126fn exec_select_join_in_txn(
2127    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2128    schema: &SchemaManager,
2129    stmt: &SelectStmt,
2130) -> Result<ExecutionResult> {
2131    let from_schema = resolve_table_name(schema, &stmt.from)?;
2132    let from_alias = table_alias_or_name(&stmt.from, &stmt.from_alias);
2133    let mut outer_rows = collect_all_rows_write(wtx, from_schema)?;
2134
2135    let mut tables: Vec<(String, &TableSchema)> = vec![(from_alias.clone(), from_schema)];
2136
2137    for join in &stmt.joins {
2138        let inner_schema = resolve_table_name(schema, &join.table.name)?;
2139        let inner_alias = table_alias_or_name(&join.table.name, &join.table.alias);
2140        let inner_rows = collect_all_rows_write(wtx, inner_schema)?;
2141
2142        let mut preview_tables = tables.clone();
2143        preview_tables.push((inner_alias.clone(), inner_schema));
2144        let combined_cols = build_joined_columns(&preview_tables);
2145
2146        let mut new_rows = Vec::new();
2147
2148        match join.join_type {
2149            JoinType::Inner | JoinType::Cross => {
2150                for outer in &outer_rows {
2151                    for inner in &inner_rows {
2152                        let combined: Vec<Value> =
2153                            outer.iter().chain(inner.iter()).cloned().collect();
2154                        if let Some(ref on_expr) = join.on_clause {
2155                            match eval_expr(on_expr, &combined_cols, &combined) {
2156                                Ok(val) if is_truthy(&val) => new_rows.push(combined),
2157                                _ => {}
2158                            }
2159                        } else {
2160                            new_rows.push(combined);
2161                        }
2162                    }
2163                }
2164            }
2165            JoinType::Left => {
2166                let inner_col_count = inner_schema.columns.len();
2167                for outer in &outer_rows {
2168                    let mut matched = false;
2169                    for inner in &inner_rows {
2170                        let combined: Vec<Value> =
2171                            outer.iter().chain(inner.iter()).cloned().collect();
2172                        if let Some(ref on_expr) = join.on_clause {
2173                            match eval_expr(on_expr, &combined_cols, &combined) {
2174                                Ok(val) if is_truthy(&val) => {
2175                                    new_rows.push(combined);
2176                                    matched = true;
2177                                }
2178                                _ => {}
2179                            }
2180                        } else {
2181                            new_rows.push(combined);
2182                            matched = true;
2183                        }
2184                    }
2185                    if !matched {
2186                        let mut padded = outer.clone();
2187                        padded.extend(std::iter::repeat(Value::Null).take(inner_col_count));
2188                        new_rows.push(padded);
2189                    }
2190                }
2191            }
2192            JoinType::Right => {
2193                let outer_col_count = if outer_rows.is_empty() {
2194                    tables.iter().map(|(_, s)| s.columns.len()).sum()
2195                } else {
2196                    outer_rows[0].len()
2197                };
2198                let mut inner_matched = vec![false; inner_rows.len()];
2199                for outer in &outer_rows {
2200                    for (j, inner) in inner_rows.iter().enumerate() {
2201                        let combined: Vec<Value> =
2202                            outer.iter().chain(inner.iter()).cloned().collect();
2203                        if let Some(ref on_expr) = join.on_clause {
2204                            match eval_expr(on_expr, &combined_cols, &combined) {
2205                                Ok(val) if is_truthy(&val) => {
2206                                    new_rows.push(combined);
2207                                    inner_matched[j] = true;
2208                                }
2209                                _ => {}
2210                            }
2211                        } else {
2212                            new_rows.push(combined);
2213                            inner_matched[j] = true;
2214                        }
2215                    }
2216                }
2217                for (j, inner) in inner_rows.iter().enumerate() {
2218                    if !inner_matched[j] {
2219                        let mut padded: Vec<Value> = std::iter::repeat(Value::Null)
2220                            .take(outer_col_count)
2221                            .collect();
2222                        padded.extend(inner.iter().cloned());
2223                        new_rows.push(padded);
2224                    }
2225                }
2226            }
2227        }
2228
2229        tables.push((inner_alias, inner_schema));
2230        outer_rows = new_rows;
2231    }
2232
2233    let joined_cols = build_joined_columns(&tables);
2234    process_select(&joined_cols, outer_rows, stmt)
2235}
2236
2237fn exec_update(
2238    db: &Database,
2239    schema: &SchemaManager,
2240    stmt: &UpdateStmt,
2241) -> Result<ExecutionResult> {
2242    let materialized;
2243    let stmt = if update_has_subquery(stmt) {
2244        materialized = materialize_update(stmt, &mut |sub| exec_subquery_read(db, schema, sub))?;
2245        &materialized
2246    } else {
2247        stmt
2248    };
2249
2250    let lower_name = stmt.table.to_ascii_lowercase();
2251    let table_schema = schema
2252        .get(&lower_name)
2253        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
2254
2255    let all_candidates = collect_keyed_rows_read(db, table_schema, &stmt.where_clause)?;
2256    let matching_rows: Vec<(Vec<u8>, Vec<Value>)> = all_candidates
2257        .into_iter()
2258        .filter(|(_, row)| match &stmt.where_clause {
2259            Some(where_expr) => match eval_expr(where_expr, &table_schema.columns, row) {
2260                Ok(val) => is_truthy(&val),
2261                Err(_) => false,
2262            },
2263            None => true,
2264        })
2265        .collect();
2266
2267    if matching_rows.is_empty() {
2268        return Ok(ExecutionResult::RowsAffected(0));
2269    }
2270
2271    struct UpdateChange {
2272        old_key: Vec<u8>,
2273        new_key: Vec<u8>,
2274        new_value: Vec<u8>,
2275        pk_changed: bool,
2276        old_row: Vec<Value>,
2277        new_row: Vec<Value>,
2278    }
2279
2280    let pk_indices = table_schema.pk_indices();
2281    let mut changes: Vec<UpdateChange> = Vec::new();
2282
2283    for (old_key, row) in &matching_rows {
2284        let mut new_row = row.clone();
2285        let mut pk_changed = false;
2286        for (col_name, expr) in &stmt.assignments {
2287            let col_idx = table_schema
2288                .column_index(col_name)
2289                .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))?;
2290            let new_val = eval_expr(expr, &table_schema.columns, &new_row)?;
2291            let col = &table_schema.columns[col_idx];
2292
2293            let coerced = if new_val.is_null() {
2294                if !col.nullable {
2295                    return Err(SqlError::NotNullViolation(col.name.clone()));
2296                }
2297                Value::Null
2298            } else {
2299                new_val
2300                    .coerce_to(col.data_type)
2301                    .ok_or_else(|| SqlError::TypeMismatch {
2302                        expected: col.data_type.to_string(),
2303                        got: new_val.data_type().to_string(),
2304                    })?
2305            };
2306
2307            if table_schema.primary_key_columns.contains(&(col_idx as u16)) {
2308                pk_changed = true;
2309            }
2310            new_row[col_idx] = coerced;
2311        }
2312
2313        let pk_values: Vec<Value> = pk_indices.iter().map(|&i| new_row[i].clone()).collect();
2314        let new_key = encode_composite_key(&pk_values);
2315
2316        let non_pk = table_schema.non_pk_indices();
2317        let value_values: Vec<Value> = non_pk.iter().map(|&i| new_row[i].clone()).collect();
2318        let new_value = encode_row(&value_values);
2319
2320        changes.push(UpdateChange {
2321            old_key: old_key.clone(),
2322            new_key,
2323            new_value,
2324            pk_changed,
2325            old_row: row.clone(),
2326            new_row,
2327        });
2328    }
2329
2330    {
2331        use std::collections::HashSet;
2332        let mut new_keys: HashSet<Vec<u8>> = HashSet::new();
2333        for c in &changes {
2334            if c.pk_changed && c.new_key != c.old_key && !new_keys.insert(c.new_key.clone()) {
2335                return Err(SqlError::DuplicateKey);
2336            }
2337        }
2338    }
2339
2340    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
2341
2342    for c in &changes {
2343        let old_pk: Vec<Value> = pk_indices.iter().map(|&i| c.old_row[i].clone()).collect();
2344
2345        for idx in &table_schema.indices {
2346            if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
2347                let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
2348                let old_idx_key = encode_index_key(idx, &c.old_row, &old_pk);
2349                wtx.table_delete(&idx_table, &old_idx_key)
2350                    .map_err(SqlError::Storage)?;
2351            }
2352        }
2353
2354        if c.pk_changed {
2355            wtx.table_delete(lower_name.as_bytes(), &c.old_key)
2356                .map_err(SqlError::Storage)?;
2357        }
2358    }
2359
2360    for c in &changes {
2361        let new_pk: Vec<Value> = pk_indices.iter().map(|&i| c.new_row[i].clone()).collect();
2362
2363        if c.pk_changed {
2364            let is_new = wtx
2365                .table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
2366                .map_err(SqlError::Storage)?;
2367            if !is_new {
2368                return Err(SqlError::DuplicateKey);
2369            }
2370        } else {
2371            wtx.table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
2372                .map_err(SqlError::Storage)?;
2373        }
2374
2375        for idx in &table_schema.indices {
2376            if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
2377                let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
2378                let new_idx_key = encode_index_key(idx, &c.new_row, &new_pk);
2379                let new_idx_val = encode_index_value(idx, &c.new_row, &new_pk);
2380                let is_new = wtx
2381                    .table_insert(&idx_table, &new_idx_key, &new_idx_val)
2382                    .map_err(SqlError::Storage)?;
2383                if idx.unique && !is_new {
2384                    let indexed_values: Vec<Value> = idx
2385                        .columns
2386                        .iter()
2387                        .map(|&col_idx| c.new_row[col_idx as usize].clone())
2388                        .collect();
2389                    let any_null = indexed_values.iter().any(|v| v.is_null());
2390                    if !any_null {
2391                        return Err(SqlError::UniqueViolation(idx.name.clone()));
2392                    }
2393                }
2394            }
2395        }
2396    }
2397
2398    let count = changes.len() as u64;
2399    wtx.commit().map_err(SqlError::Storage)?;
2400    Ok(ExecutionResult::RowsAffected(count))
2401}
2402
2403fn exec_delete(
2404    db: &Database,
2405    schema: &SchemaManager,
2406    stmt: &DeleteStmt,
2407) -> Result<ExecutionResult> {
2408    let materialized;
2409    let stmt = if delete_has_subquery(stmt) {
2410        materialized = materialize_delete(stmt, &mut |sub| exec_subquery_read(db, schema, sub))?;
2411        &materialized
2412    } else {
2413        stmt
2414    };
2415
2416    let lower_name = stmt.table.to_ascii_lowercase();
2417    let table_schema = schema
2418        .get(&lower_name)
2419        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
2420
2421    let all_candidates = collect_keyed_rows_read(db, table_schema, &stmt.where_clause)?;
2422    let rows_to_delete: Vec<(Vec<u8>, Vec<Value>)> = all_candidates
2423        .into_iter()
2424        .filter(|(_, row)| match &stmt.where_clause {
2425            Some(where_expr) => match eval_expr(where_expr, &table_schema.columns, row) {
2426                Ok(val) => is_truthy(&val),
2427                Err(_) => false,
2428            },
2429            None => true,
2430        })
2431        .collect();
2432
2433    if rows_to_delete.is_empty() {
2434        return Ok(ExecutionResult::RowsAffected(0));
2435    }
2436
2437    let pk_indices = table_schema.pk_indices();
2438    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
2439    for (key, row) in &rows_to_delete {
2440        let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
2441        delete_index_entries(&mut wtx, table_schema, row, &pk_values)?;
2442        wtx.table_delete(lower_name.as_bytes(), key)
2443            .map_err(SqlError::Storage)?;
2444    }
2445    let count = rows_to_delete.len() as u64;
2446    wtx.commit().map_err(SqlError::Storage)?;
2447    Ok(ExecutionResult::RowsAffected(count))
2448}
2449
2450// ── DML (in-transaction) ────────────────────────────────────────────
2451
2452fn exec_insert_in_txn(
2453    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2454    schema: &SchemaManager,
2455    stmt: &InsertStmt,
2456) -> Result<ExecutionResult> {
2457    let materialized;
2458    let stmt = if insert_has_subquery(stmt) {
2459        materialized = materialize_insert(stmt, &mut |sub| exec_subquery_write(wtx, schema, sub))?;
2460        &materialized
2461    } else {
2462        stmt
2463    };
2464
2465    let lower_name = stmt.table.to_ascii_lowercase();
2466    let table_schema = schema
2467        .get(&lower_name)
2468        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
2469
2470    let insert_columns = if stmt.columns.is_empty() {
2471        table_schema
2472            .columns
2473            .iter()
2474            .map(|c| c.name.clone())
2475            .collect::<Vec<_>>()
2476    } else {
2477        stmt.columns
2478            .iter()
2479            .map(|c| c.to_ascii_lowercase())
2480            .collect()
2481    };
2482
2483    let col_indices: Vec<usize> = insert_columns
2484        .iter()
2485        .map(|name| {
2486            table_schema
2487                .column_index(name)
2488                .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
2489        })
2490        .collect::<Result<_>>()?;
2491
2492    let mut count: u64 = 0;
2493
2494    for value_row in &stmt.values {
2495        if value_row.len() != insert_columns.len() {
2496            return Err(SqlError::InvalidValue(format!(
2497                "expected {} values, got {}",
2498                insert_columns.len(),
2499                value_row.len()
2500            )));
2501        }
2502
2503        let mut row = vec![Value::Null; table_schema.columns.len()];
2504        for (i, expr) in value_row.iter().enumerate() {
2505            let val = eval_const_expr(expr)?;
2506            let col_idx = col_indices[i];
2507            let col = &table_schema.columns[col_idx];
2508
2509            let coerced = if val.is_null() {
2510                Value::Null
2511            } else {
2512                val.coerce_to(col.data_type)
2513                    .ok_or_else(|| SqlError::TypeMismatch {
2514                        expected: col.data_type.to_string(),
2515                        got: val.data_type().to_string(),
2516                    })?
2517            };
2518
2519            row[col_idx] = coerced;
2520        }
2521
2522        for col in &table_schema.columns {
2523            if !col.nullable && row[col.position as usize].is_null() {
2524                return Err(SqlError::NotNullViolation(col.name.clone()));
2525            }
2526        }
2527
2528        let pk_values: Vec<Value> = table_schema
2529            .pk_indices()
2530            .iter()
2531            .map(|&i| row[i].clone())
2532            .collect();
2533        let key = encode_composite_key(&pk_values);
2534
2535        let non_pk = table_schema.non_pk_indices();
2536        let value_values: Vec<Value> = non_pk.iter().map(|&i| row[i].clone()).collect();
2537        let value = encode_row(&value_values);
2538
2539        if key.len() > citadel_core::MAX_KEY_SIZE {
2540            return Err(SqlError::KeyTooLarge {
2541                size: key.len(),
2542                max: citadel_core::MAX_KEY_SIZE,
2543            });
2544        }
2545        if value.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
2546            return Err(SqlError::RowTooLarge {
2547                size: value.len(),
2548                max: citadel_core::MAX_INLINE_VALUE_SIZE,
2549            });
2550        }
2551
2552        let is_new = wtx
2553            .table_insert(lower_name.as_bytes(), &key, &value)
2554            .map_err(SqlError::Storage)?;
2555        if !is_new {
2556            return Err(SqlError::DuplicateKey);
2557        }
2558
2559        insert_index_entries(wtx, table_schema, &row, &pk_values)?;
2560        count += 1;
2561    }
2562
2563    Ok(ExecutionResult::RowsAffected(count))
2564}
2565
2566fn exec_select_in_txn(
2567    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2568    schema: &SchemaManager,
2569    stmt: &SelectStmt,
2570) -> Result<ExecutionResult> {
2571    let materialized;
2572    let stmt = if stmt_has_subquery(stmt) {
2573        materialized = materialize_stmt(stmt, &mut |sub| exec_subquery_write(wtx, schema, sub))?;
2574        &materialized
2575    } else {
2576        stmt
2577    };
2578
2579    if stmt.from.is_empty() {
2580        return exec_select_no_from(stmt);
2581    }
2582
2583    if !stmt.joins.is_empty() {
2584        return exec_select_join_in_txn(wtx, schema, stmt);
2585    }
2586
2587    let lower_name = stmt.from.to_ascii_lowercase();
2588    let table_schema = schema
2589        .get(&lower_name)
2590        .ok_or_else(|| SqlError::TableNotFound(stmt.from.clone()))?;
2591
2592    let rows = collect_rows_write(wtx, table_schema, &stmt.where_clause)?;
2593    process_select(&table_schema.columns, rows, stmt)
2594}
2595
2596fn exec_update_in_txn(
2597    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2598    schema: &SchemaManager,
2599    stmt: &UpdateStmt,
2600) -> Result<ExecutionResult> {
2601    let materialized;
2602    let stmt = if update_has_subquery(stmt) {
2603        materialized = materialize_update(stmt, &mut |sub| exec_subquery_write(wtx, schema, sub))?;
2604        &materialized
2605    } else {
2606        stmt
2607    };
2608
2609    let lower_name = stmt.table.to_ascii_lowercase();
2610    let table_schema = schema
2611        .get(&lower_name)
2612        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
2613
2614    let all_candidates = collect_keyed_rows_write(wtx, table_schema, &stmt.where_clause)?;
2615    let matching_rows: Vec<(Vec<u8>, Vec<Value>)> = all_candidates
2616        .into_iter()
2617        .filter(|(_, row)| match &stmt.where_clause {
2618            Some(where_expr) => match eval_expr(where_expr, &table_schema.columns, row) {
2619                Ok(val) => is_truthy(&val),
2620                Err(_) => false,
2621            },
2622            None => true,
2623        })
2624        .collect();
2625
2626    if matching_rows.is_empty() {
2627        return Ok(ExecutionResult::RowsAffected(0));
2628    }
2629
2630    struct UpdateChange {
2631        old_key: Vec<u8>,
2632        new_key: Vec<u8>,
2633        new_value: Vec<u8>,
2634        pk_changed: bool,
2635        old_row: Vec<Value>,
2636        new_row: Vec<Value>,
2637    }
2638
2639    let pk_indices = table_schema.pk_indices();
2640    let mut changes: Vec<UpdateChange> = Vec::new();
2641
2642    for (old_key, row) in &matching_rows {
2643        let mut new_row = row.clone();
2644        let mut pk_changed = false;
2645        for (col_name, expr) in &stmt.assignments {
2646            let col_idx = table_schema
2647                .column_index(col_name)
2648                .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))?;
2649            let new_val = eval_expr(expr, &table_schema.columns, &new_row)?;
2650            let col = &table_schema.columns[col_idx];
2651
2652            let coerced = if new_val.is_null() {
2653                if !col.nullable {
2654                    return Err(SqlError::NotNullViolation(col.name.clone()));
2655                }
2656                Value::Null
2657            } else {
2658                new_val
2659                    .coerce_to(col.data_type)
2660                    .ok_or_else(|| SqlError::TypeMismatch {
2661                        expected: col.data_type.to_string(),
2662                        got: new_val.data_type().to_string(),
2663                    })?
2664            };
2665
2666            if table_schema.primary_key_columns.contains(&(col_idx as u16)) {
2667                pk_changed = true;
2668            }
2669            new_row[col_idx] = coerced;
2670        }
2671
2672        let pk_values: Vec<Value> = pk_indices.iter().map(|&i| new_row[i].clone()).collect();
2673        let new_key = encode_composite_key(&pk_values);
2674
2675        let non_pk = table_schema.non_pk_indices();
2676        let value_values: Vec<Value> = non_pk.iter().map(|&i| new_row[i].clone()).collect();
2677        let new_value = encode_row(&value_values);
2678
2679        changes.push(UpdateChange {
2680            old_key: old_key.clone(),
2681            new_key,
2682            new_value,
2683            pk_changed,
2684            old_row: row.clone(),
2685            new_row,
2686        });
2687    }
2688
2689    {
2690        use std::collections::HashSet;
2691        let mut new_keys: HashSet<Vec<u8>> = HashSet::new();
2692        for c in &changes {
2693            if c.pk_changed && c.new_key != c.old_key && !new_keys.insert(c.new_key.clone()) {
2694                return Err(SqlError::DuplicateKey);
2695            }
2696        }
2697    }
2698
2699    for c in &changes {
2700        let old_pk: Vec<Value> = pk_indices.iter().map(|&i| c.old_row[i].clone()).collect();
2701
2702        for idx in &table_schema.indices {
2703            if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
2704                let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
2705                let old_idx_key = encode_index_key(idx, &c.old_row, &old_pk);
2706                wtx.table_delete(&idx_table, &old_idx_key)
2707                    .map_err(SqlError::Storage)?;
2708            }
2709        }
2710
2711        if c.pk_changed {
2712            wtx.table_delete(lower_name.as_bytes(), &c.old_key)
2713                .map_err(SqlError::Storage)?;
2714        }
2715    }
2716
2717    for c in &changes {
2718        let new_pk: Vec<Value> = pk_indices.iter().map(|&i| c.new_row[i].clone()).collect();
2719
2720        if c.pk_changed {
2721            let is_new = wtx
2722                .table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
2723                .map_err(SqlError::Storage)?;
2724            if !is_new {
2725                return Err(SqlError::DuplicateKey);
2726            }
2727        } else {
2728            wtx.table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
2729                .map_err(SqlError::Storage)?;
2730        }
2731
2732        for idx in &table_schema.indices {
2733            if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
2734                let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
2735                let new_idx_key = encode_index_key(idx, &c.new_row, &new_pk);
2736                let new_idx_val = encode_index_value(idx, &c.new_row, &new_pk);
2737                let is_new = wtx
2738                    .table_insert(&idx_table, &new_idx_key, &new_idx_val)
2739                    .map_err(SqlError::Storage)?;
2740                if idx.unique && !is_new {
2741                    let indexed_values: Vec<Value> = idx
2742                        .columns
2743                        .iter()
2744                        .map(|&col_idx| c.new_row[col_idx as usize].clone())
2745                        .collect();
2746                    let any_null = indexed_values.iter().any(|v| v.is_null());
2747                    if !any_null {
2748                        return Err(SqlError::UniqueViolation(idx.name.clone()));
2749                    }
2750                }
2751            }
2752        }
2753    }
2754
2755    let count = changes.len() as u64;
2756    Ok(ExecutionResult::RowsAffected(count))
2757}
2758
2759fn exec_delete_in_txn(
2760    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2761    schema: &SchemaManager,
2762    stmt: &DeleteStmt,
2763) -> Result<ExecutionResult> {
2764    let materialized;
2765    let stmt = if delete_has_subquery(stmt) {
2766        materialized = materialize_delete(stmt, &mut |sub| exec_subquery_write(wtx, schema, sub))?;
2767        &materialized
2768    } else {
2769        stmt
2770    };
2771
2772    let lower_name = stmt.table.to_ascii_lowercase();
2773    let table_schema = schema
2774        .get(&lower_name)
2775        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
2776
2777    let all_candidates = collect_keyed_rows_write(wtx, table_schema, &stmt.where_clause)?;
2778    let rows_to_delete: Vec<(Vec<u8>, Vec<Value>)> = all_candidates
2779        .into_iter()
2780        .filter(|(_, row)| match &stmt.where_clause {
2781            Some(where_expr) => match eval_expr(where_expr, &table_schema.columns, row) {
2782                Ok(val) => is_truthy(&val),
2783                Err(_) => false,
2784            },
2785            None => true,
2786        })
2787        .collect();
2788
2789    if rows_to_delete.is_empty() {
2790        return Ok(ExecutionResult::RowsAffected(0));
2791    }
2792
2793    let pk_indices = table_schema.pk_indices();
2794    for (key, row) in &rows_to_delete {
2795        let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
2796        delete_index_entries(wtx, table_schema, row, &pk_values)?;
2797        wtx.table_delete(lower_name.as_bytes(), key)
2798            .map_err(SqlError::Storage)?;
2799    }
2800    let count = rows_to_delete.len() as u64;
2801    Ok(ExecutionResult::RowsAffected(count))
2802}
2803
2804// ── Aggregation ─────────────────────────────────────────────────────
2805
2806fn exec_aggregate(
2807    columns: &[ColumnDef],
2808    rows: &[Vec<Value>],
2809    stmt: &SelectStmt,
2810) -> Result<ExecutionResult> {
2811    let groups: BTreeMap<Vec<Value>, Vec<&Vec<Value>>> = if stmt.group_by.is_empty() {
2812        let mut m = BTreeMap::new();
2813        m.insert(vec![], rows.iter().collect());
2814        m
2815    } else {
2816        let mut m: BTreeMap<Vec<Value>, Vec<&Vec<Value>>> = BTreeMap::new();
2817        for row in rows {
2818            let group_key: Vec<Value> = stmt
2819                .group_by
2820                .iter()
2821                .map(|expr| eval_expr(expr, columns, row))
2822                .collect::<Result<_>>()?;
2823            m.entry(group_key).or_default().push(row);
2824        }
2825        m
2826    };
2827
2828    let mut result_rows = Vec::new();
2829    let output_cols = build_output_columns(&stmt.columns, columns);
2830
2831    for group_rows in groups.values() {
2832        let mut result_row = Vec::new();
2833
2834        for sel_col in &stmt.columns {
2835            match sel_col {
2836                SelectColumn::AllColumns => {
2837                    return Err(SqlError::Unsupported("SELECT * with GROUP BY".into()));
2838                }
2839                SelectColumn::Expr { expr, .. } => {
2840                    let val = eval_aggregate_expr(expr, columns, group_rows)?;
2841                    result_row.push(val);
2842                }
2843            }
2844        }
2845
2846        if let Some(ref having) = stmt.having {
2847            let passes = match eval_aggregate_expr(having, columns, group_rows) {
2848                Ok(val) => is_truthy(&val),
2849                Err(SqlError::ColumnNotFound(_)) => {
2850                    match eval_expr(having, &output_cols, &result_row) {
2851                        Ok(val) => is_truthy(&val),
2852                        Err(_) => false,
2853                    }
2854                }
2855                Err(e) => return Err(e),
2856            };
2857            if !passes {
2858                continue;
2859            }
2860        }
2861
2862        result_rows.push(result_row);
2863    }
2864
2865    if stmt.distinct {
2866        let mut seen = std::collections::HashSet::new();
2867        result_rows.retain(|row| seen.insert(row.clone()));
2868    }
2869
2870    if !stmt.order_by.is_empty() {
2871        let output_cols = build_output_columns(&stmt.columns, columns);
2872        sort_rows(&mut result_rows, &stmt.order_by, &output_cols)?;
2873    }
2874
2875    if let Some(ref offset_expr) = stmt.offset {
2876        let offset = eval_const_int(offset_expr)? as usize;
2877        if offset < result_rows.len() {
2878            result_rows = result_rows.split_off(offset);
2879        } else {
2880            result_rows.clear();
2881        }
2882    }
2883    if let Some(ref limit_expr) = stmt.limit {
2884        let limit = eval_const_int(limit_expr)? as usize;
2885        result_rows.truncate(limit);
2886    }
2887
2888    let col_names = stmt
2889        .columns
2890        .iter()
2891        .map(|c| match c {
2892            SelectColumn::AllColumns => "*".into(),
2893            SelectColumn::Expr { alias: Some(a), .. } => a.clone(),
2894            SelectColumn::Expr { expr, .. } => expr_display_name(expr),
2895        })
2896        .collect();
2897
2898    Ok(ExecutionResult::Query(QueryResult {
2899        columns: col_names,
2900        rows: result_rows,
2901    }))
2902}
2903
2904fn eval_aggregate_expr(
2905    expr: &Expr,
2906    columns: &[ColumnDef],
2907    group_rows: &[&Vec<Value>],
2908) -> Result<Value> {
2909    match expr {
2910        Expr::CountStar => Ok(Value::Integer(group_rows.len() as i64)),
2911
2912        Expr::Function { name, args } if is_aggregate_function(name, args.len()) => {
2913            let func = name.to_ascii_uppercase();
2914            if args.len() != 1 {
2915                return Err(SqlError::Unsupported(format!(
2916                    "{func} with {} args",
2917                    args.len()
2918                )));
2919            }
2920            let arg = &args[0];
2921            let values: Vec<Value> = group_rows
2922                .iter()
2923                .map(|row| eval_expr(arg, columns, row))
2924                .collect::<Result<_>>()?;
2925
2926            match func.as_str() {
2927                "COUNT" => {
2928                    let count = values.iter().filter(|v| !v.is_null()).count();
2929                    Ok(Value::Integer(count as i64))
2930                }
2931                "SUM" => {
2932                    let mut int_sum: i64 = 0;
2933                    let mut real_sum: f64 = 0.0;
2934                    let mut has_real = false;
2935                    let mut all_null = true;
2936                    for v in &values {
2937                        match v {
2938                            Value::Integer(i) => {
2939                                int_sum += i;
2940                                all_null = false;
2941                            }
2942                            Value::Real(r) => {
2943                                real_sum += r;
2944                                has_real = true;
2945                                all_null = false;
2946                            }
2947                            Value::Null => {}
2948                            _ => {
2949                                return Err(SqlError::TypeMismatch {
2950                                    expected: "numeric".into(),
2951                                    got: v.data_type().to_string(),
2952                                })
2953                            }
2954                        }
2955                    }
2956                    if all_null {
2957                        return Ok(Value::Null);
2958                    }
2959                    if has_real {
2960                        Ok(Value::Real(real_sum + int_sum as f64))
2961                    } else {
2962                        Ok(Value::Integer(int_sum))
2963                    }
2964                }
2965                "AVG" => {
2966                    let mut sum: f64 = 0.0;
2967                    let mut count: i64 = 0;
2968                    for v in &values {
2969                        match v {
2970                            Value::Integer(i) => {
2971                                sum += *i as f64;
2972                                count += 1;
2973                            }
2974                            Value::Real(r) => {
2975                                sum += r;
2976                                count += 1;
2977                            }
2978                            Value::Null => {}
2979                            _ => {
2980                                return Err(SqlError::TypeMismatch {
2981                                    expected: "numeric".into(),
2982                                    got: v.data_type().to_string(),
2983                                })
2984                            }
2985                        }
2986                    }
2987                    if count == 0 {
2988                        Ok(Value::Null)
2989                    } else {
2990                        Ok(Value::Real(sum / count as f64))
2991                    }
2992                }
2993                "MIN" => {
2994                    let mut min: Option<&Value> = None;
2995                    for v in &values {
2996                        if v.is_null() {
2997                            continue;
2998                        }
2999                        min = Some(match min {
3000                            None => v,
3001                            Some(m) => {
3002                                if v < m {
3003                                    v
3004                                } else {
3005                                    m
3006                                }
3007                            }
3008                        });
3009                    }
3010                    Ok(min.cloned().unwrap_or(Value::Null))
3011                }
3012                "MAX" => {
3013                    let mut max: Option<&Value> = None;
3014                    for v in &values {
3015                        if v.is_null() {
3016                            continue;
3017                        }
3018                        max = Some(match max {
3019                            None => v,
3020                            Some(m) => {
3021                                if v > m {
3022                                    v
3023                                } else {
3024                                    m
3025                                }
3026                            }
3027                        });
3028                    }
3029                    Ok(max.cloned().unwrap_or(Value::Null))
3030                }
3031                _ => Err(SqlError::Unsupported(format!("aggregate function: {func}"))),
3032            }
3033        }
3034
3035        Expr::Column(_) | Expr::QualifiedColumn { .. } => {
3036            if let Some(first) = group_rows.first() {
3037                eval_expr(expr, columns, first)
3038            } else {
3039                Ok(Value::Null)
3040            }
3041        }
3042
3043        Expr::Literal(v) => Ok(v.clone()),
3044
3045        Expr::BinaryOp { left, op, right } => {
3046            let l = eval_aggregate_expr(left, columns, group_rows)?;
3047            let r = eval_aggregate_expr(right, columns, group_rows)?;
3048            crate::eval::eval_expr(
3049                &Expr::BinaryOp {
3050                    left: Box::new(Expr::Literal(l)),
3051                    op: *op,
3052                    right: Box::new(Expr::Literal(r)),
3053                },
3054                columns,
3055                &[],
3056            )
3057        }
3058
3059        Expr::UnaryOp { op, expr: e } => {
3060            let v = eval_aggregate_expr(e, columns, group_rows)?;
3061            crate::eval::eval_expr(
3062                &Expr::UnaryOp {
3063                    op: *op,
3064                    expr: Box::new(Expr::Literal(v)),
3065                },
3066                columns,
3067                &[],
3068            )
3069        }
3070
3071        Expr::IsNull(e) => {
3072            let v = eval_aggregate_expr(e, columns, group_rows)?;
3073            Ok(Value::Boolean(v.is_null()))
3074        }
3075
3076        Expr::IsNotNull(e) => {
3077            let v = eval_aggregate_expr(e, columns, group_rows)?;
3078            Ok(Value::Boolean(!v.is_null()))
3079        }
3080
3081        Expr::Cast { expr: e, data_type } => {
3082            let v = eval_aggregate_expr(e, columns, group_rows)?;
3083            crate::eval::eval_expr(
3084                &Expr::Cast {
3085                    expr: Box::new(Expr::Literal(v)),
3086                    data_type: *data_type,
3087                },
3088                columns,
3089                &[],
3090            )
3091        }
3092
3093        Expr::Case {
3094            operand,
3095            conditions,
3096            else_result,
3097        } => {
3098            let op_val = operand
3099                .as_ref()
3100                .map(|e| eval_aggregate_expr(e, columns, group_rows))
3101                .transpose()?;
3102            if let Some(ov) = &op_val {
3103                for (cond, result) in conditions {
3104                    let cv = eval_aggregate_expr(cond, columns, group_rows)?;
3105                    if !ov.is_null() && !cv.is_null() && *ov == cv {
3106                        return eval_aggregate_expr(result, columns, group_rows);
3107                    }
3108                }
3109            } else {
3110                for (cond, result) in conditions {
3111                    let cv = eval_aggregate_expr(cond, columns, group_rows)?;
3112                    if crate::eval::is_truthy(&cv) {
3113                        return eval_aggregate_expr(result, columns, group_rows);
3114                    }
3115                }
3116            }
3117            match else_result {
3118                Some(e) => eval_aggregate_expr(e, columns, group_rows),
3119                None => Ok(Value::Null),
3120            }
3121        }
3122
3123        Expr::Coalesce(args) => {
3124            for arg in args {
3125                let v = eval_aggregate_expr(arg, columns, group_rows)?;
3126                if !v.is_null() {
3127                    return Ok(v);
3128                }
3129            }
3130            Ok(Value::Null)
3131        }
3132
3133        Expr::Between {
3134            expr: e,
3135            low,
3136            high,
3137            negated,
3138        } => {
3139            let v = eval_aggregate_expr(e, columns, group_rows)?;
3140            let lo = eval_aggregate_expr(low, columns, group_rows)?;
3141            let hi = eval_aggregate_expr(high, columns, group_rows)?;
3142            crate::eval::eval_expr(
3143                &Expr::Between {
3144                    expr: Box::new(Expr::Literal(v)),
3145                    low: Box::new(Expr::Literal(lo)),
3146                    high: Box::new(Expr::Literal(hi)),
3147                    negated: *negated,
3148                },
3149                columns,
3150                &[],
3151            )
3152        }
3153
3154        Expr::Like {
3155            expr: e,
3156            pattern,
3157            escape,
3158            negated,
3159        } => {
3160            let v = eval_aggregate_expr(e, columns, group_rows)?;
3161            let p = eval_aggregate_expr(pattern, columns, group_rows)?;
3162            let esc = escape
3163                .as_ref()
3164                .map(|es| eval_aggregate_expr(es, columns, group_rows))
3165                .transpose()?;
3166            let esc_box = esc.map(|v| Box::new(Expr::Literal(v)));
3167            crate::eval::eval_expr(
3168                &Expr::Like {
3169                    expr: Box::new(Expr::Literal(v)),
3170                    pattern: Box::new(Expr::Literal(p)),
3171                    escape: esc_box,
3172                    negated: *negated,
3173                },
3174                columns,
3175                &[],
3176            )
3177        }
3178
3179        Expr::Function { name, args } => {
3180            let evaluated: Vec<Value> = args
3181                .iter()
3182                .map(|a| eval_aggregate_expr(a, columns, group_rows))
3183                .collect::<Result<_>>()?;
3184            let literal_args: Vec<Expr> = evaluated.into_iter().map(Expr::Literal).collect();
3185            crate::eval::eval_expr(
3186                &Expr::Function {
3187                    name: name.clone(),
3188                    args: literal_args,
3189                },
3190                columns,
3191                &[],
3192            )
3193        }
3194
3195        _ => Err(SqlError::Unsupported(format!(
3196            "expression in aggregate: {expr:?}"
3197        ))),
3198    }
3199}
3200
3201fn is_aggregate_function(name: &str, arg_count: usize) -> bool {
3202    let u = name.to_ascii_uppercase();
3203    matches!(u.as_str(), "COUNT" | "SUM" | "AVG")
3204        || (matches!(u.as_str(), "MIN" | "MAX") && arg_count == 1)
3205}
3206
3207fn is_aggregate_expr(expr: &Expr) -> bool {
3208    match expr {
3209        Expr::CountStar => true,
3210        Expr::Function { name, args } => {
3211            is_aggregate_function(name, args.len()) || args.iter().any(is_aggregate_expr)
3212        }
3213        Expr::BinaryOp { left, right, .. } => is_aggregate_expr(left) || is_aggregate_expr(right),
3214        Expr::UnaryOp { expr, .. }
3215        | Expr::IsNull(expr)
3216        | Expr::IsNotNull(expr)
3217        | Expr::Cast { expr, .. } => is_aggregate_expr(expr),
3218        Expr::Case {
3219            operand,
3220            conditions,
3221            else_result,
3222        } => {
3223            operand.as_ref().is_some_and(|e| is_aggregate_expr(e))
3224                || conditions
3225                    .iter()
3226                    .any(|(c, r)| is_aggregate_expr(c) || is_aggregate_expr(r))
3227                || else_result.as_ref().is_some_and(|e| is_aggregate_expr(e))
3228        }
3229        Expr::Coalesce(args) => args.iter().any(is_aggregate_expr),
3230        Expr::Between {
3231            expr, low, high, ..
3232        } => is_aggregate_expr(expr) || is_aggregate_expr(low) || is_aggregate_expr(high),
3233        Expr::Like {
3234            expr,
3235            pattern,
3236            escape,
3237            ..
3238        } => {
3239            is_aggregate_expr(expr)
3240                || is_aggregate_expr(pattern)
3241                || escape.as_ref().is_some_and(|e| is_aggregate_expr(e))
3242        }
3243        _ => false,
3244    }
3245}
3246
3247// ── Helpers ─────────────────────────────────────────────────────────
3248
3249/// Decode a full row from B+ tree key + value.
3250fn decode_full_row(schema: &TableSchema, key: &[u8], value: &[u8]) -> Result<Vec<Value>> {
3251    let pk_values = decode_composite_key(key, schema.primary_key_columns.len())?;
3252    let non_pk_values = decode_row(value)?;
3253
3254    let mut row = vec![Value::Null; schema.columns.len()];
3255
3256    for (i, &col_idx) in schema.primary_key_columns.iter().enumerate() {
3257        row[col_idx as usize] = pk_values[i].clone();
3258    }
3259
3260    let non_pk = schema.non_pk_indices();
3261    for (i, &col_idx) in non_pk.iter().enumerate() {
3262        if i < non_pk_values.len() {
3263            row[col_idx] = non_pk_values[i].clone();
3264        }
3265    }
3266
3267    Ok(row)
3268}
3269
3270/// Evaluate a constant expression (no column references).
3271fn eval_const_expr(expr: &Expr) -> Result<Value> {
3272    eval_expr(expr, &[], &[])
3273}
3274
3275fn eval_const_int(expr: &Expr) -> Result<i64> {
3276    match eval_const_expr(expr)? {
3277        Value::Integer(i) => Ok(i),
3278        other => Err(SqlError::TypeMismatch {
3279            expected: "INTEGER".into(),
3280            got: other.data_type().to_string(),
3281        }),
3282    }
3283}
3284
3285fn sort_rows(
3286    rows: &mut [Vec<Value>],
3287    order_by: &[OrderByItem],
3288    columns: &[ColumnDef],
3289) -> Result<()> {
3290    rows.sort_by(|a, b| {
3291        for item in order_by {
3292            let a_val = eval_expr(&item.expr, columns, a).unwrap_or(Value::Null);
3293            let b_val = eval_expr(&item.expr, columns, b).unwrap_or(Value::Null);
3294
3295            let nulls_first = item.nulls_first.unwrap_or(!item.descending);
3296
3297            let ord = match (a_val.is_null(), b_val.is_null()) {
3298                (true, true) => std::cmp::Ordering::Equal,
3299                (true, false) => {
3300                    if nulls_first {
3301                        std::cmp::Ordering::Less
3302                    } else {
3303                        std::cmp::Ordering::Greater
3304                    }
3305                }
3306                (false, true) => {
3307                    if nulls_first {
3308                        std::cmp::Ordering::Greater
3309                    } else {
3310                        std::cmp::Ordering::Less
3311                    }
3312                }
3313                (false, false) => {
3314                    let cmp = a_val.cmp(&b_val);
3315                    if item.descending {
3316                        cmp.reverse()
3317                    } else {
3318                        cmp
3319                    }
3320                }
3321            };
3322
3323            if ord != std::cmp::Ordering::Equal {
3324                return ord;
3325            }
3326        }
3327        std::cmp::Ordering::Equal
3328    });
3329    Ok(())
3330}
3331
3332fn project_rows(
3333    columns: &[ColumnDef],
3334    select_cols: &[SelectColumn],
3335    rows: &[Vec<Value>],
3336) -> Result<(Vec<String>, Vec<Vec<Value>>)> {
3337    let mut col_names = Vec::new();
3338    type Projector = Box<dyn Fn(&[Value]) -> Result<Value>>;
3339    let mut projectors: Vec<Projector> = Vec::new();
3340
3341    for sel_col in select_cols {
3342        match sel_col {
3343            SelectColumn::AllColumns => {
3344                for col in columns {
3345                    let idx = col.position as usize;
3346                    col_names.push(col.name.clone());
3347                    projectors.push(Box::new(move |row: &[Value]| Ok(row[idx].clone())));
3348                }
3349            }
3350            SelectColumn::Expr { expr, alias } => {
3351                let name = alias.clone().unwrap_or_else(|| expr_display_name(expr));
3352                col_names.push(name);
3353                let expr = expr.clone();
3354                let owned_cols = columns.to_vec();
3355                projectors.push(Box::new(move |row: &[Value]| {
3356                    eval_expr(&expr, &owned_cols, row)
3357                }));
3358            }
3359        }
3360    }
3361
3362    let projected = rows
3363        .iter()
3364        .map(|row| {
3365            projectors
3366                .iter()
3367                .map(|p| p(row))
3368                .collect::<Result<Vec<_>>>()
3369        })
3370        .collect::<Result<Vec<_>>>()?;
3371
3372    Ok((col_names, projected))
3373}
3374
3375fn expr_display_name(expr: &Expr) -> String {
3376    match expr {
3377        Expr::Column(name) => name.clone(),
3378        Expr::QualifiedColumn { table, column } => format!("{table}.{column}"),
3379        Expr::Literal(v) => format!("{v}"),
3380        Expr::CountStar => "COUNT(*)".into(),
3381        Expr::Function { name, args } => {
3382            let arg_strs: Vec<String> = args.iter().map(expr_display_name).collect();
3383            format!("{name}({})", arg_strs.join(", "))
3384        }
3385        Expr::BinaryOp { left, op, right } => {
3386            format!(
3387                "{} {} {}",
3388                expr_display_name(left),
3389                op_symbol(op),
3390                expr_display_name(right)
3391            )
3392        }
3393        _ => "?".into(),
3394    }
3395}
3396
3397fn op_symbol(op: &BinOp) -> &'static str {
3398    match op {
3399        BinOp::Add => "+",
3400        BinOp::Sub => "-",
3401        BinOp::Mul => "*",
3402        BinOp::Div => "/",
3403        BinOp::Mod => "%",
3404        BinOp::Eq => "=",
3405        BinOp::NotEq => "<>",
3406        BinOp::Lt => "<",
3407        BinOp::Gt => ">",
3408        BinOp::LtEq => "<=",
3409        BinOp::GtEq => ">=",
3410        BinOp::And => "AND",
3411        BinOp::Or => "OR",
3412        BinOp::Concat => "||",
3413    }
3414}
3415
3416fn build_output_columns(select_cols: &[SelectColumn], columns: &[ColumnDef]) -> Vec<ColumnDef> {
3417    let mut out = Vec::new();
3418    for (i, col) in select_cols.iter().enumerate() {
3419        let (name, data_type) = match col {
3420            SelectColumn::AllColumns => (format!("col{i}"), DataType::Null),
3421            SelectColumn::Expr {
3422                alias: Some(a),
3423                expr,
3424            } => (a.clone(), infer_expr_type(expr, columns)),
3425            SelectColumn::Expr { expr, .. } => {
3426                (expr_display_name(expr), infer_expr_type(expr, columns))
3427            }
3428        };
3429        out.push(ColumnDef {
3430            name,
3431            data_type,
3432            nullable: true,
3433            position: i as u16,
3434        });
3435    }
3436    out
3437}
3438
3439fn infer_expr_type(expr: &Expr, columns: &[ColumnDef]) -> DataType {
3440    match expr {
3441        Expr::Column(name) => {
3442            let lower = name.to_ascii_lowercase();
3443            columns
3444                .iter()
3445                .find(|c| c.name.to_ascii_lowercase() == lower)
3446                .map(|c| c.data_type)
3447                .unwrap_or(DataType::Null)
3448        }
3449        Expr::QualifiedColumn { table, column } => {
3450            let qualified = format!(
3451                "{}.{}",
3452                table.to_ascii_lowercase(),
3453                column.to_ascii_lowercase()
3454            );
3455            columns
3456                .iter()
3457                .find(|c| c.name.to_ascii_lowercase() == qualified)
3458                .map(|c| c.data_type)
3459                .unwrap_or(DataType::Null)
3460        }
3461        Expr::Literal(v) => v.data_type(),
3462        Expr::CountStar => DataType::Integer,
3463        Expr::Function { name, .. } => match name.to_ascii_uppercase().as_str() {
3464            "COUNT" => DataType::Integer,
3465            "AVG" => DataType::Real,
3466            "SUM" | "MIN" | "MAX" => DataType::Null,
3467            _ => DataType::Null,
3468        },
3469        _ => DataType::Null,
3470    }
3471}