Skip to main content

citadeldb_sql/
executor.rs

1//! SQL executor: DDL and DML operations.
2
3use std::collections::BTreeMap;
4
5use citadel::Database;
6
7use crate::encoding::{decode_composite_key, decode_key_value, decode_row, encode_composite_key, encode_row};
8use crate::error::{Result, SqlError};
9use crate::eval::{eval_expr, is_truthy};
10use crate::parser::*;
11use crate::planner::{self, ScanPlan};
12use crate::schema::SchemaManager;
13use crate::types::*;
14
15// ── Index helpers ────────────────────────────────────────────────────
16
17fn encode_index_key(
18    idx: &IndexDef,
19    row: &[Value],
20    pk_values: &[Value],
21) -> Vec<u8> {
22    let indexed_values: Vec<Value> = idx.columns.iter()
23        .map(|&col_idx| row[col_idx as usize].clone())
24        .collect();
25
26    if idx.unique {
27        let any_null = indexed_values.iter().any(|v| v.is_null());
28        if !any_null {
29            return encode_composite_key(&indexed_values);
30        }
31    }
32
33    let mut all_values = indexed_values;
34    all_values.extend_from_slice(pk_values);
35    encode_composite_key(&all_values)
36}
37
38fn encode_index_value(
39    idx: &IndexDef,
40    row: &[Value],
41    pk_values: &[Value],
42) -> Vec<u8> {
43    if idx.unique {
44        let indexed_values: Vec<Value> = idx.columns.iter()
45            .map(|&col_idx| row[col_idx as usize].clone())
46            .collect();
47        let any_null = indexed_values.iter().any(|v| v.is_null());
48        if !any_null {
49            return encode_composite_key(pk_values);
50        }
51    }
52    vec![]
53}
54
55fn insert_index_entries(
56    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
57    table_schema: &TableSchema,
58    row: &[Value],
59    pk_values: &[Value],
60) -> Result<()> {
61    for idx in &table_schema.indices {
62        let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
63        let key = encode_index_key(idx, row, pk_values);
64        let value = encode_index_value(idx, row, pk_values);
65
66        let is_new = wtx.table_insert(&idx_table, &key, &value)
67            .map_err(SqlError::Storage)?;
68
69        if idx.unique && !is_new {
70            let indexed_values: Vec<Value> = idx.columns.iter()
71                .map(|&col_idx| row[col_idx as usize].clone())
72                .collect();
73            let any_null = indexed_values.iter().any(|v| v.is_null());
74            if !any_null {
75                return Err(SqlError::UniqueViolation(idx.name.clone()));
76            }
77        }
78    }
79    Ok(())
80}
81
82fn delete_index_entries(
83    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
84    table_schema: &TableSchema,
85    row: &[Value],
86    pk_values: &[Value],
87) -> Result<()> {
88    for idx in &table_schema.indices {
89        let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
90        let key = encode_index_key(idx, row, pk_values);
91        wtx.table_delete(&idx_table, &key).map_err(SqlError::Storage)?;
92    }
93    Ok(())
94}
95
96fn index_columns_changed(idx: &IndexDef, old_row: &[Value], new_row: &[Value]) -> bool {
97    idx.columns.iter().any(|&col_idx| {
98        old_row[col_idx as usize] != new_row[col_idx as usize]
99    })
100}
101
102/// Execute a parsed SQL statement in auto-commit mode.
103pub fn execute(
104    db: &Database,
105    schema: &mut SchemaManager,
106    stmt: &Statement,
107) -> Result<ExecutionResult> {
108    match stmt {
109        Statement::CreateTable(ct) => exec_create_table(db, schema, ct),
110        Statement::DropTable(dt) => exec_drop_table(db, schema, dt),
111        Statement::CreateIndex(ci) => exec_create_index(db, schema, ci),
112        Statement::DropIndex(di) => exec_drop_index(db, schema, di),
113        Statement::Insert(ins) => exec_insert(db, schema, ins),
114        Statement::Select(sel) => exec_select(db, schema, sel),
115        Statement::Update(upd) => exec_update(db, schema, upd),
116        Statement::Delete(del) => exec_delete(db, schema, del),
117        Statement::Explain(inner) => explain(schema, inner),
118        Statement::Begin | Statement::Commit | Statement::Rollback => {
119            Err(SqlError::Unsupported("transaction control in auto-commit mode".into()))
120        }
121    }
122}
123
124/// Execute a parsed SQL statement within an existing write transaction.
125pub fn execute_in_txn(
126    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
127    schema: &mut SchemaManager,
128    stmt: &Statement,
129) -> Result<ExecutionResult> {
130    match stmt {
131        Statement::CreateTable(ct) => exec_create_table_in_txn(wtx, schema, ct),
132        Statement::DropTable(dt) => exec_drop_table_in_txn(wtx, schema, dt),
133        Statement::CreateIndex(ci) => exec_create_index_in_txn(wtx, schema, ci),
134        Statement::DropIndex(di) => exec_drop_index_in_txn(wtx, schema, di),
135        Statement::Insert(ins) => exec_insert_in_txn(wtx, schema, ins),
136        Statement::Select(sel) => exec_select_in_txn(wtx, schema, sel),
137        Statement::Update(upd) => exec_update_in_txn(wtx, schema, upd),
138        Statement::Delete(del) => exec_delete_in_txn(wtx, schema, del),
139        Statement::Explain(inner) => explain(schema, inner),
140        Statement::Begin | Statement::Commit | Statement::Rollback => {
141            Err(SqlError::Unsupported("nested transaction control".into()))
142        }
143    }
144}
145
146// ── EXPLAIN ──────────────────────────────────────────────────────────
147
148pub fn explain(
149    schema: &SchemaManager,
150    stmt: &Statement,
151) -> Result<ExecutionResult> {
152    let lines = match stmt {
153        Statement::Select(sel) => explain_select(schema, sel)?,
154        Statement::Insert(ins) => {
155            vec![format!("INSERT INTO {}", ins.table.to_ascii_lowercase())]
156        }
157        Statement::Update(upd) => explain_dml(schema, &upd.table, &upd.where_clause, "UPDATE")?,
158        Statement::Delete(del) => explain_dml(schema, &del.table, &del.where_clause, "DELETE FROM")?,
159        Statement::Explain(_) => {
160            return Err(SqlError::Unsupported("EXPLAIN EXPLAIN".into()));
161        }
162        _ => {
163            return Err(SqlError::Unsupported("EXPLAIN for this statement type".into()));
164        }
165    };
166
167    let rows = lines.into_iter()
168        .map(|line| vec![Value::Text(line)])
169        .collect();
170    Ok(ExecutionResult::Query(QueryResult {
171        columns: vec!["plan".into()],
172        rows,
173    }))
174}
175
176fn explain_dml(
177    schema: &SchemaManager,
178    table: &str,
179    where_clause: &Option<Expr>,
180    verb: &str,
181) -> Result<Vec<String>> {
182    let lower = table.to_ascii_lowercase();
183    let table_schema = schema.get(&lower)
184        .ok_or_else(|| SqlError::TableNotFound(table.to_string()))?;
185    let plan = planner::plan_select(table_schema, where_clause);
186    let scan_line = format_scan_line(&lower, &None, &plan, table_schema);
187    Ok(vec![format!("{verb} {}", scan_line)])
188}
189
190fn explain_select(
191    schema: &SchemaManager,
192    stmt: &SelectStmt,
193) -> Result<Vec<String>> {
194    let mut lines = Vec::new();
195
196    if stmt.from.is_empty() {
197        lines.push("CONSTANT ROW".into());
198        return Ok(lines);
199    }
200
201    let lower_from = stmt.from.to_ascii_lowercase();
202    let from_schema = schema.get(&lower_from)
203        .ok_or_else(|| SqlError::TableNotFound(stmt.from.clone()))?;
204
205    if stmt.joins.is_empty() {
206        let plan = planner::plan_select(from_schema, &stmt.where_clause);
207        lines.push(format_scan_line(&lower_from, &stmt.from_alias, &plan, from_schema));
208    } else {
209        let from_plan = planner::plan_select(from_schema, &None);
210        lines.push(format_scan_line(&lower_from, &stmt.from_alias, &from_plan, from_schema));
211
212        for join in &stmt.joins {
213            let inner_lower = join.table.name.to_ascii_lowercase();
214            let inner_schema = schema.get(&inner_lower)
215                .ok_or_else(|| SqlError::TableNotFound(join.table.name.clone()))?;
216            let inner_plan = planner::plan_select(inner_schema, &None);
217            lines.push(format_scan_line(&inner_lower, &join.table.alias, &inner_plan, inner_schema));
218        }
219
220        let join_type_str = match stmt.joins.last().map(|j| j.join_type) {
221            Some(JoinType::Left) => "LEFT JOIN",
222            Some(JoinType::Right) => "RIGHT JOIN",
223            Some(JoinType::Cross) => "CROSS JOIN",
224            _ => "NESTED LOOP",
225        };
226        lines.push(join_type_str.into());
227    }
228
229    if stmt.where_clause.is_some() && stmt.joins.is_empty() {
230        let plan = planner::plan_select(from_schema, &stmt.where_clause);
231        if matches!(plan, ScanPlan::SeqScan) {
232            lines.push("FILTER".into());
233        }
234    }
235
236    if let Some(ref w) = stmt.where_clause {
237        if !stmt.joins.is_empty() && has_subquery(w) {
238            lines.push("SUBQUERY".into());
239        }
240    }
241
242    explain_subqueries(stmt, &mut lines);
243
244    if !stmt.group_by.is_empty() {
245        lines.push("GROUP BY".into());
246    }
247
248    if stmt.distinct {
249        lines.push("DISTINCT".into());
250    }
251
252    if !stmt.order_by.is_empty() {
253        lines.push("SORT".into());
254    }
255
256    if let Some(ref offset_expr) = stmt.offset {
257        if let Ok(n) = eval_const_int(offset_expr) {
258            lines.push(format!("OFFSET {n}"));
259        } else {
260            lines.push("OFFSET".into());
261        }
262    }
263
264    if let Some(ref limit_expr) = stmt.limit {
265        if let Ok(n) = eval_const_int(limit_expr) {
266            lines.push(format!("LIMIT {n}"));
267        } else {
268            lines.push("LIMIT".into());
269        }
270    }
271
272    Ok(lines)
273}
274
275fn explain_subqueries(stmt: &SelectStmt, lines: &mut Vec<String>) {
276    let mut count = 0;
277    if let Some(ref w) = stmt.where_clause { count += count_subqueries(w); }
278    if let Some(ref h) = stmt.having { count += count_subqueries(h); }
279    for col in &stmt.columns {
280        if let SelectColumn::Expr { expr, .. } = col {
281            count += count_subqueries(expr);
282        }
283    }
284    for _ in 0..count {
285        lines.push("SUBQUERY".into());
286    }
287}
288
289fn count_subqueries(expr: &Expr) -> usize {
290    match expr {
291        Expr::InSubquery { expr: e, .. } => 1 + count_subqueries(e),
292        Expr::ScalarSubquery(_) => 1,
293        Expr::Exists { .. } => 1,
294        Expr::BinaryOp { left, right, .. } => count_subqueries(left) + count_subqueries(right),
295        Expr::UnaryOp { expr: e, .. } => count_subqueries(e),
296        Expr::IsNull(e) | Expr::IsNotNull(e) => count_subqueries(e),
297        Expr::Function { args, .. } => args.iter().map(count_subqueries).sum(),
298        Expr::Between { expr: e, low, high, .. } => {
299            count_subqueries(e) + count_subqueries(low) + count_subqueries(high)
300        }
301        Expr::Like { expr: e, pattern, .. } => count_subqueries(e) + count_subqueries(pattern),
302        Expr::Case { operand, conditions, else_result } => {
303            let mut n = 0;
304            if let Some(op) = operand { n += count_subqueries(op); }
305            for (c, r) in conditions { n += count_subqueries(c) + count_subqueries(r); }
306            if let Some(el) = else_result { n += count_subqueries(el); }
307            n
308        }
309        Expr::Coalesce(args) => args.iter().map(count_subqueries).sum(),
310        Expr::Cast { expr: e, .. } => count_subqueries(e),
311        Expr::InList { expr: e, list, .. } => {
312            count_subqueries(e) + list.iter().map(count_subqueries).sum::<usize>()
313        }
314        _ => 0,
315    }
316}
317
318fn format_scan_line(
319    table_name: &str,
320    alias: &Option<String>,
321    plan: &ScanPlan,
322    table_schema: &TableSchema,
323) -> String {
324    let alias_part = match alias {
325        Some(a) if a.to_ascii_lowercase() != table_name.to_ascii_lowercase() => {
326            format!(" AS {}", a.to_ascii_lowercase())
327        }
328        _ => String::new(),
329    };
330
331    let desc = planner::describe_plan(plan, table_schema);
332
333    if desc.is_empty() {
334        format!("SCAN TABLE {table_name}{alias_part}")
335    } else {
336        format!("SEARCH TABLE {table_name}{alias_part} {desc}")
337    }
338}
339
340// ── DDL ─────────────────────────────────────────────────────────────
341
342fn exec_create_table(
343    db: &Database,
344    schema: &mut SchemaManager,
345    stmt: &CreateTableStmt,
346) -> Result<ExecutionResult> {
347    let lower_name = stmt.name.to_ascii_lowercase();
348
349    if schema.contains(&lower_name) {
350        if stmt.if_not_exists {
351            return Ok(ExecutionResult::Ok);
352        }
353        return Err(SqlError::TableAlreadyExists(stmt.name.clone()));
354    }
355
356    if stmt.primary_key.is_empty() {
357        return Err(SqlError::PrimaryKeyRequired);
358    }
359
360    let mut seen = std::collections::HashSet::new();
361    for col in &stmt.columns {
362        let lower = col.name.to_ascii_lowercase();
363        if !seen.insert(lower.clone()) {
364            return Err(SqlError::DuplicateColumn(col.name.clone()));
365        }
366    }
367
368    let columns: Vec<ColumnDef> = stmt.columns.iter().enumerate().map(|(i, c)| {
369        ColumnDef {
370            name: c.name.to_ascii_lowercase(),
371            data_type: c.data_type,
372            nullable: c.nullable,
373            position: i as u16,
374        }
375    }).collect();
376
377    let primary_key_columns: Vec<u16> = stmt.primary_key.iter().map(|pk_name| {
378        let lower = pk_name.to_ascii_lowercase();
379        columns.iter().position(|c| c.name == lower)
380            .map(|i| i as u16)
381            .ok_or_else(|| SqlError::ColumnNotFound(pk_name.clone()))
382    }).collect::<Result<_>>()?;
383
384    let table_schema = TableSchema {
385        name: lower_name.clone(),
386        columns,
387        primary_key_columns,
388        indices: vec![],
389    };
390
391    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
392    SchemaManager::ensure_schema_table(&mut wtx)?;
393    wtx.create_table(lower_name.as_bytes()).map_err(SqlError::Storage)?;
394    SchemaManager::save_schema(&mut wtx, &table_schema)?;
395    wtx.commit().map_err(SqlError::Storage)?;
396
397    schema.register(table_schema);
398    Ok(ExecutionResult::Ok)
399}
400
401fn exec_drop_table(
402    db: &Database,
403    schema: &mut SchemaManager,
404    stmt: &DropTableStmt,
405) -> Result<ExecutionResult> {
406    let lower_name = stmt.name.to_ascii_lowercase();
407
408    if !schema.contains(&lower_name) {
409        if stmt.if_exists {
410            return Ok(ExecutionResult::Ok);
411        }
412        return Err(SqlError::TableNotFound(stmt.name.clone()));
413    }
414
415    let table_schema = schema.get(&lower_name).unwrap();
416    let idx_tables: Vec<Vec<u8>> = table_schema.indices.iter()
417        .map(|idx| TableSchema::index_table_name(&lower_name, &idx.name))
418        .collect();
419
420    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
421    for idx_table in &idx_tables {
422        wtx.drop_table(idx_table).map_err(SqlError::Storage)?;
423    }
424    wtx.drop_table(lower_name.as_bytes()).map_err(SqlError::Storage)?;
425    SchemaManager::delete_schema(&mut wtx, &lower_name)?;
426    wtx.commit().map_err(SqlError::Storage)?;
427
428    schema.remove(&lower_name);
429    Ok(ExecutionResult::Ok)
430}
431
432fn exec_create_table_in_txn(
433    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
434    schema: &mut SchemaManager,
435    stmt: &CreateTableStmt,
436) -> Result<ExecutionResult> {
437    let lower_name = stmt.name.to_ascii_lowercase();
438
439    if schema.contains(&lower_name) {
440        if stmt.if_not_exists {
441            return Ok(ExecutionResult::Ok);
442        }
443        return Err(SqlError::TableAlreadyExists(stmt.name.clone()));
444    }
445
446    if stmt.primary_key.is_empty() {
447        return Err(SqlError::PrimaryKeyRequired);
448    }
449
450    let mut seen = std::collections::HashSet::new();
451    for col in &stmt.columns {
452        let lower = col.name.to_ascii_lowercase();
453        if !seen.insert(lower.clone()) {
454            return Err(SqlError::DuplicateColumn(col.name.clone()));
455        }
456    }
457
458    let columns: Vec<ColumnDef> = stmt.columns.iter().enumerate().map(|(i, c)| {
459        ColumnDef {
460            name: c.name.to_ascii_lowercase(),
461            data_type: c.data_type,
462            nullable: c.nullable,
463            position: i as u16,
464        }
465    }).collect();
466
467    let primary_key_columns: Vec<u16> = stmt.primary_key.iter().map(|pk_name| {
468        let lower = pk_name.to_ascii_lowercase();
469        columns.iter().position(|c| c.name == lower)
470            .map(|i| i as u16)
471            .ok_or_else(|| SqlError::ColumnNotFound(pk_name.clone()))
472    }).collect::<Result<_>>()?;
473
474    let table_schema = TableSchema {
475        name: lower_name.clone(),
476        columns,
477        primary_key_columns,
478        indices: vec![],
479    };
480
481    SchemaManager::ensure_schema_table(wtx)?;
482    wtx.create_table(lower_name.as_bytes()).map_err(SqlError::Storage)?;
483    SchemaManager::save_schema(wtx, &table_schema)?;
484
485    schema.register(table_schema);
486    Ok(ExecutionResult::Ok)
487}
488
489fn exec_drop_table_in_txn(
490    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
491    schema: &mut SchemaManager,
492    stmt: &DropTableStmt,
493) -> Result<ExecutionResult> {
494    let lower_name = stmt.name.to_ascii_lowercase();
495
496    if !schema.contains(&lower_name) {
497        if stmt.if_exists {
498            return Ok(ExecutionResult::Ok);
499        }
500        return Err(SqlError::TableNotFound(stmt.name.clone()));
501    }
502
503    let table_schema = schema.get(&lower_name).unwrap();
504    let idx_tables: Vec<Vec<u8>> = table_schema.indices.iter()
505        .map(|idx| TableSchema::index_table_name(&lower_name, &idx.name))
506        .collect();
507
508    for idx_table in &idx_tables {
509        wtx.drop_table(idx_table).map_err(SqlError::Storage)?;
510    }
511    wtx.drop_table(lower_name.as_bytes()).map_err(SqlError::Storage)?;
512    SchemaManager::delete_schema(wtx, &lower_name)?;
513
514    schema.remove(&lower_name);
515    Ok(ExecutionResult::Ok)
516}
517
518fn exec_create_index(
519    db: &Database,
520    schema: &mut SchemaManager,
521    stmt: &CreateIndexStmt,
522) -> Result<ExecutionResult> {
523    let lower_table = stmt.table_name.to_ascii_lowercase();
524    let lower_idx = stmt.index_name.to_ascii_lowercase();
525
526    let table_schema = schema.get(&lower_table)
527        .ok_or_else(|| SqlError::TableNotFound(stmt.table_name.clone()))?;
528
529    if table_schema.index_by_name(&lower_idx).is_some() {
530        if stmt.if_not_exists {
531            return Ok(ExecutionResult::Ok);
532        }
533        return Err(SqlError::IndexAlreadyExists(stmt.index_name.clone()));
534    }
535
536    let col_indices: Vec<u16> = stmt.columns.iter().map(|col_name| {
537        let lower = col_name.to_ascii_lowercase();
538        table_schema.column_index(&lower)
539            .map(|i| i as u16)
540            .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))
541    }).collect::<Result<_>>()?;
542
543    let idx_def = IndexDef {
544        name: lower_idx.clone(),
545        columns: col_indices,
546        unique: stmt.unique,
547    };
548
549    let idx_table = TableSchema::index_table_name(&lower_table, &lower_idx);
550
551    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
552    SchemaManager::ensure_schema_table(&mut wtx)?;
553    wtx.create_table(&idx_table).map_err(SqlError::Storage)?;
554
555    // Populate index from existing rows
556    let pk_indices = table_schema.pk_indices();
557    let mut rows: Vec<Vec<Value>> = Vec::new();
558    {
559        let mut scan_err: Option<SqlError> = None;
560        wtx.table_for_each(lower_table.as_bytes(), |key, value| {
561            match decode_full_row(table_schema, key, value) {
562                Ok(row) => rows.push(row),
563                Err(e) => scan_err = Some(e),
564            }
565            Ok(())
566        }).map_err(SqlError::Storage)?;
567        if let Some(e) = scan_err { return Err(e); }
568    }
569
570    for row in &rows {
571        let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
572        let key = encode_index_key(&idx_def, row, &pk_values);
573        let value = encode_index_value(&idx_def, row, &pk_values);
574        let is_new = wtx.table_insert(&idx_table, &key, &value)
575            .map_err(SqlError::Storage)?;
576        if idx_def.unique && !is_new {
577            let indexed_values: Vec<Value> = idx_def.columns.iter()
578                .map(|&col_idx| row[col_idx as usize].clone())
579                .collect();
580            let any_null = indexed_values.iter().any(|v| v.is_null());
581            if !any_null {
582                return Err(SqlError::UniqueViolation(stmt.index_name.clone()));
583            }
584        }
585    }
586
587    let mut updated_schema = table_schema.clone();
588    updated_schema.indices.push(idx_def);
589    SchemaManager::save_schema(&mut wtx, &updated_schema)?;
590    wtx.commit().map_err(SqlError::Storage)?;
591
592    schema.register(updated_schema);
593    Ok(ExecutionResult::Ok)
594}
595
596fn exec_drop_index(
597    db: &Database,
598    schema: &mut SchemaManager,
599    stmt: &DropIndexStmt,
600) -> Result<ExecutionResult> {
601    let lower_idx = stmt.index_name.to_ascii_lowercase();
602
603    let (table_name, _idx_pos) = match find_index_in_schemas(schema, &lower_idx) {
604        Some(found) => found,
605        None => {
606            if stmt.if_exists {
607                return Ok(ExecutionResult::Ok);
608            }
609            return Err(SqlError::IndexNotFound(stmt.index_name.clone()));
610        }
611    };
612
613    let idx_table = TableSchema::index_table_name(&table_name, &lower_idx);
614
615    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
616    wtx.drop_table(&idx_table).map_err(SqlError::Storage)?;
617
618    let table_schema = schema.get(&table_name).unwrap();
619    let mut updated_schema = table_schema.clone();
620    updated_schema.indices.retain(|i| i.name != lower_idx);
621    SchemaManager::save_schema(&mut wtx, &updated_schema)?;
622    wtx.commit().map_err(SqlError::Storage)?;
623
624    schema.register(updated_schema);
625    Ok(ExecutionResult::Ok)
626}
627
628fn exec_create_index_in_txn(
629    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
630    schema: &mut SchemaManager,
631    stmt: &CreateIndexStmt,
632) -> Result<ExecutionResult> {
633    let lower_table = stmt.table_name.to_ascii_lowercase();
634    let lower_idx = stmt.index_name.to_ascii_lowercase();
635
636    let table_schema = schema.get(&lower_table)
637        .ok_or_else(|| SqlError::TableNotFound(stmt.table_name.clone()))?;
638
639    if table_schema.index_by_name(&lower_idx).is_some() {
640        if stmt.if_not_exists {
641            return Ok(ExecutionResult::Ok);
642        }
643        return Err(SqlError::IndexAlreadyExists(stmt.index_name.clone()));
644    }
645
646    let col_indices: Vec<u16> = stmt.columns.iter().map(|col_name| {
647        let lower = col_name.to_ascii_lowercase();
648        table_schema.column_index(&lower)
649            .map(|i| i as u16)
650            .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))
651    }).collect::<Result<_>>()?;
652
653    let idx_def = IndexDef {
654        name: lower_idx.clone(),
655        columns: col_indices,
656        unique: stmt.unique,
657    };
658
659    let idx_table = TableSchema::index_table_name(&lower_table, &lower_idx);
660
661    SchemaManager::ensure_schema_table(wtx)?;
662    wtx.create_table(&idx_table).map_err(SqlError::Storage)?;
663
664    let pk_indices = table_schema.pk_indices();
665    let mut rows: Vec<Vec<Value>> = Vec::new();
666    {
667        let mut scan_err: Option<SqlError> = None;
668        wtx.table_for_each(lower_table.as_bytes(), |key, value| {
669            match decode_full_row(table_schema, key, value) {
670                Ok(row) => rows.push(row),
671                Err(e) => scan_err = Some(e),
672            }
673            Ok(())
674        }).map_err(SqlError::Storage)?;
675        if let Some(e) = scan_err { return Err(e); }
676    }
677
678    for row in &rows {
679        let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
680        let key = encode_index_key(&idx_def, row, &pk_values);
681        let value = encode_index_value(&idx_def, row, &pk_values);
682        let is_new = wtx.table_insert(&idx_table, &key, &value)
683            .map_err(SqlError::Storage)?;
684        if idx_def.unique && !is_new {
685            let indexed_values: Vec<Value> = idx_def.columns.iter()
686                .map(|&col_idx| row[col_idx as usize].clone())
687                .collect();
688            let any_null = indexed_values.iter().any(|v| v.is_null());
689            if !any_null {
690                return Err(SqlError::UniqueViolation(stmt.index_name.clone()));
691            }
692        }
693    }
694
695    let mut updated_schema = table_schema.clone();
696    updated_schema.indices.push(idx_def);
697    SchemaManager::save_schema(wtx, &updated_schema)?;
698
699    schema.register(updated_schema);
700    Ok(ExecutionResult::Ok)
701}
702
703fn exec_drop_index_in_txn(
704    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
705    schema: &mut SchemaManager,
706    stmt: &DropIndexStmt,
707) -> Result<ExecutionResult> {
708    let lower_idx = stmt.index_name.to_ascii_lowercase();
709
710    let (table_name, _idx_pos) = match find_index_in_schemas(schema, &lower_idx) {
711        Some(found) => found,
712        None => {
713            if stmt.if_exists {
714                return Ok(ExecutionResult::Ok);
715            }
716            return Err(SqlError::IndexNotFound(stmt.index_name.clone()));
717        }
718    };
719
720    let idx_table = TableSchema::index_table_name(&table_name, &lower_idx);
721    wtx.drop_table(&idx_table).map_err(SqlError::Storage)?;
722
723    let table_schema = schema.get(&table_name).unwrap();
724    let mut updated_schema = table_schema.clone();
725    updated_schema.indices.retain(|i| i.name != lower_idx);
726    SchemaManager::save_schema(wtx, &updated_schema)?;
727
728    schema.register(updated_schema);
729    Ok(ExecutionResult::Ok)
730}
731
732fn find_index_in_schemas(schema: &SchemaManager, index_name: &str) -> Option<(String, usize)> {
733    for table_name in schema.table_names() {
734        if let Some(ts) = schema.get(table_name) {
735            if let Some(pos) = ts.indices.iter().position(|i| i.name == index_name) {
736                return Some((table_name.to_string(), pos));
737            }
738        }
739    }
740    None
741}
742
743// ── Index scan helpers ───────────────────────────────────────────────
744
745fn extract_pk_key(
746    idx_key: &[u8],
747    idx_value: &[u8],
748    is_unique: bool,
749    num_index_cols: usize,
750    num_pk_cols: usize,
751) -> Result<Vec<u8>> {
752    if is_unique && !idx_value.is_empty() {
753        Ok(idx_value.to_vec())
754    } else {
755        let total_cols = num_index_cols + num_pk_cols;
756        let all_values = decode_composite_key(idx_key, total_cols)?;
757        let pk_values = &all_values[num_index_cols..];
758        Ok(encode_composite_key(pk_values))
759    }
760}
761
762fn check_range_conditions(
763    idx_key: &[u8],
764    num_prefix_cols: usize,
765    range_conds: &[(BinOp, Value)],
766    num_index_cols: usize,
767) -> Result<RangeCheck> {
768    if range_conds.is_empty() {
769        return Ok(RangeCheck::Match);
770    }
771
772    let num_to_decode = num_prefix_cols + 1;
773    if num_to_decode > num_index_cols {
774        return Ok(RangeCheck::Match);
775    }
776
777    // Decode just enough columns to check the range column
778    let mut pos = 0;
779    for _ in 0..num_prefix_cols {
780        let (_, n) = decode_key_value(&idx_key[pos..])?;
781        pos += n;
782    }
783    let (range_val, _) = decode_key_value(&idx_key[pos..])?;
784
785    let mut exceeds_upper = false;
786    let mut below_lower = false;
787
788    for (op, val) in range_conds {
789        match op {
790            BinOp::Lt => { if range_val >= *val { exceeds_upper = true; } }
791            BinOp::LtEq => { if range_val > *val { exceeds_upper = true; } }
792            BinOp::Gt => { if range_val <= *val { below_lower = true; } }
793            BinOp::GtEq => { if range_val < *val { below_lower = true; } }
794            _ => {}
795        }
796    }
797
798    if exceeds_upper {
799        Ok(RangeCheck::ExceedsUpper)
800    } else if below_lower {
801        Ok(RangeCheck::BelowLower)
802    } else {
803        Ok(RangeCheck::Match)
804    }
805}
806
807enum RangeCheck {
808    Match,
809    BelowLower,
810    ExceedsUpper,
811}
812
813/// Collect rows via ReadTxn using the scan plan.
814fn collect_rows_read(
815    db: &Database,
816    table_schema: &TableSchema,
817    where_clause: &Option<Expr>,
818) -> Result<Vec<Vec<Value>>> {
819    let plan = planner::plan_select(table_schema, where_clause);
820    let lower_name = &table_schema.name;
821
822    match plan {
823        ScanPlan::SeqScan => {
824            let mut rows = Vec::new();
825            let mut rtx = db.begin_read();
826            let mut scan_err: Option<SqlError> = None;
827            rtx.table_for_each(lower_name.as_bytes(), |key, value| {
828                match decode_full_row(table_schema, key, value) {
829                    Ok(row) => rows.push(row),
830                    Err(e) => scan_err = Some(e),
831                }
832                Ok(())
833            }).map_err(SqlError::Storage)?;
834            if let Some(e) = scan_err { return Err(e); }
835            Ok(rows)
836        }
837
838        ScanPlan::PkLookup { pk_values } => {
839            let key = encode_composite_key(&pk_values);
840            let mut rtx = db.begin_read();
841            match rtx.table_get(lower_name.as_bytes(), &key).map_err(SqlError::Storage)? {
842                Some(value) => {
843                    let row = decode_full_row(table_schema, &key, &value)?;
844                    Ok(vec![row])
845                }
846                None => Ok(vec![]),
847            }
848        }
849
850        ScanPlan::IndexScan {
851            idx_table, prefix, num_prefix_cols,
852            range_conds, is_unique, index_columns, ..
853        } => {
854            let num_pk_cols = table_schema.primary_key_columns.len();
855            let num_index_cols = index_columns.len();
856            let mut pk_keys: Vec<Vec<u8>> = Vec::new();
857
858            {
859                let mut rtx = db.begin_read();
860                let mut scan_err: Option<SqlError> = None;
861                rtx.table_scan_from(&idx_table, &prefix, |key, value| {
862                    if !key.starts_with(&prefix) {
863                        return Ok(false);
864                    }
865                    match check_range_conditions(key, num_prefix_cols, &range_conds, num_index_cols) {
866                        Ok(RangeCheck::ExceedsUpper) => return Ok(false),
867                        Ok(RangeCheck::BelowLower) => return Ok(true),
868                        Ok(RangeCheck::Match) => {}
869                        Err(e) => { scan_err = Some(e); return Ok(false); }
870                    }
871                    match extract_pk_key(key, value, is_unique, num_index_cols, num_pk_cols) {
872                        Ok(pk) => pk_keys.push(pk),
873                        Err(e) => { scan_err = Some(e); return Ok(false); }
874                    }
875                    Ok(true)
876                }).map_err(SqlError::Storage)?;
877                if let Some(e) = scan_err { return Err(e); }
878            }
879
880            let mut rows = Vec::new();
881            let mut rtx = db.begin_read();
882            for pk_key in &pk_keys {
883                if let Some(value) = rtx.table_get(lower_name.as_bytes(), pk_key).map_err(SqlError::Storage)? {
884                    rows.push(decode_full_row(table_schema, pk_key, &value)?);
885                }
886            }
887            Ok(rows)
888        }
889    }
890}
891
892/// Collect rows via WriteTxn using the scan plan.
893fn collect_rows_write(
894    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
895    table_schema: &TableSchema,
896    where_clause: &Option<Expr>,
897) -> Result<Vec<Vec<Value>>> {
898    let plan = planner::plan_select(table_schema, where_clause);
899    let lower_name = &table_schema.name;
900
901    match plan {
902        ScanPlan::SeqScan => {
903            let mut rows = Vec::new();
904            let mut scan_err: Option<SqlError> = None;
905            wtx.table_for_each(lower_name.as_bytes(), |key, value| {
906                match decode_full_row(table_schema, key, value) {
907                    Ok(row) => rows.push(row),
908                    Err(e) => scan_err = Some(e),
909                }
910                Ok(())
911            }).map_err(SqlError::Storage)?;
912            if let Some(e) = scan_err { return Err(e); }
913            Ok(rows)
914        }
915
916        ScanPlan::PkLookup { pk_values } => {
917            let key = encode_composite_key(&pk_values);
918            match wtx.table_get(lower_name.as_bytes(), &key).map_err(SqlError::Storage)? {
919                Some(value) => {
920                    let row = decode_full_row(table_schema, &key, &value)?;
921                    Ok(vec![row])
922                }
923                None => Ok(vec![]),
924            }
925        }
926
927        ScanPlan::IndexScan {
928            idx_table, prefix, num_prefix_cols,
929            range_conds, is_unique, index_columns, ..
930        } => {
931            let num_pk_cols = table_schema.primary_key_columns.len();
932            let num_index_cols = index_columns.len();
933            let mut pk_keys: Vec<Vec<u8>> = Vec::new();
934
935            {
936                let mut scan_err: Option<SqlError> = None;
937                wtx.table_scan_from(&idx_table, &prefix, |key, value| {
938                    if !key.starts_with(&prefix) {
939                        return Ok(false);
940                    }
941                    match check_range_conditions(key, num_prefix_cols, &range_conds, num_index_cols) {
942                        Ok(RangeCheck::ExceedsUpper) => return Ok(false),
943                        Ok(RangeCheck::BelowLower) => return Ok(true),
944                        Ok(RangeCheck::Match) => {}
945                        Err(e) => { scan_err = Some(e); return Ok(false); }
946                    }
947                    match extract_pk_key(key, value, is_unique, num_index_cols, num_pk_cols) {
948                        Ok(pk) => pk_keys.push(pk),
949                        Err(e) => { scan_err = Some(e); return Ok(false); }
950                    }
951                    Ok(true)
952                }).map_err(SqlError::Storage)?;
953                if let Some(e) = scan_err { return Err(e); }
954            }
955
956            let mut rows = Vec::new();
957            for pk_key in &pk_keys {
958                if let Some(value) = wtx.table_get(lower_name.as_bytes(), pk_key).map_err(SqlError::Storage)? {
959                    rows.push(decode_full_row(table_schema, pk_key, &value)?);
960                }
961            }
962            Ok(rows)
963        }
964    }
965}
966
967/// Collect (encoded_key, full_row) pairs via ReadTxn using the scan plan.
968/// Used by DELETE and UPDATE which need the encoded PK key.
969fn collect_keyed_rows_read(
970    db: &Database,
971    table_schema: &TableSchema,
972    where_clause: &Option<Expr>,
973) -> Result<Vec<(Vec<u8>, Vec<Value>)>> {
974    let plan = planner::plan_select(table_schema, where_clause);
975    let lower_name = &table_schema.name;
976
977    match plan {
978        ScanPlan::SeqScan => {
979            let mut rows = Vec::new();
980            let mut rtx = db.begin_read();
981            let mut scan_err: Option<SqlError> = None;
982            rtx.table_for_each(lower_name.as_bytes(), |key, value| {
983                match decode_full_row(table_schema, key, value) {
984                    Ok(row) => rows.push((key.to_vec(), row)),
985                    Err(e) => scan_err = Some(e),
986                }
987                Ok(())
988            }).map_err(SqlError::Storage)?;
989            if let Some(e) = scan_err { return Err(e); }
990            Ok(rows)
991        }
992
993        ScanPlan::PkLookup { pk_values } => {
994            let key = encode_composite_key(&pk_values);
995            let mut rtx = db.begin_read();
996            match rtx.table_get(lower_name.as_bytes(), &key).map_err(SqlError::Storage)? {
997                Some(value) => {
998                    let row = decode_full_row(table_schema, &key, &value)?;
999                    Ok(vec![(key, row)])
1000                }
1001                None => Ok(vec![]),
1002            }
1003        }
1004
1005        ScanPlan::IndexScan {
1006            idx_table, prefix, num_prefix_cols,
1007            range_conds, is_unique, index_columns, ..
1008        } => {
1009            let num_pk_cols = table_schema.primary_key_columns.len();
1010            let num_index_cols = index_columns.len();
1011            let mut pk_keys: Vec<Vec<u8>> = Vec::new();
1012
1013            {
1014                let mut rtx = db.begin_read();
1015                let mut scan_err: Option<SqlError> = None;
1016                rtx.table_scan_from(&idx_table, &prefix, |key, value| {
1017                    if !key.starts_with(&prefix) {
1018                        return Ok(false);
1019                    }
1020                    match check_range_conditions(key, num_prefix_cols, &range_conds, num_index_cols) {
1021                        Ok(RangeCheck::ExceedsUpper) => return Ok(false),
1022                        Ok(RangeCheck::BelowLower) => return Ok(true),
1023                        Ok(RangeCheck::Match) => {}
1024                        Err(e) => { scan_err = Some(e); return Ok(false); }
1025                    }
1026                    match extract_pk_key(key, value, is_unique, num_index_cols, num_pk_cols) {
1027                        Ok(pk) => pk_keys.push(pk),
1028                        Err(e) => { scan_err = Some(e); return Ok(false); }
1029                    }
1030                    Ok(true)
1031                }).map_err(SqlError::Storage)?;
1032                if let Some(e) = scan_err { return Err(e); }
1033            }
1034
1035            let mut rows = Vec::new();
1036            let mut rtx = db.begin_read();
1037            for pk_key in &pk_keys {
1038                if let Some(value) = rtx.table_get(lower_name.as_bytes(), pk_key).map_err(SqlError::Storage)? {
1039                    rows.push((pk_key.clone(), decode_full_row(table_schema, pk_key, &value)?));
1040                }
1041            }
1042            Ok(rows)
1043        }
1044    }
1045}
1046
1047/// Collect (encoded_key, full_row) pairs via WriteTxn using the scan plan.
1048fn collect_keyed_rows_write(
1049    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1050    table_schema: &TableSchema,
1051    where_clause: &Option<Expr>,
1052) -> Result<Vec<(Vec<u8>, Vec<Value>)>> {
1053    let plan = planner::plan_select(table_schema, where_clause);
1054    let lower_name = &table_schema.name;
1055
1056    match plan {
1057        ScanPlan::SeqScan => {
1058            let mut rows = Vec::new();
1059            let mut scan_err: Option<SqlError> = None;
1060            wtx.table_for_each(lower_name.as_bytes(), |key, value| {
1061                match decode_full_row(table_schema, key, value) {
1062                    Ok(row) => rows.push((key.to_vec(), row)),
1063                    Err(e) => scan_err = Some(e),
1064                }
1065                Ok(())
1066            }).map_err(SqlError::Storage)?;
1067            if let Some(e) = scan_err { return Err(e); }
1068            Ok(rows)
1069        }
1070
1071        ScanPlan::PkLookup { pk_values } => {
1072            let key = encode_composite_key(&pk_values);
1073            match wtx.table_get(lower_name.as_bytes(), &key).map_err(SqlError::Storage)? {
1074                Some(value) => {
1075                    let row = decode_full_row(table_schema, &key, &value)?;
1076                    Ok(vec![(key, row)])
1077                }
1078                None => Ok(vec![]),
1079            }
1080        }
1081
1082        ScanPlan::IndexScan {
1083            idx_table, prefix, num_prefix_cols,
1084            range_conds, is_unique, index_columns, ..
1085        } => {
1086            let num_pk_cols = table_schema.primary_key_columns.len();
1087            let num_index_cols = index_columns.len();
1088            let mut pk_keys: Vec<Vec<u8>> = Vec::new();
1089
1090            {
1091                let mut scan_err: Option<SqlError> = None;
1092                wtx.table_scan_from(&idx_table, &prefix, |key, value| {
1093                    if !key.starts_with(&prefix) {
1094                        return Ok(false);
1095                    }
1096                    match check_range_conditions(key, num_prefix_cols, &range_conds, num_index_cols) {
1097                        Ok(RangeCheck::ExceedsUpper) => return Ok(false),
1098                        Ok(RangeCheck::BelowLower) => return Ok(true),
1099                        Ok(RangeCheck::Match) => {}
1100                        Err(e) => { scan_err = Some(e); return Ok(false); }
1101                    }
1102                    match extract_pk_key(key, value, is_unique, num_index_cols, num_pk_cols) {
1103                        Ok(pk) => pk_keys.push(pk),
1104                        Err(e) => { scan_err = Some(e); return Ok(false); }
1105                    }
1106                    Ok(true)
1107                }).map_err(SqlError::Storage)?;
1108                if let Some(e) = scan_err { return Err(e); }
1109            }
1110
1111            let mut rows = Vec::new();
1112            for pk_key in &pk_keys {
1113                if let Some(value) = wtx.table_get(lower_name.as_bytes(), pk_key).map_err(SqlError::Storage)? {
1114                    rows.push((pk_key.clone(), decode_full_row(table_schema, pk_key, &value)?));
1115                }
1116            }
1117            Ok(rows)
1118        }
1119    }
1120}
1121
1122// ── DML ─────────────────────────────────────────────────────────────
1123
1124fn exec_insert(
1125    db: &Database,
1126    schema: &SchemaManager,
1127    stmt: &InsertStmt,
1128) -> Result<ExecutionResult> {
1129    let materialized;
1130    let stmt = if insert_has_subquery(stmt) {
1131        materialized = materialize_insert(stmt, &mut |sub| exec_subquery_read(db, schema, sub))?;
1132        &materialized
1133    } else {
1134        stmt
1135    };
1136
1137    let lower_name = stmt.table.to_ascii_lowercase();
1138    let table_schema = schema.get(&lower_name)
1139        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1140
1141    let insert_columns = if stmt.columns.is_empty() {
1142        table_schema.columns.iter().map(|c| c.name.clone()).collect::<Vec<_>>()
1143    } else {
1144        stmt.columns.iter().map(|c| c.to_ascii_lowercase()).collect()
1145    };
1146
1147    let col_indices: Vec<usize> = insert_columns.iter().map(|name| {
1148        table_schema.column_index(name)
1149            .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
1150    }).collect::<Result<_>>()?;
1151
1152    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
1153    let mut count: u64 = 0;
1154
1155    for value_row in &stmt.values {
1156        if value_row.len() != insert_columns.len() {
1157            return Err(SqlError::InvalidValue(format!(
1158                "expected {} values, got {}",
1159                insert_columns.len(),
1160                value_row.len()
1161            )));
1162        }
1163
1164        let mut row = vec![Value::Null; table_schema.columns.len()];
1165        for (i, expr) in value_row.iter().enumerate() {
1166            let val = eval_const_expr(expr)?;
1167            let col_idx = col_indices[i];
1168            let col = &table_schema.columns[col_idx];
1169
1170            let coerced = if val.is_null() {
1171                Value::Null
1172            } else {
1173                val.coerce_to(col.data_type).ok_or_else(|| SqlError::TypeMismatch {
1174                    expected: col.data_type.to_string(),
1175                    got: val.data_type().to_string(),
1176                })?
1177            };
1178
1179            row[col_idx] = coerced;
1180        }
1181
1182        for col in &table_schema.columns {
1183            if !col.nullable && row[col.position as usize].is_null() {
1184                return Err(SqlError::NotNullViolation(col.name.clone()));
1185            }
1186        }
1187
1188        let pk_values: Vec<Value> = table_schema.pk_indices()
1189            .iter()
1190            .map(|&i| row[i].clone())
1191            .collect();
1192        let key = encode_composite_key(&pk_values);
1193
1194        let non_pk = table_schema.non_pk_indices();
1195        let value_values: Vec<Value> = non_pk.iter().map(|&i| row[i].clone()).collect();
1196        let value = encode_row(&value_values);
1197
1198        if key.len() > citadel_core::MAX_KEY_SIZE {
1199            return Err(SqlError::KeyTooLarge { size: key.len(), max: citadel_core::MAX_KEY_SIZE });
1200        }
1201        if value.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
1202            return Err(SqlError::RowTooLarge { size: value.len(), max: citadel_core::MAX_INLINE_VALUE_SIZE });
1203        }
1204
1205        let is_new = wtx.table_insert(lower_name.as_bytes(), &key, &value)
1206            .map_err(SqlError::Storage)?;
1207        if !is_new {
1208            return Err(SqlError::DuplicateKey);
1209        }
1210
1211        insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
1212        count += 1;
1213    }
1214
1215    wtx.commit().map_err(SqlError::Storage)?;
1216    Ok(ExecutionResult::RowsAffected(count))
1217}
1218
1219fn has_subquery(expr: &Expr) -> bool {
1220    match expr {
1221        Expr::InSubquery { .. } | Expr::Exists { .. } | Expr::ScalarSubquery(_) => true,
1222        Expr::BinaryOp { left, right, .. } => has_subquery(left) || has_subquery(right),
1223        Expr::UnaryOp { expr, .. } => has_subquery(expr),
1224        Expr::IsNull(e) | Expr::IsNotNull(e) => has_subquery(e),
1225        Expr::InList { expr, list, .. } => {
1226            has_subquery(expr) || list.iter().any(has_subquery)
1227        }
1228        Expr::InSet { expr, .. } => has_subquery(expr),
1229        Expr::Between { expr, low, high, .. } => {
1230            has_subquery(expr) || has_subquery(low) || has_subquery(high)
1231        }
1232        Expr::Like { expr, pattern, escape, .. } => {
1233            has_subquery(expr) || has_subquery(pattern) || escape.as_ref().map_or(false, |e| has_subquery(e))
1234        }
1235        Expr::Case { operand, conditions, else_result } => {
1236            operand.as_ref().map_or(false, |e| has_subquery(e))
1237                || conditions.iter().any(|(c, r)| has_subquery(c) || has_subquery(r))
1238                || else_result.as_ref().map_or(false, |e| has_subquery(e))
1239        }
1240        Expr::Coalesce(args) => args.iter().any(has_subquery),
1241        Expr::Cast { expr, .. } => has_subquery(expr),
1242        Expr::Function { args, .. } => args.iter().any(has_subquery),
1243        _ => false,
1244    }
1245}
1246
1247fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
1248    if let Some(ref w) = stmt.where_clause { if has_subquery(w) { return true; } }
1249    if let Some(ref h) = stmt.having { if has_subquery(h) { return true; } }
1250    for col in &stmt.columns {
1251        if let SelectColumn::Expr { expr, .. } = col {
1252            if has_subquery(expr) { return true; }
1253        }
1254    }
1255    for ob in &stmt.order_by {
1256        if has_subquery(&ob.expr) { return true; }
1257    }
1258    for join in &stmt.joins {
1259        if let Some(ref on_expr) = join.on_clause {
1260            if has_subquery(on_expr) { return true; }
1261        }
1262    }
1263    false
1264}
1265
1266fn materialize_expr(
1267    expr: &Expr,
1268    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1269) -> Result<Expr> {
1270    match expr {
1271        Expr::InSubquery { expr: e, subquery, negated } => {
1272            let inner = materialize_expr(e, exec_sub)?;
1273            let qr = exec_sub(subquery)?;
1274            if !qr.columns.is_empty() && qr.columns.len() != 1 {
1275                return Err(SqlError::SubqueryMultipleColumns);
1276            }
1277            let mut values = std::collections::HashSet::new();
1278            let mut has_null = false;
1279            for row in &qr.rows {
1280                if row[0].is_null() {
1281                    has_null = true;
1282                } else {
1283                    values.insert(row[0].clone());
1284                }
1285            }
1286            Ok(Expr::InSet {
1287                expr: Box::new(inner),
1288                values,
1289                has_null,
1290                negated: *negated,
1291            })
1292        }
1293        Expr::ScalarSubquery(subquery) => {
1294            let qr = exec_sub(subquery)?;
1295            if qr.rows.len() > 1 {
1296                return Err(SqlError::SubqueryMultipleRows);
1297            }
1298            let val = if qr.rows.is_empty() {
1299                Value::Null
1300            } else {
1301                qr.rows[0][0].clone()
1302            };
1303            Ok(Expr::Literal(val))
1304        }
1305        Expr::Exists { subquery, negated } => {
1306            let qr = exec_sub(subquery)?;
1307            let exists = !qr.rows.is_empty();
1308            let result = if *negated { !exists } else { exists };
1309            Ok(Expr::Literal(Value::Boolean(result)))
1310        }
1311        Expr::InList { expr: e, list, negated } => {
1312            let inner = materialize_expr(e, exec_sub)?;
1313            let items = list.iter()
1314                .map(|item| materialize_expr(item, exec_sub))
1315                .collect::<Result<Vec<_>>>()?;
1316            Ok(Expr::InList { expr: Box::new(inner), list: items, negated: *negated })
1317        }
1318        Expr::BinaryOp { left, op, right } => {
1319            Ok(Expr::BinaryOp {
1320                left: Box::new(materialize_expr(left, exec_sub)?),
1321                op: *op,
1322                right: Box::new(materialize_expr(right, exec_sub)?),
1323            })
1324        }
1325        Expr::UnaryOp { op, expr: e } => {
1326            Ok(Expr::UnaryOp {
1327                op: *op,
1328                expr: Box::new(materialize_expr(e, exec_sub)?),
1329            })
1330        }
1331        Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
1332        Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
1333        Expr::InSet { expr: e, values, has_null, negated } => {
1334            Ok(Expr::InSet {
1335                expr: Box::new(materialize_expr(e, exec_sub)?),
1336                values: values.clone(),
1337                has_null: *has_null,
1338                negated: *negated,
1339            })
1340        }
1341        Expr::Between { expr: e, low, high, negated } => {
1342            Ok(Expr::Between {
1343                expr: Box::new(materialize_expr(e, exec_sub)?),
1344                low: Box::new(materialize_expr(low, exec_sub)?),
1345                high: Box::new(materialize_expr(high, exec_sub)?),
1346                negated: *negated,
1347            })
1348        }
1349        Expr::Like { expr: e, pattern, escape, negated } => {
1350            let esc = escape.as_ref()
1351                .map(|es| materialize_expr(es, exec_sub).map(Box::new))
1352                .transpose()?;
1353            Ok(Expr::Like {
1354                expr: Box::new(materialize_expr(e, exec_sub)?),
1355                pattern: Box::new(materialize_expr(pattern, exec_sub)?),
1356                escape: esc,
1357                negated: *negated,
1358            })
1359        }
1360        Expr::Case { operand, conditions, else_result } => {
1361            let op = operand.as_ref()
1362                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
1363                .transpose()?;
1364            let conds = conditions.iter()
1365                .map(|(c, r)| Ok((materialize_expr(c, exec_sub)?, materialize_expr(r, exec_sub)?)))
1366                .collect::<Result<Vec<_>>>()?;
1367            let else_r = else_result.as_ref()
1368                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
1369                .transpose()?;
1370            Ok(Expr::Case { operand: op, conditions: conds, else_result: else_r })
1371        }
1372        Expr::Coalesce(args) => {
1373            let materialized = args.iter()
1374                .map(|a| materialize_expr(a, exec_sub))
1375                .collect::<Result<Vec<_>>>()?;
1376            Ok(Expr::Coalesce(materialized))
1377        }
1378        Expr::Cast { expr: e, data_type } => {
1379            Ok(Expr::Cast {
1380                expr: Box::new(materialize_expr(e, exec_sub)?),
1381                data_type: *data_type,
1382            })
1383        }
1384        Expr::Function { name, args } => {
1385            let materialized = args.iter()
1386                .map(|a| materialize_expr(a, exec_sub))
1387                .collect::<Result<Vec<_>>>()?;
1388            Ok(Expr::Function { name: name.clone(), args: materialized })
1389        }
1390        other => Ok(other.clone()),
1391    }
1392}
1393
1394fn materialize_stmt(
1395    stmt: &SelectStmt,
1396    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1397) -> Result<SelectStmt> {
1398    let where_clause = stmt.where_clause.as_ref()
1399        .map(|e| materialize_expr(e, exec_sub))
1400        .transpose()?;
1401    let having = stmt.having.as_ref()
1402        .map(|e| materialize_expr(e, exec_sub))
1403        .transpose()?;
1404    let columns = stmt.columns.iter().map(|c| match c {
1405        SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
1406        SelectColumn::Expr { expr, alias } => {
1407            Ok(SelectColumn::Expr {
1408                expr: materialize_expr(expr, exec_sub)?,
1409                alias: alias.clone(),
1410            })
1411        }
1412    }).collect::<Result<Vec<_>>>()?;
1413    let order_by = stmt.order_by.iter().map(|ob| {
1414        Ok(OrderByItem {
1415            expr: materialize_expr(&ob.expr, exec_sub)?,
1416            descending: ob.descending,
1417            nulls_first: ob.nulls_first,
1418        })
1419    }).collect::<Result<Vec<_>>>()?;
1420    let joins = stmt.joins.iter().map(|j| {
1421        let on_clause = j.on_clause.as_ref()
1422            .map(|e| materialize_expr(e, exec_sub))
1423            .transpose()?;
1424        Ok(JoinClause {
1425            join_type: j.join_type,
1426            table: j.table.clone(),
1427            on_clause,
1428        })
1429    }).collect::<Result<Vec<_>>>()?;
1430    let group_by = stmt.group_by.iter()
1431        .map(|e| materialize_expr(e, exec_sub))
1432        .collect::<Result<Vec<_>>>()?;
1433    Ok(SelectStmt {
1434        columns,
1435        from: stmt.from.clone(),
1436        from_alias: stmt.from_alias.clone(),
1437        joins,
1438        distinct: stmt.distinct,
1439        where_clause,
1440        order_by,
1441        limit: stmt.limit.clone(),
1442        offset: stmt.offset.clone(),
1443        group_by,
1444        having,
1445    })
1446}
1447
1448fn exec_subquery_read(
1449    db: &Database,
1450    schema: &SchemaManager,
1451    stmt: &SelectStmt,
1452) -> Result<QueryResult> {
1453    match exec_select(db, schema, stmt)? {
1454        ExecutionResult::Query(qr) => Ok(qr),
1455        _ => Ok(QueryResult { columns: vec![], rows: vec![] }),
1456    }
1457}
1458
1459fn exec_subquery_write(
1460    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1461    schema: &SchemaManager,
1462    stmt: &SelectStmt,
1463) -> Result<QueryResult> {
1464    match exec_select_in_txn(wtx, schema, stmt)? {
1465        ExecutionResult::Query(qr) => Ok(qr),
1466        _ => Ok(QueryResult { columns: vec![], rows: vec![] }),
1467    }
1468}
1469
1470fn update_has_subquery(stmt: &UpdateStmt) -> bool {
1471    stmt.where_clause.as_ref().map_or(false, has_subquery)
1472        || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
1473}
1474
1475fn materialize_update(
1476    stmt: &UpdateStmt,
1477    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1478) -> Result<UpdateStmt> {
1479    let where_clause = stmt.where_clause.as_ref()
1480        .map(|e| materialize_expr(e, exec_sub))
1481        .transpose()?;
1482    let assignments = stmt.assignments.iter()
1483        .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
1484        .collect::<Result<Vec<_>>>()?;
1485    Ok(UpdateStmt {
1486        table: stmt.table.clone(),
1487        assignments,
1488        where_clause,
1489    })
1490}
1491
1492fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
1493    stmt.where_clause.as_ref().map_or(false, has_subquery)
1494}
1495
1496fn materialize_delete(
1497    stmt: &DeleteStmt,
1498    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1499) -> Result<DeleteStmt> {
1500    let where_clause = stmt.where_clause.as_ref()
1501        .map(|e| materialize_expr(e, exec_sub))
1502        .transpose()?;
1503    Ok(DeleteStmt {
1504        table: stmt.table.clone(),
1505        where_clause,
1506    })
1507}
1508
1509fn insert_has_subquery(stmt: &InsertStmt) -> bool {
1510    stmt.values.iter().any(|row| row.iter().any(has_subquery))
1511}
1512
1513fn materialize_insert(
1514    stmt: &InsertStmt,
1515    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1516) -> Result<InsertStmt> {
1517    let values = stmt.values.iter()
1518        .map(|row| row.iter().map(|e| materialize_expr(e, exec_sub)).collect::<Result<Vec<_>>>())
1519        .collect::<Result<Vec<_>>>()?;
1520    Ok(InsertStmt {
1521        table: stmt.table.clone(),
1522        columns: stmt.columns.clone(),
1523        values,
1524    })
1525}
1526
1527fn exec_select(
1528    db: &Database,
1529    schema: &SchemaManager,
1530    stmt: &SelectStmt,
1531) -> Result<ExecutionResult> {
1532    let materialized;
1533    let stmt = if stmt_has_subquery(stmt) {
1534        materialized = materialize_stmt(stmt, &mut |sub| exec_subquery_read(db, schema, sub))?;
1535        &materialized
1536    } else {
1537        stmt
1538    };
1539
1540    if stmt.from.is_empty() {
1541        return exec_select_no_from(stmt);
1542    }
1543
1544    let lower_name = stmt.from.to_ascii_lowercase();
1545    let table_schema = schema.get(&lower_name)
1546        .ok_or_else(|| SqlError::TableNotFound(stmt.from.clone()))?;
1547
1548    if !stmt.joins.is_empty() {
1549        return exec_select_join(db, schema, stmt);
1550    }
1551
1552    let rows = collect_rows_read(db, table_schema, &stmt.where_clause)?;
1553    process_select(&table_schema.columns, rows, stmt)
1554}
1555
1556fn exec_select_no_from(stmt: &SelectStmt) -> Result<ExecutionResult> {
1557    let empty_cols: Vec<ColumnDef> = vec![];
1558    let empty_row: Vec<Value> = vec![];
1559    let (col_names, projected) = project_rows(&empty_cols, &stmt.columns, &[empty_row])?;
1560    Ok(ExecutionResult::Query(QueryResult {
1561        columns: col_names,
1562        rows: projected,
1563    }))
1564}
1565
1566/// Shared SELECT processing: WHERE, aggregation, ORDER BY, LIMIT/OFFSET, projection.
1567fn process_select(
1568    columns: &[ColumnDef],
1569    mut rows: Vec<Vec<Value>>,
1570    stmt: &SelectStmt,
1571) -> Result<ExecutionResult> {
1572    if let Some(ref where_expr) = stmt.where_clause {
1573        rows.retain(|row| {
1574            match eval_expr(where_expr, columns, row) {
1575                Ok(val) => is_truthy(&val),
1576                Err(_) => false,
1577            }
1578        });
1579    }
1580
1581    let has_aggregates = stmt.columns.iter().any(|c| match c {
1582        SelectColumn::Expr { expr, .. } => is_aggregate_expr(expr),
1583        _ => false,
1584    });
1585
1586    if has_aggregates || !stmt.group_by.is_empty() {
1587        return exec_aggregate(columns, &rows, stmt);
1588    }
1589
1590    if stmt.distinct {
1591        let (col_names, mut projected) = project_rows(columns, &stmt.columns, &rows)?;
1592
1593        let mut seen = std::collections::HashSet::new();
1594        projected.retain(|row| seen.insert(row.clone()));
1595
1596        if !stmt.order_by.is_empty() {
1597            let output_cols = build_output_columns(&stmt.columns, columns);
1598            sort_rows(&mut projected, &stmt.order_by, &output_cols)?;
1599        }
1600
1601        if let Some(ref offset_expr) = stmt.offset {
1602            let offset = eval_const_int(offset_expr)? as usize;
1603            if offset < projected.len() {
1604                projected = projected.split_off(offset);
1605            } else {
1606                projected.clear();
1607            }
1608        }
1609
1610        if let Some(ref limit_expr) = stmt.limit {
1611            let limit = eval_const_int(limit_expr)? as usize;
1612            projected.truncate(limit);
1613        }
1614
1615        return Ok(ExecutionResult::Query(QueryResult {
1616            columns: col_names,
1617            rows: projected,
1618        }));
1619    }
1620
1621    if !stmt.order_by.is_empty() {
1622        sort_rows(&mut rows, &stmt.order_by, columns)?;
1623    }
1624
1625    if let Some(ref offset_expr) = stmt.offset {
1626        let offset = eval_const_int(offset_expr)? as usize;
1627        if offset < rows.len() {
1628            rows = rows.split_off(offset);
1629        } else {
1630            rows.clear();
1631        }
1632    }
1633
1634    if let Some(ref limit_expr) = stmt.limit {
1635        let limit = eval_const_int(limit_expr)? as usize;
1636        rows.truncate(limit);
1637    }
1638
1639    let (col_names, projected) = project_rows(columns, &stmt.columns, &rows)?;
1640
1641    Ok(ExecutionResult::Query(QueryResult {
1642        columns: col_names,
1643        rows: projected,
1644    }))
1645}
1646
1647
1648fn resolve_table_name<'a>(
1649    schema: &'a SchemaManager,
1650    name: &str,
1651) -> Result<&'a TableSchema> {
1652    let lower = name.to_ascii_lowercase();
1653    schema.get(&lower)
1654        .ok_or_else(|| SqlError::TableNotFound(name.to_string()))
1655}
1656
1657fn build_joined_columns(
1658    tables: &[(String, &TableSchema)],
1659) -> Vec<ColumnDef> {
1660    let mut result = Vec::new();
1661    let mut pos: u16 = 0;
1662
1663    for (alias, schema) in tables {
1664        for col in &schema.columns {
1665            result.push(ColumnDef {
1666                name: format!("{}.{}", alias.to_ascii_lowercase(), col.name),
1667                data_type: col.data_type,
1668                nullable: col.nullable,
1669                position: pos,
1670            });
1671            pos += 1;
1672        }
1673    }
1674
1675    result
1676}
1677
1678fn table_alias_or_name(name: &str, alias: &Option<String>) -> String {
1679    alias.as_ref().unwrap_or(&name.to_string()).to_ascii_lowercase()
1680}
1681
1682fn collect_all_rows_read(
1683    db: &Database,
1684    table_schema: &TableSchema,
1685) -> Result<Vec<Vec<Value>>> {
1686    collect_rows_read(db, table_schema, &None)
1687}
1688
1689fn collect_all_rows_write(
1690    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1691    table_schema: &TableSchema,
1692) -> Result<Vec<Vec<Value>>> {
1693    collect_rows_write(wtx, table_schema, &None)
1694}
1695
1696fn exec_select_join(
1697    db: &Database,
1698    schema: &SchemaManager,
1699    stmt: &SelectStmt,
1700) -> Result<ExecutionResult> {
1701    let from_schema = resolve_table_name(schema, &stmt.from)?;
1702    let from_alias = table_alias_or_name(&stmt.from, &stmt.from_alias);
1703    let mut outer_rows = collect_all_rows_read(db, from_schema)?;
1704
1705    let mut tables: Vec<(String, &TableSchema)> = vec![(from_alias.clone(), from_schema)];
1706
1707    for join in &stmt.joins {
1708        let inner_schema = resolve_table_name(schema, &join.table.name)?;
1709        let inner_alias = table_alias_or_name(&join.table.name, &join.table.alias);
1710        let inner_rows = collect_all_rows_read(db, inner_schema)?;
1711
1712        let mut preview_tables = tables.clone();
1713        preview_tables.push((inner_alias.clone(), inner_schema));
1714        let combined_cols = build_joined_columns(&preview_tables);
1715
1716        let mut new_rows = Vec::new();
1717
1718        match join.join_type {
1719            JoinType::Inner | JoinType::Cross => {
1720                for outer in &outer_rows {
1721                    for inner in &inner_rows {
1722                        let combined: Vec<Value> = outer.iter()
1723                            .chain(inner.iter())
1724                            .cloned()
1725                            .collect();
1726                        if let Some(ref on_expr) = join.on_clause {
1727                            match eval_expr(on_expr, &combined_cols, &combined) {
1728                                Ok(val) if is_truthy(&val) => new_rows.push(combined),
1729                                _ => {}
1730                            }
1731                        } else {
1732                            new_rows.push(combined);
1733                        }
1734                    }
1735                }
1736            }
1737            JoinType::Left => {
1738                let inner_col_count = inner_schema.columns.len();
1739                for outer in &outer_rows {
1740                    let mut matched = false;
1741                    for inner in &inner_rows {
1742                        let combined: Vec<Value> = outer.iter()
1743                            .chain(inner.iter())
1744                            .cloned()
1745                            .collect();
1746                        if let Some(ref on_expr) = join.on_clause {
1747                            match eval_expr(on_expr, &combined_cols, &combined) {
1748                                Ok(val) if is_truthy(&val) => {
1749                                    new_rows.push(combined);
1750                                    matched = true;
1751                                }
1752                                _ => {}
1753                            }
1754                        } else {
1755                            new_rows.push(combined);
1756                            matched = true;
1757                        }
1758                    }
1759                    if !matched {
1760                        let mut padded = outer.clone();
1761                        padded.extend(std::iter::repeat(Value::Null).take(inner_col_count));
1762                        new_rows.push(padded);
1763                    }
1764                }
1765            }
1766            JoinType::Right => {
1767                let outer_col_count = if outer_rows.is_empty() {
1768                    tables.iter().map(|(_, s)| s.columns.len()).sum()
1769                } else {
1770                    outer_rows[0].len()
1771                };
1772                let mut inner_matched = vec![false; inner_rows.len()];
1773                for outer in &outer_rows {
1774                    for (j, inner) in inner_rows.iter().enumerate() {
1775                        let combined: Vec<Value> = outer.iter()
1776                            .chain(inner.iter())
1777                            .cloned()
1778                            .collect();
1779                        if let Some(ref on_expr) = join.on_clause {
1780                            match eval_expr(on_expr, &combined_cols, &combined) {
1781                                Ok(val) if is_truthy(&val) => {
1782                                    new_rows.push(combined);
1783                                    inner_matched[j] = true;
1784                                }
1785                                _ => {}
1786                            }
1787                        } else {
1788                            new_rows.push(combined);
1789                            inner_matched[j] = true;
1790                        }
1791                    }
1792                }
1793                for (j, inner) in inner_rows.iter().enumerate() {
1794                    if !inner_matched[j] {
1795                        let mut padded: Vec<Value> = std::iter::repeat(Value::Null)
1796                            .take(outer_col_count)
1797                            .collect();
1798                        padded.extend(inner.iter().cloned());
1799                        new_rows.push(padded);
1800                    }
1801                }
1802            }
1803        }
1804
1805        tables.push((inner_alias, inner_schema));
1806        outer_rows = new_rows;
1807    }
1808
1809    let joined_cols = build_joined_columns(&tables);
1810    process_select(&joined_cols, outer_rows, stmt)
1811}
1812
1813fn exec_select_join_in_txn(
1814    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1815    schema: &SchemaManager,
1816    stmt: &SelectStmt,
1817) -> Result<ExecutionResult> {
1818    let from_schema = resolve_table_name(schema, &stmt.from)?;
1819    let from_alias = table_alias_or_name(&stmt.from, &stmt.from_alias);
1820    let mut outer_rows = collect_all_rows_write(wtx, from_schema)?;
1821
1822    let mut tables: Vec<(String, &TableSchema)> = vec![(from_alias.clone(), from_schema)];
1823
1824    for join in &stmt.joins {
1825        let inner_schema = resolve_table_name(schema, &join.table.name)?;
1826        let inner_alias = table_alias_or_name(&join.table.name, &join.table.alias);
1827        let inner_rows = collect_all_rows_write(wtx, inner_schema)?;
1828
1829        let mut preview_tables = tables.clone();
1830        preview_tables.push((inner_alias.clone(), inner_schema));
1831        let combined_cols = build_joined_columns(&preview_tables);
1832
1833        let mut new_rows = Vec::new();
1834
1835        match join.join_type {
1836            JoinType::Inner | JoinType::Cross => {
1837                for outer in &outer_rows {
1838                    for inner in &inner_rows {
1839                        let combined: Vec<Value> = outer.iter()
1840                            .chain(inner.iter())
1841                            .cloned()
1842                            .collect();
1843                        if let Some(ref on_expr) = join.on_clause {
1844                            match eval_expr(on_expr, &combined_cols, &combined) {
1845                                Ok(val) if is_truthy(&val) => new_rows.push(combined),
1846                                _ => {}
1847                            }
1848                        } else {
1849                            new_rows.push(combined);
1850                        }
1851                    }
1852                }
1853            }
1854            JoinType::Left => {
1855                let inner_col_count = inner_schema.columns.len();
1856                for outer in &outer_rows {
1857                    let mut matched = false;
1858                    for inner in &inner_rows {
1859                        let combined: Vec<Value> = outer.iter()
1860                            .chain(inner.iter())
1861                            .cloned()
1862                            .collect();
1863                        if let Some(ref on_expr) = join.on_clause {
1864                            match eval_expr(on_expr, &combined_cols, &combined) {
1865                                Ok(val) if is_truthy(&val) => {
1866                                    new_rows.push(combined);
1867                                    matched = true;
1868                                }
1869                                _ => {}
1870                            }
1871                        } else {
1872                            new_rows.push(combined);
1873                            matched = true;
1874                        }
1875                    }
1876                    if !matched {
1877                        let mut padded = outer.clone();
1878                        padded.extend(std::iter::repeat(Value::Null).take(inner_col_count));
1879                        new_rows.push(padded);
1880                    }
1881                }
1882            }
1883            JoinType::Right => {
1884                let outer_col_count = if outer_rows.is_empty() {
1885                    tables.iter().map(|(_, s)| s.columns.len()).sum()
1886                } else {
1887                    outer_rows[0].len()
1888                };
1889                let mut inner_matched = vec![false; inner_rows.len()];
1890                for outer in &outer_rows {
1891                    for (j, inner) in inner_rows.iter().enumerate() {
1892                        let combined: Vec<Value> = outer.iter()
1893                            .chain(inner.iter())
1894                            .cloned()
1895                            .collect();
1896                        if let Some(ref on_expr) = join.on_clause {
1897                            match eval_expr(on_expr, &combined_cols, &combined) {
1898                                Ok(val) if is_truthy(&val) => {
1899                                    new_rows.push(combined);
1900                                    inner_matched[j] = true;
1901                                }
1902                                _ => {}
1903                            }
1904                        } else {
1905                            new_rows.push(combined);
1906                            inner_matched[j] = true;
1907                        }
1908                    }
1909                }
1910                for (j, inner) in inner_rows.iter().enumerate() {
1911                    if !inner_matched[j] {
1912                        let mut padded: Vec<Value> = std::iter::repeat(Value::Null)
1913                            .take(outer_col_count)
1914                            .collect();
1915                        padded.extend(inner.iter().cloned());
1916                        new_rows.push(padded);
1917                    }
1918                }
1919            }
1920        }
1921
1922        tables.push((inner_alias, inner_schema));
1923        outer_rows = new_rows;
1924    }
1925
1926    let joined_cols = build_joined_columns(&tables);
1927    process_select(&joined_cols, outer_rows, stmt)
1928}
1929
1930fn exec_update(
1931    db: &Database,
1932    schema: &SchemaManager,
1933    stmt: &UpdateStmt,
1934) -> Result<ExecutionResult> {
1935    let materialized;
1936    let stmt = if update_has_subquery(stmt) {
1937        materialized = materialize_update(stmt, &mut |sub| exec_subquery_read(db, schema, sub))?;
1938        &materialized
1939    } else {
1940        stmt
1941    };
1942
1943    let lower_name = stmt.table.to_ascii_lowercase();
1944    let table_schema = schema.get(&lower_name)
1945        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1946
1947    let all_candidates = collect_keyed_rows_read(db, table_schema, &stmt.where_clause)?;
1948    let matching_rows: Vec<(Vec<u8>, Vec<Value>)> = all_candidates.into_iter()
1949        .filter(|(_, row)| {
1950            match &stmt.where_clause {
1951                Some(where_expr) => {
1952                    match eval_expr(where_expr, &table_schema.columns, row) {
1953                        Ok(val) => is_truthy(&val),
1954                        Err(_) => false,
1955                    }
1956                }
1957                None => true,
1958            }
1959        })
1960        .collect();
1961
1962    if matching_rows.is_empty() {
1963        return Ok(ExecutionResult::RowsAffected(0));
1964    }
1965
1966    struct UpdateChange {
1967        old_key: Vec<u8>,
1968        new_key: Vec<u8>,
1969        new_value: Vec<u8>,
1970        pk_changed: bool,
1971        old_row: Vec<Value>,
1972        new_row: Vec<Value>,
1973    }
1974
1975    let pk_indices = table_schema.pk_indices();
1976    let mut changes: Vec<UpdateChange> = Vec::new();
1977
1978    for (old_key, row) in &matching_rows {
1979        let mut new_row = row.clone();
1980        let mut pk_changed = false;
1981        for (col_name, expr) in &stmt.assignments {
1982            let col_idx = table_schema.column_index(col_name)
1983                .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))?;
1984            let new_val = eval_expr(expr, &table_schema.columns, &new_row)?;
1985            let col = &table_schema.columns[col_idx];
1986
1987            let coerced = if new_val.is_null() {
1988                if !col.nullable {
1989                    return Err(SqlError::NotNullViolation(col.name.clone()));
1990                }
1991                Value::Null
1992            } else {
1993                new_val.coerce_to(col.data_type).ok_or_else(|| SqlError::TypeMismatch {
1994                    expected: col.data_type.to_string(),
1995                    got: new_val.data_type().to_string(),
1996                })?
1997            };
1998
1999            if table_schema.primary_key_columns.contains(&(col_idx as u16)) {
2000                pk_changed = true;
2001            }
2002            new_row[col_idx] = coerced;
2003        }
2004
2005        let pk_values: Vec<Value> = pk_indices.iter().map(|&i| new_row[i].clone()).collect();
2006        let new_key = encode_composite_key(&pk_values);
2007
2008        let non_pk = table_schema.non_pk_indices();
2009        let value_values: Vec<Value> = non_pk.iter().map(|&i| new_row[i].clone()).collect();
2010        let new_value = encode_row(&value_values);
2011
2012        changes.push(UpdateChange {
2013            old_key: old_key.clone(), new_key, new_value, pk_changed,
2014            old_row: row.clone(), new_row,
2015        });
2016    }
2017
2018    {
2019        use std::collections::HashSet;
2020        let mut new_keys: HashSet<Vec<u8>> = HashSet::new();
2021        for c in &changes {
2022            if c.pk_changed && c.new_key != c.old_key {
2023                if !new_keys.insert(c.new_key.clone()) {
2024                    return Err(SqlError::DuplicateKey);
2025                }
2026            }
2027        }
2028    }
2029
2030    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
2031
2032    for c in &changes {
2033        let old_pk: Vec<Value> = pk_indices.iter().map(|&i| c.old_row[i].clone()).collect();
2034
2035        for idx in &table_schema.indices {
2036            if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
2037                let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
2038                let old_idx_key = encode_index_key(idx, &c.old_row, &old_pk);
2039                wtx.table_delete(&idx_table, &old_idx_key).map_err(SqlError::Storage)?;
2040            }
2041        }
2042
2043        if c.pk_changed {
2044            wtx.table_delete(lower_name.as_bytes(), &c.old_key).map_err(SqlError::Storage)?;
2045        }
2046    }
2047
2048    for c in &changes {
2049        let new_pk: Vec<Value> = pk_indices.iter().map(|&i| c.new_row[i].clone()).collect();
2050
2051        if c.pk_changed {
2052            let is_new = wtx.table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
2053                .map_err(SqlError::Storage)?;
2054            if !is_new {
2055                return Err(SqlError::DuplicateKey);
2056            }
2057        } else {
2058            wtx.table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
2059                .map_err(SqlError::Storage)?;
2060        }
2061
2062        for idx in &table_schema.indices {
2063            if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
2064                let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
2065                let new_idx_key = encode_index_key(idx, &c.new_row, &new_pk);
2066                let new_idx_val = encode_index_value(idx, &c.new_row, &new_pk);
2067                let is_new = wtx.table_insert(&idx_table, &new_idx_key, &new_idx_val)
2068                    .map_err(SqlError::Storage)?;
2069                if idx.unique && !is_new {
2070                    let indexed_values: Vec<Value> = idx.columns.iter()
2071                        .map(|&col_idx| c.new_row[col_idx as usize].clone())
2072                        .collect();
2073                    let any_null = indexed_values.iter().any(|v| v.is_null());
2074                    if !any_null {
2075                        return Err(SqlError::UniqueViolation(idx.name.clone()));
2076                    }
2077                }
2078            }
2079        }
2080    }
2081
2082    let count = changes.len() as u64;
2083    wtx.commit().map_err(SqlError::Storage)?;
2084    Ok(ExecutionResult::RowsAffected(count))
2085}
2086
2087fn exec_delete(
2088    db: &Database,
2089    schema: &SchemaManager,
2090    stmt: &DeleteStmt,
2091) -> Result<ExecutionResult> {
2092    let materialized;
2093    let stmt = if delete_has_subquery(stmt) {
2094        materialized = materialize_delete(stmt, &mut |sub| exec_subquery_read(db, schema, sub))?;
2095        &materialized
2096    } else {
2097        stmt
2098    };
2099
2100    let lower_name = stmt.table.to_ascii_lowercase();
2101    let table_schema = schema.get(&lower_name)
2102        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
2103
2104    let all_candidates = collect_keyed_rows_read(db, table_schema, &stmt.where_clause)?;
2105    let rows_to_delete: Vec<(Vec<u8>, Vec<Value>)> = all_candidates.into_iter()
2106        .filter(|(_, row)| {
2107            match &stmt.where_clause {
2108                Some(where_expr) => {
2109                    match eval_expr(where_expr, &table_schema.columns, row) {
2110                        Ok(val) => is_truthy(&val),
2111                        Err(_) => false,
2112                    }
2113                }
2114                None => true,
2115            }
2116        })
2117        .collect();
2118
2119    if rows_to_delete.is_empty() {
2120        return Ok(ExecutionResult::RowsAffected(0));
2121    }
2122
2123    let pk_indices = table_schema.pk_indices();
2124    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
2125    for (key, row) in &rows_to_delete {
2126        let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
2127        delete_index_entries(&mut wtx, table_schema, row, &pk_values)?;
2128        wtx.table_delete(lower_name.as_bytes(), key).map_err(SqlError::Storage)?;
2129    }
2130    let count = rows_to_delete.len() as u64;
2131    wtx.commit().map_err(SqlError::Storage)?;
2132    Ok(ExecutionResult::RowsAffected(count))
2133}
2134
2135// ── DML (in-transaction) ────────────────────────────────────────────
2136
2137fn exec_insert_in_txn(
2138    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2139    schema: &SchemaManager,
2140    stmt: &InsertStmt,
2141) -> Result<ExecutionResult> {
2142    let materialized;
2143    let stmt = if insert_has_subquery(stmt) {
2144        materialized = materialize_insert(stmt, &mut |sub| exec_subquery_write(wtx, schema, sub))?;
2145        &materialized
2146    } else {
2147        stmt
2148    };
2149
2150    let lower_name = stmt.table.to_ascii_lowercase();
2151    let table_schema = schema.get(&lower_name)
2152        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
2153
2154    let insert_columns = if stmt.columns.is_empty() {
2155        table_schema.columns.iter().map(|c| c.name.clone()).collect::<Vec<_>>()
2156    } else {
2157        stmt.columns.iter().map(|c| c.to_ascii_lowercase()).collect()
2158    };
2159
2160    let col_indices: Vec<usize> = insert_columns.iter().map(|name| {
2161        table_schema.column_index(name)
2162            .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
2163    }).collect::<Result<_>>()?;
2164
2165    let mut count: u64 = 0;
2166
2167    for value_row in &stmt.values {
2168        if value_row.len() != insert_columns.len() {
2169            return Err(SqlError::InvalidValue(format!(
2170                "expected {} values, got {}",
2171                insert_columns.len(),
2172                value_row.len()
2173            )));
2174        }
2175
2176        let mut row = vec![Value::Null; table_schema.columns.len()];
2177        for (i, expr) in value_row.iter().enumerate() {
2178            let val = eval_const_expr(expr)?;
2179            let col_idx = col_indices[i];
2180            let col = &table_schema.columns[col_idx];
2181
2182            let coerced = if val.is_null() {
2183                Value::Null
2184            } else {
2185                val.coerce_to(col.data_type).ok_or_else(|| SqlError::TypeMismatch {
2186                    expected: col.data_type.to_string(),
2187                    got: val.data_type().to_string(),
2188                })?
2189            };
2190
2191            row[col_idx] = coerced;
2192        }
2193
2194        for col in &table_schema.columns {
2195            if !col.nullable && row[col.position as usize].is_null() {
2196                return Err(SqlError::NotNullViolation(col.name.clone()));
2197            }
2198        }
2199
2200        let pk_values: Vec<Value> = table_schema.pk_indices()
2201            .iter()
2202            .map(|&i| row[i].clone())
2203            .collect();
2204        let key = encode_composite_key(&pk_values);
2205
2206        let non_pk = table_schema.non_pk_indices();
2207        let value_values: Vec<Value> = non_pk.iter().map(|&i| row[i].clone()).collect();
2208        let value = encode_row(&value_values);
2209
2210        if key.len() > citadel_core::MAX_KEY_SIZE {
2211            return Err(SqlError::KeyTooLarge { size: key.len(), max: citadel_core::MAX_KEY_SIZE });
2212        }
2213        if value.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
2214            return Err(SqlError::RowTooLarge { size: value.len(), max: citadel_core::MAX_INLINE_VALUE_SIZE });
2215        }
2216
2217        let is_new = wtx.table_insert(lower_name.as_bytes(), &key, &value)
2218            .map_err(SqlError::Storage)?;
2219        if !is_new {
2220            return Err(SqlError::DuplicateKey);
2221        }
2222
2223        insert_index_entries(wtx, table_schema, &row, &pk_values)?;
2224        count += 1;
2225    }
2226
2227    Ok(ExecutionResult::RowsAffected(count))
2228}
2229
2230fn exec_select_in_txn(
2231    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2232    schema: &SchemaManager,
2233    stmt: &SelectStmt,
2234) -> Result<ExecutionResult> {
2235    let materialized;
2236    let stmt = if stmt_has_subquery(stmt) {
2237        materialized = materialize_stmt(stmt, &mut |sub| exec_subquery_write(wtx, schema, sub))?;
2238        &materialized
2239    } else {
2240        stmt
2241    };
2242
2243    if stmt.from.is_empty() {
2244        return exec_select_no_from(stmt);
2245    }
2246
2247    if !stmt.joins.is_empty() {
2248        return exec_select_join_in_txn(wtx, schema, stmt);
2249    }
2250
2251    let lower_name = stmt.from.to_ascii_lowercase();
2252    let table_schema = schema.get(&lower_name)
2253        .ok_or_else(|| SqlError::TableNotFound(stmt.from.clone()))?;
2254
2255    let rows = collect_rows_write(wtx, table_schema, &stmt.where_clause)?;
2256    process_select(&table_schema.columns, rows, stmt)
2257}
2258
2259fn exec_update_in_txn(
2260    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2261    schema: &SchemaManager,
2262    stmt: &UpdateStmt,
2263) -> Result<ExecutionResult> {
2264    let materialized;
2265    let stmt = if update_has_subquery(stmt) {
2266        materialized = materialize_update(stmt, &mut |sub| exec_subquery_write(wtx, schema, sub))?;
2267        &materialized
2268    } else {
2269        stmt
2270    };
2271
2272    let lower_name = stmt.table.to_ascii_lowercase();
2273    let table_schema = schema.get(&lower_name)
2274        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
2275
2276    let all_candidates = collect_keyed_rows_write(wtx, table_schema, &stmt.where_clause)?;
2277    let matching_rows: Vec<(Vec<u8>, Vec<Value>)> = all_candidates.into_iter()
2278        .filter(|(_, row)| {
2279            match &stmt.where_clause {
2280                Some(where_expr) => {
2281                    match eval_expr(where_expr, &table_schema.columns, row) {
2282                        Ok(val) => is_truthy(&val),
2283                        Err(_) => false,
2284                    }
2285                }
2286                None => true,
2287            }
2288        })
2289        .collect();
2290
2291    if matching_rows.is_empty() {
2292        return Ok(ExecutionResult::RowsAffected(0));
2293    }
2294
2295    struct UpdateChange {
2296        old_key: Vec<u8>,
2297        new_key: Vec<u8>,
2298        new_value: Vec<u8>,
2299        pk_changed: bool,
2300        old_row: Vec<Value>,
2301        new_row: Vec<Value>,
2302    }
2303
2304    let pk_indices = table_schema.pk_indices();
2305    let mut changes: Vec<UpdateChange> = Vec::new();
2306
2307    for (old_key, row) in &matching_rows {
2308        let mut new_row = row.clone();
2309        let mut pk_changed = false;
2310        for (col_name, expr) in &stmt.assignments {
2311            let col_idx = table_schema.column_index(col_name)
2312                .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))?;
2313            let new_val = eval_expr(expr, &table_schema.columns, &new_row)?;
2314            let col = &table_schema.columns[col_idx];
2315
2316            let coerced = if new_val.is_null() {
2317                if !col.nullable {
2318                    return Err(SqlError::NotNullViolation(col.name.clone()));
2319                }
2320                Value::Null
2321            } else {
2322                new_val.coerce_to(col.data_type).ok_or_else(|| SqlError::TypeMismatch {
2323                    expected: col.data_type.to_string(),
2324                    got: new_val.data_type().to_string(),
2325                })?
2326            };
2327
2328            if table_schema.primary_key_columns.contains(&(col_idx as u16)) {
2329                pk_changed = true;
2330            }
2331            new_row[col_idx] = coerced;
2332        }
2333
2334        let pk_values: Vec<Value> = pk_indices.iter().map(|&i| new_row[i].clone()).collect();
2335        let new_key = encode_composite_key(&pk_values);
2336
2337        let non_pk = table_schema.non_pk_indices();
2338        let value_values: Vec<Value> = non_pk.iter().map(|&i| new_row[i].clone()).collect();
2339        let new_value = encode_row(&value_values);
2340
2341        changes.push(UpdateChange {
2342            old_key: old_key.clone(), new_key, new_value, pk_changed,
2343            old_row: row.clone(), new_row,
2344        });
2345    }
2346
2347    {
2348        use std::collections::HashSet;
2349        let mut new_keys: HashSet<Vec<u8>> = HashSet::new();
2350        for c in &changes {
2351            if c.pk_changed && c.new_key != c.old_key {
2352                if !new_keys.insert(c.new_key.clone()) {
2353                    return Err(SqlError::DuplicateKey);
2354                }
2355            }
2356        }
2357    }
2358
2359    for c in &changes {
2360        let old_pk: Vec<Value> = pk_indices.iter().map(|&i| c.old_row[i].clone()).collect();
2361
2362        for idx in &table_schema.indices {
2363            if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
2364                let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
2365                let old_idx_key = encode_index_key(idx, &c.old_row, &old_pk);
2366                wtx.table_delete(&idx_table, &old_idx_key).map_err(SqlError::Storage)?;
2367            }
2368        }
2369
2370        if c.pk_changed {
2371            wtx.table_delete(lower_name.as_bytes(), &c.old_key).map_err(SqlError::Storage)?;
2372        }
2373    }
2374
2375    for c in &changes {
2376        let new_pk: Vec<Value> = pk_indices.iter().map(|&i| c.new_row[i].clone()).collect();
2377
2378        if c.pk_changed {
2379            let is_new = wtx.table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
2380                .map_err(SqlError::Storage)?;
2381            if !is_new {
2382                return Err(SqlError::DuplicateKey);
2383            }
2384        } else {
2385            wtx.table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
2386                .map_err(SqlError::Storage)?;
2387        }
2388
2389        for idx in &table_schema.indices {
2390            if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
2391                let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
2392                let new_idx_key = encode_index_key(idx, &c.new_row, &new_pk);
2393                let new_idx_val = encode_index_value(idx, &c.new_row, &new_pk);
2394                let is_new = wtx.table_insert(&idx_table, &new_idx_key, &new_idx_val)
2395                    .map_err(SqlError::Storage)?;
2396                if idx.unique && !is_new {
2397                    let indexed_values: Vec<Value> = idx.columns.iter()
2398                        .map(|&col_idx| c.new_row[col_idx as usize].clone())
2399                        .collect();
2400                    let any_null = indexed_values.iter().any(|v| v.is_null());
2401                    if !any_null {
2402                        return Err(SqlError::UniqueViolation(idx.name.clone()));
2403                    }
2404                }
2405            }
2406        }
2407    }
2408
2409    let count = changes.len() as u64;
2410    Ok(ExecutionResult::RowsAffected(count))
2411}
2412
2413fn exec_delete_in_txn(
2414    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2415    schema: &SchemaManager,
2416    stmt: &DeleteStmt,
2417) -> Result<ExecutionResult> {
2418    let materialized;
2419    let stmt = if delete_has_subquery(stmt) {
2420        materialized = materialize_delete(stmt, &mut |sub| exec_subquery_write(wtx, schema, sub))?;
2421        &materialized
2422    } else {
2423        stmt
2424    };
2425
2426    let lower_name = stmt.table.to_ascii_lowercase();
2427    let table_schema = schema.get(&lower_name)
2428        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
2429
2430    let all_candidates = collect_keyed_rows_write(wtx, table_schema, &stmt.where_clause)?;
2431    let rows_to_delete: Vec<(Vec<u8>, Vec<Value>)> = all_candidates.into_iter()
2432        .filter(|(_, row)| {
2433            match &stmt.where_clause {
2434                Some(where_expr) => {
2435                    match eval_expr(where_expr, &table_schema.columns, row) {
2436                        Ok(val) => is_truthy(&val),
2437                        Err(_) => false,
2438                    }
2439                }
2440                None => true,
2441            }
2442        })
2443        .collect();
2444
2445    if rows_to_delete.is_empty() {
2446        return Ok(ExecutionResult::RowsAffected(0));
2447    }
2448
2449    let pk_indices = table_schema.pk_indices();
2450    for (key, row) in &rows_to_delete {
2451        let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
2452        delete_index_entries(wtx, table_schema, row, &pk_values)?;
2453        wtx.table_delete(lower_name.as_bytes(), key).map_err(SqlError::Storage)?;
2454    }
2455    let count = rows_to_delete.len() as u64;
2456    Ok(ExecutionResult::RowsAffected(count))
2457}
2458
2459// ── Aggregation ─────────────────────────────────────────────────────
2460
2461fn exec_aggregate(
2462    columns: &[ColumnDef],
2463    rows: &[Vec<Value>],
2464    stmt: &SelectStmt,
2465) -> Result<ExecutionResult> {
2466    let groups: BTreeMap<Vec<Value>, Vec<&Vec<Value>>> = if stmt.group_by.is_empty() {
2467        let mut m = BTreeMap::new();
2468        m.insert(vec![], rows.iter().collect());
2469        m
2470    } else {
2471        let mut m: BTreeMap<Vec<Value>, Vec<&Vec<Value>>> = BTreeMap::new();
2472        for row in rows {
2473            let group_key: Vec<Value> = stmt.group_by.iter()
2474                .map(|expr| eval_expr(expr, columns, row))
2475                .collect::<Result<_>>()?;
2476            m.entry(group_key).or_default().push(row);
2477        }
2478        m
2479    };
2480
2481    let mut result_rows = Vec::new();
2482    let output_cols = build_output_columns(&stmt.columns, columns);
2483
2484    for (_group_key, group_rows) in &groups {
2485        let mut result_row = Vec::new();
2486
2487        for sel_col in &stmt.columns {
2488            match sel_col {
2489                SelectColumn::AllColumns => {
2490                    return Err(SqlError::Unsupported(
2491                        "SELECT * with GROUP BY".into()
2492                    ));
2493                }
2494                SelectColumn::Expr { expr, .. } => {
2495                    let val = eval_aggregate_expr(
2496                        expr, columns, group_rows,
2497                    )?;
2498                    result_row.push(val);
2499                }
2500            }
2501        }
2502
2503        if let Some(ref having) = stmt.having {
2504            let passes = match eval_aggregate_expr(having, columns, group_rows) {
2505                Ok(val) => is_truthy(&val),
2506                Err(SqlError::ColumnNotFound(_)) => {
2507                    match eval_expr(having, &output_cols, &result_row) {
2508                        Ok(val) => is_truthy(&val),
2509                        Err(_) => false,
2510                    }
2511                }
2512                Err(e) => return Err(e),
2513            };
2514            if !passes {
2515                continue;
2516            }
2517        }
2518
2519        result_rows.push(result_row);
2520    }
2521
2522    if stmt.distinct {
2523        let mut seen = std::collections::HashSet::new();
2524        result_rows.retain(|row| seen.insert(row.clone()));
2525    }
2526
2527    if !stmt.order_by.is_empty() {
2528        let output_cols = build_output_columns(&stmt.columns, columns);
2529        sort_rows(&mut result_rows, &stmt.order_by, &output_cols)?;
2530    }
2531
2532    if let Some(ref offset_expr) = stmt.offset {
2533        let offset = eval_const_int(offset_expr)? as usize;
2534        if offset < result_rows.len() {
2535            result_rows = result_rows.split_off(offset);
2536        } else {
2537            result_rows.clear();
2538        }
2539    }
2540    if let Some(ref limit_expr) = stmt.limit {
2541        let limit = eval_const_int(limit_expr)? as usize;
2542        result_rows.truncate(limit);
2543    }
2544
2545    let col_names = stmt.columns.iter().map(|c| match c {
2546        SelectColumn::AllColumns => "*".into(),
2547        SelectColumn::Expr { alias: Some(a), .. } => a.clone(),
2548        SelectColumn::Expr { expr, .. } => expr_display_name(expr),
2549    }).collect();
2550
2551    Ok(ExecutionResult::Query(QueryResult {
2552        columns: col_names,
2553        rows: result_rows,
2554    }))
2555}
2556
2557fn eval_aggregate_expr(
2558    expr: &Expr,
2559    columns: &[ColumnDef],
2560    group_rows: &[&Vec<Value>],
2561) -> Result<Value> {
2562    match expr {
2563        Expr::CountStar => Ok(Value::Integer(group_rows.len() as i64)),
2564
2565        Expr::Function { name, args } if is_aggregate_function(name, args.len()) => {
2566            let func = name.to_ascii_uppercase();
2567            if args.len() != 1 {
2568                return Err(SqlError::Unsupported(format!(
2569                    "{func} with {} args", args.len()
2570                )));
2571            }
2572            let arg = &args[0];
2573            let values: Vec<Value> = group_rows.iter()
2574                .map(|row| eval_expr(arg, columns, row))
2575                .collect::<Result<_>>()?;
2576
2577            match func.as_str() {
2578                "COUNT" => {
2579                    let count = values.iter().filter(|v| !v.is_null()).count();
2580                    Ok(Value::Integer(count as i64))
2581                }
2582                "SUM" => {
2583                    let mut int_sum: i64 = 0;
2584                    let mut real_sum: f64 = 0.0;
2585                    let mut has_real = false;
2586                    let mut all_null = true;
2587                    for v in &values {
2588                        match v {
2589                            Value::Integer(i) => { int_sum += i; all_null = false; }
2590                            Value::Real(r) => { real_sum += r; has_real = true; all_null = false; }
2591                            Value::Null => {}
2592                            _ => return Err(SqlError::TypeMismatch {
2593                                expected: "numeric".into(),
2594                                got: v.data_type().to_string(),
2595                            }),
2596                        }
2597                    }
2598                    if all_null { return Ok(Value::Null); }
2599                    if has_real {
2600                        Ok(Value::Real(real_sum + int_sum as f64))
2601                    } else {
2602                        Ok(Value::Integer(int_sum))
2603                    }
2604                }
2605                "AVG" => {
2606                    let mut sum: f64 = 0.0;
2607                    let mut count: i64 = 0;
2608                    for v in &values {
2609                        match v {
2610                            Value::Integer(i) => { sum += *i as f64; count += 1; }
2611                            Value::Real(r) => { sum += r; count += 1; }
2612                            Value::Null => {}
2613                            _ => return Err(SqlError::TypeMismatch {
2614                                expected: "numeric".into(),
2615                                got: v.data_type().to_string(),
2616                            }),
2617                        }
2618                    }
2619                    if count == 0 { Ok(Value::Null) } else { Ok(Value::Real(sum / count as f64)) }
2620                }
2621                "MIN" => {
2622                    let mut min: Option<&Value> = None;
2623                    for v in &values {
2624                        if v.is_null() { continue; }
2625                        min = Some(match min {
2626                            None => v,
2627                            Some(m) => if v < m { v } else { m },
2628                        });
2629                    }
2630                    Ok(min.cloned().unwrap_or(Value::Null))
2631                }
2632                "MAX" => {
2633                    let mut max: Option<&Value> = None;
2634                    for v in &values {
2635                        if v.is_null() { continue; }
2636                        max = Some(match max {
2637                            None => v,
2638                            Some(m) => if v > m { v } else { m },
2639                        });
2640                    }
2641                    Ok(max.cloned().unwrap_or(Value::Null))
2642                }
2643                _ => Err(SqlError::Unsupported(format!("aggregate function: {func}"))),
2644            }
2645        }
2646
2647        Expr::Column(_) | Expr::QualifiedColumn { .. } => {
2648            if let Some(first) = group_rows.first() {
2649                eval_expr(expr, columns, first)
2650            } else {
2651                Ok(Value::Null)
2652            }
2653        }
2654
2655        Expr::Literal(v) => Ok(v.clone()),
2656
2657        Expr::BinaryOp { left, op, right } => {
2658            let l = eval_aggregate_expr(left, columns, group_rows)?;
2659            let r = eval_aggregate_expr(right, columns, group_rows)?;
2660            crate::eval::eval_expr(
2661                &Expr::BinaryOp {
2662                    left: Box::new(Expr::Literal(l)),
2663                    op: *op,
2664                    right: Box::new(Expr::Literal(r)),
2665                },
2666                columns,
2667                &[],
2668            )
2669        }
2670
2671        Expr::UnaryOp { op, expr: e } => {
2672            let v = eval_aggregate_expr(e, columns, group_rows)?;
2673            crate::eval::eval_expr(
2674                &Expr::UnaryOp { op: *op, expr: Box::new(Expr::Literal(v)) },
2675                columns, &[],
2676            )
2677        }
2678
2679        Expr::IsNull(e) => {
2680            let v = eval_aggregate_expr(e, columns, group_rows)?;
2681            Ok(Value::Boolean(v.is_null()))
2682        }
2683
2684        Expr::IsNotNull(e) => {
2685            let v = eval_aggregate_expr(e, columns, group_rows)?;
2686            Ok(Value::Boolean(!v.is_null()))
2687        }
2688
2689        Expr::Cast { expr: e, data_type } => {
2690            let v = eval_aggregate_expr(e, columns, group_rows)?;
2691            crate::eval::eval_expr(
2692                &Expr::Cast { expr: Box::new(Expr::Literal(v)), data_type: *data_type },
2693                columns, &[],
2694            )
2695        }
2696
2697        Expr::Case { operand, conditions, else_result } => {
2698            let op_val = operand.as_ref()
2699                .map(|e| eval_aggregate_expr(e, columns, group_rows))
2700                .transpose()?;
2701            if let Some(ov) = &op_val {
2702                for (cond, result) in conditions {
2703                    let cv = eval_aggregate_expr(cond, columns, group_rows)?;
2704                    if !ov.is_null() && !cv.is_null() && *ov == cv {
2705                        return eval_aggregate_expr(result, columns, group_rows);
2706                    }
2707                }
2708            } else {
2709                for (cond, result) in conditions {
2710                    let cv = eval_aggregate_expr(cond, columns, group_rows)?;
2711                    if crate::eval::is_truthy(&cv) {
2712                        return eval_aggregate_expr(result, columns, group_rows);
2713                    }
2714                }
2715            }
2716            match else_result {
2717                Some(e) => eval_aggregate_expr(e, columns, group_rows),
2718                None => Ok(Value::Null),
2719            }
2720        }
2721
2722        Expr::Coalesce(args) => {
2723            for arg in args {
2724                let v = eval_aggregate_expr(arg, columns, group_rows)?;
2725                if !v.is_null() { return Ok(v); }
2726            }
2727            Ok(Value::Null)
2728        }
2729
2730        Expr::Between { expr: e, low, high, negated } => {
2731            let v = eval_aggregate_expr(e, columns, group_rows)?;
2732            let lo = eval_aggregate_expr(low, columns, group_rows)?;
2733            let hi = eval_aggregate_expr(high, columns, group_rows)?;
2734            crate::eval::eval_expr(
2735                &Expr::Between {
2736                    expr: Box::new(Expr::Literal(v)),
2737                    low: Box::new(Expr::Literal(lo)),
2738                    high: Box::new(Expr::Literal(hi)),
2739                    negated: *negated,
2740                },
2741                columns, &[],
2742            )
2743        }
2744
2745        Expr::Like { expr: e, pattern, escape, negated } => {
2746            let v = eval_aggregate_expr(e, columns, group_rows)?;
2747            let p = eval_aggregate_expr(pattern, columns, group_rows)?;
2748            let esc = escape.as_ref()
2749                .map(|es| eval_aggregate_expr(es, columns, group_rows))
2750                .transpose()?;
2751            let esc_box = esc.map(|v| Box::new(Expr::Literal(v)));
2752            crate::eval::eval_expr(
2753                &Expr::Like {
2754                    expr: Box::new(Expr::Literal(v)),
2755                    pattern: Box::new(Expr::Literal(p)),
2756                    escape: esc_box,
2757                    negated: *negated,
2758                },
2759                columns, &[],
2760            )
2761        }
2762
2763        Expr::Function { name, args } => {
2764            let evaluated: Vec<Value> = args.iter()
2765                .map(|a| eval_aggregate_expr(a, columns, group_rows))
2766                .collect::<Result<_>>()?;
2767            let literal_args: Vec<Expr> = evaluated.into_iter().map(Expr::Literal).collect();
2768            crate::eval::eval_expr(
2769                &Expr::Function { name: name.clone(), args: literal_args },
2770                columns, &[],
2771            )
2772        }
2773
2774        _ => Err(SqlError::Unsupported(format!("expression in aggregate: {expr:?}"))),
2775    }
2776}
2777
2778fn is_aggregate_function(name: &str, arg_count: usize) -> bool {
2779    let u = name.to_ascii_uppercase();
2780    matches!(u.as_str(), "COUNT" | "SUM" | "AVG")
2781        || (matches!(u.as_str(), "MIN" | "MAX") && arg_count == 1)
2782}
2783
2784fn is_aggregate_expr(expr: &Expr) -> bool {
2785    match expr {
2786        Expr::CountStar => true,
2787        Expr::Function { name, args } => {
2788            is_aggregate_function(name, args.len())
2789                || args.iter().any(is_aggregate_expr)
2790        }
2791        Expr::BinaryOp { left, right, .. } => {
2792            is_aggregate_expr(left) || is_aggregate_expr(right)
2793        }
2794        Expr::UnaryOp { expr, .. } | Expr::IsNull(expr) | Expr::IsNotNull(expr)
2795        | Expr::Cast { expr, .. } => is_aggregate_expr(expr),
2796        Expr::Case { operand, conditions, else_result } => {
2797            operand.as_ref().map_or(false, |e| is_aggregate_expr(e))
2798                || conditions.iter().any(|(c, r)| is_aggregate_expr(c) || is_aggregate_expr(r))
2799                || else_result.as_ref().map_or(false, |e| is_aggregate_expr(e))
2800        }
2801        Expr::Coalesce(args) => args.iter().any(is_aggregate_expr),
2802        Expr::Between { expr, low, high, .. } => {
2803            is_aggregate_expr(expr) || is_aggregate_expr(low) || is_aggregate_expr(high)
2804        }
2805        Expr::Like { expr, pattern, escape, .. } => {
2806            is_aggregate_expr(expr) || is_aggregate_expr(pattern)
2807                || escape.as_ref().map_or(false, |e| is_aggregate_expr(e))
2808        }
2809        _ => false,
2810    }
2811}
2812
2813// ── Helpers ─────────────────────────────────────────────────────────
2814
2815/// Decode a full row from B+ tree key + value.
2816fn decode_full_row(
2817    schema: &TableSchema,
2818    key: &[u8],
2819    value: &[u8],
2820) -> Result<Vec<Value>> {
2821    let pk_values = decode_composite_key(key, schema.primary_key_columns.len())?;
2822    let non_pk_values = decode_row(value)?;
2823
2824    let mut row = vec![Value::Null; schema.columns.len()];
2825
2826    for (i, &col_idx) in schema.primary_key_columns.iter().enumerate() {
2827        row[col_idx as usize] = pk_values[i].clone();
2828    }
2829
2830    let non_pk = schema.non_pk_indices();
2831    for (i, &col_idx) in non_pk.iter().enumerate() {
2832        if i < non_pk_values.len() {
2833            row[col_idx] = non_pk_values[i].clone();
2834        }
2835    }
2836
2837    Ok(row)
2838}
2839
2840/// Evaluate a constant expression (no column references).
2841fn eval_const_expr(expr: &Expr) -> Result<Value> {
2842    eval_expr(expr, &[], &[])
2843}
2844
2845fn eval_const_int(expr: &Expr) -> Result<i64> {
2846    match eval_const_expr(expr)? {
2847        Value::Integer(i) => Ok(i),
2848        other => Err(SqlError::TypeMismatch {
2849            expected: "INTEGER".into(),
2850            got: other.data_type().to_string(),
2851        }),
2852    }
2853}
2854
2855fn sort_rows(
2856    rows: &mut [Vec<Value>],
2857    order_by: &[OrderByItem],
2858    columns: &[ColumnDef],
2859) -> Result<()> {
2860    rows.sort_by(|a, b| {
2861        for item in order_by {
2862            let a_val = eval_expr(&item.expr, columns, a).unwrap_or(Value::Null);
2863            let b_val = eval_expr(&item.expr, columns, b).unwrap_or(Value::Null);
2864
2865            let nulls_first = item.nulls_first.unwrap_or(!item.descending);
2866
2867            let ord = match (a_val.is_null(), b_val.is_null()) {
2868                (true, true) => std::cmp::Ordering::Equal,
2869                (true, false) => {
2870                    if nulls_first { std::cmp::Ordering::Less } else { std::cmp::Ordering::Greater }
2871                }
2872                (false, true) => {
2873                    if nulls_first { std::cmp::Ordering::Greater } else { std::cmp::Ordering::Less }
2874                }
2875                (false, false) => {
2876                    let cmp = a_val.cmp(&b_val);
2877                    if item.descending { cmp.reverse() } else { cmp }
2878                }
2879            };
2880
2881            if ord != std::cmp::Ordering::Equal {
2882                return ord;
2883            }
2884        }
2885        std::cmp::Ordering::Equal
2886    });
2887    Ok(())
2888}
2889
2890fn project_rows(
2891    columns: &[ColumnDef],
2892    select_cols: &[SelectColumn],
2893    rows: &[Vec<Value>],
2894) -> Result<(Vec<String>, Vec<Vec<Value>>)> {
2895    let mut col_names = Vec::new();
2896    let mut projectors: Vec<Box<dyn Fn(&[Value]) -> Result<Value>>> = Vec::new();
2897
2898    for sel_col in select_cols {
2899        match sel_col {
2900            SelectColumn::AllColumns => {
2901                for col in columns {
2902                    let idx = col.position as usize;
2903                    col_names.push(col.name.clone());
2904                    projectors.push(Box::new(move |row: &[Value]| Ok(row[idx].clone())));
2905                }
2906            }
2907            SelectColumn::Expr { expr, alias } => {
2908                let name = alias.clone().unwrap_or_else(|| expr_display_name(expr));
2909                col_names.push(name);
2910                let expr = expr.clone();
2911                let owned_cols = columns.to_vec();
2912                projectors.push(Box::new(move |row: &[Value]| {
2913                    eval_expr(&expr, &owned_cols, row)
2914                }));
2915            }
2916        }
2917    }
2918
2919    let projected = rows.iter().map(|row| {
2920        projectors.iter().map(|p| p(row)).collect::<Result<Vec<_>>>()
2921    }).collect::<Result<Vec<_>>>()?;
2922
2923    Ok((col_names, projected))
2924}
2925
2926fn expr_display_name(expr: &Expr) -> String {
2927    match expr {
2928        Expr::Column(name) => name.clone(),
2929        Expr::QualifiedColumn { table, column } => format!("{table}.{column}"),
2930        Expr::Literal(v) => format!("{v}"),
2931        Expr::CountStar => "COUNT(*)".into(),
2932        Expr::Function { name, args } => {
2933            let arg_strs: Vec<String> = args.iter().map(expr_display_name).collect();
2934            format!("{name}({})", arg_strs.join(", "))
2935        }
2936        Expr::BinaryOp { left, op, right } => {
2937            format!("{} {} {}", expr_display_name(left), op_symbol(op), expr_display_name(right))
2938        }
2939        _ => "?".into(),
2940    }
2941}
2942
2943fn op_symbol(op: &BinOp) -> &'static str {
2944    match op {
2945        BinOp::Add => "+", BinOp::Sub => "-", BinOp::Mul => "*",
2946        BinOp::Div => "/", BinOp::Mod => "%",
2947        BinOp::Eq => "=", BinOp::NotEq => "<>",
2948        BinOp::Lt => "<", BinOp::Gt => ">",
2949        BinOp::LtEq => "<=", BinOp::GtEq => ">=",
2950        BinOp::And => "AND", BinOp::Or => "OR", BinOp::Concat => "||",
2951    }
2952}
2953
2954fn build_output_columns(
2955    select_cols: &[SelectColumn],
2956    columns: &[ColumnDef],
2957) -> Vec<ColumnDef> {
2958    let mut out = Vec::new();
2959    for (i, col) in select_cols.iter().enumerate() {
2960        let (name, data_type) = match col {
2961            SelectColumn::AllColumns => (format!("col{i}"), DataType::Null),
2962            SelectColumn::Expr { alias: Some(a), expr } => {
2963                (a.clone(), infer_expr_type(expr, columns))
2964            }
2965            SelectColumn::Expr { expr, .. } => {
2966                (expr_display_name(expr), infer_expr_type(expr, columns))
2967            }
2968        };
2969        out.push(ColumnDef {
2970            name,
2971            data_type,
2972            nullable: true,
2973            position: i as u16,
2974        });
2975    }
2976    out
2977}
2978
2979fn infer_expr_type(expr: &Expr, columns: &[ColumnDef]) -> DataType {
2980    match expr {
2981        Expr::Column(name) => {
2982            let lower = name.to_ascii_lowercase();
2983            columns.iter()
2984                .find(|c| c.name.to_ascii_lowercase() == lower)
2985                .map(|c| c.data_type)
2986                .unwrap_or(DataType::Null)
2987        }
2988        Expr::QualifiedColumn { table, column } => {
2989            let qualified = format!("{}.{}", table.to_ascii_lowercase(), column.to_ascii_lowercase());
2990            columns.iter()
2991                .find(|c| c.name.to_ascii_lowercase() == qualified)
2992                .map(|c| c.data_type)
2993                .unwrap_or(DataType::Null)
2994        }
2995        Expr::Literal(v) => v.data_type(),
2996        Expr::CountStar => DataType::Integer,
2997        Expr::Function { name, .. } => {
2998            match name.to_ascii_uppercase().as_str() {
2999                "COUNT" => DataType::Integer,
3000                "AVG" => DataType::Real,
3001                "SUM" | "MIN" | "MAX" => DataType::Null,
3002                _ => DataType::Null,
3003            }
3004        }
3005        _ => DataType::Null,
3006    }
3007}