Skip to main content

citadel_sql/
executor.rs

1//! SQL executor: DDL and DML operations.
2
3use std::collections::BTreeMap;
4
5use citadel::Database;
6
7use crate::encoding::{
8    decode_composite_key, decode_key_value, decode_row, encode_composite_key, encode_row,
9};
10use crate::error::{Result, SqlError};
11use crate::eval::{eval_expr, is_truthy};
12use crate::parser::*;
13use crate::planner::{self, ScanPlan};
14use crate::schema::SchemaManager;
15use crate::types::*;
16
17// ── Index helpers ────────────────────────────────────────────────────
18
19fn encode_index_key(idx: &IndexDef, row: &[Value], pk_values: &[Value]) -> Vec<u8> {
20    let indexed_values: Vec<Value> = idx
21        .columns
22        .iter()
23        .map(|&col_idx| row[col_idx as usize].clone())
24        .collect();
25
26    if idx.unique {
27        let any_null = indexed_values.iter().any(|v| v.is_null());
28        if !any_null {
29            return encode_composite_key(&indexed_values);
30        }
31    }
32
33    let mut all_values = indexed_values;
34    all_values.extend_from_slice(pk_values);
35    encode_composite_key(&all_values)
36}
37
38fn encode_index_value(idx: &IndexDef, row: &[Value], pk_values: &[Value]) -> Vec<u8> {
39    if idx.unique {
40        let indexed_values: Vec<Value> = idx
41            .columns
42            .iter()
43            .map(|&col_idx| row[col_idx as usize].clone())
44            .collect();
45        let any_null = indexed_values.iter().any(|v| v.is_null());
46        if !any_null {
47            return encode_composite_key(pk_values);
48        }
49    }
50    vec![]
51}
52
53fn insert_index_entries(
54    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
55    table_schema: &TableSchema,
56    row: &[Value],
57    pk_values: &[Value],
58) -> Result<()> {
59    for idx in &table_schema.indices {
60        let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
61        let key = encode_index_key(idx, row, pk_values);
62        let value = encode_index_value(idx, row, pk_values);
63
64        let is_new = wtx
65            .table_insert(&idx_table, &key, &value)
66            .map_err(SqlError::Storage)?;
67
68        if idx.unique && !is_new {
69            let indexed_values: Vec<Value> = idx
70                .columns
71                .iter()
72                .map(|&col_idx| row[col_idx as usize].clone())
73                .collect();
74            let any_null = indexed_values.iter().any(|v| v.is_null());
75            if !any_null {
76                return Err(SqlError::UniqueViolation(idx.name.clone()));
77            }
78        }
79    }
80    Ok(())
81}
82
83fn delete_index_entries(
84    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
85    table_schema: &TableSchema,
86    row: &[Value],
87    pk_values: &[Value],
88) -> Result<()> {
89    for idx in &table_schema.indices {
90        let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
91        let key = encode_index_key(idx, row, pk_values);
92        wtx.table_delete(&idx_table, &key)
93            .map_err(SqlError::Storage)?;
94    }
95    Ok(())
96}
97
98fn index_columns_changed(idx: &IndexDef, old_row: &[Value], new_row: &[Value]) -> bool {
99    idx.columns
100        .iter()
101        .any(|&col_idx| old_row[col_idx as usize] != new_row[col_idx as usize])
102}
103
104/// Execute a parsed SQL statement in auto-commit mode.
105pub fn execute(
106    db: &Database,
107    schema: &mut SchemaManager,
108    stmt: &Statement,
109) -> Result<ExecutionResult> {
110    match stmt {
111        Statement::CreateTable(ct) => exec_create_table(db, schema, ct),
112        Statement::DropTable(dt) => exec_drop_table(db, schema, dt),
113        Statement::CreateIndex(ci) => exec_create_index(db, schema, ci),
114        Statement::DropIndex(di) => exec_drop_index(db, schema, di),
115        Statement::Insert(ins) => exec_insert(db, schema, ins),
116        Statement::Select(sel) => exec_select(db, schema, sel),
117        Statement::Update(upd) => exec_update(db, schema, upd),
118        Statement::Delete(del) => exec_delete(db, schema, del),
119        Statement::Explain(inner) => explain(schema, inner),
120        Statement::Begin | Statement::Commit | Statement::Rollback => Err(SqlError::Unsupported(
121            "transaction control in auto-commit mode".into(),
122        )),
123    }
124}
125
126/// Execute a parsed SQL statement within an existing write transaction.
127pub fn execute_in_txn(
128    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
129    schema: &mut SchemaManager,
130    stmt: &Statement,
131) -> Result<ExecutionResult> {
132    match stmt {
133        Statement::CreateTable(ct) => exec_create_table_in_txn(wtx, schema, ct),
134        Statement::DropTable(dt) => exec_drop_table_in_txn(wtx, schema, dt),
135        Statement::CreateIndex(ci) => exec_create_index_in_txn(wtx, schema, ci),
136        Statement::DropIndex(di) => exec_drop_index_in_txn(wtx, schema, di),
137        Statement::Insert(ins) => exec_insert_in_txn(wtx, schema, ins),
138        Statement::Select(sel) => exec_select_in_txn(wtx, schema, sel),
139        Statement::Update(upd) => exec_update_in_txn(wtx, schema, upd),
140        Statement::Delete(del) => exec_delete_in_txn(wtx, schema, del),
141        Statement::Explain(inner) => explain(schema, inner),
142        Statement::Begin | Statement::Commit | Statement::Rollback => {
143            Err(SqlError::Unsupported("nested transaction control".into()))
144        }
145    }
146}
147
148// ── EXPLAIN ──────────────────────────────────────────────────────────
149
150pub fn explain(schema: &SchemaManager, stmt: &Statement) -> Result<ExecutionResult> {
151    let lines = match stmt {
152        Statement::Select(sel) => explain_select(schema, sel)?,
153        Statement::Insert(ins) => {
154            vec![format!("INSERT INTO {}", ins.table.to_ascii_lowercase())]
155        }
156        Statement::Update(upd) => explain_dml(schema, &upd.table, &upd.where_clause, "UPDATE")?,
157        Statement::Delete(del) => {
158            explain_dml(schema, &del.table, &del.where_clause, "DELETE FROM")?
159        }
160        Statement::Explain(_) => {
161            return Err(SqlError::Unsupported("EXPLAIN EXPLAIN".into()));
162        }
163        _ => {
164            return Err(SqlError::Unsupported(
165                "EXPLAIN for this statement type".into(),
166            ));
167        }
168    };
169
170    let rows = lines
171        .into_iter()
172        .map(|line| vec![Value::Text(line)])
173        .collect();
174    Ok(ExecutionResult::Query(QueryResult {
175        columns: vec!["plan".into()],
176        rows,
177    }))
178}
179
180fn explain_dml(
181    schema: &SchemaManager,
182    table: &str,
183    where_clause: &Option<Expr>,
184    verb: &str,
185) -> Result<Vec<String>> {
186    let lower = table.to_ascii_lowercase();
187    let table_schema = schema
188        .get(&lower)
189        .ok_or_else(|| SqlError::TableNotFound(table.to_string()))?;
190    let plan = planner::plan_select(table_schema, where_clause);
191    let scan_line = format_scan_line(&lower, &None, &plan, table_schema);
192    Ok(vec![format!("{verb} {}", scan_line)])
193}
194
195fn explain_select(schema: &SchemaManager, stmt: &SelectStmt) -> Result<Vec<String>> {
196    let mut lines = Vec::new();
197
198    if stmt.from.is_empty() {
199        lines.push("CONSTANT ROW".into());
200        return Ok(lines);
201    }
202
203    let lower_from = stmt.from.to_ascii_lowercase();
204    let from_schema = schema
205        .get(&lower_from)
206        .ok_or_else(|| SqlError::TableNotFound(stmt.from.clone()))?;
207
208    if stmt.joins.is_empty() {
209        let plan = planner::plan_select(from_schema, &stmt.where_clause);
210        lines.push(format_scan_line(
211            &lower_from,
212            &stmt.from_alias,
213            &plan,
214            from_schema,
215        ));
216    } else {
217        let from_plan = planner::plan_select(from_schema, &None);
218        lines.push(format_scan_line(
219            &lower_from,
220            &stmt.from_alias,
221            &from_plan,
222            from_schema,
223        ));
224
225        for join in &stmt.joins {
226            let inner_lower = join.table.name.to_ascii_lowercase();
227            let inner_schema = schema
228                .get(&inner_lower)
229                .ok_or_else(|| SqlError::TableNotFound(join.table.name.clone()))?;
230            let inner_plan = planner::plan_select(inner_schema, &None);
231            lines.push(format_scan_line(
232                &inner_lower,
233                &join.table.alias,
234                &inner_plan,
235                inner_schema,
236            ));
237        }
238
239        let join_type_str = match stmt.joins.last().map(|j| j.join_type) {
240            Some(JoinType::Left) => "LEFT JOIN",
241            Some(JoinType::Right) => "RIGHT JOIN",
242            Some(JoinType::Cross) => "CROSS JOIN",
243            _ => "NESTED LOOP",
244        };
245        lines.push(join_type_str.into());
246    }
247
248    if stmt.where_clause.is_some() && stmt.joins.is_empty() {
249        let plan = planner::plan_select(from_schema, &stmt.where_clause);
250        if matches!(plan, ScanPlan::SeqScan) {
251            lines.push("FILTER".into());
252        }
253    }
254
255    if let Some(ref w) = stmt.where_clause {
256        if !stmt.joins.is_empty() && has_subquery(w) {
257            lines.push("SUBQUERY".into());
258        }
259    }
260
261    explain_subqueries(stmt, &mut lines);
262
263    if !stmt.group_by.is_empty() {
264        lines.push("GROUP BY".into());
265    }
266
267    if stmt.distinct {
268        lines.push("DISTINCT".into());
269    }
270
271    if !stmt.order_by.is_empty() {
272        lines.push("SORT".into());
273    }
274
275    if let Some(ref offset_expr) = stmt.offset {
276        if let Ok(n) = eval_const_int(offset_expr) {
277            lines.push(format!("OFFSET {n}"));
278        } else {
279            lines.push("OFFSET".into());
280        }
281    }
282
283    if let Some(ref limit_expr) = stmt.limit {
284        if let Ok(n) = eval_const_int(limit_expr) {
285            lines.push(format!("LIMIT {n}"));
286        } else {
287            lines.push("LIMIT".into());
288        }
289    }
290
291    Ok(lines)
292}
293
294fn explain_subqueries(stmt: &SelectStmt, lines: &mut Vec<String>) {
295    let mut count = 0;
296    if let Some(ref w) = stmt.where_clause {
297        count += count_subqueries(w);
298    }
299    if let Some(ref h) = stmt.having {
300        count += count_subqueries(h);
301    }
302    for col in &stmt.columns {
303        if let SelectColumn::Expr { expr, .. } = col {
304            count += count_subqueries(expr);
305        }
306    }
307    for _ in 0..count {
308        lines.push("SUBQUERY".into());
309    }
310}
311
312fn count_subqueries(expr: &Expr) -> usize {
313    match expr {
314        Expr::InSubquery { expr: e, .. } => 1 + count_subqueries(e),
315        Expr::ScalarSubquery(_) => 1,
316        Expr::Exists { .. } => 1,
317        Expr::BinaryOp { left, right, .. } => count_subqueries(left) + count_subqueries(right),
318        Expr::UnaryOp { expr: e, .. } => count_subqueries(e),
319        Expr::IsNull(e) | Expr::IsNotNull(e) => count_subqueries(e),
320        Expr::Function { args, .. } => args.iter().map(count_subqueries).sum(),
321        Expr::Between {
322            expr: e, low, high, ..
323        } => count_subqueries(e) + count_subqueries(low) + count_subqueries(high),
324        Expr::Like {
325            expr: e, pattern, ..
326        } => count_subqueries(e) + count_subqueries(pattern),
327        Expr::Case {
328            operand,
329            conditions,
330            else_result,
331        } => {
332            let mut n = 0;
333            if let Some(op) = operand {
334                n += count_subqueries(op);
335            }
336            for (c, r) in conditions {
337                n += count_subqueries(c) + count_subqueries(r);
338            }
339            if let Some(el) = else_result {
340                n += count_subqueries(el);
341            }
342            n
343        }
344        Expr::Coalesce(args) => args.iter().map(count_subqueries).sum(),
345        Expr::Cast { expr: e, .. } => count_subqueries(e),
346        Expr::InList { expr: e, list, .. } => {
347            count_subqueries(e) + list.iter().map(count_subqueries).sum::<usize>()
348        }
349        _ => 0,
350    }
351}
352
353fn format_scan_line(
354    table_name: &str,
355    alias: &Option<String>,
356    plan: &ScanPlan,
357    table_schema: &TableSchema,
358) -> String {
359    let alias_part = match alias {
360        Some(a) if !a.eq_ignore_ascii_case(table_name) => {
361            format!(" AS {}", a.to_ascii_lowercase())
362        }
363        _ => String::new(),
364    };
365
366    let desc = planner::describe_plan(plan, table_schema);
367
368    if desc.is_empty() {
369        format!("SCAN TABLE {table_name}{alias_part}")
370    } else {
371        format!("SEARCH TABLE {table_name}{alias_part} {desc}")
372    }
373}
374
375// ── DDL ─────────────────────────────────────────────────────────────
376
377fn exec_create_table(
378    db: &Database,
379    schema: &mut SchemaManager,
380    stmt: &CreateTableStmt,
381) -> Result<ExecutionResult> {
382    let lower_name = stmt.name.to_ascii_lowercase();
383
384    if schema.contains(&lower_name) {
385        if stmt.if_not_exists {
386            return Ok(ExecutionResult::Ok);
387        }
388        return Err(SqlError::TableAlreadyExists(stmt.name.clone()));
389    }
390
391    if stmt.primary_key.is_empty() {
392        return Err(SqlError::PrimaryKeyRequired);
393    }
394
395    let mut seen = std::collections::HashSet::new();
396    for col in &stmt.columns {
397        let lower = col.name.to_ascii_lowercase();
398        if !seen.insert(lower.clone()) {
399            return Err(SqlError::DuplicateColumn(col.name.clone()));
400        }
401    }
402
403    let columns: Vec<ColumnDef> = stmt
404        .columns
405        .iter()
406        .enumerate()
407        .map(|(i, c)| ColumnDef {
408            name: c.name.to_ascii_lowercase(),
409            data_type: c.data_type,
410            nullable: c.nullable,
411            position: i as u16,
412        })
413        .collect();
414
415    let primary_key_columns: Vec<u16> = stmt
416        .primary_key
417        .iter()
418        .map(|pk_name| {
419            let lower = pk_name.to_ascii_lowercase();
420            columns
421                .iter()
422                .position(|c| c.name == lower)
423                .map(|i| i as u16)
424                .ok_or_else(|| SqlError::ColumnNotFound(pk_name.clone()))
425        })
426        .collect::<Result<_>>()?;
427
428    let table_schema = TableSchema {
429        name: lower_name.clone(),
430        columns,
431        primary_key_columns,
432        indices: vec![],
433    };
434
435    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
436    SchemaManager::ensure_schema_table(&mut wtx)?;
437    wtx.create_table(lower_name.as_bytes())
438        .map_err(SqlError::Storage)?;
439    SchemaManager::save_schema(&mut wtx, &table_schema)?;
440    wtx.commit().map_err(SqlError::Storage)?;
441
442    schema.register(table_schema);
443    Ok(ExecutionResult::Ok)
444}
445
446fn exec_drop_table(
447    db: &Database,
448    schema: &mut SchemaManager,
449    stmt: &DropTableStmt,
450) -> Result<ExecutionResult> {
451    let lower_name = stmt.name.to_ascii_lowercase();
452
453    if !schema.contains(&lower_name) {
454        if stmt.if_exists {
455            return Ok(ExecutionResult::Ok);
456        }
457        return Err(SqlError::TableNotFound(stmt.name.clone()));
458    }
459
460    let table_schema = schema.get(&lower_name).unwrap();
461    let idx_tables: Vec<Vec<u8>> = table_schema
462        .indices
463        .iter()
464        .map(|idx| TableSchema::index_table_name(&lower_name, &idx.name))
465        .collect();
466
467    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
468    for idx_table in &idx_tables {
469        wtx.drop_table(idx_table).map_err(SqlError::Storage)?;
470    }
471    wtx.drop_table(lower_name.as_bytes())
472        .map_err(SqlError::Storage)?;
473    SchemaManager::delete_schema(&mut wtx, &lower_name)?;
474    wtx.commit().map_err(SqlError::Storage)?;
475
476    schema.remove(&lower_name);
477    Ok(ExecutionResult::Ok)
478}
479
480fn exec_create_table_in_txn(
481    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
482    schema: &mut SchemaManager,
483    stmt: &CreateTableStmt,
484) -> Result<ExecutionResult> {
485    let lower_name = stmt.name.to_ascii_lowercase();
486
487    if schema.contains(&lower_name) {
488        if stmt.if_not_exists {
489            return Ok(ExecutionResult::Ok);
490        }
491        return Err(SqlError::TableAlreadyExists(stmt.name.clone()));
492    }
493
494    if stmt.primary_key.is_empty() {
495        return Err(SqlError::PrimaryKeyRequired);
496    }
497
498    let mut seen = std::collections::HashSet::new();
499    for col in &stmt.columns {
500        let lower = col.name.to_ascii_lowercase();
501        if !seen.insert(lower.clone()) {
502            return Err(SqlError::DuplicateColumn(col.name.clone()));
503        }
504    }
505
506    let columns: Vec<ColumnDef> = stmt
507        .columns
508        .iter()
509        .enumerate()
510        .map(|(i, c)| ColumnDef {
511            name: c.name.to_ascii_lowercase(),
512            data_type: c.data_type,
513            nullable: c.nullable,
514            position: i as u16,
515        })
516        .collect();
517
518    let primary_key_columns: Vec<u16> = stmt
519        .primary_key
520        .iter()
521        .map(|pk_name| {
522            let lower = pk_name.to_ascii_lowercase();
523            columns
524                .iter()
525                .position(|c| c.name == lower)
526                .map(|i| i as u16)
527                .ok_or_else(|| SqlError::ColumnNotFound(pk_name.clone()))
528        })
529        .collect::<Result<_>>()?;
530
531    let table_schema = TableSchema {
532        name: lower_name.clone(),
533        columns,
534        primary_key_columns,
535        indices: vec![],
536    };
537
538    SchemaManager::ensure_schema_table(wtx)?;
539    wtx.create_table(lower_name.as_bytes())
540        .map_err(SqlError::Storage)?;
541    SchemaManager::save_schema(wtx, &table_schema)?;
542
543    schema.register(table_schema);
544    Ok(ExecutionResult::Ok)
545}
546
547fn exec_drop_table_in_txn(
548    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
549    schema: &mut SchemaManager,
550    stmt: &DropTableStmt,
551) -> Result<ExecutionResult> {
552    let lower_name = stmt.name.to_ascii_lowercase();
553
554    if !schema.contains(&lower_name) {
555        if stmt.if_exists {
556            return Ok(ExecutionResult::Ok);
557        }
558        return Err(SqlError::TableNotFound(stmt.name.clone()));
559    }
560
561    let table_schema = schema.get(&lower_name).unwrap();
562    let idx_tables: Vec<Vec<u8>> = table_schema
563        .indices
564        .iter()
565        .map(|idx| TableSchema::index_table_name(&lower_name, &idx.name))
566        .collect();
567
568    for idx_table in &idx_tables {
569        wtx.drop_table(idx_table).map_err(SqlError::Storage)?;
570    }
571    wtx.drop_table(lower_name.as_bytes())
572        .map_err(SqlError::Storage)?;
573    SchemaManager::delete_schema(wtx, &lower_name)?;
574
575    schema.remove(&lower_name);
576    Ok(ExecutionResult::Ok)
577}
578
579fn exec_create_index(
580    db: &Database,
581    schema: &mut SchemaManager,
582    stmt: &CreateIndexStmt,
583) -> Result<ExecutionResult> {
584    let lower_table = stmt.table_name.to_ascii_lowercase();
585    let lower_idx = stmt.index_name.to_ascii_lowercase();
586
587    let table_schema = schema
588        .get(&lower_table)
589        .ok_or_else(|| SqlError::TableNotFound(stmt.table_name.clone()))?;
590
591    if table_schema.index_by_name(&lower_idx).is_some() {
592        if stmt.if_not_exists {
593            return Ok(ExecutionResult::Ok);
594        }
595        return Err(SqlError::IndexAlreadyExists(stmt.index_name.clone()));
596    }
597
598    let col_indices: Vec<u16> = stmt
599        .columns
600        .iter()
601        .map(|col_name| {
602            let lower = col_name.to_ascii_lowercase();
603            table_schema
604                .column_index(&lower)
605                .map(|i| i as u16)
606                .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))
607        })
608        .collect::<Result<_>>()?;
609
610    let idx_def = IndexDef {
611        name: lower_idx.clone(),
612        columns: col_indices,
613        unique: stmt.unique,
614    };
615
616    let idx_table = TableSchema::index_table_name(&lower_table, &lower_idx);
617
618    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
619    SchemaManager::ensure_schema_table(&mut wtx)?;
620    wtx.create_table(&idx_table).map_err(SqlError::Storage)?;
621
622    // Populate index from existing rows
623    let pk_indices = table_schema.pk_indices();
624    let mut rows: Vec<Vec<Value>> = Vec::new();
625    {
626        let mut scan_err: Option<SqlError> = None;
627        wtx.table_for_each(lower_table.as_bytes(), |key, value| {
628            match decode_full_row(table_schema, key, value) {
629                Ok(row) => rows.push(row),
630                Err(e) => scan_err = Some(e),
631            }
632            Ok(())
633        })
634        .map_err(SqlError::Storage)?;
635        if let Some(e) = scan_err {
636            return Err(e);
637        }
638    }
639
640    for row in &rows {
641        let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
642        let key = encode_index_key(&idx_def, row, &pk_values);
643        let value = encode_index_value(&idx_def, row, &pk_values);
644        let is_new = wtx
645            .table_insert(&idx_table, &key, &value)
646            .map_err(SqlError::Storage)?;
647        if idx_def.unique && !is_new {
648            let indexed_values: Vec<Value> = idx_def
649                .columns
650                .iter()
651                .map(|&col_idx| row[col_idx as usize].clone())
652                .collect();
653            let any_null = indexed_values.iter().any(|v| v.is_null());
654            if !any_null {
655                return Err(SqlError::UniqueViolation(stmt.index_name.clone()));
656            }
657        }
658    }
659
660    let mut updated_schema = table_schema.clone();
661    updated_schema.indices.push(idx_def);
662    SchemaManager::save_schema(&mut wtx, &updated_schema)?;
663    wtx.commit().map_err(SqlError::Storage)?;
664
665    schema.register(updated_schema);
666    Ok(ExecutionResult::Ok)
667}
668
669fn exec_drop_index(
670    db: &Database,
671    schema: &mut SchemaManager,
672    stmt: &DropIndexStmt,
673) -> Result<ExecutionResult> {
674    let lower_idx = stmt.index_name.to_ascii_lowercase();
675
676    let (table_name, _idx_pos) = match find_index_in_schemas(schema, &lower_idx) {
677        Some(found) => found,
678        None => {
679            if stmt.if_exists {
680                return Ok(ExecutionResult::Ok);
681            }
682            return Err(SqlError::IndexNotFound(stmt.index_name.clone()));
683        }
684    };
685
686    let idx_table = TableSchema::index_table_name(&table_name, &lower_idx);
687
688    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
689    wtx.drop_table(&idx_table).map_err(SqlError::Storage)?;
690
691    let table_schema = schema.get(&table_name).unwrap();
692    let mut updated_schema = table_schema.clone();
693    updated_schema.indices.retain(|i| i.name != lower_idx);
694    SchemaManager::save_schema(&mut wtx, &updated_schema)?;
695    wtx.commit().map_err(SqlError::Storage)?;
696
697    schema.register(updated_schema);
698    Ok(ExecutionResult::Ok)
699}
700
701fn exec_create_index_in_txn(
702    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
703    schema: &mut SchemaManager,
704    stmt: &CreateIndexStmt,
705) -> Result<ExecutionResult> {
706    let lower_table = stmt.table_name.to_ascii_lowercase();
707    let lower_idx = stmt.index_name.to_ascii_lowercase();
708
709    let table_schema = schema
710        .get(&lower_table)
711        .ok_or_else(|| SqlError::TableNotFound(stmt.table_name.clone()))?;
712
713    if table_schema.index_by_name(&lower_idx).is_some() {
714        if stmt.if_not_exists {
715            return Ok(ExecutionResult::Ok);
716        }
717        return Err(SqlError::IndexAlreadyExists(stmt.index_name.clone()));
718    }
719
720    let col_indices: Vec<u16> = stmt
721        .columns
722        .iter()
723        .map(|col_name| {
724            let lower = col_name.to_ascii_lowercase();
725            table_schema
726                .column_index(&lower)
727                .map(|i| i as u16)
728                .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))
729        })
730        .collect::<Result<_>>()?;
731
732    let idx_def = IndexDef {
733        name: lower_idx.clone(),
734        columns: col_indices,
735        unique: stmt.unique,
736    };
737
738    let idx_table = TableSchema::index_table_name(&lower_table, &lower_idx);
739
740    SchemaManager::ensure_schema_table(wtx)?;
741    wtx.create_table(&idx_table).map_err(SqlError::Storage)?;
742
743    let pk_indices = table_schema.pk_indices();
744    let mut rows: Vec<Vec<Value>> = Vec::new();
745    {
746        let mut scan_err: Option<SqlError> = None;
747        wtx.table_for_each(lower_table.as_bytes(), |key, value| {
748            match decode_full_row(table_schema, key, value) {
749                Ok(row) => rows.push(row),
750                Err(e) => scan_err = Some(e),
751            }
752            Ok(())
753        })
754        .map_err(SqlError::Storage)?;
755        if let Some(e) = scan_err {
756            return Err(e);
757        }
758    }
759
760    for row in &rows {
761        let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
762        let key = encode_index_key(&idx_def, row, &pk_values);
763        let value = encode_index_value(&idx_def, row, &pk_values);
764        let is_new = wtx
765            .table_insert(&idx_table, &key, &value)
766            .map_err(SqlError::Storage)?;
767        if idx_def.unique && !is_new {
768            let indexed_values: Vec<Value> = idx_def
769                .columns
770                .iter()
771                .map(|&col_idx| row[col_idx as usize].clone())
772                .collect();
773            let any_null = indexed_values.iter().any(|v| v.is_null());
774            if !any_null {
775                return Err(SqlError::UniqueViolation(stmt.index_name.clone()));
776            }
777        }
778    }
779
780    let mut updated_schema = table_schema.clone();
781    updated_schema.indices.push(idx_def);
782    SchemaManager::save_schema(wtx, &updated_schema)?;
783
784    schema.register(updated_schema);
785    Ok(ExecutionResult::Ok)
786}
787
788fn exec_drop_index_in_txn(
789    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
790    schema: &mut SchemaManager,
791    stmt: &DropIndexStmt,
792) -> Result<ExecutionResult> {
793    let lower_idx = stmt.index_name.to_ascii_lowercase();
794
795    let (table_name, _idx_pos) = match find_index_in_schemas(schema, &lower_idx) {
796        Some(found) => found,
797        None => {
798            if stmt.if_exists {
799                return Ok(ExecutionResult::Ok);
800            }
801            return Err(SqlError::IndexNotFound(stmt.index_name.clone()));
802        }
803    };
804
805    let idx_table = TableSchema::index_table_name(&table_name, &lower_idx);
806    wtx.drop_table(&idx_table).map_err(SqlError::Storage)?;
807
808    let table_schema = schema.get(&table_name).unwrap();
809    let mut updated_schema = table_schema.clone();
810    updated_schema.indices.retain(|i| i.name != lower_idx);
811    SchemaManager::save_schema(wtx, &updated_schema)?;
812
813    schema.register(updated_schema);
814    Ok(ExecutionResult::Ok)
815}
816
817fn find_index_in_schemas(schema: &SchemaManager, index_name: &str) -> Option<(String, usize)> {
818    for table_name in schema.table_names() {
819        if let Some(ts) = schema.get(table_name) {
820            if let Some(pos) = ts.indices.iter().position(|i| i.name == index_name) {
821                return Some((table_name.to_string(), pos));
822            }
823        }
824    }
825    None
826}
827
828// ── Index scan helpers ───────────────────────────────────────────────
829
830fn extract_pk_key(
831    idx_key: &[u8],
832    idx_value: &[u8],
833    is_unique: bool,
834    num_index_cols: usize,
835    num_pk_cols: usize,
836) -> Result<Vec<u8>> {
837    if is_unique && !idx_value.is_empty() {
838        Ok(idx_value.to_vec())
839    } else {
840        let total_cols = num_index_cols + num_pk_cols;
841        let all_values = decode_composite_key(idx_key, total_cols)?;
842        let pk_values = &all_values[num_index_cols..];
843        Ok(encode_composite_key(pk_values))
844    }
845}
846
847fn check_range_conditions(
848    idx_key: &[u8],
849    num_prefix_cols: usize,
850    range_conds: &[(BinOp, Value)],
851    num_index_cols: usize,
852) -> Result<RangeCheck> {
853    if range_conds.is_empty() {
854        return Ok(RangeCheck::Match);
855    }
856
857    let num_to_decode = num_prefix_cols + 1;
858    if num_to_decode > num_index_cols {
859        return Ok(RangeCheck::Match);
860    }
861
862    // Decode just enough columns to check the range column
863    let mut pos = 0;
864    for _ in 0..num_prefix_cols {
865        let (_, n) = decode_key_value(&idx_key[pos..])?;
866        pos += n;
867    }
868    let (range_val, _) = decode_key_value(&idx_key[pos..])?;
869
870    let mut exceeds_upper = false;
871    let mut below_lower = false;
872
873    for (op, val) in range_conds {
874        match op {
875            BinOp::Lt => {
876                if range_val >= *val {
877                    exceeds_upper = true;
878                }
879            }
880            BinOp::LtEq => {
881                if range_val > *val {
882                    exceeds_upper = true;
883                }
884            }
885            BinOp::Gt => {
886                if range_val <= *val {
887                    below_lower = true;
888                }
889            }
890            BinOp::GtEq => {
891                if range_val < *val {
892                    below_lower = true;
893                }
894            }
895            _ => {}
896        }
897    }
898
899    if exceeds_upper {
900        Ok(RangeCheck::ExceedsUpper)
901    } else if below_lower {
902        Ok(RangeCheck::BelowLower)
903    } else {
904        Ok(RangeCheck::Match)
905    }
906}
907
908enum RangeCheck {
909    Match,
910    BelowLower,
911    ExceedsUpper,
912}
913
914/// Collect rows via ReadTxn using the scan plan.
915fn collect_rows_read(
916    db: &Database,
917    table_schema: &TableSchema,
918    where_clause: &Option<Expr>,
919) -> Result<Vec<Vec<Value>>> {
920    let plan = planner::plan_select(table_schema, where_clause);
921    let lower_name = &table_schema.name;
922
923    match plan {
924        ScanPlan::SeqScan => {
925            let mut rows = Vec::new();
926            let mut rtx = db.begin_read();
927            let mut scan_err: Option<SqlError> = None;
928            rtx.table_for_each(lower_name.as_bytes(), |key, value| {
929                match decode_full_row(table_schema, key, value) {
930                    Ok(row) => rows.push(row),
931                    Err(e) => scan_err = Some(e),
932                }
933                Ok(())
934            })
935            .map_err(SqlError::Storage)?;
936            if let Some(e) = scan_err {
937                return Err(e);
938            }
939            Ok(rows)
940        }
941
942        ScanPlan::PkLookup { pk_values } => {
943            let key = encode_composite_key(&pk_values);
944            let mut rtx = db.begin_read();
945            match rtx
946                .table_get(lower_name.as_bytes(), &key)
947                .map_err(SqlError::Storage)?
948            {
949                Some(value) => {
950                    let row = decode_full_row(table_schema, &key, &value)?;
951                    Ok(vec![row])
952                }
953                None => Ok(vec![]),
954            }
955        }
956
957        ScanPlan::IndexScan {
958            idx_table,
959            prefix,
960            num_prefix_cols,
961            range_conds,
962            is_unique,
963            index_columns,
964            ..
965        } => {
966            let num_pk_cols = table_schema.primary_key_columns.len();
967            let num_index_cols = index_columns.len();
968            let mut pk_keys: Vec<Vec<u8>> = Vec::new();
969
970            {
971                let mut rtx = db.begin_read();
972                let mut scan_err: Option<SqlError> = None;
973                rtx.table_scan_from(&idx_table, &prefix, |key, value| {
974                    if !key.starts_with(&prefix) {
975                        return Ok(false);
976                    }
977                    match check_range_conditions(key, num_prefix_cols, &range_conds, num_index_cols)
978                    {
979                        Ok(RangeCheck::ExceedsUpper) => return Ok(false),
980                        Ok(RangeCheck::BelowLower) => return Ok(true),
981                        Ok(RangeCheck::Match) => {}
982                        Err(e) => {
983                            scan_err = Some(e);
984                            return Ok(false);
985                        }
986                    }
987                    match extract_pk_key(key, value, is_unique, num_index_cols, num_pk_cols) {
988                        Ok(pk) => pk_keys.push(pk),
989                        Err(e) => {
990                            scan_err = Some(e);
991                            return Ok(false);
992                        }
993                    }
994                    Ok(true)
995                })
996                .map_err(SqlError::Storage)?;
997                if let Some(e) = scan_err {
998                    return Err(e);
999                }
1000            }
1001
1002            let mut rows = Vec::new();
1003            let mut rtx = db.begin_read();
1004            for pk_key in &pk_keys {
1005                if let Some(value) = rtx
1006                    .table_get(lower_name.as_bytes(), pk_key)
1007                    .map_err(SqlError::Storage)?
1008                {
1009                    rows.push(decode_full_row(table_schema, pk_key, &value)?);
1010                }
1011            }
1012            Ok(rows)
1013        }
1014    }
1015}
1016
1017/// Collect rows via WriteTxn using the scan plan.
1018fn collect_rows_write(
1019    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1020    table_schema: &TableSchema,
1021    where_clause: &Option<Expr>,
1022) -> Result<Vec<Vec<Value>>> {
1023    let plan = planner::plan_select(table_schema, where_clause);
1024    let lower_name = &table_schema.name;
1025
1026    match plan {
1027        ScanPlan::SeqScan => {
1028            let mut rows = Vec::new();
1029            let mut scan_err: Option<SqlError> = None;
1030            wtx.table_for_each(lower_name.as_bytes(), |key, value| {
1031                match decode_full_row(table_schema, key, value) {
1032                    Ok(row) => rows.push(row),
1033                    Err(e) => scan_err = Some(e),
1034                }
1035                Ok(())
1036            })
1037            .map_err(SqlError::Storage)?;
1038            if let Some(e) = scan_err {
1039                return Err(e);
1040            }
1041            Ok(rows)
1042        }
1043
1044        ScanPlan::PkLookup { pk_values } => {
1045            let key = encode_composite_key(&pk_values);
1046            match wtx
1047                .table_get(lower_name.as_bytes(), &key)
1048                .map_err(SqlError::Storage)?
1049            {
1050                Some(value) => {
1051                    let row = decode_full_row(table_schema, &key, &value)?;
1052                    Ok(vec![row])
1053                }
1054                None => Ok(vec![]),
1055            }
1056        }
1057
1058        ScanPlan::IndexScan {
1059            idx_table,
1060            prefix,
1061            num_prefix_cols,
1062            range_conds,
1063            is_unique,
1064            index_columns,
1065            ..
1066        } => {
1067            let num_pk_cols = table_schema.primary_key_columns.len();
1068            let num_index_cols = index_columns.len();
1069            let mut pk_keys: Vec<Vec<u8>> = Vec::new();
1070
1071            {
1072                let mut scan_err: Option<SqlError> = None;
1073                wtx.table_scan_from(&idx_table, &prefix, |key, value| {
1074                    if !key.starts_with(&prefix) {
1075                        return Ok(false);
1076                    }
1077                    match check_range_conditions(key, num_prefix_cols, &range_conds, num_index_cols)
1078                    {
1079                        Ok(RangeCheck::ExceedsUpper) => return Ok(false),
1080                        Ok(RangeCheck::BelowLower) => return Ok(true),
1081                        Ok(RangeCheck::Match) => {}
1082                        Err(e) => {
1083                            scan_err = Some(e);
1084                            return Ok(false);
1085                        }
1086                    }
1087                    match extract_pk_key(key, value, is_unique, num_index_cols, num_pk_cols) {
1088                        Ok(pk) => pk_keys.push(pk),
1089                        Err(e) => {
1090                            scan_err = Some(e);
1091                            return Ok(false);
1092                        }
1093                    }
1094                    Ok(true)
1095                })
1096                .map_err(SqlError::Storage)?;
1097                if let Some(e) = scan_err {
1098                    return Err(e);
1099                }
1100            }
1101
1102            let mut rows = Vec::new();
1103            for pk_key in &pk_keys {
1104                if let Some(value) = wtx
1105                    .table_get(lower_name.as_bytes(), pk_key)
1106                    .map_err(SqlError::Storage)?
1107                {
1108                    rows.push(decode_full_row(table_schema, pk_key, &value)?);
1109                }
1110            }
1111            Ok(rows)
1112        }
1113    }
1114}
1115
1116/// Collect (encoded_key, full_row) pairs via ReadTxn using the scan plan.
1117/// Used by DELETE and UPDATE which need the encoded PK key.
1118fn collect_keyed_rows_read(
1119    db: &Database,
1120    table_schema: &TableSchema,
1121    where_clause: &Option<Expr>,
1122) -> Result<Vec<(Vec<u8>, Vec<Value>)>> {
1123    let plan = planner::plan_select(table_schema, where_clause);
1124    let lower_name = &table_schema.name;
1125
1126    match plan {
1127        ScanPlan::SeqScan => {
1128            let mut rows = Vec::new();
1129            let mut rtx = db.begin_read();
1130            let mut scan_err: Option<SqlError> = None;
1131            rtx.table_for_each(lower_name.as_bytes(), |key, value| {
1132                match decode_full_row(table_schema, key, value) {
1133                    Ok(row) => rows.push((key.to_vec(), row)),
1134                    Err(e) => scan_err = Some(e),
1135                }
1136                Ok(())
1137            })
1138            .map_err(SqlError::Storage)?;
1139            if let Some(e) = scan_err {
1140                return Err(e);
1141            }
1142            Ok(rows)
1143        }
1144
1145        ScanPlan::PkLookup { pk_values } => {
1146            let key = encode_composite_key(&pk_values);
1147            let mut rtx = db.begin_read();
1148            match rtx
1149                .table_get(lower_name.as_bytes(), &key)
1150                .map_err(SqlError::Storage)?
1151            {
1152                Some(value) => {
1153                    let row = decode_full_row(table_schema, &key, &value)?;
1154                    Ok(vec![(key, row)])
1155                }
1156                None => Ok(vec![]),
1157            }
1158        }
1159
1160        ScanPlan::IndexScan {
1161            idx_table,
1162            prefix,
1163            num_prefix_cols,
1164            range_conds,
1165            is_unique,
1166            index_columns,
1167            ..
1168        } => {
1169            let num_pk_cols = table_schema.primary_key_columns.len();
1170            let num_index_cols = index_columns.len();
1171            let mut pk_keys: Vec<Vec<u8>> = Vec::new();
1172
1173            {
1174                let mut rtx = db.begin_read();
1175                let mut scan_err: Option<SqlError> = None;
1176                rtx.table_scan_from(&idx_table, &prefix, |key, value| {
1177                    if !key.starts_with(&prefix) {
1178                        return Ok(false);
1179                    }
1180                    match check_range_conditions(key, num_prefix_cols, &range_conds, num_index_cols)
1181                    {
1182                        Ok(RangeCheck::ExceedsUpper) => return Ok(false),
1183                        Ok(RangeCheck::BelowLower) => return Ok(true),
1184                        Ok(RangeCheck::Match) => {}
1185                        Err(e) => {
1186                            scan_err = Some(e);
1187                            return Ok(false);
1188                        }
1189                    }
1190                    match extract_pk_key(key, value, is_unique, num_index_cols, num_pk_cols) {
1191                        Ok(pk) => pk_keys.push(pk),
1192                        Err(e) => {
1193                            scan_err = Some(e);
1194                            return Ok(false);
1195                        }
1196                    }
1197                    Ok(true)
1198                })
1199                .map_err(SqlError::Storage)?;
1200                if let Some(e) = scan_err {
1201                    return Err(e);
1202                }
1203            }
1204
1205            let mut rows = Vec::new();
1206            let mut rtx = db.begin_read();
1207            for pk_key in &pk_keys {
1208                if let Some(value) = rtx
1209                    .table_get(lower_name.as_bytes(), pk_key)
1210                    .map_err(SqlError::Storage)?
1211                {
1212                    rows.push((
1213                        pk_key.clone(),
1214                        decode_full_row(table_schema, pk_key, &value)?,
1215                    ));
1216                }
1217            }
1218            Ok(rows)
1219        }
1220    }
1221}
1222
1223/// Collect (encoded_key, full_row) pairs via WriteTxn using the scan plan.
1224fn collect_keyed_rows_write(
1225    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1226    table_schema: &TableSchema,
1227    where_clause: &Option<Expr>,
1228) -> Result<Vec<(Vec<u8>, Vec<Value>)>> {
1229    let plan = planner::plan_select(table_schema, where_clause);
1230    let lower_name = &table_schema.name;
1231
1232    match plan {
1233        ScanPlan::SeqScan => {
1234            let mut rows = Vec::new();
1235            let mut scan_err: Option<SqlError> = None;
1236            wtx.table_for_each(lower_name.as_bytes(), |key, value| {
1237                match decode_full_row(table_schema, key, value) {
1238                    Ok(row) => rows.push((key.to_vec(), row)),
1239                    Err(e) => scan_err = Some(e),
1240                }
1241                Ok(())
1242            })
1243            .map_err(SqlError::Storage)?;
1244            if let Some(e) = scan_err {
1245                return Err(e);
1246            }
1247            Ok(rows)
1248        }
1249
1250        ScanPlan::PkLookup { pk_values } => {
1251            let key = encode_composite_key(&pk_values);
1252            match wtx
1253                .table_get(lower_name.as_bytes(), &key)
1254                .map_err(SqlError::Storage)?
1255            {
1256                Some(value) => {
1257                    let row = decode_full_row(table_schema, &key, &value)?;
1258                    Ok(vec![(key, row)])
1259                }
1260                None => Ok(vec![]),
1261            }
1262        }
1263
1264        ScanPlan::IndexScan {
1265            idx_table,
1266            prefix,
1267            num_prefix_cols,
1268            range_conds,
1269            is_unique,
1270            index_columns,
1271            ..
1272        } => {
1273            let num_pk_cols = table_schema.primary_key_columns.len();
1274            let num_index_cols = index_columns.len();
1275            let mut pk_keys: Vec<Vec<u8>> = Vec::new();
1276
1277            {
1278                let mut scan_err: Option<SqlError> = None;
1279                wtx.table_scan_from(&idx_table, &prefix, |key, value| {
1280                    if !key.starts_with(&prefix) {
1281                        return Ok(false);
1282                    }
1283                    match check_range_conditions(key, num_prefix_cols, &range_conds, num_index_cols)
1284                    {
1285                        Ok(RangeCheck::ExceedsUpper) => return Ok(false),
1286                        Ok(RangeCheck::BelowLower) => return Ok(true),
1287                        Ok(RangeCheck::Match) => {}
1288                        Err(e) => {
1289                            scan_err = Some(e);
1290                            return Ok(false);
1291                        }
1292                    }
1293                    match extract_pk_key(key, value, is_unique, num_index_cols, num_pk_cols) {
1294                        Ok(pk) => pk_keys.push(pk),
1295                        Err(e) => {
1296                            scan_err = Some(e);
1297                            return Ok(false);
1298                        }
1299                    }
1300                    Ok(true)
1301                })
1302                .map_err(SqlError::Storage)?;
1303                if let Some(e) = scan_err {
1304                    return Err(e);
1305                }
1306            }
1307
1308            let mut rows = Vec::new();
1309            for pk_key in &pk_keys {
1310                if let Some(value) = wtx
1311                    .table_get(lower_name.as_bytes(), pk_key)
1312                    .map_err(SqlError::Storage)?
1313                {
1314                    rows.push((
1315                        pk_key.clone(),
1316                        decode_full_row(table_schema, pk_key, &value)?,
1317                    ));
1318                }
1319            }
1320            Ok(rows)
1321        }
1322    }
1323}
1324
1325// ── DML ─────────────────────────────────────────────────────────────
1326
1327fn exec_insert(
1328    db: &Database,
1329    schema: &SchemaManager,
1330    stmt: &InsertStmt,
1331) -> Result<ExecutionResult> {
1332    let materialized;
1333    let stmt = if insert_has_subquery(stmt) {
1334        materialized = materialize_insert(stmt, &mut |sub| exec_subquery_read(db, schema, sub))?;
1335        &materialized
1336    } else {
1337        stmt
1338    };
1339
1340    let lower_name = stmt.table.to_ascii_lowercase();
1341    let table_schema = schema
1342        .get(&lower_name)
1343        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1344
1345    let insert_columns = if stmt.columns.is_empty() {
1346        table_schema
1347            .columns
1348            .iter()
1349            .map(|c| c.name.clone())
1350            .collect::<Vec<_>>()
1351    } else {
1352        stmt.columns
1353            .iter()
1354            .map(|c| c.to_ascii_lowercase())
1355            .collect()
1356    };
1357
1358    let col_indices: Vec<usize> = insert_columns
1359        .iter()
1360        .map(|name| {
1361            table_schema
1362                .column_index(name)
1363                .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
1364        })
1365        .collect::<Result<_>>()?;
1366
1367    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
1368    let mut count: u64 = 0;
1369
1370    for value_row in &stmt.values {
1371        if value_row.len() != insert_columns.len() {
1372            return Err(SqlError::InvalidValue(format!(
1373                "expected {} values, got {}",
1374                insert_columns.len(),
1375                value_row.len()
1376            )));
1377        }
1378
1379        let mut row = vec![Value::Null; table_schema.columns.len()];
1380        for (i, expr) in value_row.iter().enumerate() {
1381            let val = eval_const_expr(expr)?;
1382            let col_idx = col_indices[i];
1383            let col = &table_schema.columns[col_idx];
1384
1385            let coerced = if val.is_null() {
1386                Value::Null
1387            } else {
1388                val.coerce_to(col.data_type)
1389                    .ok_or_else(|| SqlError::TypeMismatch {
1390                        expected: col.data_type.to_string(),
1391                        got: val.data_type().to_string(),
1392                    })?
1393            };
1394
1395            row[col_idx] = coerced;
1396        }
1397
1398        for col in &table_schema.columns {
1399            if !col.nullable && row[col.position as usize].is_null() {
1400                return Err(SqlError::NotNullViolation(col.name.clone()));
1401            }
1402        }
1403
1404        let pk_values: Vec<Value> = table_schema
1405            .pk_indices()
1406            .iter()
1407            .map(|&i| row[i].clone())
1408            .collect();
1409        let key = encode_composite_key(&pk_values);
1410
1411        let non_pk = table_schema.non_pk_indices();
1412        let value_values: Vec<Value> = non_pk.iter().map(|&i| row[i].clone()).collect();
1413        let value = encode_row(&value_values);
1414
1415        if key.len() > citadel_core::MAX_KEY_SIZE {
1416            return Err(SqlError::KeyTooLarge {
1417                size: key.len(),
1418                max: citadel_core::MAX_KEY_SIZE,
1419            });
1420        }
1421        if value.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
1422            return Err(SqlError::RowTooLarge {
1423                size: value.len(),
1424                max: citadel_core::MAX_INLINE_VALUE_SIZE,
1425            });
1426        }
1427
1428        let is_new = wtx
1429            .table_insert(lower_name.as_bytes(), &key, &value)
1430            .map_err(SqlError::Storage)?;
1431        if !is_new {
1432            return Err(SqlError::DuplicateKey);
1433        }
1434
1435        insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
1436        count += 1;
1437    }
1438
1439    wtx.commit().map_err(SqlError::Storage)?;
1440    Ok(ExecutionResult::RowsAffected(count))
1441}
1442
1443fn has_subquery(expr: &Expr) -> bool {
1444    match expr {
1445        Expr::InSubquery { .. } | Expr::Exists { .. } | Expr::ScalarSubquery(_) => true,
1446        Expr::BinaryOp { left, right, .. } => has_subquery(left) || has_subquery(right),
1447        Expr::UnaryOp { expr, .. } => has_subquery(expr),
1448        Expr::IsNull(e) | Expr::IsNotNull(e) => has_subquery(e),
1449        Expr::InList { expr, list, .. } => has_subquery(expr) || list.iter().any(has_subquery),
1450        Expr::InSet { expr, .. } => has_subquery(expr),
1451        Expr::Between {
1452            expr, low, high, ..
1453        } => has_subquery(expr) || has_subquery(low) || has_subquery(high),
1454        Expr::Like {
1455            expr,
1456            pattern,
1457            escape,
1458            ..
1459        } => {
1460            has_subquery(expr)
1461                || has_subquery(pattern)
1462                || escape.as_ref().is_some_and(|e| has_subquery(e))
1463        }
1464        Expr::Case {
1465            operand,
1466            conditions,
1467            else_result,
1468        } => {
1469            operand.as_ref().is_some_and(|e| has_subquery(e))
1470                || conditions
1471                    .iter()
1472                    .any(|(c, r)| has_subquery(c) || has_subquery(r))
1473                || else_result.as_ref().is_some_and(|e| has_subquery(e))
1474        }
1475        Expr::Coalesce(args) => args.iter().any(has_subquery),
1476        Expr::Cast { expr, .. } => has_subquery(expr),
1477        Expr::Function { args, .. } => args.iter().any(has_subquery),
1478        _ => false,
1479    }
1480}
1481
1482fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
1483    if let Some(ref w) = stmt.where_clause {
1484        if has_subquery(w) {
1485            return true;
1486        }
1487    }
1488    if let Some(ref h) = stmt.having {
1489        if has_subquery(h) {
1490            return true;
1491        }
1492    }
1493    for col in &stmt.columns {
1494        if let SelectColumn::Expr { expr, .. } = col {
1495            if has_subquery(expr) {
1496                return true;
1497            }
1498        }
1499    }
1500    for ob in &stmt.order_by {
1501        if has_subquery(&ob.expr) {
1502            return true;
1503        }
1504    }
1505    for join in &stmt.joins {
1506        if let Some(ref on_expr) = join.on_clause {
1507            if has_subquery(on_expr) {
1508                return true;
1509            }
1510        }
1511    }
1512    false
1513}
1514
1515fn materialize_expr(
1516    expr: &Expr,
1517    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1518) -> Result<Expr> {
1519    match expr {
1520        Expr::InSubquery {
1521            expr: e,
1522            subquery,
1523            negated,
1524        } => {
1525            let inner = materialize_expr(e, exec_sub)?;
1526            let qr = exec_sub(subquery)?;
1527            if !qr.columns.is_empty() && qr.columns.len() != 1 {
1528                return Err(SqlError::SubqueryMultipleColumns);
1529            }
1530            let mut values = std::collections::HashSet::new();
1531            let mut has_null = false;
1532            for row in &qr.rows {
1533                if row[0].is_null() {
1534                    has_null = true;
1535                } else {
1536                    values.insert(row[0].clone());
1537                }
1538            }
1539            Ok(Expr::InSet {
1540                expr: Box::new(inner),
1541                values,
1542                has_null,
1543                negated: *negated,
1544            })
1545        }
1546        Expr::ScalarSubquery(subquery) => {
1547            let qr = exec_sub(subquery)?;
1548            if qr.rows.len() > 1 {
1549                return Err(SqlError::SubqueryMultipleRows);
1550            }
1551            let val = if qr.rows.is_empty() {
1552                Value::Null
1553            } else {
1554                qr.rows[0][0].clone()
1555            };
1556            Ok(Expr::Literal(val))
1557        }
1558        Expr::Exists { subquery, negated } => {
1559            let qr = exec_sub(subquery)?;
1560            let exists = !qr.rows.is_empty();
1561            let result = if *negated { !exists } else { exists };
1562            Ok(Expr::Literal(Value::Boolean(result)))
1563        }
1564        Expr::InList {
1565            expr: e,
1566            list,
1567            negated,
1568        } => {
1569            let inner = materialize_expr(e, exec_sub)?;
1570            let items = list
1571                .iter()
1572                .map(|item| materialize_expr(item, exec_sub))
1573                .collect::<Result<Vec<_>>>()?;
1574            Ok(Expr::InList {
1575                expr: Box::new(inner),
1576                list: items,
1577                negated: *negated,
1578            })
1579        }
1580        Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
1581            left: Box::new(materialize_expr(left, exec_sub)?),
1582            op: *op,
1583            right: Box::new(materialize_expr(right, exec_sub)?),
1584        }),
1585        Expr::UnaryOp { op, expr: e } => Ok(Expr::UnaryOp {
1586            op: *op,
1587            expr: Box::new(materialize_expr(e, exec_sub)?),
1588        }),
1589        Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
1590        Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
1591        Expr::InSet {
1592            expr: e,
1593            values,
1594            has_null,
1595            negated,
1596        } => Ok(Expr::InSet {
1597            expr: Box::new(materialize_expr(e, exec_sub)?),
1598            values: values.clone(),
1599            has_null: *has_null,
1600            negated: *negated,
1601        }),
1602        Expr::Between {
1603            expr: e,
1604            low,
1605            high,
1606            negated,
1607        } => Ok(Expr::Between {
1608            expr: Box::new(materialize_expr(e, exec_sub)?),
1609            low: Box::new(materialize_expr(low, exec_sub)?),
1610            high: Box::new(materialize_expr(high, exec_sub)?),
1611            negated: *negated,
1612        }),
1613        Expr::Like {
1614            expr: e,
1615            pattern,
1616            escape,
1617            negated,
1618        } => {
1619            let esc = escape
1620                .as_ref()
1621                .map(|es| materialize_expr(es, exec_sub).map(Box::new))
1622                .transpose()?;
1623            Ok(Expr::Like {
1624                expr: Box::new(materialize_expr(e, exec_sub)?),
1625                pattern: Box::new(materialize_expr(pattern, exec_sub)?),
1626                escape: esc,
1627                negated: *negated,
1628            })
1629        }
1630        Expr::Case {
1631            operand,
1632            conditions,
1633            else_result,
1634        } => {
1635            let op = operand
1636                .as_ref()
1637                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
1638                .transpose()?;
1639            let conds = conditions
1640                .iter()
1641                .map(|(c, r)| {
1642                    Ok((
1643                        materialize_expr(c, exec_sub)?,
1644                        materialize_expr(r, exec_sub)?,
1645                    ))
1646                })
1647                .collect::<Result<Vec<_>>>()?;
1648            let else_r = else_result
1649                .as_ref()
1650                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
1651                .transpose()?;
1652            Ok(Expr::Case {
1653                operand: op,
1654                conditions: conds,
1655                else_result: else_r,
1656            })
1657        }
1658        Expr::Coalesce(args) => {
1659            let materialized = args
1660                .iter()
1661                .map(|a| materialize_expr(a, exec_sub))
1662                .collect::<Result<Vec<_>>>()?;
1663            Ok(Expr::Coalesce(materialized))
1664        }
1665        Expr::Cast { expr: e, data_type } => Ok(Expr::Cast {
1666            expr: Box::new(materialize_expr(e, exec_sub)?),
1667            data_type: *data_type,
1668        }),
1669        Expr::Function { name, args } => {
1670            let materialized = args
1671                .iter()
1672                .map(|a| materialize_expr(a, exec_sub))
1673                .collect::<Result<Vec<_>>>()?;
1674            Ok(Expr::Function {
1675                name: name.clone(),
1676                args: materialized,
1677            })
1678        }
1679        other => Ok(other.clone()),
1680    }
1681}
1682
1683fn materialize_stmt(
1684    stmt: &SelectStmt,
1685    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1686) -> Result<SelectStmt> {
1687    let where_clause = stmt
1688        .where_clause
1689        .as_ref()
1690        .map(|e| materialize_expr(e, exec_sub))
1691        .transpose()?;
1692    let having = stmt
1693        .having
1694        .as_ref()
1695        .map(|e| materialize_expr(e, exec_sub))
1696        .transpose()?;
1697    let columns = stmt
1698        .columns
1699        .iter()
1700        .map(|c| match c {
1701            SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
1702            SelectColumn::Expr { expr, alias } => Ok(SelectColumn::Expr {
1703                expr: materialize_expr(expr, exec_sub)?,
1704                alias: alias.clone(),
1705            }),
1706        })
1707        .collect::<Result<Vec<_>>>()?;
1708    let order_by = stmt
1709        .order_by
1710        .iter()
1711        .map(|ob| {
1712            Ok(OrderByItem {
1713                expr: materialize_expr(&ob.expr, exec_sub)?,
1714                descending: ob.descending,
1715                nulls_first: ob.nulls_first,
1716            })
1717        })
1718        .collect::<Result<Vec<_>>>()?;
1719    let joins = stmt
1720        .joins
1721        .iter()
1722        .map(|j| {
1723            let on_clause = j
1724                .on_clause
1725                .as_ref()
1726                .map(|e| materialize_expr(e, exec_sub))
1727                .transpose()?;
1728            Ok(JoinClause {
1729                join_type: j.join_type,
1730                table: j.table.clone(),
1731                on_clause,
1732            })
1733        })
1734        .collect::<Result<Vec<_>>>()?;
1735    let group_by = stmt
1736        .group_by
1737        .iter()
1738        .map(|e| materialize_expr(e, exec_sub))
1739        .collect::<Result<Vec<_>>>()?;
1740    Ok(SelectStmt {
1741        columns,
1742        from: stmt.from.clone(),
1743        from_alias: stmt.from_alias.clone(),
1744        joins,
1745        distinct: stmt.distinct,
1746        where_clause,
1747        order_by,
1748        limit: stmt.limit.clone(),
1749        offset: stmt.offset.clone(),
1750        group_by,
1751        having,
1752    })
1753}
1754
1755fn exec_subquery_read(
1756    db: &Database,
1757    schema: &SchemaManager,
1758    stmt: &SelectStmt,
1759) -> Result<QueryResult> {
1760    match exec_select(db, schema, stmt)? {
1761        ExecutionResult::Query(qr) => Ok(qr),
1762        _ => Ok(QueryResult {
1763            columns: vec![],
1764            rows: vec![],
1765        }),
1766    }
1767}
1768
1769fn exec_subquery_write(
1770    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1771    schema: &SchemaManager,
1772    stmt: &SelectStmt,
1773) -> Result<QueryResult> {
1774    match exec_select_in_txn(wtx, schema, stmt)? {
1775        ExecutionResult::Query(qr) => Ok(qr),
1776        _ => Ok(QueryResult {
1777            columns: vec![],
1778            rows: vec![],
1779        }),
1780    }
1781}
1782
1783fn update_has_subquery(stmt: &UpdateStmt) -> bool {
1784    stmt.where_clause.as_ref().is_some_and(has_subquery)
1785        || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
1786}
1787
1788fn materialize_update(
1789    stmt: &UpdateStmt,
1790    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1791) -> Result<UpdateStmt> {
1792    let where_clause = stmt
1793        .where_clause
1794        .as_ref()
1795        .map(|e| materialize_expr(e, exec_sub))
1796        .transpose()?;
1797    let assignments = stmt
1798        .assignments
1799        .iter()
1800        .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
1801        .collect::<Result<Vec<_>>>()?;
1802    Ok(UpdateStmt {
1803        table: stmt.table.clone(),
1804        assignments,
1805        where_clause,
1806    })
1807}
1808
1809fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
1810    stmt.where_clause.as_ref().is_some_and(has_subquery)
1811}
1812
1813fn materialize_delete(
1814    stmt: &DeleteStmt,
1815    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1816) -> Result<DeleteStmt> {
1817    let where_clause = stmt
1818        .where_clause
1819        .as_ref()
1820        .map(|e| materialize_expr(e, exec_sub))
1821        .transpose()?;
1822    Ok(DeleteStmt {
1823        table: stmt.table.clone(),
1824        where_clause,
1825    })
1826}
1827
1828fn insert_has_subquery(stmt: &InsertStmt) -> bool {
1829    stmt.values.iter().any(|row| row.iter().any(has_subquery))
1830}
1831
1832fn materialize_insert(
1833    stmt: &InsertStmt,
1834    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1835) -> Result<InsertStmt> {
1836    let values = stmt
1837        .values
1838        .iter()
1839        .map(|row| {
1840            row.iter()
1841                .map(|e| materialize_expr(e, exec_sub))
1842                .collect::<Result<Vec<_>>>()
1843        })
1844        .collect::<Result<Vec<_>>>()?;
1845    Ok(InsertStmt {
1846        table: stmt.table.clone(),
1847        columns: stmt.columns.clone(),
1848        values,
1849    })
1850}
1851
1852fn exec_select(
1853    db: &Database,
1854    schema: &SchemaManager,
1855    stmt: &SelectStmt,
1856) -> Result<ExecutionResult> {
1857    let materialized;
1858    let stmt = if stmt_has_subquery(stmt) {
1859        materialized = materialize_stmt(stmt, &mut |sub| exec_subquery_read(db, schema, sub))?;
1860        &materialized
1861    } else {
1862        stmt
1863    };
1864
1865    if stmt.from.is_empty() {
1866        return exec_select_no_from(stmt);
1867    }
1868
1869    let lower_name = stmt.from.to_ascii_lowercase();
1870    let table_schema = schema
1871        .get(&lower_name)
1872        .ok_or_else(|| SqlError::TableNotFound(stmt.from.clone()))?;
1873
1874    if !stmt.joins.is_empty() {
1875        return exec_select_join(db, schema, stmt);
1876    }
1877
1878    let rows = collect_rows_read(db, table_schema, &stmt.where_clause)?;
1879    process_select(&table_schema.columns, rows, stmt)
1880}
1881
1882fn exec_select_no_from(stmt: &SelectStmt) -> Result<ExecutionResult> {
1883    let empty_cols: Vec<ColumnDef> = vec![];
1884    let empty_row: Vec<Value> = vec![];
1885    let (col_names, projected) = project_rows(&empty_cols, &stmt.columns, &[empty_row])?;
1886    Ok(ExecutionResult::Query(QueryResult {
1887        columns: col_names,
1888        rows: projected,
1889    }))
1890}
1891
1892/// Shared SELECT processing: WHERE, aggregation, ORDER BY, LIMIT/OFFSET, projection.
1893fn process_select(
1894    columns: &[ColumnDef],
1895    mut rows: Vec<Vec<Value>>,
1896    stmt: &SelectStmt,
1897) -> Result<ExecutionResult> {
1898    if let Some(ref where_expr) = stmt.where_clause {
1899        rows.retain(|row| match eval_expr(where_expr, columns, row) {
1900            Ok(val) => is_truthy(&val),
1901            Err(_) => false,
1902        });
1903    }
1904
1905    let has_aggregates = stmt.columns.iter().any(|c| match c {
1906        SelectColumn::Expr { expr, .. } => is_aggregate_expr(expr),
1907        _ => false,
1908    });
1909
1910    if has_aggregates || !stmt.group_by.is_empty() {
1911        return exec_aggregate(columns, &rows, stmt);
1912    }
1913
1914    if stmt.distinct {
1915        let (col_names, mut projected) = project_rows(columns, &stmt.columns, &rows)?;
1916
1917        let mut seen = std::collections::HashSet::new();
1918        projected.retain(|row| seen.insert(row.clone()));
1919
1920        if !stmt.order_by.is_empty() {
1921            let output_cols = build_output_columns(&stmt.columns, columns);
1922            sort_rows(&mut projected, &stmt.order_by, &output_cols)?;
1923        }
1924
1925        if let Some(ref offset_expr) = stmt.offset {
1926            let offset = eval_const_int(offset_expr)?.max(0) as usize;
1927            if offset < projected.len() {
1928                projected = projected.split_off(offset);
1929            } else {
1930                projected.clear();
1931            }
1932        }
1933
1934        if let Some(ref limit_expr) = stmt.limit {
1935            let limit = eval_const_int(limit_expr)?.max(0) as usize;
1936            projected.truncate(limit);
1937        }
1938
1939        return Ok(ExecutionResult::Query(QueryResult {
1940            columns: col_names,
1941            rows: projected,
1942        }));
1943    }
1944
1945    if !stmt.order_by.is_empty() {
1946        sort_rows(&mut rows, &stmt.order_by, columns)?;
1947    }
1948
1949    if let Some(ref offset_expr) = stmt.offset {
1950        let offset = eval_const_int(offset_expr)?.max(0) as usize;
1951        if offset < rows.len() {
1952            rows = rows.split_off(offset);
1953        } else {
1954            rows.clear();
1955        }
1956    }
1957
1958    if let Some(ref limit_expr) = stmt.limit {
1959        let limit = eval_const_int(limit_expr)?.max(0) as usize;
1960        rows.truncate(limit);
1961    }
1962
1963    let (col_names, projected) = project_rows(columns, &stmt.columns, &rows)?;
1964
1965    Ok(ExecutionResult::Query(QueryResult {
1966        columns: col_names,
1967        rows: projected,
1968    }))
1969}
1970
1971fn resolve_table_name<'a>(schema: &'a SchemaManager, name: &str) -> Result<&'a TableSchema> {
1972    let lower = name.to_ascii_lowercase();
1973    schema
1974        .get(&lower)
1975        .ok_or_else(|| SqlError::TableNotFound(name.to_string()))
1976}
1977
1978fn build_joined_columns(tables: &[(String, &TableSchema)]) -> Vec<ColumnDef> {
1979    let mut result = Vec::new();
1980    let mut pos: u16 = 0;
1981
1982    for (alias, schema) in tables {
1983        for col in &schema.columns {
1984            result.push(ColumnDef {
1985                name: format!("{}.{}", alias.to_ascii_lowercase(), col.name),
1986                data_type: col.data_type,
1987                nullable: col.nullable,
1988                position: pos,
1989            });
1990            pos += 1;
1991        }
1992    }
1993
1994    result
1995}
1996
1997fn table_alias_or_name(name: &str, alias: &Option<String>) -> String {
1998    alias
1999        .as_ref()
2000        .unwrap_or(&name.to_string())
2001        .to_ascii_lowercase()
2002}
2003
2004fn collect_all_rows_read(db: &Database, table_schema: &TableSchema) -> Result<Vec<Vec<Value>>> {
2005    collect_rows_read(db, table_schema, &None)
2006}
2007
2008fn collect_all_rows_write(
2009    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2010    table_schema: &TableSchema,
2011) -> Result<Vec<Vec<Value>>> {
2012    collect_rows_write(wtx, table_schema, &None)
2013}
2014
2015fn exec_select_join(
2016    db: &Database,
2017    schema: &SchemaManager,
2018    stmt: &SelectStmt,
2019) -> Result<ExecutionResult> {
2020    let from_schema = resolve_table_name(schema, &stmt.from)?;
2021    let from_alias = table_alias_or_name(&stmt.from, &stmt.from_alias);
2022    let mut outer_rows = collect_all_rows_read(db, from_schema)?;
2023
2024    let mut tables: Vec<(String, &TableSchema)> = vec![(from_alias.clone(), from_schema)];
2025
2026    for join in &stmt.joins {
2027        let inner_schema = resolve_table_name(schema, &join.table.name)?;
2028        let inner_alias = table_alias_or_name(&join.table.name, &join.table.alias);
2029        let inner_rows = collect_all_rows_read(db, inner_schema)?;
2030
2031        let mut preview_tables = tables.clone();
2032        preview_tables.push((inner_alias.clone(), inner_schema));
2033        let combined_cols = build_joined_columns(&preview_tables);
2034
2035        let mut new_rows = Vec::new();
2036
2037        match join.join_type {
2038            JoinType::Inner | JoinType::Cross => {
2039                for outer in &outer_rows {
2040                    for inner in &inner_rows {
2041                        let combined: Vec<Value> =
2042                            outer.iter().chain(inner.iter()).cloned().collect();
2043                        if let Some(ref on_expr) = join.on_clause {
2044                            match eval_expr(on_expr, &combined_cols, &combined) {
2045                                Ok(val) if is_truthy(&val) => new_rows.push(combined),
2046                                _ => {}
2047                            }
2048                        } else {
2049                            new_rows.push(combined);
2050                        }
2051                    }
2052                }
2053            }
2054            JoinType::Left => {
2055                let inner_col_count = inner_schema.columns.len();
2056                for outer in &outer_rows {
2057                    let mut matched = false;
2058                    for inner in &inner_rows {
2059                        let combined: Vec<Value> =
2060                            outer.iter().chain(inner.iter()).cloned().collect();
2061                        if let Some(ref on_expr) = join.on_clause {
2062                            match eval_expr(on_expr, &combined_cols, &combined) {
2063                                Ok(val) if is_truthy(&val) => {
2064                                    new_rows.push(combined);
2065                                    matched = true;
2066                                }
2067                                _ => {}
2068                            }
2069                        } else {
2070                            new_rows.push(combined);
2071                            matched = true;
2072                        }
2073                    }
2074                    if !matched {
2075                        let mut padded = outer.clone();
2076                        padded.extend(std::iter::repeat(Value::Null).take(inner_col_count));
2077                        new_rows.push(padded);
2078                    }
2079                }
2080            }
2081            JoinType::Right => {
2082                let outer_col_count = if outer_rows.is_empty() {
2083                    tables.iter().map(|(_, s)| s.columns.len()).sum()
2084                } else {
2085                    outer_rows[0].len()
2086                };
2087                let mut inner_matched = vec![false; inner_rows.len()];
2088                for outer in &outer_rows {
2089                    for (j, inner) in inner_rows.iter().enumerate() {
2090                        let combined: Vec<Value> =
2091                            outer.iter().chain(inner.iter()).cloned().collect();
2092                        if let Some(ref on_expr) = join.on_clause {
2093                            match eval_expr(on_expr, &combined_cols, &combined) {
2094                                Ok(val) if is_truthy(&val) => {
2095                                    new_rows.push(combined);
2096                                    inner_matched[j] = true;
2097                                }
2098                                _ => {}
2099                            }
2100                        } else {
2101                            new_rows.push(combined);
2102                            inner_matched[j] = true;
2103                        }
2104                    }
2105                }
2106                for (j, inner) in inner_rows.iter().enumerate() {
2107                    if !inner_matched[j] {
2108                        let mut padded: Vec<Value> = std::iter::repeat(Value::Null)
2109                            .take(outer_col_count)
2110                            .collect();
2111                        padded.extend(inner.iter().cloned());
2112                        new_rows.push(padded);
2113                    }
2114                }
2115            }
2116        }
2117
2118        tables.push((inner_alias, inner_schema));
2119        outer_rows = new_rows;
2120    }
2121
2122    let joined_cols = build_joined_columns(&tables);
2123    process_select(&joined_cols, outer_rows, stmt)
2124}
2125
2126fn exec_select_join_in_txn(
2127    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2128    schema: &SchemaManager,
2129    stmt: &SelectStmt,
2130) -> Result<ExecutionResult> {
2131    let from_schema = resolve_table_name(schema, &stmt.from)?;
2132    let from_alias = table_alias_or_name(&stmt.from, &stmt.from_alias);
2133    let mut outer_rows = collect_all_rows_write(wtx, from_schema)?;
2134
2135    let mut tables: Vec<(String, &TableSchema)> = vec![(from_alias.clone(), from_schema)];
2136
2137    for join in &stmt.joins {
2138        let inner_schema = resolve_table_name(schema, &join.table.name)?;
2139        let inner_alias = table_alias_or_name(&join.table.name, &join.table.alias);
2140        let inner_rows = collect_all_rows_write(wtx, inner_schema)?;
2141
2142        let mut preview_tables = tables.clone();
2143        preview_tables.push((inner_alias.clone(), inner_schema));
2144        let combined_cols = build_joined_columns(&preview_tables);
2145
2146        let mut new_rows = Vec::new();
2147
2148        match join.join_type {
2149            JoinType::Inner | JoinType::Cross => {
2150                for outer in &outer_rows {
2151                    for inner in &inner_rows {
2152                        let combined: Vec<Value> =
2153                            outer.iter().chain(inner.iter()).cloned().collect();
2154                        if let Some(ref on_expr) = join.on_clause {
2155                            match eval_expr(on_expr, &combined_cols, &combined) {
2156                                Ok(val) if is_truthy(&val) => new_rows.push(combined),
2157                                _ => {}
2158                            }
2159                        } else {
2160                            new_rows.push(combined);
2161                        }
2162                    }
2163                }
2164            }
2165            JoinType::Left => {
2166                let inner_col_count = inner_schema.columns.len();
2167                for outer in &outer_rows {
2168                    let mut matched = false;
2169                    for inner in &inner_rows {
2170                        let combined: Vec<Value> =
2171                            outer.iter().chain(inner.iter()).cloned().collect();
2172                        if let Some(ref on_expr) = join.on_clause {
2173                            match eval_expr(on_expr, &combined_cols, &combined) {
2174                                Ok(val) if is_truthy(&val) => {
2175                                    new_rows.push(combined);
2176                                    matched = true;
2177                                }
2178                                _ => {}
2179                            }
2180                        } else {
2181                            new_rows.push(combined);
2182                            matched = true;
2183                        }
2184                    }
2185                    if !matched {
2186                        let mut padded = outer.clone();
2187                        padded.extend(std::iter::repeat(Value::Null).take(inner_col_count));
2188                        new_rows.push(padded);
2189                    }
2190                }
2191            }
2192            JoinType::Right => {
2193                let outer_col_count = if outer_rows.is_empty() {
2194                    tables.iter().map(|(_, s)| s.columns.len()).sum()
2195                } else {
2196                    outer_rows[0].len()
2197                };
2198                let mut inner_matched = vec![false; inner_rows.len()];
2199                for outer in &outer_rows {
2200                    for (j, inner) in inner_rows.iter().enumerate() {
2201                        let combined: Vec<Value> =
2202                            outer.iter().chain(inner.iter()).cloned().collect();
2203                        if let Some(ref on_expr) = join.on_clause {
2204                            match eval_expr(on_expr, &combined_cols, &combined) {
2205                                Ok(val) if is_truthy(&val) => {
2206                                    new_rows.push(combined);
2207                                    inner_matched[j] = true;
2208                                }
2209                                _ => {}
2210                            }
2211                        } else {
2212                            new_rows.push(combined);
2213                            inner_matched[j] = true;
2214                        }
2215                    }
2216                }
2217                for (j, inner) in inner_rows.iter().enumerate() {
2218                    if !inner_matched[j] {
2219                        let mut padded: Vec<Value> = std::iter::repeat(Value::Null)
2220                            .take(outer_col_count)
2221                            .collect();
2222                        padded.extend(inner.iter().cloned());
2223                        new_rows.push(padded);
2224                    }
2225                }
2226            }
2227        }
2228
2229        tables.push((inner_alias, inner_schema));
2230        outer_rows = new_rows;
2231    }
2232
2233    let joined_cols = build_joined_columns(&tables);
2234    process_select(&joined_cols, outer_rows, stmt)
2235}
2236
2237fn exec_update(
2238    db: &Database,
2239    schema: &SchemaManager,
2240    stmt: &UpdateStmt,
2241) -> Result<ExecutionResult> {
2242    let materialized;
2243    let stmt = if update_has_subquery(stmt) {
2244        materialized = materialize_update(stmt, &mut |sub| exec_subquery_read(db, schema, sub))?;
2245        &materialized
2246    } else {
2247        stmt
2248    };
2249
2250    let lower_name = stmt.table.to_ascii_lowercase();
2251    let table_schema = schema
2252        .get(&lower_name)
2253        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
2254
2255    let all_candidates = collect_keyed_rows_read(db, table_schema, &stmt.where_clause)?;
2256    let matching_rows: Vec<(Vec<u8>, Vec<Value>)> = all_candidates
2257        .into_iter()
2258        .filter(|(_, row)| match &stmt.where_clause {
2259            Some(where_expr) => match eval_expr(where_expr, &table_schema.columns, row) {
2260                Ok(val) => is_truthy(&val),
2261                Err(_) => false,
2262            },
2263            None => true,
2264        })
2265        .collect();
2266
2267    if matching_rows.is_empty() {
2268        return Ok(ExecutionResult::RowsAffected(0));
2269    }
2270
2271    struct UpdateChange {
2272        old_key: Vec<u8>,
2273        new_key: Vec<u8>,
2274        new_value: Vec<u8>,
2275        pk_changed: bool,
2276        old_row: Vec<Value>,
2277        new_row: Vec<Value>,
2278    }
2279
2280    let pk_indices = table_schema.pk_indices();
2281    let mut changes: Vec<UpdateChange> = Vec::new();
2282
2283    for (old_key, row) in &matching_rows {
2284        let mut new_row = row.clone();
2285        let mut pk_changed = false;
2286
2287        // Evaluate all SET expressions against the original row (SQL standard).
2288        let mut evaluated: Vec<(usize, Value)> = Vec::with_capacity(stmt.assignments.len());
2289        for (col_name, expr) in &stmt.assignments {
2290            let col_idx = table_schema
2291                .column_index(col_name)
2292                .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))?;
2293            let new_val = eval_expr(expr, &table_schema.columns, row)?;
2294            let col = &table_schema.columns[col_idx];
2295
2296            let coerced = if new_val.is_null() {
2297                if !col.nullable {
2298                    return Err(SqlError::NotNullViolation(col.name.clone()));
2299                }
2300                Value::Null
2301            } else {
2302                new_val
2303                    .coerce_to(col.data_type)
2304                    .ok_or_else(|| SqlError::TypeMismatch {
2305                        expected: col.data_type.to_string(),
2306                        got: new_val.data_type().to_string(),
2307                    })?
2308            };
2309
2310            evaluated.push((col_idx, coerced));
2311        }
2312
2313        for (col_idx, coerced) in evaluated {
2314            if table_schema.primary_key_columns.contains(&(col_idx as u16)) {
2315                pk_changed = true;
2316            }
2317            new_row[col_idx] = coerced;
2318        }
2319
2320        let pk_values: Vec<Value> = pk_indices.iter().map(|&i| new_row[i].clone()).collect();
2321        let new_key = encode_composite_key(&pk_values);
2322
2323        let non_pk = table_schema.non_pk_indices();
2324        let value_values: Vec<Value> = non_pk.iter().map(|&i| new_row[i].clone()).collect();
2325        let new_value = encode_row(&value_values);
2326
2327        changes.push(UpdateChange {
2328            old_key: old_key.clone(),
2329            new_key,
2330            new_value,
2331            pk_changed,
2332            old_row: row.clone(),
2333            new_row,
2334        });
2335    }
2336
2337    {
2338        use std::collections::HashSet;
2339        let mut new_keys: HashSet<Vec<u8>> = HashSet::new();
2340        for c in &changes {
2341            if c.pk_changed && c.new_key != c.old_key && !new_keys.insert(c.new_key.clone()) {
2342                return Err(SqlError::DuplicateKey);
2343            }
2344        }
2345    }
2346
2347    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
2348
2349    for c in &changes {
2350        let old_pk: Vec<Value> = pk_indices.iter().map(|&i| c.old_row[i].clone()).collect();
2351
2352        for idx in &table_schema.indices {
2353            if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
2354                let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
2355                let old_idx_key = encode_index_key(idx, &c.old_row, &old_pk);
2356                wtx.table_delete(&idx_table, &old_idx_key)
2357                    .map_err(SqlError::Storage)?;
2358            }
2359        }
2360
2361        if c.pk_changed {
2362            wtx.table_delete(lower_name.as_bytes(), &c.old_key)
2363                .map_err(SqlError::Storage)?;
2364        }
2365    }
2366
2367    for c in &changes {
2368        let new_pk: Vec<Value> = pk_indices.iter().map(|&i| c.new_row[i].clone()).collect();
2369
2370        if c.pk_changed {
2371            let is_new = wtx
2372                .table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
2373                .map_err(SqlError::Storage)?;
2374            if !is_new {
2375                return Err(SqlError::DuplicateKey);
2376            }
2377        } else {
2378            wtx.table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
2379                .map_err(SqlError::Storage)?;
2380        }
2381
2382        for idx in &table_schema.indices {
2383            if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
2384                let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
2385                let new_idx_key = encode_index_key(idx, &c.new_row, &new_pk);
2386                let new_idx_val = encode_index_value(idx, &c.new_row, &new_pk);
2387                let is_new = wtx
2388                    .table_insert(&idx_table, &new_idx_key, &new_idx_val)
2389                    .map_err(SqlError::Storage)?;
2390                if idx.unique && !is_new {
2391                    let indexed_values: Vec<Value> = idx
2392                        .columns
2393                        .iter()
2394                        .map(|&col_idx| c.new_row[col_idx as usize].clone())
2395                        .collect();
2396                    let any_null = indexed_values.iter().any(|v| v.is_null());
2397                    if !any_null {
2398                        return Err(SqlError::UniqueViolation(idx.name.clone()));
2399                    }
2400                }
2401            }
2402        }
2403    }
2404
2405    let count = changes.len() as u64;
2406    wtx.commit().map_err(SqlError::Storage)?;
2407    Ok(ExecutionResult::RowsAffected(count))
2408}
2409
2410fn exec_delete(
2411    db: &Database,
2412    schema: &SchemaManager,
2413    stmt: &DeleteStmt,
2414) -> Result<ExecutionResult> {
2415    let materialized;
2416    let stmt = if delete_has_subquery(stmt) {
2417        materialized = materialize_delete(stmt, &mut |sub| exec_subquery_read(db, schema, sub))?;
2418        &materialized
2419    } else {
2420        stmt
2421    };
2422
2423    let lower_name = stmt.table.to_ascii_lowercase();
2424    let table_schema = schema
2425        .get(&lower_name)
2426        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
2427
2428    let all_candidates = collect_keyed_rows_read(db, table_schema, &stmt.where_clause)?;
2429    let rows_to_delete: Vec<(Vec<u8>, Vec<Value>)> = all_candidates
2430        .into_iter()
2431        .filter(|(_, row)| match &stmt.where_clause {
2432            Some(where_expr) => match eval_expr(where_expr, &table_schema.columns, row) {
2433                Ok(val) => is_truthy(&val),
2434                Err(_) => false,
2435            },
2436            None => true,
2437        })
2438        .collect();
2439
2440    if rows_to_delete.is_empty() {
2441        return Ok(ExecutionResult::RowsAffected(0));
2442    }
2443
2444    let pk_indices = table_schema.pk_indices();
2445    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
2446    for (key, row) in &rows_to_delete {
2447        let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
2448        delete_index_entries(&mut wtx, table_schema, row, &pk_values)?;
2449        wtx.table_delete(lower_name.as_bytes(), key)
2450            .map_err(SqlError::Storage)?;
2451    }
2452    let count = rows_to_delete.len() as u64;
2453    wtx.commit().map_err(SqlError::Storage)?;
2454    Ok(ExecutionResult::RowsAffected(count))
2455}
2456
2457// ── DML (in-transaction) ────────────────────────────────────────────
2458
2459fn exec_insert_in_txn(
2460    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2461    schema: &SchemaManager,
2462    stmt: &InsertStmt,
2463) -> Result<ExecutionResult> {
2464    let materialized;
2465    let stmt = if insert_has_subquery(stmt) {
2466        materialized = materialize_insert(stmt, &mut |sub| exec_subquery_write(wtx, schema, sub))?;
2467        &materialized
2468    } else {
2469        stmt
2470    };
2471
2472    let lower_name = stmt.table.to_ascii_lowercase();
2473    let table_schema = schema
2474        .get(&lower_name)
2475        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
2476
2477    let insert_columns = if stmt.columns.is_empty() {
2478        table_schema
2479            .columns
2480            .iter()
2481            .map(|c| c.name.clone())
2482            .collect::<Vec<_>>()
2483    } else {
2484        stmt.columns
2485            .iter()
2486            .map(|c| c.to_ascii_lowercase())
2487            .collect()
2488    };
2489
2490    let col_indices: Vec<usize> = insert_columns
2491        .iter()
2492        .map(|name| {
2493            table_schema
2494                .column_index(name)
2495                .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
2496        })
2497        .collect::<Result<_>>()?;
2498
2499    let mut count: u64 = 0;
2500
2501    for value_row in &stmt.values {
2502        if value_row.len() != insert_columns.len() {
2503            return Err(SqlError::InvalidValue(format!(
2504                "expected {} values, got {}",
2505                insert_columns.len(),
2506                value_row.len()
2507            )));
2508        }
2509
2510        let mut row = vec![Value::Null; table_schema.columns.len()];
2511        for (i, expr) in value_row.iter().enumerate() {
2512            let val = eval_const_expr(expr)?;
2513            let col_idx = col_indices[i];
2514            let col = &table_schema.columns[col_idx];
2515
2516            let coerced = if val.is_null() {
2517                Value::Null
2518            } else {
2519                val.coerce_to(col.data_type)
2520                    .ok_or_else(|| SqlError::TypeMismatch {
2521                        expected: col.data_type.to_string(),
2522                        got: val.data_type().to_string(),
2523                    })?
2524            };
2525
2526            row[col_idx] = coerced;
2527        }
2528
2529        for col in &table_schema.columns {
2530            if !col.nullable && row[col.position as usize].is_null() {
2531                return Err(SqlError::NotNullViolation(col.name.clone()));
2532            }
2533        }
2534
2535        let pk_values: Vec<Value> = table_schema
2536            .pk_indices()
2537            .iter()
2538            .map(|&i| row[i].clone())
2539            .collect();
2540        let key = encode_composite_key(&pk_values);
2541
2542        let non_pk = table_schema.non_pk_indices();
2543        let value_values: Vec<Value> = non_pk.iter().map(|&i| row[i].clone()).collect();
2544        let value = encode_row(&value_values);
2545
2546        if key.len() > citadel_core::MAX_KEY_SIZE {
2547            return Err(SqlError::KeyTooLarge {
2548                size: key.len(),
2549                max: citadel_core::MAX_KEY_SIZE,
2550            });
2551        }
2552        if value.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
2553            return Err(SqlError::RowTooLarge {
2554                size: value.len(),
2555                max: citadel_core::MAX_INLINE_VALUE_SIZE,
2556            });
2557        }
2558
2559        let is_new = wtx
2560            .table_insert(lower_name.as_bytes(), &key, &value)
2561            .map_err(SqlError::Storage)?;
2562        if !is_new {
2563            return Err(SqlError::DuplicateKey);
2564        }
2565
2566        insert_index_entries(wtx, table_schema, &row, &pk_values)?;
2567        count += 1;
2568    }
2569
2570    Ok(ExecutionResult::RowsAffected(count))
2571}
2572
2573fn exec_select_in_txn(
2574    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2575    schema: &SchemaManager,
2576    stmt: &SelectStmt,
2577) -> Result<ExecutionResult> {
2578    let materialized;
2579    let stmt = if stmt_has_subquery(stmt) {
2580        materialized = materialize_stmt(stmt, &mut |sub| exec_subquery_write(wtx, schema, sub))?;
2581        &materialized
2582    } else {
2583        stmt
2584    };
2585
2586    if stmt.from.is_empty() {
2587        return exec_select_no_from(stmt);
2588    }
2589
2590    if !stmt.joins.is_empty() {
2591        return exec_select_join_in_txn(wtx, schema, stmt);
2592    }
2593
2594    let lower_name = stmt.from.to_ascii_lowercase();
2595    let table_schema = schema
2596        .get(&lower_name)
2597        .ok_or_else(|| SqlError::TableNotFound(stmt.from.clone()))?;
2598
2599    let rows = collect_rows_write(wtx, table_schema, &stmt.where_clause)?;
2600    process_select(&table_schema.columns, rows, stmt)
2601}
2602
2603fn exec_update_in_txn(
2604    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2605    schema: &SchemaManager,
2606    stmt: &UpdateStmt,
2607) -> Result<ExecutionResult> {
2608    let materialized;
2609    let stmt = if update_has_subquery(stmt) {
2610        materialized = materialize_update(stmt, &mut |sub| exec_subquery_write(wtx, schema, sub))?;
2611        &materialized
2612    } else {
2613        stmt
2614    };
2615
2616    let lower_name = stmt.table.to_ascii_lowercase();
2617    let table_schema = schema
2618        .get(&lower_name)
2619        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
2620
2621    let all_candidates = collect_keyed_rows_write(wtx, table_schema, &stmt.where_clause)?;
2622    let matching_rows: Vec<(Vec<u8>, Vec<Value>)> = all_candidates
2623        .into_iter()
2624        .filter(|(_, row)| match &stmt.where_clause {
2625            Some(where_expr) => match eval_expr(where_expr, &table_schema.columns, row) {
2626                Ok(val) => is_truthy(&val),
2627                Err(_) => false,
2628            },
2629            None => true,
2630        })
2631        .collect();
2632
2633    if matching_rows.is_empty() {
2634        return Ok(ExecutionResult::RowsAffected(0));
2635    }
2636
2637    struct UpdateChange {
2638        old_key: Vec<u8>,
2639        new_key: Vec<u8>,
2640        new_value: Vec<u8>,
2641        pk_changed: bool,
2642        old_row: Vec<Value>,
2643        new_row: Vec<Value>,
2644    }
2645
2646    let pk_indices = table_schema.pk_indices();
2647    let mut changes: Vec<UpdateChange> = Vec::new();
2648
2649    for (old_key, row) in &matching_rows {
2650        let mut new_row = row.clone();
2651        let mut pk_changed = false;
2652
2653        // Evaluate all SET expressions against the original row (SQL standard).
2654        let mut evaluated: Vec<(usize, Value)> = Vec::with_capacity(stmt.assignments.len());
2655        for (col_name, expr) in &stmt.assignments {
2656            let col_idx = table_schema
2657                .column_index(col_name)
2658                .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))?;
2659            let new_val = eval_expr(expr, &table_schema.columns, row)?;
2660            let col = &table_schema.columns[col_idx];
2661
2662            let coerced = if new_val.is_null() {
2663                if !col.nullable {
2664                    return Err(SqlError::NotNullViolation(col.name.clone()));
2665                }
2666                Value::Null
2667            } else {
2668                new_val
2669                    .coerce_to(col.data_type)
2670                    .ok_or_else(|| SqlError::TypeMismatch {
2671                        expected: col.data_type.to_string(),
2672                        got: new_val.data_type().to_string(),
2673                    })?
2674            };
2675
2676            evaluated.push((col_idx, coerced));
2677        }
2678
2679        for (col_idx, coerced) in evaluated {
2680            if table_schema.primary_key_columns.contains(&(col_idx as u16)) {
2681                pk_changed = true;
2682            }
2683            new_row[col_idx] = coerced;
2684        }
2685
2686        let pk_values: Vec<Value> = pk_indices.iter().map(|&i| new_row[i].clone()).collect();
2687        let new_key = encode_composite_key(&pk_values);
2688
2689        let non_pk = table_schema.non_pk_indices();
2690        let value_values: Vec<Value> = non_pk.iter().map(|&i| new_row[i].clone()).collect();
2691        let new_value = encode_row(&value_values);
2692
2693        changes.push(UpdateChange {
2694            old_key: old_key.clone(),
2695            new_key,
2696            new_value,
2697            pk_changed,
2698            old_row: row.clone(),
2699            new_row,
2700        });
2701    }
2702
2703    {
2704        use std::collections::HashSet;
2705        let mut new_keys: HashSet<Vec<u8>> = HashSet::new();
2706        for c in &changes {
2707            if c.pk_changed && c.new_key != c.old_key && !new_keys.insert(c.new_key.clone()) {
2708                return Err(SqlError::DuplicateKey);
2709            }
2710        }
2711    }
2712
2713    for c in &changes {
2714        let old_pk: Vec<Value> = pk_indices.iter().map(|&i| c.old_row[i].clone()).collect();
2715
2716        for idx in &table_schema.indices {
2717            if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
2718                let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
2719                let old_idx_key = encode_index_key(idx, &c.old_row, &old_pk);
2720                wtx.table_delete(&idx_table, &old_idx_key)
2721                    .map_err(SqlError::Storage)?;
2722            }
2723        }
2724
2725        if c.pk_changed {
2726            wtx.table_delete(lower_name.as_bytes(), &c.old_key)
2727                .map_err(SqlError::Storage)?;
2728        }
2729    }
2730
2731    for c in &changes {
2732        let new_pk: Vec<Value> = pk_indices.iter().map(|&i| c.new_row[i].clone()).collect();
2733
2734        if c.pk_changed {
2735            let is_new = wtx
2736                .table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
2737                .map_err(SqlError::Storage)?;
2738            if !is_new {
2739                return Err(SqlError::DuplicateKey);
2740            }
2741        } else {
2742            wtx.table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
2743                .map_err(SqlError::Storage)?;
2744        }
2745
2746        for idx in &table_schema.indices {
2747            if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
2748                let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
2749                let new_idx_key = encode_index_key(idx, &c.new_row, &new_pk);
2750                let new_idx_val = encode_index_value(idx, &c.new_row, &new_pk);
2751                let is_new = wtx
2752                    .table_insert(&idx_table, &new_idx_key, &new_idx_val)
2753                    .map_err(SqlError::Storage)?;
2754                if idx.unique && !is_new {
2755                    let indexed_values: Vec<Value> = idx
2756                        .columns
2757                        .iter()
2758                        .map(|&col_idx| c.new_row[col_idx as usize].clone())
2759                        .collect();
2760                    let any_null = indexed_values.iter().any(|v| v.is_null());
2761                    if !any_null {
2762                        return Err(SqlError::UniqueViolation(idx.name.clone()));
2763                    }
2764                }
2765            }
2766        }
2767    }
2768
2769    let count = changes.len() as u64;
2770    Ok(ExecutionResult::RowsAffected(count))
2771}
2772
2773fn exec_delete_in_txn(
2774    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2775    schema: &SchemaManager,
2776    stmt: &DeleteStmt,
2777) -> Result<ExecutionResult> {
2778    let materialized;
2779    let stmt = if delete_has_subquery(stmt) {
2780        materialized = materialize_delete(stmt, &mut |sub| exec_subquery_write(wtx, schema, sub))?;
2781        &materialized
2782    } else {
2783        stmt
2784    };
2785
2786    let lower_name = stmt.table.to_ascii_lowercase();
2787    let table_schema = schema
2788        .get(&lower_name)
2789        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
2790
2791    let all_candidates = collect_keyed_rows_write(wtx, table_schema, &stmt.where_clause)?;
2792    let rows_to_delete: Vec<(Vec<u8>, Vec<Value>)> = all_candidates
2793        .into_iter()
2794        .filter(|(_, row)| match &stmt.where_clause {
2795            Some(where_expr) => match eval_expr(where_expr, &table_schema.columns, row) {
2796                Ok(val) => is_truthy(&val),
2797                Err(_) => false,
2798            },
2799            None => true,
2800        })
2801        .collect();
2802
2803    if rows_to_delete.is_empty() {
2804        return Ok(ExecutionResult::RowsAffected(0));
2805    }
2806
2807    let pk_indices = table_schema.pk_indices();
2808    for (key, row) in &rows_to_delete {
2809        let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
2810        delete_index_entries(wtx, table_schema, row, &pk_values)?;
2811        wtx.table_delete(lower_name.as_bytes(), key)
2812            .map_err(SqlError::Storage)?;
2813    }
2814    let count = rows_to_delete.len() as u64;
2815    Ok(ExecutionResult::RowsAffected(count))
2816}
2817
2818// ── Aggregation ─────────────────────────────────────────────────────
2819
2820fn exec_aggregate(
2821    columns: &[ColumnDef],
2822    rows: &[Vec<Value>],
2823    stmt: &SelectStmt,
2824) -> Result<ExecutionResult> {
2825    let groups: BTreeMap<Vec<Value>, Vec<&Vec<Value>>> = if stmt.group_by.is_empty() {
2826        let mut m = BTreeMap::new();
2827        m.insert(vec![], rows.iter().collect());
2828        m
2829    } else {
2830        let mut m: BTreeMap<Vec<Value>, Vec<&Vec<Value>>> = BTreeMap::new();
2831        for row in rows {
2832            let group_key: Vec<Value> = stmt
2833                .group_by
2834                .iter()
2835                .map(|expr| eval_expr(expr, columns, row))
2836                .collect::<Result<_>>()?;
2837            m.entry(group_key).or_default().push(row);
2838        }
2839        m
2840    };
2841
2842    let mut result_rows = Vec::new();
2843    let output_cols = build_output_columns(&stmt.columns, columns);
2844
2845    for group_rows in groups.values() {
2846        let mut result_row = Vec::new();
2847
2848        for sel_col in &stmt.columns {
2849            match sel_col {
2850                SelectColumn::AllColumns => {
2851                    return Err(SqlError::Unsupported("SELECT * with GROUP BY".into()));
2852                }
2853                SelectColumn::Expr { expr, .. } => {
2854                    let val = eval_aggregate_expr(expr, columns, group_rows)?;
2855                    result_row.push(val);
2856                }
2857            }
2858        }
2859
2860        if let Some(ref having) = stmt.having {
2861            let passes = match eval_aggregate_expr(having, columns, group_rows) {
2862                Ok(val) => is_truthy(&val),
2863                Err(SqlError::ColumnNotFound(_)) => {
2864                    match eval_expr(having, &output_cols, &result_row) {
2865                        Ok(val) => is_truthy(&val),
2866                        Err(_) => false,
2867                    }
2868                }
2869                Err(e) => return Err(e),
2870            };
2871            if !passes {
2872                continue;
2873            }
2874        }
2875
2876        result_rows.push(result_row);
2877    }
2878
2879    if stmt.distinct {
2880        let mut seen = std::collections::HashSet::new();
2881        result_rows.retain(|row| seen.insert(row.clone()));
2882    }
2883
2884    if !stmt.order_by.is_empty() {
2885        let output_cols = build_output_columns(&stmt.columns, columns);
2886        sort_rows(&mut result_rows, &stmt.order_by, &output_cols)?;
2887    }
2888
2889    if let Some(ref offset_expr) = stmt.offset {
2890        let offset = eval_const_int(offset_expr)?.max(0) as usize;
2891        if offset < result_rows.len() {
2892            result_rows = result_rows.split_off(offset);
2893        } else {
2894            result_rows.clear();
2895        }
2896    }
2897    if let Some(ref limit_expr) = stmt.limit {
2898        let limit = eval_const_int(limit_expr)?.max(0) as usize;
2899        result_rows.truncate(limit);
2900    }
2901
2902    let col_names = stmt
2903        .columns
2904        .iter()
2905        .map(|c| match c {
2906            SelectColumn::AllColumns => "*".into(),
2907            SelectColumn::Expr { alias: Some(a), .. } => a.clone(),
2908            SelectColumn::Expr { expr, .. } => expr_display_name(expr),
2909        })
2910        .collect();
2911
2912    Ok(ExecutionResult::Query(QueryResult {
2913        columns: col_names,
2914        rows: result_rows,
2915    }))
2916}
2917
2918fn eval_aggregate_expr(
2919    expr: &Expr,
2920    columns: &[ColumnDef],
2921    group_rows: &[&Vec<Value>],
2922) -> Result<Value> {
2923    match expr {
2924        Expr::CountStar => Ok(Value::Integer(group_rows.len() as i64)),
2925
2926        Expr::Function { name, args } if is_aggregate_function(name, args.len()) => {
2927            let func = name.to_ascii_uppercase();
2928            if args.len() != 1 {
2929                return Err(SqlError::Unsupported(format!(
2930                    "{func} with {} args",
2931                    args.len()
2932                )));
2933            }
2934            let arg = &args[0];
2935            let values: Vec<Value> = group_rows
2936                .iter()
2937                .map(|row| eval_expr(arg, columns, row))
2938                .collect::<Result<_>>()?;
2939
2940            match func.as_str() {
2941                "COUNT" => {
2942                    let count = values.iter().filter(|v| !v.is_null()).count();
2943                    Ok(Value::Integer(count as i64))
2944                }
2945                "SUM" => {
2946                    let mut int_sum: i64 = 0;
2947                    let mut real_sum: f64 = 0.0;
2948                    let mut has_real = false;
2949                    let mut all_null = true;
2950                    for v in &values {
2951                        match v {
2952                            Value::Integer(i) => {
2953                                int_sum += i;
2954                                all_null = false;
2955                            }
2956                            Value::Real(r) => {
2957                                real_sum += r;
2958                                has_real = true;
2959                                all_null = false;
2960                            }
2961                            Value::Null => {}
2962                            _ => {
2963                                return Err(SqlError::TypeMismatch {
2964                                    expected: "numeric".into(),
2965                                    got: v.data_type().to_string(),
2966                                })
2967                            }
2968                        }
2969                    }
2970                    if all_null {
2971                        return Ok(Value::Null);
2972                    }
2973                    if has_real {
2974                        Ok(Value::Real(real_sum + int_sum as f64))
2975                    } else {
2976                        Ok(Value::Integer(int_sum))
2977                    }
2978                }
2979                "AVG" => {
2980                    let mut sum: f64 = 0.0;
2981                    let mut count: i64 = 0;
2982                    for v in &values {
2983                        match v {
2984                            Value::Integer(i) => {
2985                                sum += *i as f64;
2986                                count += 1;
2987                            }
2988                            Value::Real(r) => {
2989                                sum += r;
2990                                count += 1;
2991                            }
2992                            Value::Null => {}
2993                            _ => {
2994                                return Err(SqlError::TypeMismatch {
2995                                    expected: "numeric".into(),
2996                                    got: v.data_type().to_string(),
2997                                })
2998                            }
2999                        }
3000                    }
3001                    if count == 0 {
3002                        Ok(Value::Null)
3003                    } else {
3004                        Ok(Value::Real(sum / count as f64))
3005                    }
3006                }
3007                "MIN" => {
3008                    let mut min: Option<&Value> = None;
3009                    for v in &values {
3010                        if v.is_null() {
3011                            continue;
3012                        }
3013                        min = Some(match min {
3014                            None => v,
3015                            Some(m) => {
3016                                if v < m {
3017                                    v
3018                                } else {
3019                                    m
3020                                }
3021                            }
3022                        });
3023                    }
3024                    Ok(min.cloned().unwrap_or(Value::Null))
3025                }
3026                "MAX" => {
3027                    let mut max: Option<&Value> = None;
3028                    for v in &values {
3029                        if v.is_null() {
3030                            continue;
3031                        }
3032                        max = Some(match max {
3033                            None => v,
3034                            Some(m) => {
3035                                if v > m {
3036                                    v
3037                                } else {
3038                                    m
3039                                }
3040                            }
3041                        });
3042                    }
3043                    Ok(max.cloned().unwrap_or(Value::Null))
3044                }
3045                _ => Err(SqlError::Unsupported(format!("aggregate function: {func}"))),
3046            }
3047        }
3048
3049        Expr::Column(_) | Expr::QualifiedColumn { .. } => {
3050            if let Some(first) = group_rows.first() {
3051                eval_expr(expr, columns, first)
3052            } else {
3053                Ok(Value::Null)
3054            }
3055        }
3056
3057        Expr::Literal(v) => Ok(v.clone()),
3058
3059        Expr::BinaryOp { left, op, right } => {
3060            let l = eval_aggregate_expr(left, columns, group_rows)?;
3061            let r = eval_aggregate_expr(right, columns, group_rows)?;
3062            crate::eval::eval_expr(
3063                &Expr::BinaryOp {
3064                    left: Box::new(Expr::Literal(l)),
3065                    op: *op,
3066                    right: Box::new(Expr::Literal(r)),
3067                },
3068                columns,
3069                &[],
3070            )
3071        }
3072
3073        Expr::UnaryOp { op, expr: e } => {
3074            let v = eval_aggregate_expr(e, columns, group_rows)?;
3075            crate::eval::eval_expr(
3076                &Expr::UnaryOp {
3077                    op: *op,
3078                    expr: Box::new(Expr::Literal(v)),
3079                },
3080                columns,
3081                &[],
3082            )
3083        }
3084
3085        Expr::IsNull(e) => {
3086            let v = eval_aggregate_expr(e, columns, group_rows)?;
3087            Ok(Value::Boolean(v.is_null()))
3088        }
3089
3090        Expr::IsNotNull(e) => {
3091            let v = eval_aggregate_expr(e, columns, group_rows)?;
3092            Ok(Value::Boolean(!v.is_null()))
3093        }
3094
3095        Expr::Cast { expr: e, data_type } => {
3096            let v = eval_aggregate_expr(e, columns, group_rows)?;
3097            crate::eval::eval_expr(
3098                &Expr::Cast {
3099                    expr: Box::new(Expr::Literal(v)),
3100                    data_type: *data_type,
3101                },
3102                columns,
3103                &[],
3104            )
3105        }
3106
3107        Expr::Case {
3108            operand,
3109            conditions,
3110            else_result,
3111        } => {
3112            let op_val = operand
3113                .as_ref()
3114                .map(|e| eval_aggregate_expr(e, columns, group_rows))
3115                .transpose()?;
3116            if let Some(ov) = &op_val {
3117                for (cond, result) in conditions {
3118                    let cv = eval_aggregate_expr(cond, columns, group_rows)?;
3119                    if !ov.is_null() && !cv.is_null() && *ov == cv {
3120                        return eval_aggregate_expr(result, columns, group_rows);
3121                    }
3122                }
3123            } else {
3124                for (cond, result) in conditions {
3125                    let cv = eval_aggregate_expr(cond, columns, group_rows)?;
3126                    if crate::eval::is_truthy(&cv) {
3127                        return eval_aggregate_expr(result, columns, group_rows);
3128                    }
3129                }
3130            }
3131            match else_result {
3132                Some(e) => eval_aggregate_expr(e, columns, group_rows),
3133                None => Ok(Value::Null),
3134            }
3135        }
3136
3137        Expr::Coalesce(args) => {
3138            for arg in args {
3139                let v = eval_aggregate_expr(arg, columns, group_rows)?;
3140                if !v.is_null() {
3141                    return Ok(v);
3142                }
3143            }
3144            Ok(Value::Null)
3145        }
3146
3147        Expr::Between {
3148            expr: e,
3149            low,
3150            high,
3151            negated,
3152        } => {
3153            let v = eval_aggregate_expr(e, columns, group_rows)?;
3154            let lo = eval_aggregate_expr(low, columns, group_rows)?;
3155            let hi = eval_aggregate_expr(high, columns, group_rows)?;
3156            crate::eval::eval_expr(
3157                &Expr::Between {
3158                    expr: Box::new(Expr::Literal(v)),
3159                    low: Box::new(Expr::Literal(lo)),
3160                    high: Box::new(Expr::Literal(hi)),
3161                    negated: *negated,
3162                },
3163                columns,
3164                &[],
3165            )
3166        }
3167
3168        Expr::Like {
3169            expr: e,
3170            pattern,
3171            escape,
3172            negated,
3173        } => {
3174            let v = eval_aggregate_expr(e, columns, group_rows)?;
3175            let p = eval_aggregate_expr(pattern, columns, group_rows)?;
3176            let esc = escape
3177                .as_ref()
3178                .map(|es| eval_aggregate_expr(es, columns, group_rows))
3179                .transpose()?;
3180            let esc_box = esc.map(|v| Box::new(Expr::Literal(v)));
3181            crate::eval::eval_expr(
3182                &Expr::Like {
3183                    expr: Box::new(Expr::Literal(v)),
3184                    pattern: Box::new(Expr::Literal(p)),
3185                    escape: esc_box,
3186                    negated: *negated,
3187                },
3188                columns,
3189                &[],
3190            )
3191        }
3192
3193        Expr::Function { name, args } => {
3194            let evaluated: Vec<Value> = args
3195                .iter()
3196                .map(|a| eval_aggregate_expr(a, columns, group_rows))
3197                .collect::<Result<_>>()?;
3198            let literal_args: Vec<Expr> = evaluated.into_iter().map(Expr::Literal).collect();
3199            crate::eval::eval_expr(
3200                &Expr::Function {
3201                    name: name.clone(),
3202                    args: literal_args,
3203                },
3204                columns,
3205                &[],
3206            )
3207        }
3208
3209        _ => Err(SqlError::Unsupported(format!(
3210            "expression in aggregate: {expr:?}"
3211        ))),
3212    }
3213}
3214
3215fn is_aggregate_function(name: &str, arg_count: usize) -> bool {
3216    let u = name.to_ascii_uppercase();
3217    matches!(u.as_str(), "COUNT" | "SUM" | "AVG")
3218        || (matches!(u.as_str(), "MIN" | "MAX") && arg_count == 1)
3219}
3220
3221fn is_aggregate_expr(expr: &Expr) -> bool {
3222    match expr {
3223        Expr::CountStar => true,
3224        Expr::Function { name, args } => {
3225            is_aggregate_function(name, args.len()) || args.iter().any(is_aggregate_expr)
3226        }
3227        Expr::BinaryOp { left, right, .. } => is_aggregate_expr(left) || is_aggregate_expr(right),
3228        Expr::UnaryOp { expr, .. }
3229        | Expr::IsNull(expr)
3230        | Expr::IsNotNull(expr)
3231        | Expr::Cast { expr, .. } => is_aggregate_expr(expr),
3232        Expr::Case {
3233            operand,
3234            conditions,
3235            else_result,
3236        } => {
3237            operand.as_ref().is_some_and(|e| is_aggregate_expr(e))
3238                || conditions
3239                    .iter()
3240                    .any(|(c, r)| is_aggregate_expr(c) || is_aggregate_expr(r))
3241                || else_result.as_ref().is_some_and(|e| is_aggregate_expr(e))
3242        }
3243        Expr::Coalesce(args) => args.iter().any(is_aggregate_expr),
3244        Expr::Between {
3245            expr, low, high, ..
3246        } => is_aggregate_expr(expr) || is_aggregate_expr(low) || is_aggregate_expr(high),
3247        Expr::Like {
3248            expr,
3249            pattern,
3250            escape,
3251            ..
3252        } => {
3253            is_aggregate_expr(expr)
3254                || is_aggregate_expr(pattern)
3255                || escape.as_ref().is_some_and(|e| is_aggregate_expr(e))
3256        }
3257        _ => false,
3258    }
3259}
3260
3261// ── Helpers ─────────────────────────────────────────────────────────
3262
3263/// Decode a full row from B+ tree key + value.
3264fn decode_full_row(schema: &TableSchema, key: &[u8], value: &[u8]) -> Result<Vec<Value>> {
3265    let pk_values = decode_composite_key(key, schema.primary_key_columns.len())?;
3266    let non_pk_values = decode_row(value)?;
3267
3268    let mut row = vec![Value::Null; schema.columns.len()];
3269
3270    for (i, &col_idx) in schema.primary_key_columns.iter().enumerate() {
3271        row[col_idx as usize] = pk_values[i].clone();
3272    }
3273
3274    let non_pk = schema.non_pk_indices();
3275    for (i, &col_idx) in non_pk.iter().enumerate() {
3276        if i < non_pk_values.len() {
3277            row[col_idx] = non_pk_values[i].clone();
3278        }
3279    }
3280
3281    Ok(row)
3282}
3283
3284/// Evaluate a constant expression (no column references).
3285fn eval_const_expr(expr: &Expr) -> Result<Value> {
3286    eval_expr(expr, &[], &[])
3287}
3288
3289fn eval_const_int(expr: &Expr) -> Result<i64> {
3290    match eval_const_expr(expr)? {
3291        Value::Integer(i) => Ok(i),
3292        other => Err(SqlError::TypeMismatch {
3293            expected: "INTEGER".into(),
3294            got: other.data_type().to_string(),
3295        }),
3296    }
3297}
3298
3299fn sort_rows(
3300    rows: &mut [Vec<Value>],
3301    order_by: &[OrderByItem],
3302    columns: &[ColumnDef],
3303) -> Result<()> {
3304    rows.sort_by(|a, b| {
3305        for item in order_by {
3306            let a_val = eval_expr(&item.expr, columns, a).unwrap_or(Value::Null);
3307            let b_val = eval_expr(&item.expr, columns, b).unwrap_or(Value::Null);
3308
3309            let nulls_first = item.nulls_first.unwrap_or(!item.descending);
3310
3311            let ord = match (a_val.is_null(), b_val.is_null()) {
3312                (true, true) => std::cmp::Ordering::Equal,
3313                (true, false) => {
3314                    if nulls_first {
3315                        std::cmp::Ordering::Less
3316                    } else {
3317                        std::cmp::Ordering::Greater
3318                    }
3319                }
3320                (false, true) => {
3321                    if nulls_first {
3322                        std::cmp::Ordering::Greater
3323                    } else {
3324                        std::cmp::Ordering::Less
3325                    }
3326                }
3327                (false, false) => {
3328                    let cmp = a_val.cmp(&b_val);
3329                    if item.descending {
3330                        cmp.reverse()
3331                    } else {
3332                        cmp
3333                    }
3334                }
3335            };
3336
3337            if ord != std::cmp::Ordering::Equal {
3338                return ord;
3339            }
3340        }
3341        std::cmp::Ordering::Equal
3342    });
3343    Ok(())
3344}
3345
3346fn project_rows(
3347    columns: &[ColumnDef],
3348    select_cols: &[SelectColumn],
3349    rows: &[Vec<Value>],
3350) -> Result<(Vec<String>, Vec<Vec<Value>>)> {
3351    let mut col_names = Vec::new();
3352    type Projector = Box<dyn Fn(&[Value]) -> Result<Value>>;
3353    let mut projectors: Vec<Projector> = Vec::new();
3354
3355    for sel_col in select_cols {
3356        match sel_col {
3357            SelectColumn::AllColumns => {
3358                for col in columns {
3359                    let idx = col.position as usize;
3360                    col_names.push(col.name.clone());
3361                    projectors.push(Box::new(move |row: &[Value]| Ok(row[idx].clone())));
3362                }
3363            }
3364            SelectColumn::Expr { expr, alias } => {
3365                let name = alias.clone().unwrap_or_else(|| expr_display_name(expr));
3366                col_names.push(name);
3367                let expr = expr.clone();
3368                let owned_cols = columns.to_vec();
3369                projectors.push(Box::new(move |row: &[Value]| {
3370                    eval_expr(&expr, &owned_cols, row)
3371                }));
3372            }
3373        }
3374    }
3375
3376    let projected = rows
3377        .iter()
3378        .map(|row| {
3379            projectors
3380                .iter()
3381                .map(|p| p(row))
3382                .collect::<Result<Vec<_>>>()
3383        })
3384        .collect::<Result<Vec<_>>>()?;
3385
3386    Ok((col_names, projected))
3387}
3388
3389fn expr_display_name(expr: &Expr) -> String {
3390    match expr {
3391        Expr::Column(name) => name.clone(),
3392        Expr::QualifiedColumn { table, column } => format!("{table}.{column}"),
3393        Expr::Literal(v) => format!("{v}"),
3394        Expr::CountStar => "COUNT(*)".into(),
3395        Expr::Function { name, args } => {
3396            let arg_strs: Vec<String> = args.iter().map(expr_display_name).collect();
3397            format!("{name}({})", arg_strs.join(", "))
3398        }
3399        Expr::BinaryOp { left, op, right } => {
3400            format!(
3401                "{} {} {}",
3402                expr_display_name(left),
3403                op_symbol(op),
3404                expr_display_name(right)
3405            )
3406        }
3407        _ => "?".into(),
3408    }
3409}
3410
3411fn op_symbol(op: &BinOp) -> &'static str {
3412    match op {
3413        BinOp::Add => "+",
3414        BinOp::Sub => "-",
3415        BinOp::Mul => "*",
3416        BinOp::Div => "/",
3417        BinOp::Mod => "%",
3418        BinOp::Eq => "=",
3419        BinOp::NotEq => "<>",
3420        BinOp::Lt => "<",
3421        BinOp::Gt => ">",
3422        BinOp::LtEq => "<=",
3423        BinOp::GtEq => ">=",
3424        BinOp::And => "AND",
3425        BinOp::Or => "OR",
3426        BinOp::Concat => "||",
3427    }
3428}
3429
3430fn build_output_columns(select_cols: &[SelectColumn], columns: &[ColumnDef]) -> Vec<ColumnDef> {
3431    let mut out = Vec::new();
3432    for (i, col) in select_cols.iter().enumerate() {
3433        let (name, data_type) = match col {
3434            SelectColumn::AllColumns => (format!("col{i}"), DataType::Null),
3435            SelectColumn::Expr {
3436                alias: Some(a),
3437                expr,
3438            } => (a.clone(), infer_expr_type(expr, columns)),
3439            SelectColumn::Expr { expr, .. } => {
3440                (expr_display_name(expr), infer_expr_type(expr, columns))
3441            }
3442        };
3443        out.push(ColumnDef {
3444            name,
3445            data_type,
3446            nullable: true,
3447            position: i as u16,
3448        });
3449    }
3450    out
3451}
3452
3453fn infer_expr_type(expr: &Expr, columns: &[ColumnDef]) -> DataType {
3454    match expr {
3455        Expr::Column(name) => {
3456            let lower = name.to_ascii_lowercase();
3457            columns
3458                .iter()
3459                .find(|c| c.name.to_ascii_lowercase() == lower)
3460                .map(|c| c.data_type)
3461                .unwrap_or(DataType::Null)
3462        }
3463        Expr::QualifiedColumn { table, column } => {
3464            let qualified = format!(
3465                "{}.{}",
3466                table.to_ascii_lowercase(),
3467                column.to_ascii_lowercase()
3468            );
3469            columns
3470                .iter()
3471                .find(|c| c.name.to_ascii_lowercase() == qualified)
3472                .map(|c| c.data_type)
3473                .unwrap_or(DataType::Null)
3474        }
3475        Expr::Literal(v) => v.data_type(),
3476        Expr::CountStar => DataType::Integer,
3477        Expr::Function { name, .. } => match name.to_ascii_uppercase().as_str() {
3478            "COUNT" => DataType::Integer,
3479            "AVG" => DataType::Real,
3480            "SUM" | "MIN" | "MAX" => DataType::Null,
3481            _ => DataType::Null,
3482        },
3483        _ => DataType::Null,
3484    }
3485}