1use std::collections::BTreeMap;
4
5use citadel::Database;
6
7use crate::encoding::{decode_composite_key, decode_key_value, decode_row, encode_composite_key, encode_row};
8use crate::error::{Result, SqlError};
9use crate::eval::{eval_expr, is_truthy};
10use crate::parser::*;
11use crate::planner::{self, ScanPlan};
12use crate::schema::SchemaManager;
13use crate::types::*;
14
15fn encode_index_key(
18 idx: &IndexDef,
19 row: &[Value],
20 pk_values: &[Value],
21) -> Vec<u8> {
22 let indexed_values: Vec<Value> = idx.columns.iter()
23 .map(|&col_idx| row[col_idx as usize].clone())
24 .collect();
25
26 if idx.unique {
27 let any_null = indexed_values.iter().any(|v| v.is_null());
28 if !any_null {
29 return encode_composite_key(&indexed_values);
30 }
31 }
32
33 let mut all_values = indexed_values;
34 all_values.extend_from_slice(pk_values);
35 encode_composite_key(&all_values)
36}
37
38fn encode_index_value(
39 idx: &IndexDef,
40 row: &[Value],
41 pk_values: &[Value],
42) -> Vec<u8> {
43 if idx.unique {
44 let indexed_values: Vec<Value> = idx.columns.iter()
45 .map(|&col_idx| row[col_idx as usize].clone())
46 .collect();
47 let any_null = indexed_values.iter().any(|v| v.is_null());
48 if !any_null {
49 return encode_composite_key(pk_values);
50 }
51 }
52 vec![]
53}
54
55fn insert_index_entries(
56 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
57 table_schema: &TableSchema,
58 row: &[Value],
59 pk_values: &[Value],
60) -> Result<()> {
61 for idx in &table_schema.indices {
62 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
63 let key = encode_index_key(idx, row, pk_values);
64 let value = encode_index_value(idx, row, pk_values);
65
66 let is_new = wtx.table_insert(&idx_table, &key, &value)
67 .map_err(SqlError::Storage)?;
68
69 if idx.unique && !is_new {
70 let indexed_values: Vec<Value> = idx.columns.iter()
71 .map(|&col_idx| row[col_idx as usize].clone())
72 .collect();
73 let any_null = indexed_values.iter().any(|v| v.is_null());
74 if !any_null {
75 return Err(SqlError::UniqueViolation(idx.name.clone()));
76 }
77 }
78 }
79 Ok(())
80}
81
82fn delete_index_entries(
83 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
84 table_schema: &TableSchema,
85 row: &[Value],
86 pk_values: &[Value],
87) -> Result<()> {
88 for idx in &table_schema.indices {
89 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
90 let key = encode_index_key(idx, row, pk_values);
91 wtx.table_delete(&idx_table, &key).map_err(SqlError::Storage)?;
92 }
93 Ok(())
94}
95
96fn index_columns_changed(idx: &IndexDef, old_row: &[Value], new_row: &[Value]) -> bool {
97 idx.columns.iter().any(|&col_idx| {
98 old_row[col_idx as usize] != new_row[col_idx as usize]
99 })
100}
101
102pub fn execute(
104 db: &Database,
105 schema: &mut SchemaManager,
106 stmt: &Statement,
107) -> Result<ExecutionResult> {
108 match stmt {
109 Statement::CreateTable(ct) => exec_create_table(db, schema, ct),
110 Statement::DropTable(dt) => exec_drop_table(db, schema, dt),
111 Statement::CreateIndex(ci) => exec_create_index(db, schema, ci),
112 Statement::DropIndex(di) => exec_drop_index(db, schema, di),
113 Statement::Insert(ins) => exec_insert(db, schema, ins),
114 Statement::Select(sel) => exec_select(db, schema, sel),
115 Statement::Update(upd) => exec_update(db, schema, upd),
116 Statement::Delete(del) => exec_delete(db, schema, del),
117 Statement::Explain(inner) => explain(schema, inner),
118 Statement::Begin | Statement::Commit | Statement::Rollback => {
119 Err(SqlError::Unsupported("transaction control in auto-commit mode".into()))
120 }
121 }
122}
123
124pub fn execute_in_txn(
126 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
127 schema: &mut SchemaManager,
128 stmt: &Statement,
129) -> Result<ExecutionResult> {
130 match stmt {
131 Statement::CreateTable(ct) => exec_create_table_in_txn(wtx, schema, ct),
132 Statement::DropTable(dt) => exec_drop_table_in_txn(wtx, schema, dt),
133 Statement::CreateIndex(ci) => exec_create_index_in_txn(wtx, schema, ci),
134 Statement::DropIndex(di) => exec_drop_index_in_txn(wtx, schema, di),
135 Statement::Insert(ins) => exec_insert_in_txn(wtx, schema, ins),
136 Statement::Select(sel) => exec_select_in_txn(wtx, schema, sel),
137 Statement::Update(upd) => exec_update_in_txn(wtx, schema, upd),
138 Statement::Delete(del) => exec_delete_in_txn(wtx, schema, del),
139 Statement::Explain(inner) => explain(schema, inner),
140 Statement::Begin | Statement::Commit | Statement::Rollback => {
141 Err(SqlError::Unsupported("nested transaction control".into()))
142 }
143 }
144}
145
146pub fn explain(
149 schema: &SchemaManager,
150 stmt: &Statement,
151) -> Result<ExecutionResult> {
152 let lines = match stmt {
153 Statement::Select(sel) => explain_select(schema, sel)?,
154 Statement::Insert(ins) => {
155 vec![format!("INSERT INTO {}", ins.table.to_ascii_lowercase())]
156 }
157 Statement::Update(upd) => explain_dml(schema, &upd.table, &upd.where_clause, "UPDATE")?,
158 Statement::Delete(del) => explain_dml(schema, &del.table, &del.where_clause, "DELETE FROM")?,
159 Statement::Explain(_) => {
160 return Err(SqlError::Unsupported("EXPLAIN EXPLAIN".into()));
161 }
162 _ => {
163 return Err(SqlError::Unsupported("EXPLAIN for this statement type".into()));
164 }
165 };
166
167 let rows = lines.into_iter()
168 .map(|line| vec![Value::Text(line)])
169 .collect();
170 Ok(ExecutionResult::Query(QueryResult {
171 columns: vec!["plan".into()],
172 rows,
173 }))
174}
175
176fn explain_dml(
177 schema: &SchemaManager,
178 table: &str,
179 where_clause: &Option<Expr>,
180 verb: &str,
181) -> Result<Vec<String>> {
182 let lower = table.to_ascii_lowercase();
183 let table_schema = schema.get(&lower)
184 .ok_or_else(|| SqlError::TableNotFound(table.to_string()))?;
185 let plan = planner::plan_select(table_schema, where_clause);
186 let scan_line = format_scan_line(&lower, &None, &plan, table_schema);
187 Ok(vec![format!("{verb} {}", scan_line)])
188}
189
190fn explain_select(
191 schema: &SchemaManager,
192 stmt: &SelectStmt,
193) -> Result<Vec<String>> {
194 let mut lines = Vec::new();
195
196 if stmt.from.is_empty() {
197 lines.push("CONSTANT ROW".into());
198 return Ok(lines);
199 }
200
201 let lower_from = stmt.from.to_ascii_lowercase();
202 let from_schema = schema.get(&lower_from)
203 .ok_or_else(|| SqlError::TableNotFound(stmt.from.clone()))?;
204
205 if stmt.joins.is_empty() {
206 let plan = planner::plan_select(from_schema, &stmt.where_clause);
207 lines.push(format_scan_line(&lower_from, &stmt.from_alias, &plan, from_schema));
208 } else {
209 let from_plan = planner::plan_select(from_schema, &None);
210 lines.push(format_scan_line(&lower_from, &stmt.from_alias, &from_plan, from_schema));
211
212 for join in &stmt.joins {
213 let inner_lower = join.table.name.to_ascii_lowercase();
214 let inner_schema = schema.get(&inner_lower)
215 .ok_or_else(|| SqlError::TableNotFound(join.table.name.clone()))?;
216 let inner_plan = planner::plan_select(inner_schema, &None);
217 lines.push(format_scan_line(&inner_lower, &join.table.alias, &inner_plan, inner_schema));
218 }
219
220 let join_type_str = match stmt.joins.last().map(|j| j.join_type) {
221 Some(JoinType::Left) => "LEFT JOIN",
222 Some(JoinType::Right) => "RIGHT JOIN",
223 Some(JoinType::Cross) => "CROSS JOIN",
224 _ => "NESTED LOOP",
225 };
226 lines.push(join_type_str.into());
227 }
228
229 if stmt.where_clause.is_some() && stmt.joins.is_empty() {
230 let plan = planner::plan_select(from_schema, &stmt.where_clause);
231 if matches!(plan, ScanPlan::SeqScan) {
232 lines.push("FILTER".into());
233 }
234 }
235
236 if let Some(ref w) = stmt.where_clause {
237 if !stmt.joins.is_empty() && has_subquery(w) {
238 lines.push("SUBQUERY".into());
239 }
240 }
241
242 explain_subqueries(stmt, &mut lines);
243
244 if !stmt.group_by.is_empty() {
245 lines.push("GROUP BY".into());
246 }
247
248 if stmt.distinct {
249 lines.push("DISTINCT".into());
250 }
251
252 if !stmt.order_by.is_empty() {
253 lines.push("SORT".into());
254 }
255
256 if let Some(ref offset_expr) = stmt.offset {
257 if let Ok(n) = eval_const_int(offset_expr) {
258 lines.push(format!("OFFSET {n}"));
259 } else {
260 lines.push("OFFSET".into());
261 }
262 }
263
264 if let Some(ref limit_expr) = stmt.limit {
265 if let Ok(n) = eval_const_int(limit_expr) {
266 lines.push(format!("LIMIT {n}"));
267 } else {
268 lines.push("LIMIT".into());
269 }
270 }
271
272 Ok(lines)
273}
274
275fn explain_subqueries(stmt: &SelectStmt, lines: &mut Vec<String>) {
276 let mut count = 0;
277 if let Some(ref w) = stmt.where_clause { count += count_subqueries(w); }
278 if let Some(ref h) = stmt.having { count += count_subqueries(h); }
279 for col in &stmt.columns {
280 if let SelectColumn::Expr { expr, .. } = col {
281 count += count_subqueries(expr);
282 }
283 }
284 for _ in 0..count {
285 lines.push("SUBQUERY".into());
286 }
287}
288
289fn count_subqueries(expr: &Expr) -> usize {
290 match expr {
291 Expr::InSubquery { expr: e, .. } => 1 + count_subqueries(e),
292 Expr::ScalarSubquery(_) => 1,
293 Expr::Exists { .. } => 1,
294 Expr::BinaryOp { left, right, .. } => count_subqueries(left) + count_subqueries(right),
295 Expr::UnaryOp { expr: e, .. } => count_subqueries(e),
296 Expr::IsNull(e) | Expr::IsNotNull(e) => count_subqueries(e),
297 Expr::Function { args, .. } => args.iter().map(count_subqueries).sum(),
298 Expr::Between { expr: e, low, high, .. } => {
299 count_subqueries(e) + count_subqueries(low) + count_subqueries(high)
300 }
301 Expr::Like { expr: e, pattern, .. } => count_subqueries(e) + count_subqueries(pattern),
302 Expr::Case { operand, conditions, else_result } => {
303 let mut n = 0;
304 if let Some(op) = operand { n += count_subqueries(op); }
305 for (c, r) in conditions { n += count_subqueries(c) + count_subqueries(r); }
306 if let Some(el) = else_result { n += count_subqueries(el); }
307 n
308 }
309 Expr::Coalesce(args) => args.iter().map(count_subqueries).sum(),
310 Expr::Cast { expr: e, .. } => count_subqueries(e),
311 Expr::InList { expr: e, list, .. } => {
312 count_subqueries(e) + list.iter().map(count_subqueries).sum::<usize>()
313 }
314 _ => 0,
315 }
316}
317
318fn format_scan_line(
319 table_name: &str,
320 alias: &Option<String>,
321 plan: &ScanPlan,
322 table_schema: &TableSchema,
323) -> String {
324 let alias_part = match alias {
325 Some(a) if a.to_ascii_lowercase() != table_name.to_ascii_lowercase() => {
326 format!(" AS {}", a.to_ascii_lowercase())
327 }
328 _ => String::new(),
329 };
330
331 let desc = planner::describe_plan(plan, table_schema);
332
333 if desc.is_empty() {
334 format!("SCAN TABLE {table_name}{alias_part}")
335 } else {
336 format!("SEARCH TABLE {table_name}{alias_part} {desc}")
337 }
338}
339
340fn exec_create_table(
343 db: &Database,
344 schema: &mut SchemaManager,
345 stmt: &CreateTableStmt,
346) -> Result<ExecutionResult> {
347 let lower_name = stmt.name.to_ascii_lowercase();
348
349 if schema.contains(&lower_name) {
350 if stmt.if_not_exists {
351 return Ok(ExecutionResult::Ok);
352 }
353 return Err(SqlError::TableAlreadyExists(stmt.name.clone()));
354 }
355
356 if stmt.primary_key.is_empty() {
357 return Err(SqlError::PrimaryKeyRequired);
358 }
359
360 let mut seen = std::collections::HashSet::new();
361 for col in &stmt.columns {
362 let lower = col.name.to_ascii_lowercase();
363 if !seen.insert(lower.clone()) {
364 return Err(SqlError::DuplicateColumn(col.name.clone()));
365 }
366 }
367
368 let columns: Vec<ColumnDef> = stmt.columns.iter().enumerate().map(|(i, c)| {
369 ColumnDef {
370 name: c.name.to_ascii_lowercase(),
371 data_type: c.data_type,
372 nullable: c.nullable,
373 position: i as u16,
374 }
375 }).collect();
376
377 let primary_key_columns: Vec<u16> = stmt.primary_key.iter().map(|pk_name| {
378 let lower = pk_name.to_ascii_lowercase();
379 columns.iter().position(|c| c.name == lower)
380 .map(|i| i as u16)
381 .ok_or_else(|| SqlError::ColumnNotFound(pk_name.clone()))
382 }).collect::<Result<_>>()?;
383
384 let table_schema = TableSchema {
385 name: lower_name.clone(),
386 columns,
387 primary_key_columns,
388 indices: vec![],
389 };
390
391 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
392 SchemaManager::ensure_schema_table(&mut wtx)?;
393 wtx.create_table(lower_name.as_bytes()).map_err(SqlError::Storage)?;
394 SchemaManager::save_schema(&mut wtx, &table_schema)?;
395 wtx.commit().map_err(SqlError::Storage)?;
396
397 schema.register(table_schema);
398 Ok(ExecutionResult::Ok)
399}
400
401fn exec_drop_table(
402 db: &Database,
403 schema: &mut SchemaManager,
404 stmt: &DropTableStmt,
405) -> Result<ExecutionResult> {
406 let lower_name = stmt.name.to_ascii_lowercase();
407
408 if !schema.contains(&lower_name) {
409 if stmt.if_exists {
410 return Ok(ExecutionResult::Ok);
411 }
412 return Err(SqlError::TableNotFound(stmt.name.clone()));
413 }
414
415 let table_schema = schema.get(&lower_name).unwrap();
416 let idx_tables: Vec<Vec<u8>> = table_schema.indices.iter()
417 .map(|idx| TableSchema::index_table_name(&lower_name, &idx.name))
418 .collect();
419
420 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
421 for idx_table in &idx_tables {
422 wtx.drop_table(idx_table).map_err(SqlError::Storage)?;
423 }
424 wtx.drop_table(lower_name.as_bytes()).map_err(SqlError::Storage)?;
425 SchemaManager::delete_schema(&mut wtx, &lower_name)?;
426 wtx.commit().map_err(SqlError::Storage)?;
427
428 schema.remove(&lower_name);
429 Ok(ExecutionResult::Ok)
430}
431
432fn exec_create_table_in_txn(
433 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
434 schema: &mut SchemaManager,
435 stmt: &CreateTableStmt,
436) -> Result<ExecutionResult> {
437 let lower_name = stmt.name.to_ascii_lowercase();
438
439 if schema.contains(&lower_name) {
440 if stmt.if_not_exists {
441 return Ok(ExecutionResult::Ok);
442 }
443 return Err(SqlError::TableAlreadyExists(stmt.name.clone()));
444 }
445
446 if stmt.primary_key.is_empty() {
447 return Err(SqlError::PrimaryKeyRequired);
448 }
449
450 let mut seen = std::collections::HashSet::new();
451 for col in &stmt.columns {
452 let lower = col.name.to_ascii_lowercase();
453 if !seen.insert(lower.clone()) {
454 return Err(SqlError::DuplicateColumn(col.name.clone()));
455 }
456 }
457
458 let columns: Vec<ColumnDef> = stmt.columns.iter().enumerate().map(|(i, c)| {
459 ColumnDef {
460 name: c.name.to_ascii_lowercase(),
461 data_type: c.data_type,
462 nullable: c.nullable,
463 position: i as u16,
464 }
465 }).collect();
466
467 let primary_key_columns: Vec<u16> = stmt.primary_key.iter().map(|pk_name| {
468 let lower = pk_name.to_ascii_lowercase();
469 columns.iter().position(|c| c.name == lower)
470 .map(|i| i as u16)
471 .ok_or_else(|| SqlError::ColumnNotFound(pk_name.clone()))
472 }).collect::<Result<_>>()?;
473
474 let table_schema = TableSchema {
475 name: lower_name.clone(),
476 columns,
477 primary_key_columns,
478 indices: vec![],
479 };
480
481 SchemaManager::ensure_schema_table(wtx)?;
482 wtx.create_table(lower_name.as_bytes()).map_err(SqlError::Storage)?;
483 SchemaManager::save_schema(wtx, &table_schema)?;
484
485 schema.register(table_schema);
486 Ok(ExecutionResult::Ok)
487}
488
489fn exec_drop_table_in_txn(
490 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
491 schema: &mut SchemaManager,
492 stmt: &DropTableStmt,
493) -> Result<ExecutionResult> {
494 let lower_name = stmt.name.to_ascii_lowercase();
495
496 if !schema.contains(&lower_name) {
497 if stmt.if_exists {
498 return Ok(ExecutionResult::Ok);
499 }
500 return Err(SqlError::TableNotFound(stmt.name.clone()));
501 }
502
503 let table_schema = schema.get(&lower_name).unwrap();
504 let idx_tables: Vec<Vec<u8>> = table_schema.indices.iter()
505 .map(|idx| TableSchema::index_table_name(&lower_name, &idx.name))
506 .collect();
507
508 for idx_table in &idx_tables {
509 wtx.drop_table(idx_table).map_err(SqlError::Storage)?;
510 }
511 wtx.drop_table(lower_name.as_bytes()).map_err(SqlError::Storage)?;
512 SchemaManager::delete_schema(wtx, &lower_name)?;
513
514 schema.remove(&lower_name);
515 Ok(ExecutionResult::Ok)
516}
517
518fn exec_create_index(
519 db: &Database,
520 schema: &mut SchemaManager,
521 stmt: &CreateIndexStmt,
522) -> Result<ExecutionResult> {
523 let lower_table = stmt.table_name.to_ascii_lowercase();
524 let lower_idx = stmt.index_name.to_ascii_lowercase();
525
526 let table_schema = schema.get(&lower_table)
527 .ok_or_else(|| SqlError::TableNotFound(stmt.table_name.clone()))?;
528
529 if table_schema.index_by_name(&lower_idx).is_some() {
530 if stmt.if_not_exists {
531 return Ok(ExecutionResult::Ok);
532 }
533 return Err(SqlError::IndexAlreadyExists(stmt.index_name.clone()));
534 }
535
536 let col_indices: Vec<u16> = stmt.columns.iter().map(|col_name| {
537 let lower = col_name.to_ascii_lowercase();
538 table_schema.column_index(&lower)
539 .map(|i| i as u16)
540 .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))
541 }).collect::<Result<_>>()?;
542
543 let idx_def = IndexDef {
544 name: lower_idx.clone(),
545 columns: col_indices,
546 unique: stmt.unique,
547 };
548
549 let idx_table = TableSchema::index_table_name(&lower_table, &lower_idx);
550
551 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
552 SchemaManager::ensure_schema_table(&mut wtx)?;
553 wtx.create_table(&idx_table).map_err(SqlError::Storage)?;
554
555 let pk_indices = table_schema.pk_indices();
557 let mut rows: Vec<Vec<Value>> = Vec::new();
558 {
559 let mut scan_err: Option<SqlError> = None;
560 wtx.table_for_each(lower_table.as_bytes(), |key, value| {
561 match decode_full_row(table_schema, key, value) {
562 Ok(row) => rows.push(row),
563 Err(e) => scan_err = Some(e),
564 }
565 Ok(())
566 }).map_err(SqlError::Storage)?;
567 if let Some(e) = scan_err { return Err(e); }
568 }
569
570 for row in &rows {
571 let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
572 let key = encode_index_key(&idx_def, row, &pk_values);
573 let value = encode_index_value(&idx_def, row, &pk_values);
574 let is_new = wtx.table_insert(&idx_table, &key, &value)
575 .map_err(SqlError::Storage)?;
576 if idx_def.unique && !is_new {
577 let indexed_values: Vec<Value> = idx_def.columns.iter()
578 .map(|&col_idx| row[col_idx as usize].clone())
579 .collect();
580 let any_null = indexed_values.iter().any(|v| v.is_null());
581 if !any_null {
582 return Err(SqlError::UniqueViolation(stmt.index_name.clone()));
583 }
584 }
585 }
586
587 let mut updated_schema = table_schema.clone();
588 updated_schema.indices.push(idx_def);
589 SchemaManager::save_schema(&mut wtx, &updated_schema)?;
590 wtx.commit().map_err(SqlError::Storage)?;
591
592 schema.register(updated_schema);
593 Ok(ExecutionResult::Ok)
594}
595
596fn exec_drop_index(
597 db: &Database,
598 schema: &mut SchemaManager,
599 stmt: &DropIndexStmt,
600) -> Result<ExecutionResult> {
601 let lower_idx = stmt.index_name.to_ascii_lowercase();
602
603 let (table_name, _idx_pos) = match find_index_in_schemas(schema, &lower_idx) {
604 Some(found) => found,
605 None => {
606 if stmt.if_exists {
607 return Ok(ExecutionResult::Ok);
608 }
609 return Err(SqlError::IndexNotFound(stmt.index_name.clone()));
610 }
611 };
612
613 let idx_table = TableSchema::index_table_name(&table_name, &lower_idx);
614
615 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
616 wtx.drop_table(&idx_table).map_err(SqlError::Storage)?;
617
618 let table_schema = schema.get(&table_name).unwrap();
619 let mut updated_schema = table_schema.clone();
620 updated_schema.indices.retain(|i| i.name != lower_idx);
621 SchemaManager::save_schema(&mut wtx, &updated_schema)?;
622 wtx.commit().map_err(SqlError::Storage)?;
623
624 schema.register(updated_schema);
625 Ok(ExecutionResult::Ok)
626}
627
628fn exec_create_index_in_txn(
629 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
630 schema: &mut SchemaManager,
631 stmt: &CreateIndexStmt,
632) -> Result<ExecutionResult> {
633 let lower_table = stmt.table_name.to_ascii_lowercase();
634 let lower_idx = stmt.index_name.to_ascii_lowercase();
635
636 let table_schema = schema.get(&lower_table)
637 .ok_or_else(|| SqlError::TableNotFound(stmt.table_name.clone()))?;
638
639 if table_schema.index_by_name(&lower_idx).is_some() {
640 if stmt.if_not_exists {
641 return Ok(ExecutionResult::Ok);
642 }
643 return Err(SqlError::IndexAlreadyExists(stmt.index_name.clone()));
644 }
645
646 let col_indices: Vec<u16> = stmt.columns.iter().map(|col_name| {
647 let lower = col_name.to_ascii_lowercase();
648 table_schema.column_index(&lower)
649 .map(|i| i as u16)
650 .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))
651 }).collect::<Result<_>>()?;
652
653 let idx_def = IndexDef {
654 name: lower_idx.clone(),
655 columns: col_indices,
656 unique: stmt.unique,
657 };
658
659 let idx_table = TableSchema::index_table_name(&lower_table, &lower_idx);
660
661 SchemaManager::ensure_schema_table(wtx)?;
662 wtx.create_table(&idx_table).map_err(SqlError::Storage)?;
663
664 let pk_indices = table_schema.pk_indices();
665 let mut rows: Vec<Vec<Value>> = Vec::new();
666 {
667 let mut scan_err: Option<SqlError> = None;
668 wtx.table_for_each(lower_table.as_bytes(), |key, value| {
669 match decode_full_row(table_schema, key, value) {
670 Ok(row) => rows.push(row),
671 Err(e) => scan_err = Some(e),
672 }
673 Ok(())
674 }).map_err(SqlError::Storage)?;
675 if let Some(e) = scan_err { return Err(e); }
676 }
677
678 for row in &rows {
679 let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
680 let key = encode_index_key(&idx_def, row, &pk_values);
681 let value = encode_index_value(&idx_def, row, &pk_values);
682 let is_new = wtx.table_insert(&idx_table, &key, &value)
683 .map_err(SqlError::Storage)?;
684 if idx_def.unique && !is_new {
685 let indexed_values: Vec<Value> = idx_def.columns.iter()
686 .map(|&col_idx| row[col_idx as usize].clone())
687 .collect();
688 let any_null = indexed_values.iter().any(|v| v.is_null());
689 if !any_null {
690 return Err(SqlError::UniqueViolation(stmt.index_name.clone()));
691 }
692 }
693 }
694
695 let mut updated_schema = table_schema.clone();
696 updated_schema.indices.push(idx_def);
697 SchemaManager::save_schema(wtx, &updated_schema)?;
698
699 schema.register(updated_schema);
700 Ok(ExecutionResult::Ok)
701}
702
703fn exec_drop_index_in_txn(
704 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
705 schema: &mut SchemaManager,
706 stmt: &DropIndexStmt,
707) -> Result<ExecutionResult> {
708 let lower_idx = stmt.index_name.to_ascii_lowercase();
709
710 let (table_name, _idx_pos) = match find_index_in_schemas(schema, &lower_idx) {
711 Some(found) => found,
712 None => {
713 if stmt.if_exists {
714 return Ok(ExecutionResult::Ok);
715 }
716 return Err(SqlError::IndexNotFound(stmt.index_name.clone()));
717 }
718 };
719
720 let idx_table = TableSchema::index_table_name(&table_name, &lower_idx);
721 wtx.drop_table(&idx_table).map_err(SqlError::Storage)?;
722
723 let table_schema = schema.get(&table_name).unwrap();
724 let mut updated_schema = table_schema.clone();
725 updated_schema.indices.retain(|i| i.name != lower_idx);
726 SchemaManager::save_schema(wtx, &updated_schema)?;
727
728 schema.register(updated_schema);
729 Ok(ExecutionResult::Ok)
730}
731
732fn find_index_in_schemas(schema: &SchemaManager, index_name: &str) -> Option<(String, usize)> {
733 for table_name in schema.table_names() {
734 if let Some(ts) = schema.get(table_name) {
735 if let Some(pos) = ts.indices.iter().position(|i| i.name == index_name) {
736 return Some((table_name.to_string(), pos));
737 }
738 }
739 }
740 None
741}
742
743fn extract_pk_key(
746 idx_key: &[u8],
747 idx_value: &[u8],
748 is_unique: bool,
749 num_index_cols: usize,
750 num_pk_cols: usize,
751) -> Result<Vec<u8>> {
752 if is_unique && !idx_value.is_empty() {
753 Ok(idx_value.to_vec())
754 } else {
755 let total_cols = num_index_cols + num_pk_cols;
756 let all_values = decode_composite_key(idx_key, total_cols)?;
757 let pk_values = &all_values[num_index_cols..];
758 Ok(encode_composite_key(pk_values))
759 }
760}
761
762fn check_range_conditions(
763 idx_key: &[u8],
764 num_prefix_cols: usize,
765 range_conds: &[(BinOp, Value)],
766 num_index_cols: usize,
767) -> Result<RangeCheck> {
768 if range_conds.is_empty() {
769 return Ok(RangeCheck::Match);
770 }
771
772 let num_to_decode = num_prefix_cols + 1;
773 if num_to_decode > num_index_cols {
774 return Ok(RangeCheck::Match);
775 }
776
777 let mut pos = 0;
779 for _ in 0..num_prefix_cols {
780 let (_, n) = decode_key_value(&idx_key[pos..])?;
781 pos += n;
782 }
783 let (range_val, _) = decode_key_value(&idx_key[pos..])?;
784
785 let mut exceeds_upper = false;
786 let mut below_lower = false;
787
788 for (op, val) in range_conds {
789 match op {
790 BinOp::Lt => { if range_val >= *val { exceeds_upper = true; } }
791 BinOp::LtEq => { if range_val > *val { exceeds_upper = true; } }
792 BinOp::Gt => { if range_val <= *val { below_lower = true; } }
793 BinOp::GtEq => { if range_val < *val { below_lower = true; } }
794 _ => {}
795 }
796 }
797
798 if exceeds_upper {
799 Ok(RangeCheck::ExceedsUpper)
800 } else if below_lower {
801 Ok(RangeCheck::BelowLower)
802 } else {
803 Ok(RangeCheck::Match)
804 }
805}
806
807enum RangeCheck {
808 Match,
809 BelowLower,
810 ExceedsUpper,
811}
812
813fn collect_rows_read(
815 db: &Database,
816 table_schema: &TableSchema,
817 where_clause: &Option<Expr>,
818) -> Result<Vec<Vec<Value>>> {
819 let plan = planner::plan_select(table_schema, where_clause);
820 let lower_name = &table_schema.name;
821
822 match plan {
823 ScanPlan::SeqScan => {
824 let mut rows = Vec::new();
825 let mut rtx = db.begin_read();
826 let mut scan_err: Option<SqlError> = None;
827 rtx.table_for_each(lower_name.as_bytes(), |key, value| {
828 match decode_full_row(table_schema, key, value) {
829 Ok(row) => rows.push(row),
830 Err(e) => scan_err = Some(e),
831 }
832 Ok(())
833 }).map_err(SqlError::Storage)?;
834 if let Some(e) = scan_err { return Err(e); }
835 Ok(rows)
836 }
837
838 ScanPlan::PkLookup { pk_values } => {
839 let key = encode_composite_key(&pk_values);
840 let mut rtx = db.begin_read();
841 match rtx.table_get(lower_name.as_bytes(), &key).map_err(SqlError::Storage)? {
842 Some(value) => {
843 let row = decode_full_row(table_schema, &key, &value)?;
844 Ok(vec![row])
845 }
846 None => Ok(vec![]),
847 }
848 }
849
850 ScanPlan::IndexScan {
851 idx_table, prefix, num_prefix_cols,
852 range_conds, is_unique, index_columns, ..
853 } => {
854 let num_pk_cols = table_schema.primary_key_columns.len();
855 let num_index_cols = index_columns.len();
856 let mut pk_keys: Vec<Vec<u8>> = Vec::new();
857
858 {
859 let mut rtx = db.begin_read();
860 let mut scan_err: Option<SqlError> = None;
861 rtx.table_scan_from(&idx_table, &prefix, |key, value| {
862 if !key.starts_with(&prefix) {
863 return Ok(false);
864 }
865 match check_range_conditions(key, num_prefix_cols, &range_conds, num_index_cols) {
866 Ok(RangeCheck::ExceedsUpper) => return Ok(false),
867 Ok(RangeCheck::BelowLower) => return Ok(true),
868 Ok(RangeCheck::Match) => {}
869 Err(e) => { scan_err = Some(e); return Ok(false); }
870 }
871 match extract_pk_key(key, value, is_unique, num_index_cols, num_pk_cols) {
872 Ok(pk) => pk_keys.push(pk),
873 Err(e) => { scan_err = Some(e); return Ok(false); }
874 }
875 Ok(true)
876 }).map_err(SqlError::Storage)?;
877 if let Some(e) = scan_err { return Err(e); }
878 }
879
880 let mut rows = Vec::new();
881 let mut rtx = db.begin_read();
882 for pk_key in &pk_keys {
883 if let Some(value) = rtx.table_get(lower_name.as_bytes(), pk_key).map_err(SqlError::Storage)? {
884 rows.push(decode_full_row(table_schema, pk_key, &value)?);
885 }
886 }
887 Ok(rows)
888 }
889 }
890}
891
892fn collect_rows_write(
894 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
895 table_schema: &TableSchema,
896 where_clause: &Option<Expr>,
897) -> Result<Vec<Vec<Value>>> {
898 let plan = planner::plan_select(table_schema, where_clause);
899 let lower_name = &table_schema.name;
900
901 match plan {
902 ScanPlan::SeqScan => {
903 let mut rows = Vec::new();
904 let mut scan_err: Option<SqlError> = None;
905 wtx.table_for_each(lower_name.as_bytes(), |key, value| {
906 match decode_full_row(table_schema, key, value) {
907 Ok(row) => rows.push(row),
908 Err(e) => scan_err = Some(e),
909 }
910 Ok(())
911 }).map_err(SqlError::Storage)?;
912 if let Some(e) = scan_err { return Err(e); }
913 Ok(rows)
914 }
915
916 ScanPlan::PkLookup { pk_values } => {
917 let key = encode_composite_key(&pk_values);
918 match wtx.table_get(lower_name.as_bytes(), &key).map_err(SqlError::Storage)? {
919 Some(value) => {
920 let row = decode_full_row(table_schema, &key, &value)?;
921 Ok(vec![row])
922 }
923 None => Ok(vec![]),
924 }
925 }
926
927 ScanPlan::IndexScan {
928 idx_table, prefix, num_prefix_cols,
929 range_conds, is_unique, index_columns, ..
930 } => {
931 let num_pk_cols = table_schema.primary_key_columns.len();
932 let num_index_cols = index_columns.len();
933 let mut pk_keys: Vec<Vec<u8>> = Vec::new();
934
935 {
936 let mut scan_err: Option<SqlError> = None;
937 wtx.table_scan_from(&idx_table, &prefix, |key, value| {
938 if !key.starts_with(&prefix) {
939 return Ok(false);
940 }
941 match check_range_conditions(key, num_prefix_cols, &range_conds, num_index_cols) {
942 Ok(RangeCheck::ExceedsUpper) => return Ok(false),
943 Ok(RangeCheck::BelowLower) => return Ok(true),
944 Ok(RangeCheck::Match) => {}
945 Err(e) => { scan_err = Some(e); return Ok(false); }
946 }
947 match extract_pk_key(key, value, is_unique, num_index_cols, num_pk_cols) {
948 Ok(pk) => pk_keys.push(pk),
949 Err(e) => { scan_err = Some(e); return Ok(false); }
950 }
951 Ok(true)
952 }).map_err(SqlError::Storage)?;
953 if let Some(e) = scan_err { return Err(e); }
954 }
955
956 let mut rows = Vec::new();
957 for pk_key in &pk_keys {
958 if let Some(value) = wtx.table_get(lower_name.as_bytes(), pk_key).map_err(SqlError::Storage)? {
959 rows.push(decode_full_row(table_schema, pk_key, &value)?);
960 }
961 }
962 Ok(rows)
963 }
964 }
965}
966
967fn collect_keyed_rows_read(
970 db: &Database,
971 table_schema: &TableSchema,
972 where_clause: &Option<Expr>,
973) -> Result<Vec<(Vec<u8>, Vec<Value>)>> {
974 let plan = planner::plan_select(table_schema, where_clause);
975 let lower_name = &table_schema.name;
976
977 match plan {
978 ScanPlan::SeqScan => {
979 let mut rows = Vec::new();
980 let mut rtx = db.begin_read();
981 let mut scan_err: Option<SqlError> = None;
982 rtx.table_for_each(lower_name.as_bytes(), |key, value| {
983 match decode_full_row(table_schema, key, value) {
984 Ok(row) => rows.push((key.to_vec(), row)),
985 Err(e) => scan_err = Some(e),
986 }
987 Ok(())
988 }).map_err(SqlError::Storage)?;
989 if let Some(e) = scan_err { return Err(e); }
990 Ok(rows)
991 }
992
993 ScanPlan::PkLookup { pk_values } => {
994 let key = encode_composite_key(&pk_values);
995 let mut rtx = db.begin_read();
996 match rtx.table_get(lower_name.as_bytes(), &key).map_err(SqlError::Storage)? {
997 Some(value) => {
998 let row = decode_full_row(table_schema, &key, &value)?;
999 Ok(vec![(key, row)])
1000 }
1001 None => Ok(vec![]),
1002 }
1003 }
1004
1005 ScanPlan::IndexScan {
1006 idx_table, prefix, num_prefix_cols,
1007 range_conds, is_unique, index_columns, ..
1008 } => {
1009 let num_pk_cols = table_schema.primary_key_columns.len();
1010 let num_index_cols = index_columns.len();
1011 let mut pk_keys: Vec<Vec<u8>> = Vec::new();
1012
1013 {
1014 let mut rtx = db.begin_read();
1015 let mut scan_err: Option<SqlError> = None;
1016 rtx.table_scan_from(&idx_table, &prefix, |key, value| {
1017 if !key.starts_with(&prefix) {
1018 return Ok(false);
1019 }
1020 match check_range_conditions(key, num_prefix_cols, &range_conds, num_index_cols) {
1021 Ok(RangeCheck::ExceedsUpper) => return Ok(false),
1022 Ok(RangeCheck::BelowLower) => return Ok(true),
1023 Ok(RangeCheck::Match) => {}
1024 Err(e) => { scan_err = Some(e); return Ok(false); }
1025 }
1026 match extract_pk_key(key, value, is_unique, num_index_cols, num_pk_cols) {
1027 Ok(pk) => pk_keys.push(pk),
1028 Err(e) => { scan_err = Some(e); return Ok(false); }
1029 }
1030 Ok(true)
1031 }).map_err(SqlError::Storage)?;
1032 if let Some(e) = scan_err { return Err(e); }
1033 }
1034
1035 let mut rows = Vec::new();
1036 let mut rtx = db.begin_read();
1037 for pk_key in &pk_keys {
1038 if let Some(value) = rtx.table_get(lower_name.as_bytes(), pk_key).map_err(SqlError::Storage)? {
1039 rows.push((pk_key.clone(), decode_full_row(table_schema, pk_key, &value)?));
1040 }
1041 }
1042 Ok(rows)
1043 }
1044 }
1045}
1046
1047fn collect_keyed_rows_write(
1049 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1050 table_schema: &TableSchema,
1051 where_clause: &Option<Expr>,
1052) -> Result<Vec<(Vec<u8>, Vec<Value>)>> {
1053 let plan = planner::plan_select(table_schema, where_clause);
1054 let lower_name = &table_schema.name;
1055
1056 match plan {
1057 ScanPlan::SeqScan => {
1058 let mut rows = Vec::new();
1059 let mut scan_err: Option<SqlError> = None;
1060 wtx.table_for_each(lower_name.as_bytes(), |key, value| {
1061 match decode_full_row(table_schema, key, value) {
1062 Ok(row) => rows.push((key.to_vec(), row)),
1063 Err(e) => scan_err = Some(e),
1064 }
1065 Ok(())
1066 }).map_err(SqlError::Storage)?;
1067 if let Some(e) = scan_err { return Err(e); }
1068 Ok(rows)
1069 }
1070
1071 ScanPlan::PkLookup { pk_values } => {
1072 let key = encode_composite_key(&pk_values);
1073 match wtx.table_get(lower_name.as_bytes(), &key).map_err(SqlError::Storage)? {
1074 Some(value) => {
1075 let row = decode_full_row(table_schema, &key, &value)?;
1076 Ok(vec![(key, row)])
1077 }
1078 None => Ok(vec![]),
1079 }
1080 }
1081
1082 ScanPlan::IndexScan {
1083 idx_table, prefix, num_prefix_cols,
1084 range_conds, is_unique, index_columns, ..
1085 } => {
1086 let num_pk_cols = table_schema.primary_key_columns.len();
1087 let num_index_cols = index_columns.len();
1088 let mut pk_keys: Vec<Vec<u8>> = Vec::new();
1089
1090 {
1091 let mut scan_err: Option<SqlError> = None;
1092 wtx.table_scan_from(&idx_table, &prefix, |key, value| {
1093 if !key.starts_with(&prefix) {
1094 return Ok(false);
1095 }
1096 match check_range_conditions(key, num_prefix_cols, &range_conds, num_index_cols) {
1097 Ok(RangeCheck::ExceedsUpper) => return Ok(false),
1098 Ok(RangeCheck::BelowLower) => return Ok(true),
1099 Ok(RangeCheck::Match) => {}
1100 Err(e) => { scan_err = Some(e); return Ok(false); }
1101 }
1102 match extract_pk_key(key, value, is_unique, num_index_cols, num_pk_cols) {
1103 Ok(pk) => pk_keys.push(pk),
1104 Err(e) => { scan_err = Some(e); return Ok(false); }
1105 }
1106 Ok(true)
1107 }).map_err(SqlError::Storage)?;
1108 if let Some(e) = scan_err { return Err(e); }
1109 }
1110
1111 let mut rows = Vec::new();
1112 for pk_key in &pk_keys {
1113 if let Some(value) = wtx.table_get(lower_name.as_bytes(), pk_key).map_err(SqlError::Storage)? {
1114 rows.push((pk_key.clone(), decode_full_row(table_schema, pk_key, &value)?));
1115 }
1116 }
1117 Ok(rows)
1118 }
1119 }
1120}
1121
1122fn exec_insert(
1125 db: &Database,
1126 schema: &SchemaManager,
1127 stmt: &InsertStmt,
1128) -> Result<ExecutionResult> {
1129 let materialized;
1130 let stmt = if insert_has_subquery(stmt) {
1131 materialized = materialize_insert(stmt, &mut |sub| exec_subquery_read(db, schema, sub))?;
1132 &materialized
1133 } else {
1134 stmt
1135 };
1136
1137 let lower_name = stmt.table.to_ascii_lowercase();
1138 let table_schema = schema.get(&lower_name)
1139 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1140
1141 let insert_columns = if stmt.columns.is_empty() {
1142 table_schema.columns.iter().map(|c| c.name.clone()).collect::<Vec<_>>()
1143 } else {
1144 stmt.columns.iter().map(|c| c.to_ascii_lowercase()).collect()
1145 };
1146
1147 let col_indices: Vec<usize> = insert_columns.iter().map(|name| {
1148 table_schema.column_index(name)
1149 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
1150 }).collect::<Result<_>>()?;
1151
1152 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
1153 let mut count: u64 = 0;
1154
1155 for value_row in &stmt.values {
1156 if value_row.len() != insert_columns.len() {
1157 return Err(SqlError::InvalidValue(format!(
1158 "expected {} values, got {}",
1159 insert_columns.len(),
1160 value_row.len()
1161 )));
1162 }
1163
1164 let mut row = vec![Value::Null; table_schema.columns.len()];
1165 for (i, expr) in value_row.iter().enumerate() {
1166 let val = eval_const_expr(expr)?;
1167 let col_idx = col_indices[i];
1168 let col = &table_schema.columns[col_idx];
1169
1170 let coerced = if val.is_null() {
1171 Value::Null
1172 } else {
1173 val.coerce_to(col.data_type).ok_or_else(|| SqlError::TypeMismatch {
1174 expected: col.data_type.to_string(),
1175 got: val.data_type().to_string(),
1176 })?
1177 };
1178
1179 row[col_idx] = coerced;
1180 }
1181
1182 for col in &table_schema.columns {
1183 if !col.nullable && row[col.position as usize].is_null() {
1184 return Err(SqlError::NotNullViolation(col.name.clone()));
1185 }
1186 }
1187
1188 let pk_values: Vec<Value> = table_schema.pk_indices()
1189 .iter()
1190 .map(|&i| row[i].clone())
1191 .collect();
1192 let key = encode_composite_key(&pk_values);
1193
1194 let non_pk = table_schema.non_pk_indices();
1195 let value_values: Vec<Value> = non_pk.iter().map(|&i| row[i].clone()).collect();
1196 let value = encode_row(&value_values);
1197
1198 if key.len() > citadel_core::MAX_KEY_SIZE {
1199 return Err(SqlError::KeyTooLarge { size: key.len(), max: citadel_core::MAX_KEY_SIZE });
1200 }
1201 if value.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
1202 return Err(SqlError::RowTooLarge { size: value.len(), max: citadel_core::MAX_INLINE_VALUE_SIZE });
1203 }
1204
1205 let is_new = wtx.table_insert(lower_name.as_bytes(), &key, &value)
1206 .map_err(SqlError::Storage)?;
1207 if !is_new {
1208 return Err(SqlError::DuplicateKey);
1209 }
1210
1211 insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
1212 count += 1;
1213 }
1214
1215 wtx.commit().map_err(SqlError::Storage)?;
1216 Ok(ExecutionResult::RowsAffected(count))
1217}
1218
1219fn has_subquery(expr: &Expr) -> bool {
1220 match expr {
1221 Expr::InSubquery { .. } | Expr::Exists { .. } | Expr::ScalarSubquery(_) => true,
1222 Expr::BinaryOp { left, right, .. } => has_subquery(left) || has_subquery(right),
1223 Expr::UnaryOp { expr, .. } => has_subquery(expr),
1224 Expr::IsNull(e) | Expr::IsNotNull(e) => has_subquery(e),
1225 Expr::InList { expr, list, .. } => {
1226 has_subquery(expr) || list.iter().any(has_subquery)
1227 }
1228 Expr::InSet { expr, .. } => has_subquery(expr),
1229 Expr::Between { expr, low, high, .. } => {
1230 has_subquery(expr) || has_subquery(low) || has_subquery(high)
1231 }
1232 Expr::Like { expr, pattern, escape, .. } => {
1233 has_subquery(expr) || has_subquery(pattern) || escape.as_ref().map_or(false, |e| has_subquery(e))
1234 }
1235 Expr::Case { operand, conditions, else_result } => {
1236 operand.as_ref().map_or(false, |e| has_subquery(e))
1237 || conditions.iter().any(|(c, r)| has_subquery(c) || has_subquery(r))
1238 || else_result.as_ref().map_or(false, |e| has_subquery(e))
1239 }
1240 Expr::Coalesce(args) => args.iter().any(has_subquery),
1241 Expr::Cast { expr, .. } => has_subquery(expr),
1242 Expr::Function { args, .. } => args.iter().any(has_subquery),
1243 _ => false,
1244 }
1245}
1246
1247fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
1248 if let Some(ref w) = stmt.where_clause { if has_subquery(w) { return true; } }
1249 if let Some(ref h) = stmt.having { if has_subquery(h) { return true; } }
1250 for col in &stmt.columns {
1251 if let SelectColumn::Expr { expr, .. } = col {
1252 if has_subquery(expr) { return true; }
1253 }
1254 }
1255 for ob in &stmt.order_by {
1256 if has_subquery(&ob.expr) { return true; }
1257 }
1258 for join in &stmt.joins {
1259 if let Some(ref on_expr) = join.on_clause {
1260 if has_subquery(on_expr) { return true; }
1261 }
1262 }
1263 false
1264}
1265
1266fn materialize_expr(
1267 expr: &Expr,
1268 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1269) -> Result<Expr> {
1270 match expr {
1271 Expr::InSubquery { expr: e, subquery, negated } => {
1272 let inner = materialize_expr(e, exec_sub)?;
1273 let qr = exec_sub(subquery)?;
1274 if !qr.columns.is_empty() && qr.columns.len() != 1 {
1275 return Err(SqlError::SubqueryMultipleColumns);
1276 }
1277 let mut values = std::collections::HashSet::new();
1278 let mut has_null = false;
1279 for row in &qr.rows {
1280 if row[0].is_null() {
1281 has_null = true;
1282 } else {
1283 values.insert(row[0].clone());
1284 }
1285 }
1286 Ok(Expr::InSet {
1287 expr: Box::new(inner),
1288 values,
1289 has_null,
1290 negated: *negated,
1291 })
1292 }
1293 Expr::ScalarSubquery(subquery) => {
1294 let qr = exec_sub(subquery)?;
1295 if qr.rows.len() > 1 {
1296 return Err(SqlError::SubqueryMultipleRows);
1297 }
1298 let val = if qr.rows.is_empty() {
1299 Value::Null
1300 } else {
1301 qr.rows[0][0].clone()
1302 };
1303 Ok(Expr::Literal(val))
1304 }
1305 Expr::Exists { subquery, negated } => {
1306 let qr = exec_sub(subquery)?;
1307 let exists = !qr.rows.is_empty();
1308 let result = if *negated { !exists } else { exists };
1309 Ok(Expr::Literal(Value::Boolean(result)))
1310 }
1311 Expr::InList { expr: e, list, negated } => {
1312 let inner = materialize_expr(e, exec_sub)?;
1313 let items = list.iter()
1314 .map(|item| materialize_expr(item, exec_sub))
1315 .collect::<Result<Vec<_>>>()?;
1316 Ok(Expr::InList { expr: Box::new(inner), list: items, negated: *negated })
1317 }
1318 Expr::BinaryOp { left, op, right } => {
1319 Ok(Expr::BinaryOp {
1320 left: Box::new(materialize_expr(left, exec_sub)?),
1321 op: *op,
1322 right: Box::new(materialize_expr(right, exec_sub)?),
1323 })
1324 }
1325 Expr::UnaryOp { op, expr: e } => {
1326 Ok(Expr::UnaryOp {
1327 op: *op,
1328 expr: Box::new(materialize_expr(e, exec_sub)?),
1329 })
1330 }
1331 Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
1332 Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
1333 Expr::InSet { expr: e, values, has_null, negated } => {
1334 Ok(Expr::InSet {
1335 expr: Box::new(materialize_expr(e, exec_sub)?),
1336 values: values.clone(),
1337 has_null: *has_null,
1338 negated: *negated,
1339 })
1340 }
1341 Expr::Between { expr: e, low, high, negated } => {
1342 Ok(Expr::Between {
1343 expr: Box::new(materialize_expr(e, exec_sub)?),
1344 low: Box::new(materialize_expr(low, exec_sub)?),
1345 high: Box::new(materialize_expr(high, exec_sub)?),
1346 negated: *negated,
1347 })
1348 }
1349 Expr::Like { expr: e, pattern, escape, negated } => {
1350 let esc = escape.as_ref()
1351 .map(|es| materialize_expr(es, exec_sub).map(Box::new))
1352 .transpose()?;
1353 Ok(Expr::Like {
1354 expr: Box::new(materialize_expr(e, exec_sub)?),
1355 pattern: Box::new(materialize_expr(pattern, exec_sub)?),
1356 escape: esc,
1357 negated: *negated,
1358 })
1359 }
1360 Expr::Case { operand, conditions, else_result } => {
1361 let op = operand.as_ref()
1362 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
1363 .transpose()?;
1364 let conds = conditions.iter()
1365 .map(|(c, r)| Ok((materialize_expr(c, exec_sub)?, materialize_expr(r, exec_sub)?)))
1366 .collect::<Result<Vec<_>>>()?;
1367 let else_r = else_result.as_ref()
1368 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
1369 .transpose()?;
1370 Ok(Expr::Case { operand: op, conditions: conds, else_result: else_r })
1371 }
1372 Expr::Coalesce(args) => {
1373 let materialized = args.iter()
1374 .map(|a| materialize_expr(a, exec_sub))
1375 .collect::<Result<Vec<_>>>()?;
1376 Ok(Expr::Coalesce(materialized))
1377 }
1378 Expr::Cast { expr: e, data_type } => {
1379 Ok(Expr::Cast {
1380 expr: Box::new(materialize_expr(e, exec_sub)?),
1381 data_type: *data_type,
1382 })
1383 }
1384 Expr::Function { name, args } => {
1385 let materialized = args.iter()
1386 .map(|a| materialize_expr(a, exec_sub))
1387 .collect::<Result<Vec<_>>>()?;
1388 Ok(Expr::Function { name: name.clone(), args: materialized })
1389 }
1390 other => Ok(other.clone()),
1391 }
1392}
1393
1394fn materialize_stmt(
1395 stmt: &SelectStmt,
1396 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1397) -> Result<SelectStmt> {
1398 let where_clause = stmt.where_clause.as_ref()
1399 .map(|e| materialize_expr(e, exec_sub))
1400 .transpose()?;
1401 let having = stmt.having.as_ref()
1402 .map(|e| materialize_expr(e, exec_sub))
1403 .transpose()?;
1404 let columns = stmt.columns.iter().map(|c| match c {
1405 SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
1406 SelectColumn::Expr { expr, alias } => {
1407 Ok(SelectColumn::Expr {
1408 expr: materialize_expr(expr, exec_sub)?,
1409 alias: alias.clone(),
1410 })
1411 }
1412 }).collect::<Result<Vec<_>>>()?;
1413 let order_by = stmt.order_by.iter().map(|ob| {
1414 Ok(OrderByItem {
1415 expr: materialize_expr(&ob.expr, exec_sub)?,
1416 descending: ob.descending,
1417 nulls_first: ob.nulls_first,
1418 })
1419 }).collect::<Result<Vec<_>>>()?;
1420 let joins = stmt.joins.iter().map(|j| {
1421 let on_clause = j.on_clause.as_ref()
1422 .map(|e| materialize_expr(e, exec_sub))
1423 .transpose()?;
1424 Ok(JoinClause {
1425 join_type: j.join_type,
1426 table: j.table.clone(),
1427 on_clause,
1428 })
1429 }).collect::<Result<Vec<_>>>()?;
1430 let group_by = stmt.group_by.iter()
1431 .map(|e| materialize_expr(e, exec_sub))
1432 .collect::<Result<Vec<_>>>()?;
1433 Ok(SelectStmt {
1434 columns,
1435 from: stmt.from.clone(),
1436 from_alias: stmt.from_alias.clone(),
1437 joins,
1438 distinct: stmt.distinct,
1439 where_clause,
1440 order_by,
1441 limit: stmt.limit.clone(),
1442 offset: stmt.offset.clone(),
1443 group_by,
1444 having,
1445 })
1446}
1447
1448fn exec_subquery_read(
1449 db: &Database,
1450 schema: &SchemaManager,
1451 stmt: &SelectStmt,
1452) -> Result<QueryResult> {
1453 match exec_select(db, schema, stmt)? {
1454 ExecutionResult::Query(qr) => Ok(qr),
1455 _ => Ok(QueryResult { columns: vec![], rows: vec![] }),
1456 }
1457}
1458
1459fn exec_subquery_write(
1460 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1461 schema: &SchemaManager,
1462 stmt: &SelectStmt,
1463) -> Result<QueryResult> {
1464 match exec_select_in_txn(wtx, schema, stmt)? {
1465 ExecutionResult::Query(qr) => Ok(qr),
1466 _ => Ok(QueryResult { columns: vec![], rows: vec![] }),
1467 }
1468}
1469
1470fn update_has_subquery(stmt: &UpdateStmt) -> bool {
1471 stmt.where_clause.as_ref().map_or(false, has_subquery)
1472 || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
1473}
1474
1475fn materialize_update(
1476 stmt: &UpdateStmt,
1477 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1478) -> Result<UpdateStmt> {
1479 let where_clause = stmt.where_clause.as_ref()
1480 .map(|e| materialize_expr(e, exec_sub))
1481 .transpose()?;
1482 let assignments = stmt.assignments.iter()
1483 .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
1484 .collect::<Result<Vec<_>>>()?;
1485 Ok(UpdateStmt {
1486 table: stmt.table.clone(),
1487 assignments,
1488 where_clause,
1489 })
1490}
1491
1492fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
1493 stmt.where_clause.as_ref().map_or(false, has_subquery)
1494}
1495
1496fn materialize_delete(
1497 stmt: &DeleteStmt,
1498 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1499) -> Result<DeleteStmt> {
1500 let where_clause = stmt.where_clause.as_ref()
1501 .map(|e| materialize_expr(e, exec_sub))
1502 .transpose()?;
1503 Ok(DeleteStmt {
1504 table: stmt.table.clone(),
1505 where_clause,
1506 })
1507}
1508
1509fn insert_has_subquery(stmt: &InsertStmt) -> bool {
1510 stmt.values.iter().any(|row| row.iter().any(has_subquery))
1511}
1512
1513fn materialize_insert(
1514 stmt: &InsertStmt,
1515 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1516) -> Result<InsertStmt> {
1517 let values = stmt.values.iter()
1518 .map(|row| row.iter().map(|e| materialize_expr(e, exec_sub)).collect::<Result<Vec<_>>>())
1519 .collect::<Result<Vec<_>>>()?;
1520 Ok(InsertStmt {
1521 table: stmt.table.clone(),
1522 columns: stmt.columns.clone(),
1523 values,
1524 })
1525}
1526
1527fn exec_select(
1528 db: &Database,
1529 schema: &SchemaManager,
1530 stmt: &SelectStmt,
1531) -> Result<ExecutionResult> {
1532 let materialized;
1533 let stmt = if stmt_has_subquery(stmt) {
1534 materialized = materialize_stmt(stmt, &mut |sub| exec_subquery_read(db, schema, sub))?;
1535 &materialized
1536 } else {
1537 stmt
1538 };
1539
1540 if stmt.from.is_empty() {
1541 return exec_select_no_from(stmt);
1542 }
1543
1544 let lower_name = stmt.from.to_ascii_lowercase();
1545 let table_schema = schema.get(&lower_name)
1546 .ok_or_else(|| SqlError::TableNotFound(stmt.from.clone()))?;
1547
1548 if !stmt.joins.is_empty() {
1549 return exec_select_join(db, schema, stmt);
1550 }
1551
1552 let rows = collect_rows_read(db, table_schema, &stmt.where_clause)?;
1553 process_select(&table_schema.columns, rows, stmt)
1554}
1555
1556fn exec_select_no_from(stmt: &SelectStmt) -> Result<ExecutionResult> {
1557 let empty_cols: Vec<ColumnDef> = vec![];
1558 let empty_row: Vec<Value> = vec![];
1559 let (col_names, projected) = project_rows(&empty_cols, &stmt.columns, &[empty_row])?;
1560 Ok(ExecutionResult::Query(QueryResult {
1561 columns: col_names,
1562 rows: projected,
1563 }))
1564}
1565
1566fn process_select(
1568 columns: &[ColumnDef],
1569 mut rows: Vec<Vec<Value>>,
1570 stmt: &SelectStmt,
1571) -> Result<ExecutionResult> {
1572 if let Some(ref where_expr) = stmt.where_clause {
1573 rows.retain(|row| {
1574 match eval_expr(where_expr, columns, row) {
1575 Ok(val) => is_truthy(&val),
1576 Err(_) => false,
1577 }
1578 });
1579 }
1580
1581 let has_aggregates = stmt.columns.iter().any(|c| match c {
1582 SelectColumn::Expr { expr, .. } => is_aggregate_expr(expr),
1583 _ => false,
1584 });
1585
1586 if has_aggregates || !stmt.group_by.is_empty() {
1587 return exec_aggregate(columns, &rows, stmt);
1588 }
1589
1590 if stmt.distinct {
1591 let (col_names, mut projected) = project_rows(columns, &stmt.columns, &rows)?;
1592
1593 let mut seen = std::collections::HashSet::new();
1594 projected.retain(|row| seen.insert(row.clone()));
1595
1596 if !stmt.order_by.is_empty() {
1597 let output_cols = build_output_columns(&stmt.columns, columns);
1598 sort_rows(&mut projected, &stmt.order_by, &output_cols)?;
1599 }
1600
1601 if let Some(ref offset_expr) = stmt.offset {
1602 let offset = eval_const_int(offset_expr)? as usize;
1603 if offset < projected.len() {
1604 projected = projected.split_off(offset);
1605 } else {
1606 projected.clear();
1607 }
1608 }
1609
1610 if let Some(ref limit_expr) = stmt.limit {
1611 let limit = eval_const_int(limit_expr)? as usize;
1612 projected.truncate(limit);
1613 }
1614
1615 return Ok(ExecutionResult::Query(QueryResult {
1616 columns: col_names,
1617 rows: projected,
1618 }));
1619 }
1620
1621 if !stmt.order_by.is_empty() {
1622 sort_rows(&mut rows, &stmt.order_by, columns)?;
1623 }
1624
1625 if let Some(ref offset_expr) = stmt.offset {
1626 let offset = eval_const_int(offset_expr)? as usize;
1627 if offset < rows.len() {
1628 rows = rows.split_off(offset);
1629 } else {
1630 rows.clear();
1631 }
1632 }
1633
1634 if let Some(ref limit_expr) = stmt.limit {
1635 let limit = eval_const_int(limit_expr)? as usize;
1636 rows.truncate(limit);
1637 }
1638
1639 let (col_names, projected) = project_rows(columns, &stmt.columns, &rows)?;
1640
1641 Ok(ExecutionResult::Query(QueryResult {
1642 columns: col_names,
1643 rows: projected,
1644 }))
1645}
1646
1647
1648fn resolve_table_name<'a>(
1649 schema: &'a SchemaManager,
1650 name: &str,
1651) -> Result<&'a TableSchema> {
1652 let lower = name.to_ascii_lowercase();
1653 schema.get(&lower)
1654 .ok_or_else(|| SqlError::TableNotFound(name.to_string()))
1655}
1656
1657fn build_joined_columns(
1658 tables: &[(String, &TableSchema)],
1659) -> Vec<ColumnDef> {
1660 let mut result = Vec::new();
1661 let mut pos: u16 = 0;
1662
1663 for (alias, schema) in tables {
1664 for col in &schema.columns {
1665 result.push(ColumnDef {
1666 name: format!("{}.{}", alias.to_ascii_lowercase(), col.name),
1667 data_type: col.data_type,
1668 nullable: col.nullable,
1669 position: pos,
1670 });
1671 pos += 1;
1672 }
1673 }
1674
1675 result
1676}
1677
1678fn table_alias_or_name(name: &str, alias: &Option<String>) -> String {
1679 alias.as_ref().unwrap_or(&name.to_string()).to_ascii_lowercase()
1680}
1681
1682fn collect_all_rows_read(
1683 db: &Database,
1684 table_schema: &TableSchema,
1685) -> Result<Vec<Vec<Value>>> {
1686 collect_rows_read(db, table_schema, &None)
1687}
1688
1689fn collect_all_rows_write(
1690 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1691 table_schema: &TableSchema,
1692) -> Result<Vec<Vec<Value>>> {
1693 collect_rows_write(wtx, table_schema, &None)
1694}
1695
1696fn exec_select_join(
1697 db: &Database,
1698 schema: &SchemaManager,
1699 stmt: &SelectStmt,
1700) -> Result<ExecutionResult> {
1701 let from_schema = resolve_table_name(schema, &stmt.from)?;
1702 let from_alias = table_alias_or_name(&stmt.from, &stmt.from_alias);
1703 let mut outer_rows = collect_all_rows_read(db, from_schema)?;
1704
1705 let mut tables: Vec<(String, &TableSchema)> = vec![(from_alias.clone(), from_schema)];
1706
1707 for join in &stmt.joins {
1708 let inner_schema = resolve_table_name(schema, &join.table.name)?;
1709 let inner_alias = table_alias_or_name(&join.table.name, &join.table.alias);
1710 let inner_rows = collect_all_rows_read(db, inner_schema)?;
1711
1712 let mut preview_tables = tables.clone();
1713 preview_tables.push((inner_alias.clone(), inner_schema));
1714 let combined_cols = build_joined_columns(&preview_tables);
1715
1716 let mut new_rows = Vec::new();
1717
1718 match join.join_type {
1719 JoinType::Inner | JoinType::Cross => {
1720 for outer in &outer_rows {
1721 for inner in &inner_rows {
1722 let combined: Vec<Value> = outer.iter()
1723 .chain(inner.iter())
1724 .cloned()
1725 .collect();
1726 if let Some(ref on_expr) = join.on_clause {
1727 match eval_expr(on_expr, &combined_cols, &combined) {
1728 Ok(val) if is_truthy(&val) => new_rows.push(combined),
1729 _ => {}
1730 }
1731 } else {
1732 new_rows.push(combined);
1733 }
1734 }
1735 }
1736 }
1737 JoinType::Left => {
1738 let inner_col_count = inner_schema.columns.len();
1739 for outer in &outer_rows {
1740 let mut matched = false;
1741 for inner in &inner_rows {
1742 let combined: Vec<Value> = outer.iter()
1743 .chain(inner.iter())
1744 .cloned()
1745 .collect();
1746 if let Some(ref on_expr) = join.on_clause {
1747 match eval_expr(on_expr, &combined_cols, &combined) {
1748 Ok(val) if is_truthy(&val) => {
1749 new_rows.push(combined);
1750 matched = true;
1751 }
1752 _ => {}
1753 }
1754 } else {
1755 new_rows.push(combined);
1756 matched = true;
1757 }
1758 }
1759 if !matched {
1760 let mut padded = outer.clone();
1761 padded.extend(std::iter::repeat(Value::Null).take(inner_col_count));
1762 new_rows.push(padded);
1763 }
1764 }
1765 }
1766 JoinType::Right => {
1767 let outer_col_count = if outer_rows.is_empty() {
1768 tables.iter().map(|(_, s)| s.columns.len()).sum()
1769 } else {
1770 outer_rows[0].len()
1771 };
1772 let mut inner_matched = vec![false; inner_rows.len()];
1773 for outer in &outer_rows {
1774 for (j, inner) in inner_rows.iter().enumerate() {
1775 let combined: Vec<Value> = outer.iter()
1776 .chain(inner.iter())
1777 .cloned()
1778 .collect();
1779 if let Some(ref on_expr) = join.on_clause {
1780 match eval_expr(on_expr, &combined_cols, &combined) {
1781 Ok(val) if is_truthy(&val) => {
1782 new_rows.push(combined);
1783 inner_matched[j] = true;
1784 }
1785 _ => {}
1786 }
1787 } else {
1788 new_rows.push(combined);
1789 inner_matched[j] = true;
1790 }
1791 }
1792 }
1793 for (j, inner) in inner_rows.iter().enumerate() {
1794 if !inner_matched[j] {
1795 let mut padded: Vec<Value> = std::iter::repeat(Value::Null)
1796 .take(outer_col_count)
1797 .collect();
1798 padded.extend(inner.iter().cloned());
1799 new_rows.push(padded);
1800 }
1801 }
1802 }
1803 }
1804
1805 tables.push((inner_alias, inner_schema));
1806 outer_rows = new_rows;
1807 }
1808
1809 let joined_cols = build_joined_columns(&tables);
1810 process_select(&joined_cols, outer_rows, stmt)
1811}
1812
1813fn exec_select_join_in_txn(
1814 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1815 schema: &SchemaManager,
1816 stmt: &SelectStmt,
1817) -> Result<ExecutionResult> {
1818 let from_schema = resolve_table_name(schema, &stmt.from)?;
1819 let from_alias = table_alias_or_name(&stmt.from, &stmt.from_alias);
1820 let mut outer_rows = collect_all_rows_write(wtx, from_schema)?;
1821
1822 let mut tables: Vec<(String, &TableSchema)> = vec![(from_alias.clone(), from_schema)];
1823
1824 for join in &stmt.joins {
1825 let inner_schema = resolve_table_name(schema, &join.table.name)?;
1826 let inner_alias = table_alias_or_name(&join.table.name, &join.table.alias);
1827 let inner_rows = collect_all_rows_write(wtx, inner_schema)?;
1828
1829 let mut preview_tables = tables.clone();
1830 preview_tables.push((inner_alias.clone(), inner_schema));
1831 let combined_cols = build_joined_columns(&preview_tables);
1832
1833 let mut new_rows = Vec::new();
1834
1835 match join.join_type {
1836 JoinType::Inner | JoinType::Cross => {
1837 for outer in &outer_rows {
1838 for inner in &inner_rows {
1839 let combined: Vec<Value> = outer.iter()
1840 .chain(inner.iter())
1841 .cloned()
1842 .collect();
1843 if let Some(ref on_expr) = join.on_clause {
1844 match eval_expr(on_expr, &combined_cols, &combined) {
1845 Ok(val) if is_truthy(&val) => new_rows.push(combined),
1846 _ => {}
1847 }
1848 } else {
1849 new_rows.push(combined);
1850 }
1851 }
1852 }
1853 }
1854 JoinType::Left => {
1855 let inner_col_count = inner_schema.columns.len();
1856 for outer in &outer_rows {
1857 let mut matched = false;
1858 for inner in &inner_rows {
1859 let combined: Vec<Value> = outer.iter()
1860 .chain(inner.iter())
1861 .cloned()
1862 .collect();
1863 if let Some(ref on_expr) = join.on_clause {
1864 match eval_expr(on_expr, &combined_cols, &combined) {
1865 Ok(val) if is_truthy(&val) => {
1866 new_rows.push(combined);
1867 matched = true;
1868 }
1869 _ => {}
1870 }
1871 } else {
1872 new_rows.push(combined);
1873 matched = true;
1874 }
1875 }
1876 if !matched {
1877 let mut padded = outer.clone();
1878 padded.extend(std::iter::repeat(Value::Null).take(inner_col_count));
1879 new_rows.push(padded);
1880 }
1881 }
1882 }
1883 JoinType::Right => {
1884 let outer_col_count = if outer_rows.is_empty() {
1885 tables.iter().map(|(_, s)| s.columns.len()).sum()
1886 } else {
1887 outer_rows[0].len()
1888 };
1889 let mut inner_matched = vec![false; inner_rows.len()];
1890 for outer in &outer_rows {
1891 for (j, inner) in inner_rows.iter().enumerate() {
1892 let combined: Vec<Value> = outer.iter()
1893 .chain(inner.iter())
1894 .cloned()
1895 .collect();
1896 if let Some(ref on_expr) = join.on_clause {
1897 match eval_expr(on_expr, &combined_cols, &combined) {
1898 Ok(val) if is_truthy(&val) => {
1899 new_rows.push(combined);
1900 inner_matched[j] = true;
1901 }
1902 _ => {}
1903 }
1904 } else {
1905 new_rows.push(combined);
1906 inner_matched[j] = true;
1907 }
1908 }
1909 }
1910 for (j, inner) in inner_rows.iter().enumerate() {
1911 if !inner_matched[j] {
1912 let mut padded: Vec<Value> = std::iter::repeat(Value::Null)
1913 .take(outer_col_count)
1914 .collect();
1915 padded.extend(inner.iter().cloned());
1916 new_rows.push(padded);
1917 }
1918 }
1919 }
1920 }
1921
1922 tables.push((inner_alias, inner_schema));
1923 outer_rows = new_rows;
1924 }
1925
1926 let joined_cols = build_joined_columns(&tables);
1927 process_select(&joined_cols, outer_rows, stmt)
1928}
1929
1930fn exec_update(
1931 db: &Database,
1932 schema: &SchemaManager,
1933 stmt: &UpdateStmt,
1934) -> Result<ExecutionResult> {
1935 let materialized;
1936 let stmt = if update_has_subquery(stmt) {
1937 materialized = materialize_update(stmt, &mut |sub| exec_subquery_read(db, schema, sub))?;
1938 &materialized
1939 } else {
1940 stmt
1941 };
1942
1943 let lower_name = stmt.table.to_ascii_lowercase();
1944 let table_schema = schema.get(&lower_name)
1945 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1946
1947 let all_candidates = collect_keyed_rows_read(db, table_schema, &stmt.where_clause)?;
1948 let matching_rows: Vec<(Vec<u8>, Vec<Value>)> = all_candidates.into_iter()
1949 .filter(|(_, row)| {
1950 match &stmt.where_clause {
1951 Some(where_expr) => {
1952 match eval_expr(where_expr, &table_schema.columns, row) {
1953 Ok(val) => is_truthy(&val),
1954 Err(_) => false,
1955 }
1956 }
1957 None => true,
1958 }
1959 })
1960 .collect();
1961
1962 if matching_rows.is_empty() {
1963 return Ok(ExecutionResult::RowsAffected(0));
1964 }
1965
1966 struct UpdateChange {
1967 old_key: Vec<u8>,
1968 new_key: Vec<u8>,
1969 new_value: Vec<u8>,
1970 pk_changed: bool,
1971 old_row: Vec<Value>,
1972 new_row: Vec<Value>,
1973 }
1974
1975 let pk_indices = table_schema.pk_indices();
1976 let mut changes: Vec<UpdateChange> = Vec::new();
1977
1978 for (old_key, row) in &matching_rows {
1979 let mut new_row = row.clone();
1980 let mut pk_changed = false;
1981 for (col_name, expr) in &stmt.assignments {
1982 let col_idx = table_schema.column_index(col_name)
1983 .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))?;
1984 let new_val = eval_expr(expr, &table_schema.columns, &new_row)?;
1985 let col = &table_schema.columns[col_idx];
1986
1987 let coerced = if new_val.is_null() {
1988 if !col.nullable {
1989 return Err(SqlError::NotNullViolation(col.name.clone()));
1990 }
1991 Value::Null
1992 } else {
1993 new_val.coerce_to(col.data_type).ok_or_else(|| SqlError::TypeMismatch {
1994 expected: col.data_type.to_string(),
1995 got: new_val.data_type().to_string(),
1996 })?
1997 };
1998
1999 if table_schema.primary_key_columns.contains(&(col_idx as u16)) {
2000 pk_changed = true;
2001 }
2002 new_row[col_idx] = coerced;
2003 }
2004
2005 let pk_values: Vec<Value> = pk_indices.iter().map(|&i| new_row[i].clone()).collect();
2006 let new_key = encode_composite_key(&pk_values);
2007
2008 let non_pk = table_schema.non_pk_indices();
2009 let value_values: Vec<Value> = non_pk.iter().map(|&i| new_row[i].clone()).collect();
2010 let new_value = encode_row(&value_values);
2011
2012 changes.push(UpdateChange {
2013 old_key: old_key.clone(), new_key, new_value, pk_changed,
2014 old_row: row.clone(), new_row,
2015 });
2016 }
2017
2018 {
2019 use std::collections::HashSet;
2020 let mut new_keys: HashSet<Vec<u8>> = HashSet::new();
2021 for c in &changes {
2022 if c.pk_changed && c.new_key != c.old_key {
2023 if !new_keys.insert(c.new_key.clone()) {
2024 return Err(SqlError::DuplicateKey);
2025 }
2026 }
2027 }
2028 }
2029
2030 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
2031
2032 for c in &changes {
2033 let old_pk: Vec<Value> = pk_indices.iter().map(|&i| c.old_row[i].clone()).collect();
2034
2035 for idx in &table_schema.indices {
2036 if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
2037 let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
2038 let old_idx_key = encode_index_key(idx, &c.old_row, &old_pk);
2039 wtx.table_delete(&idx_table, &old_idx_key).map_err(SqlError::Storage)?;
2040 }
2041 }
2042
2043 if c.pk_changed {
2044 wtx.table_delete(lower_name.as_bytes(), &c.old_key).map_err(SqlError::Storage)?;
2045 }
2046 }
2047
2048 for c in &changes {
2049 let new_pk: Vec<Value> = pk_indices.iter().map(|&i| c.new_row[i].clone()).collect();
2050
2051 if c.pk_changed {
2052 let is_new = wtx.table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
2053 .map_err(SqlError::Storage)?;
2054 if !is_new {
2055 return Err(SqlError::DuplicateKey);
2056 }
2057 } else {
2058 wtx.table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
2059 .map_err(SqlError::Storage)?;
2060 }
2061
2062 for idx in &table_schema.indices {
2063 if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
2064 let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
2065 let new_idx_key = encode_index_key(idx, &c.new_row, &new_pk);
2066 let new_idx_val = encode_index_value(idx, &c.new_row, &new_pk);
2067 let is_new = wtx.table_insert(&idx_table, &new_idx_key, &new_idx_val)
2068 .map_err(SqlError::Storage)?;
2069 if idx.unique && !is_new {
2070 let indexed_values: Vec<Value> = idx.columns.iter()
2071 .map(|&col_idx| c.new_row[col_idx as usize].clone())
2072 .collect();
2073 let any_null = indexed_values.iter().any(|v| v.is_null());
2074 if !any_null {
2075 return Err(SqlError::UniqueViolation(idx.name.clone()));
2076 }
2077 }
2078 }
2079 }
2080 }
2081
2082 let count = changes.len() as u64;
2083 wtx.commit().map_err(SqlError::Storage)?;
2084 Ok(ExecutionResult::RowsAffected(count))
2085}
2086
2087fn exec_delete(
2088 db: &Database,
2089 schema: &SchemaManager,
2090 stmt: &DeleteStmt,
2091) -> Result<ExecutionResult> {
2092 let materialized;
2093 let stmt = if delete_has_subquery(stmt) {
2094 materialized = materialize_delete(stmt, &mut |sub| exec_subquery_read(db, schema, sub))?;
2095 &materialized
2096 } else {
2097 stmt
2098 };
2099
2100 let lower_name = stmt.table.to_ascii_lowercase();
2101 let table_schema = schema.get(&lower_name)
2102 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
2103
2104 let all_candidates = collect_keyed_rows_read(db, table_schema, &stmt.where_clause)?;
2105 let rows_to_delete: Vec<(Vec<u8>, Vec<Value>)> = all_candidates.into_iter()
2106 .filter(|(_, row)| {
2107 match &stmt.where_clause {
2108 Some(where_expr) => {
2109 match eval_expr(where_expr, &table_schema.columns, row) {
2110 Ok(val) => is_truthy(&val),
2111 Err(_) => false,
2112 }
2113 }
2114 None => true,
2115 }
2116 })
2117 .collect();
2118
2119 if rows_to_delete.is_empty() {
2120 return Ok(ExecutionResult::RowsAffected(0));
2121 }
2122
2123 let pk_indices = table_schema.pk_indices();
2124 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
2125 for (key, row) in &rows_to_delete {
2126 let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
2127 delete_index_entries(&mut wtx, table_schema, row, &pk_values)?;
2128 wtx.table_delete(lower_name.as_bytes(), key).map_err(SqlError::Storage)?;
2129 }
2130 let count = rows_to_delete.len() as u64;
2131 wtx.commit().map_err(SqlError::Storage)?;
2132 Ok(ExecutionResult::RowsAffected(count))
2133}
2134
2135fn exec_insert_in_txn(
2138 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2139 schema: &SchemaManager,
2140 stmt: &InsertStmt,
2141) -> Result<ExecutionResult> {
2142 let materialized;
2143 let stmt = if insert_has_subquery(stmt) {
2144 materialized = materialize_insert(stmt, &mut |sub| exec_subquery_write(wtx, schema, sub))?;
2145 &materialized
2146 } else {
2147 stmt
2148 };
2149
2150 let lower_name = stmt.table.to_ascii_lowercase();
2151 let table_schema = schema.get(&lower_name)
2152 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
2153
2154 let insert_columns = if stmt.columns.is_empty() {
2155 table_schema.columns.iter().map(|c| c.name.clone()).collect::<Vec<_>>()
2156 } else {
2157 stmt.columns.iter().map(|c| c.to_ascii_lowercase()).collect()
2158 };
2159
2160 let col_indices: Vec<usize> = insert_columns.iter().map(|name| {
2161 table_schema.column_index(name)
2162 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
2163 }).collect::<Result<_>>()?;
2164
2165 let mut count: u64 = 0;
2166
2167 for value_row in &stmt.values {
2168 if value_row.len() != insert_columns.len() {
2169 return Err(SqlError::InvalidValue(format!(
2170 "expected {} values, got {}",
2171 insert_columns.len(),
2172 value_row.len()
2173 )));
2174 }
2175
2176 let mut row = vec![Value::Null; table_schema.columns.len()];
2177 for (i, expr) in value_row.iter().enumerate() {
2178 let val = eval_const_expr(expr)?;
2179 let col_idx = col_indices[i];
2180 let col = &table_schema.columns[col_idx];
2181
2182 let coerced = if val.is_null() {
2183 Value::Null
2184 } else {
2185 val.coerce_to(col.data_type).ok_or_else(|| SqlError::TypeMismatch {
2186 expected: col.data_type.to_string(),
2187 got: val.data_type().to_string(),
2188 })?
2189 };
2190
2191 row[col_idx] = coerced;
2192 }
2193
2194 for col in &table_schema.columns {
2195 if !col.nullable && row[col.position as usize].is_null() {
2196 return Err(SqlError::NotNullViolation(col.name.clone()));
2197 }
2198 }
2199
2200 let pk_values: Vec<Value> = table_schema.pk_indices()
2201 .iter()
2202 .map(|&i| row[i].clone())
2203 .collect();
2204 let key = encode_composite_key(&pk_values);
2205
2206 let non_pk = table_schema.non_pk_indices();
2207 let value_values: Vec<Value> = non_pk.iter().map(|&i| row[i].clone()).collect();
2208 let value = encode_row(&value_values);
2209
2210 if key.len() > citadel_core::MAX_KEY_SIZE {
2211 return Err(SqlError::KeyTooLarge { size: key.len(), max: citadel_core::MAX_KEY_SIZE });
2212 }
2213 if value.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
2214 return Err(SqlError::RowTooLarge { size: value.len(), max: citadel_core::MAX_INLINE_VALUE_SIZE });
2215 }
2216
2217 let is_new = wtx.table_insert(lower_name.as_bytes(), &key, &value)
2218 .map_err(SqlError::Storage)?;
2219 if !is_new {
2220 return Err(SqlError::DuplicateKey);
2221 }
2222
2223 insert_index_entries(wtx, table_schema, &row, &pk_values)?;
2224 count += 1;
2225 }
2226
2227 Ok(ExecutionResult::RowsAffected(count))
2228}
2229
2230fn exec_select_in_txn(
2231 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2232 schema: &SchemaManager,
2233 stmt: &SelectStmt,
2234) -> Result<ExecutionResult> {
2235 let materialized;
2236 let stmt = if stmt_has_subquery(stmt) {
2237 materialized = materialize_stmt(stmt, &mut |sub| exec_subquery_write(wtx, schema, sub))?;
2238 &materialized
2239 } else {
2240 stmt
2241 };
2242
2243 if stmt.from.is_empty() {
2244 return exec_select_no_from(stmt);
2245 }
2246
2247 if !stmt.joins.is_empty() {
2248 return exec_select_join_in_txn(wtx, schema, stmt);
2249 }
2250
2251 let lower_name = stmt.from.to_ascii_lowercase();
2252 let table_schema = schema.get(&lower_name)
2253 .ok_or_else(|| SqlError::TableNotFound(stmt.from.clone()))?;
2254
2255 let rows = collect_rows_write(wtx, table_schema, &stmt.where_clause)?;
2256 process_select(&table_schema.columns, rows, stmt)
2257}
2258
2259fn exec_update_in_txn(
2260 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2261 schema: &SchemaManager,
2262 stmt: &UpdateStmt,
2263) -> Result<ExecutionResult> {
2264 let materialized;
2265 let stmt = if update_has_subquery(stmt) {
2266 materialized = materialize_update(stmt, &mut |sub| exec_subquery_write(wtx, schema, sub))?;
2267 &materialized
2268 } else {
2269 stmt
2270 };
2271
2272 let lower_name = stmt.table.to_ascii_lowercase();
2273 let table_schema = schema.get(&lower_name)
2274 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
2275
2276 let all_candidates = collect_keyed_rows_write(wtx, table_schema, &stmt.where_clause)?;
2277 let matching_rows: Vec<(Vec<u8>, Vec<Value>)> = all_candidates.into_iter()
2278 .filter(|(_, row)| {
2279 match &stmt.where_clause {
2280 Some(where_expr) => {
2281 match eval_expr(where_expr, &table_schema.columns, row) {
2282 Ok(val) => is_truthy(&val),
2283 Err(_) => false,
2284 }
2285 }
2286 None => true,
2287 }
2288 })
2289 .collect();
2290
2291 if matching_rows.is_empty() {
2292 return Ok(ExecutionResult::RowsAffected(0));
2293 }
2294
2295 struct UpdateChange {
2296 old_key: Vec<u8>,
2297 new_key: Vec<u8>,
2298 new_value: Vec<u8>,
2299 pk_changed: bool,
2300 old_row: Vec<Value>,
2301 new_row: Vec<Value>,
2302 }
2303
2304 let pk_indices = table_schema.pk_indices();
2305 let mut changes: Vec<UpdateChange> = Vec::new();
2306
2307 for (old_key, row) in &matching_rows {
2308 let mut new_row = row.clone();
2309 let mut pk_changed = false;
2310 for (col_name, expr) in &stmt.assignments {
2311 let col_idx = table_schema.column_index(col_name)
2312 .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))?;
2313 let new_val = eval_expr(expr, &table_schema.columns, &new_row)?;
2314 let col = &table_schema.columns[col_idx];
2315
2316 let coerced = if new_val.is_null() {
2317 if !col.nullable {
2318 return Err(SqlError::NotNullViolation(col.name.clone()));
2319 }
2320 Value::Null
2321 } else {
2322 new_val.coerce_to(col.data_type).ok_or_else(|| SqlError::TypeMismatch {
2323 expected: col.data_type.to_string(),
2324 got: new_val.data_type().to_string(),
2325 })?
2326 };
2327
2328 if table_schema.primary_key_columns.contains(&(col_idx as u16)) {
2329 pk_changed = true;
2330 }
2331 new_row[col_idx] = coerced;
2332 }
2333
2334 let pk_values: Vec<Value> = pk_indices.iter().map(|&i| new_row[i].clone()).collect();
2335 let new_key = encode_composite_key(&pk_values);
2336
2337 let non_pk = table_schema.non_pk_indices();
2338 let value_values: Vec<Value> = non_pk.iter().map(|&i| new_row[i].clone()).collect();
2339 let new_value = encode_row(&value_values);
2340
2341 changes.push(UpdateChange {
2342 old_key: old_key.clone(), new_key, new_value, pk_changed,
2343 old_row: row.clone(), new_row,
2344 });
2345 }
2346
2347 {
2348 use std::collections::HashSet;
2349 let mut new_keys: HashSet<Vec<u8>> = HashSet::new();
2350 for c in &changes {
2351 if c.pk_changed && c.new_key != c.old_key {
2352 if !new_keys.insert(c.new_key.clone()) {
2353 return Err(SqlError::DuplicateKey);
2354 }
2355 }
2356 }
2357 }
2358
2359 for c in &changes {
2360 let old_pk: Vec<Value> = pk_indices.iter().map(|&i| c.old_row[i].clone()).collect();
2361
2362 for idx in &table_schema.indices {
2363 if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
2364 let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
2365 let old_idx_key = encode_index_key(idx, &c.old_row, &old_pk);
2366 wtx.table_delete(&idx_table, &old_idx_key).map_err(SqlError::Storage)?;
2367 }
2368 }
2369
2370 if c.pk_changed {
2371 wtx.table_delete(lower_name.as_bytes(), &c.old_key).map_err(SqlError::Storage)?;
2372 }
2373 }
2374
2375 for c in &changes {
2376 let new_pk: Vec<Value> = pk_indices.iter().map(|&i| c.new_row[i].clone()).collect();
2377
2378 if c.pk_changed {
2379 let is_new = wtx.table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
2380 .map_err(SqlError::Storage)?;
2381 if !is_new {
2382 return Err(SqlError::DuplicateKey);
2383 }
2384 } else {
2385 wtx.table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
2386 .map_err(SqlError::Storage)?;
2387 }
2388
2389 for idx in &table_schema.indices {
2390 if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
2391 let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
2392 let new_idx_key = encode_index_key(idx, &c.new_row, &new_pk);
2393 let new_idx_val = encode_index_value(idx, &c.new_row, &new_pk);
2394 let is_new = wtx.table_insert(&idx_table, &new_idx_key, &new_idx_val)
2395 .map_err(SqlError::Storage)?;
2396 if idx.unique && !is_new {
2397 let indexed_values: Vec<Value> = idx.columns.iter()
2398 .map(|&col_idx| c.new_row[col_idx as usize].clone())
2399 .collect();
2400 let any_null = indexed_values.iter().any(|v| v.is_null());
2401 if !any_null {
2402 return Err(SqlError::UniqueViolation(idx.name.clone()));
2403 }
2404 }
2405 }
2406 }
2407 }
2408
2409 let count = changes.len() as u64;
2410 Ok(ExecutionResult::RowsAffected(count))
2411}
2412
2413fn exec_delete_in_txn(
2414 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
2415 schema: &SchemaManager,
2416 stmt: &DeleteStmt,
2417) -> Result<ExecutionResult> {
2418 let materialized;
2419 let stmt = if delete_has_subquery(stmt) {
2420 materialized = materialize_delete(stmt, &mut |sub| exec_subquery_write(wtx, schema, sub))?;
2421 &materialized
2422 } else {
2423 stmt
2424 };
2425
2426 let lower_name = stmt.table.to_ascii_lowercase();
2427 let table_schema = schema.get(&lower_name)
2428 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
2429
2430 let all_candidates = collect_keyed_rows_write(wtx, table_schema, &stmt.where_clause)?;
2431 let rows_to_delete: Vec<(Vec<u8>, Vec<Value>)> = all_candidates.into_iter()
2432 .filter(|(_, row)| {
2433 match &stmt.where_clause {
2434 Some(where_expr) => {
2435 match eval_expr(where_expr, &table_schema.columns, row) {
2436 Ok(val) => is_truthy(&val),
2437 Err(_) => false,
2438 }
2439 }
2440 None => true,
2441 }
2442 })
2443 .collect();
2444
2445 if rows_to_delete.is_empty() {
2446 return Ok(ExecutionResult::RowsAffected(0));
2447 }
2448
2449 let pk_indices = table_schema.pk_indices();
2450 for (key, row) in &rows_to_delete {
2451 let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
2452 delete_index_entries(wtx, table_schema, row, &pk_values)?;
2453 wtx.table_delete(lower_name.as_bytes(), key).map_err(SqlError::Storage)?;
2454 }
2455 let count = rows_to_delete.len() as u64;
2456 Ok(ExecutionResult::RowsAffected(count))
2457}
2458
2459fn exec_aggregate(
2462 columns: &[ColumnDef],
2463 rows: &[Vec<Value>],
2464 stmt: &SelectStmt,
2465) -> Result<ExecutionResult> {
2466 let groups: BTreeMap<Vec<Value>, Vec<&Vec<Value>>> = if stmt.group_by.is_empty() {
2467 let mut m = BTreeMap::new();
2468 m.insert(vec![], rows.iter().collect());
2469 m
2470 } else {
2471 let mut m: BTreeMap<Vec<Value>, Vec<&Vec<Value>>> = BTreeMap::new();
2472 for row in rows {
2473 let group_key: Vec<Value> = stmt.group_by.iter()
2474 .map(|expr| eval_expr(expr, columns, row))
2475 .collect::<Result<_>>()?;
2476 m.entry(group_key).or_default().push(row);
2477 }
2478 m
2479 };
2480
2481 let mut result_rows = Vec::new();
2482 let output_cols = build_output_columns(&stmt.columns, columns);
2483
2484 for (_group_key, group_rows) in &groups {
2485 let mut result_row = Vec::new();
2486
2487 for sel_col in &stmt.columns {
2488 match sel_col {
2489 SelectColumn::AllColumns => {
2490 return Err(SqlError::Unsupported(
2491 "SELECT * with GROUP BY".into()
2492 ));
2493 }
2494 SelectColumn::Expr { expr, .. } => {
2495 let val = eval_aggregate_expr(
2496 expr, columns, group_rows,
2497 )?;
2498 result_row.push(val);
2499 }
2500 }
2501 }
2502
2503 if let Some(ref having) = stmt.having {
2504 let passes = match eval_aggregate_expr(having, columns, group_rows) {
2505 Ok(val) => is_truthy(&val),
2506 Err(SqlError::ColumnNotFound(_)) => {
2507 match eval_expr(having, &output_cols, &result_row) {
2508 Ok(val) => is_truthy(&val),
2509 Err(_) => false,
2510 }
2511 }
2512 Err(e) => return Err(e),
2513 };
2514 if !passes {
2515 continue;
2516 }
2517 }
2518
2519 result_rows.push(result_row);
2520 }
2521
2522 if stmt.distinct {
2523 let mut seen = std::collections::HashSet::new();
2524 result_rows.retain(|row| seen.insert(row.clone()));
2525 }
2526
2527 if !stmt.order_by.is_empty() {
2528 let output_cols = build_output_columns(&stmt.columns, columns);
2529 sort_rows(&mut result_rows, &stmt.order_by, &output_cols)?;
2530 }
2531
2532 if let Some(ref offset_expr) = stmt.offset {
2533 let offset = eval_const_int(offset_expr)? as usize;
2534 if offset < result_rows.len() {
2535 result_rows = result_rows.split_off(offset);
2536 } else {
2537 result_rows.clear();
2538 }
2539 }
2540 if let Some(ref limit_expr) = stmt.limit {
2541 let limit = eval_const_int(limit_expr)? as usize;
2542 result_rows.truncate(limit);
2543 }
2544
2545 let col_names = stmt.columns.iter().map(|c| match c {
2546 SelectColumn::AllColumns => "*".into(),
2547 SelectColumn::Expr { alias: Some(a), .. } => a.clone(),
2548 SelectColumn::Expr { expr, .. } => expr_display_name(expr),
2549 }).collect();
2550
2551 Ok(ExecutionResult::Query(QueryResult {
2552 columns: col_names,
2553 rows: result_rows,
2554 }))
2555}
2556
2557fn eval_aggregate_expr(
2558 expr: &Expr,
2559 columns: &[ColumnDef],
2560 group_rows: &[&Vec<Value>],
2561) -> Result<Value> {
2562 match expr {
2563 Expr::CountStar => Ok(Value::Integer(group_rows.len() as i64)),
2564
2565 Expr::Function { name, args } if is_aggregate_function(name, args.len()) => {
2566 let func = name.to_ascii_uppercase();
2567 if args.len() != 1 {
2568 return Err(SqlError::Unsupported(format!(
2569 "{func} with {} args", args.len()
2570 )));
2571 }
2572 let arg = &args[0];
2573 let values: Vec<Value> = group_rows.iter()
2574 .map(|row| eval_expr(arg, columns, row))
2575 .collect::<Result<_>>()?;
2576
2577 match func.as_str() {
2578 "COUNT" => {
2579 let count = values.iter().filter(|v| !v.is_null()).count();
2580 Ok(Value::Integer(count as i64))
2581 }
2582 "SUM" => {
2583 let mut int_sum: i64 = 0;
2584 let mut real_sum: f64 = 0.0;
2585 let mut has_real = false;
2586 let mut all_null = true;
2587 for v in &values {
2588 match v {
2589 Value::Integer(i) => { int_sum += i; all_null = false; }
2590 Value::Real(r) => { real_sum += r; has_real = true; all_null = false; }
2591 Value::Null => {}
2592 _ => return Err(SqlError::TypeMismatch {
2593 expected: "numeric".into(),
2594 got: v.data_type().to_string(),
2595 }),
2596 }
2597 }
2598 if all_null { return Ok(Value::Null); }
2599 if has_real {
2600 Ok(Value::Real(real_sum + int_sum as f64))
2601 } else {
2602 Ok(Value::Integer(int_sum))
2603 }
2604 }
2605 "AVG" => {
2606 let mut sum: f64 = 0.0;
2607 let mut count: i64 = 0;
2608 for v in &values {
2609 match v {
2610 Value::Integer(i) => { sum += *i as f64; count += 1; }
2611 Value::Real(r) => { sum += r; count += 1; }
2612 Value::Null => {}
2613 _ => return Err(SqlError::TypeMismatch {
2614 expected: "numeric".into(),
2615 got: v.data_type().to_string(),
2616 }),
2617 }
2618 }
2619 if count == 0 { Ok(Value::Null) } else { Ok(Value::Real(sum / count as f64)) }
2620 }
2621 "MIN" => {
2622 let mut min: Option<&Value> = None;
2623 for v in &values {
2624 if v.is_null() { continue; }
2625 min = Some(match min {
2626 None => v,
2627 Some(m) => if v < m { v } else { m },
2628 });
2629 }
2630 Ok(min.cloned().unwrap_or(Value::Null))
2631 }
2632 "MAX" => {
2633 let mut max: Option<&Value> = None;
2634 for v in &values {
2635 if v.is_null() { continue; }
2636 max = Some(match max {
2637 None => v,
2638 Some(m) => if v > m { v } else { m },
2639 });
2640 }
2641 Ok(max.cloned().unwrap_or(Value::Null))
2642 }
2643 _ => Err(SqlError::Unsupported(format!("aggregate function: {func}"))),
2644 }
2645 }
2646
2647 Expr::Column(_) | Expr::QualifiedColumn { .. } => {
2648 if let Some(first) = group_rows.first() {
2649 eval_expr(expr, columns, first)
2650 } else {
2651 Ok(Value::Null)
2652 }
2653 }
2654
2655 Expr::Literal(v) => Ok(v.clone()),
2656
2657 Expr::BinaryOp { left, op, right } => {
2658 let l = eval_aggregate_expr(left, columns, group_rows)?;
2659 let r = eval_aggregate_expr(right, columns, group_rows)?;
2660 crate::eval::eval_expr(
2661 &Expr::BinaryOp {
2662 left: Box::new(Expr::Literal(l)),
2663 op: *op,
2664 right: Box::new(Expr::Literal(r)),
2665 },
2666 columns,
2667 &[],
2668 )
2669 }
2670
2671 Expr::UnaryOp { op, expr: e } => {
2672 let v = eval_aggregate_expr(e, columns, group_rows)?;
2673 crate::eval::eval_expr(
2674 &Expr::UnaryOp { op: *op, expr: Box::new(Expr::Literal(v)) },
2675 columns, &[],
2676 )
2677 }
2678
2679 Expr::IsNull(e) => {
2680 let v = eval_aggregate_expr(e, columns, group_rows)?;
2681 Ok(Value::Boolean(v.is_null()))
2682 }
2683
2684 Expr::IsNotNull(e) => {
2685 let v = eval_aggregate_expr(e, columns, group_rows)?;
2686 Ok(Value::Boolean(!v.is_null()))
2687 }
2688
2689 Expr::Cast { expr: e, data_type } => {
2690 let v = eval_aggregate_expr(e, columns, group_rows)?;
2691 crate::eval::eval_expr(
2692 &Expr::Cast { expr: Box::new(Expr::Literal(v)), data_type: *data_type },
2693 columns, &[],
2694 )
2695 }
2696
2697 Expr::Case { operand, conditions, else_result } => {
2698 let op_val = operand.as_ref()
2699 .map(|e| eval_aggregate_expr(e, columns, group_rows))
2700 .transpose()?;
2701 if let Some(ov) = &op_val {
2702 for (cond, result) in conditions {
2703 let cv = eval_aggregate_expr(cond, columns, group_rows)?;
2704 if !ov.is_null() && !cv.is_null() && *ov == cv {
2705 return eval_aggregate_expr(result, columns, group_rows);
2706 }
2707 }
2708 } else {
2709 for (cond, result) in conditions {
2710 let cv = eval_aggregate_expr(cond, columns, group_rows)?;
2711 if crate::eval::is_truthy(&cv) {
2712 return eval_aggregate_expr(result, columns, group_rows);
2713 }
2714 }
2715 }
2716 match else_result {
2717 Some(e) => eval_aggregate_expr(e, columns, group_rows),
2718 None => Ok(Value::Null),
2719 }
2720 }
2721
2722 Expr::Coalesce(args) => {
2723 for arg in args {
2724 let v = eval_aggregate_expr(arg, columns, group_rows)?;
2725 if !v.is_null() { return Ok(v); }
2726 }
2727 Ok(Value::Null)
2728 }
2729
2730 Expr::Between { expr: e, low, high, negated } => {
2731 let v = eval_aggregate_expr(e, columns, group_rows)?;
2732 let lo = eval_aggregate_expr(low, columns, group_rows)?;
2733 let hi = eval_aggregate_expr(high, columns, group_rows)?;
2734 crate::eval::eval_expr(
2735 &Expr::Between {
2736 expr: Box::new(Expr::Literal(v)),
2737 low: Box::new(Expr::Literal(lo)),
2738 high: Box::new(Expr::Literal(hi)),
2739 negated: *negated,
2740 },
2741 columns, &[],
2742 )
2743 }
2744
2745 Expr::Like { expr: e, pattern, escape, negated } => {
2746 let v = eval_aggregate_expr(e, columns, group_rows)?;
2747 let p = eval_aggregate_expr(pattern, columns, group_rows)?;
2748 let esc = escape.as_ref()
2749 .map(|es| eval_aggregate_expr(es, columns, group_rows))
2750 .transpose()?;
2751 let esc_box = esc.map(|v| Box::new(Expr::Literal(v)));
2752 crate::eval::eval_expr(
2753 &Expr::Like {
2754 expr: Box::new(Expr::Literal(v)),
2755 pattern: Box::new(Expr::Literal(p)),
2756 escape: esc_box,
2757 negated: *negated,
2758 },
2759 columns, &[],
2760 )
2761 }
2762
2763 Expr::Function { name, args } => {
2764 let evaluated: Vec<Value> = args.iter()
2765 .map(|a| eval_aggregate_expr(a, columns, group_rows))
2766 .collect::<Result<_>>()?;
2767 let literal_args: Vec<Expr> = evaluated.into_iter().map(Expr::Literal).collect();
2768 crate::eval::eval_expr(
2769 &Expr::Function { name: name.clone(), args: literal_args },
2770 columns, &[],
2771 )
2772 }
2773
2774 _ => Err(SqlError::Unsupported(format!("expression in aggregate: {expr:?}"))),
2775 }
2776}
2777
2778fn is_aggregate_function(name: &str, arg_count: usize) -> bool {
2779 let u = name.to_ascii_uppercase();
2780 matches!(u.as_str(), "COUNT" | "SUM" | "AVG")
2781 || (matches!(u.as_str(), "MIN" | "MAX") && arg_count == 1)
2782}
2783
2784fn is_aggregate_expr(expr: &Expr) -> bool {
2785 match expr {
2786 Expr::CountStar => true,
2787 Expr::Function { name, args } => {
2788 is_aggregate_function(name, args.len())
2789 || args.iter().any(is_aggregate_expr)
2790 }
2791 Expr::BinaryOp { left, right, .. } => {
2792 is_aggregate_expr(left) || is_aggregate_expr(right)
2793 }
2794 Expr::UnaryOp { expr, .. } | Expr::IsNull(expr) | Expr::IsNotNull(expr)
2795 | Expr::Cast { expr, .. } => is_aggregate_expr(expr),
2796 Expr::Case { operand, conditions, else_result } => {
2797 operand.as_ref().map_or(false, |e| is_aggregate_expr(e))
2798 || conditions.iter().any(|(c, r)| is_aggregate_expr(c) || is_aggregate_expr(r))
2799 || else_result.as_ref().map_or(false, |e| is_aggregate_expr(e))
2800 }
2801 Expr::Coalesce(args) => args.iter().any(is_aggregate_expr),
2802 Expr::Between { expr, low, high, .. } => {
2803 is_aggregate_expr(expr) || is_aggregate_expr(low) || is_aggregate_expr(high)
2804 }
2805 Expr::Like { expr, pattern, escape, .. } => {
2806 is_aggregate_expr(expr) || is_aggregate_expr(pattern)
2807 || escape.as_ref().map_or(false, |e| is_aggregate_expr(e))
2808 }
2809 _ => false,
2810 }
2811}
2812
2813fn decode_full_row(
2817 schema: &TableSchema,
2818 key: &[u8],
2819 value: &[u8],
2820) -> Result<Vec<Value>> {
2821 let pk_values = decode_composite_key(key, schema.primary_key_columns.len())?;
2822 let non_pk_values = decode_row(value)?;
2823
2824 let mut row = vec![Value::Null; schema.columns.len()];
2825
2826 for (i, &col_idx) in schema.primary_key_columns.iter().enumerate() {
2827 row[col_idx as usize] = pk_values[i].clone();
2828 }
2829
2830 let non_pk = schema.non_pk_indices();
2831 for (i, &col_idx) in non_pk.iter().enumerate() {
2832 if i < non_pk_values.len() {
2833 row[col_idx] = non_pk_values[i].clone();
2834 }
2835 }
2836
2837 Ok(row)
2838}
2839
2840fn eval_const_expr(expr: &Expr) -> Result<Value> {
2842 eval_expr(expr, &[], &[])
2843}
2844
2845fn eval_const_int(expr: &Expr) -> Result<i64> {
2846 match eval_const_expr(expr)? {
2847 Value::Integer(i) => Ok(i),
2848 other => Err(SqlError::TypeMismatch {
2849 expected: "INTEGER".into(),
2850 got: other.data_type().to_string(),
2851 }),
2852 }
2853}
2854
2855fn sort_rows(
2856 rows: &mut [Vec<Value>],
2857 order_by: &[OrderByItem],
2858 columns: &[ColumnDef],
2859) -> Result<()> {
2860 rows.sort_by(|a, b| {
2861 for item in order_by {
2862 let a_val = eval_expr(&item.expr, columns, a).unwrap_or(Value::Null);
2863 let b_val = eval_expr(&item.expr, columns, b).unwrap_or(Value::Null);
2864
2865 let nulls_first = item.nulls_first.unwrap_or(!item.descending);
2866
2867 let ord = match (a_val.is_null(), b_val.is_null()) {
2868 (true, true) => std::cmp::Ordering::Equal,
2869 (true, false) => {
2870 if nulls_first { std::cmp::Ordering::Less } else { std::cmp::Ordering::Greater }
2871 }
2872 (false, true) => {
2873 if nulls_first { std::cmp::Ordering::Greater } else { std::cmp::Ordering::Less }
2874 }
2875 (false, false) => {
2876 let cmp = a_val.cmp(&b_val);
2877 if item.descending { cmp.reverse() } else { cmp }
2878 }
2879 };
2880
2881 if ord != std::cmp::Ordering::Equal {
2882 return ord;
2883 }
2884 }
2885 std::cmp::Ordering::Equal
2886 });
2887 Ok(())
2888}
2889
2890fn project_rows(
2891 columns: &[ColumnDef],
2892 select_cols: &[SelectColumn],
2893 rows: &[Vec<Value>],
2894) -> Result<(Vec<String>, Vec<Vec<Value>>)> {
2895 let mut col_names = Vec::new();
2896 let mut projectors: Vec<Box<dyn Fn(&[Value]) -> Result<Value>>> = Vec::new();
2897
2898 for sel_col in select_cols {
2899 match sel_col {
2900 SelectColumn::AllColumns => {
2901 for col in columns {
2902 let idx = col.position as usize;
2903 col_names.push(col.name.clone());
2904 projectors.push(Box::new(move |row: &[Value]| Ok(row[idx].clone())));
2905 }
2906 }
2907 SelectColumn::Expr { expr, alias } => {
2908 let name = alias.clone().unwrap_or_else(|| expr_display_name(expr));
2909 col_names.push(name);
2910 let expr = expr.clone();
2911 let owned_cols = columns.to_vec();
2912 projectors.push(Box::new(move |row: &[Value]| {
2913 eval_expr(&expr, &owned_cols, row)
2914 }));
2915 }
2916 }
2917 }
2918
2919 let projected = rows.iter().map(|row| {
2920 projectors.iter().map(|p| p(row)).collect::<Result<Vec<_>>>()
2921 }).collect::<Result<Vec<_>>>()?;
2922
2923 Ok((col_names, projected))
2924}
2925
2926fn expr_display_name(expr: &Expr) -> String {
2927 match expr {
2928 Expr::Column(name) => name.clone(),
2929 Expr::QualifiedColumn { table, column } => format!("{table}.{column}"),
2930 Expr::Literal(v) => format!("{v}"),
2931 Expr::CountStar => "COUNT(*)".into(),
2932 Expr::Function { name, args } => {
2933 let arg_strs: Vec<String> = args.iter().map(expr_display_name).collect();
2934 format!("{name}({})", arg_strs.join(", "))
2935 }
2936 Expr::BinaryOp { left, op, right } => {
2937 format!("{} {} {}", expr_display_name(left), op_symbol(op), expr_display_name(right))
2938 }
2939 _ => "?".into(),
2940 }
2941}
2942
2943fn op_symbol(op: &BinOp) -> &'static str {
2944 match op {
2945 BinOp::Add => "+", BinOp::Sub => "-", BinOp::Mul => "*",
2946 BinOp::Div => "/", BinOp::Mod => "%",
2947 BinOp::Eq => "=", BinOp::NotEq => "<>",
2948 BinOp::Lt => "<", BinOp::Gt => ">",
2949 BinOp::LtEq => "<=", BinOp::GtEq => ">=",
2950 BinOp::And => "AND", BinOp::Or => "OR", BinOp::Concat => "||",
2951 }
2952}
2953
2954fn build_output_columns(
2955 select_cols: &[SelectColumn],
2956 columns: &[ColumnDef],
2957) -> Vec<ColumnDef> {
2958 let mut out = Vec::new();
2959 for (i, col) in select_cols.iter().enumerate() {
2960 let (name, data_type) = match col {
2961 SelectColumn::AllColumns => (format!("col{i}"), DataType::Null),
2962 SelectColumn::Expr { alias: Some(a), expr } => {
2963 (a.clone(), infer_expr_type(expr, columns))
2964 }
2965 SelectColumn::Expr { expr, .. } => {
2966 (expr_display_name(expr), infer_expr_type(expr, columns))
2967 }
2968 };
2969 out.push(ColumnDef {
2970 name,
2971 data_type,
2972 nullable: true,
2973 position: i as u16,
2974 });
2975 }
2976 out
2977}
2978
2979fn infer_expr_type(expr: &Expr, columns: &[ColumnDef]) -> DataType {
2980 match expr {
2981 Expr::Column(name) => {
2982 let lower = name.to_ascii_lowercase();
2983 columns.iter()
2984 .find(|c| c.name.to_ascii_lowercase() == lower)
2985 .map(|c| c.data_type)
2986 .unwrap_or(DataType::Null)
2987 }
2988 Expr::QualifiedColumn { table, column } => {
2989 let qualified = format!("{}.{}", table.to_ascii_lowercase(), column.to_ascii_lowercase());
2990 columns.iter()
2991 .find(|c| c.name.to_ascii_lowercase() == qualified)
2992 .map(|c| c.data_type)
2993 .unwrap_or(DataType::Null)
2994 }
2995 Expr::Literal(v) => v.data_type(),
2996 Expr::CountStar => DataType::Integer,
2997 Expr::Function { name, .. } => {
2998 match name.to_ascii_uppercase().as_str() {
2999 "COUNT" => DataType::Integer,
3000 "AVG" => DataType::Real,
3001 "SUM" | "MIN" | "MAX" => DataType::Null,
3002 _ => DataType::Null,
3003 }
3004 }
3005 _ => DataType::Null,
3006 }
3007}