1use std::collections::HashMap;
4use std::fs::File;
5use std::io::{BufRead, BufReader, Write};
6use std::path::{Path, PathBuf};
7
8use vibesql::{
9 AnalyzedQuery, Analyzer, Catalog, ColumnSchema, ExprKind, JoinCondition, JoinType,
10 MemoryCatalog, Parser, Query, QueryBody, Select, SelectItem, SqlType, StatementKind, TableRef,
11 TableSchemaBuilder,
12};
13
14use crate::execution::{ExecutionRow, TableInfo};
15use crate::result::{QueryResult, Row};
16
17pub struct CsvDatabase {
19 data_dir: PathBuf,
21 catalog: MemoryCatalog,
23 tables: HashMap<String, Vec<Row>>,
25}
26
27impl CsvDatabase {
28 pub fn new(data_dir: impl AsRef<Path>) -> std::io::Result<Self> {
30 let data_dir = data_dir.as_ref().to_path_buf();
31 let mut db = Self {
32 data_dir,
33 catalog: MemoryCatalog::new(),
34 tables: HashMap::new(),
35 };
36 db.catalog.register_builtins();
37 db.discover_tables()?;
38 db.register_information_schema();
39 Ok(db)
40 }
41
42 pub fn has_table(&self, name: &str) -> bool {
44 self.tables.contains_key(name)
45 }
46
47 fn register_information_schema(&mut self) {
49 let schema = self.catalog.add_schema("information_schema");
51
52 let tables_schema = TableSchemaBuilder::new("tables")
54 .column(ColumnSchema::new("table_catalog", SqlType::Varchar))
55 .column(ColumnSchema::new("table_schema", SqlType::Varchar))
56 .column(ColumnSchema::new("table_name", SqlType::Varchar))
57 .column(ColumnSchema::new("table_type", SqlType::Varchar))
58 .build();
59 schema.tables.insert("tables".to_string(), tables_schema);
60
61 let columns_schema = TableSchemaBuilder::new("columns")
63 .column(ColumnSchema::new("table_catalog", SqlType::Varchar))
64 .column(ColumnSchema::new("table_schema", SqlType::Varchar))
65 .column(ColumnSchema::new("table_name", SqlType::Varchar))
66 .column(ColumnSchema::new("column_name", SqlType::Varchar))
67 .column(ColumnSchema::new("ordinal_position", SqlType::Varchar))
68 .column(ColumnSchema::new("data_type", SqlType::Varchar))
69 .column(ColumnSchema::new("is_nullable", SqlType::Varchar))
70 .build();
71 schema.tables.insert("columns".to_string(), columns_schema);
72 }
73
74 fn get_information_schema_tables(&self) -> Vec<Row> {
76 let mut rows = Vec::new();
77 for table_name in self.tables.keys() {
78 if table_name.starts_with("information_schema.") {
79 continue;
80 }
81 rows.push(vec![
82 "csvdb".to_string(),
83 "default".to_string(),
84 table_name.clone(),
85 "BASE TABLE".to_string(),
86 ]);
87 }
88 rows.sort_by(|a, b| a[2].cmp(&b[2]));
89 rows
90 }
91
92 fn get_information_schema_columns(&self) -> Vec<Row> {
94 let mut rows = Vec::new();
95 for table_name in self.tables.keys() {
96 if table_name.starts_with("information_schema.") {
97 continue;
98 }
99 if let Ok(Some(schema)) = self.catalog.resolve_table(&[table_name.clone()]) {
100 for (i, col) in schema.columns.iter().enumerate() {
101 rows.push(vec![
102 "csvdb".to_string(),
103 "default".to_string(),
104 table_name.clone(),
105 col.name.clone(),
106 (i + 1).to_string(),
107 format!("{:?}", col.data_type),
108 if col.nullable { "YES" } else { "NO" }.to_string(),
109 ]);
110 }
111 }
112 }
113 rows.sort_by(|a, b| (&a[2], &a[4]).cmp(&(&b[2], &b[4])));
114 rows
115 }
116
117 fn discover_tables(&mut self) -> std::io::Result<()> {
119 if !self.data_dir.exists() {
120 std::fs::create_dir_all(&self.data_dir)?;
121 }
122
123 for entry in std::fs::read_dir(&self.data_dir)? {
124 let entry = entry?;
125 let path = entry.path();
126 if path.extension().map(|e| e == "csv").unwrap_or(false) {
127 self.load_table(&path)?;
128 }
129 }
130 Ok(())
131 }
132
133 fn load_table(&mut self, path: &Path) -> std::io::Result<()> {
135 let table_name = path
136 .file_stem()
137 .and_then(|s| s.to_str())
138 .unwrap_or("unknown")
139 .to_string();
140
141 let file = File::open(path)?;
142 let reader = BufReader::new(file);
143 let mut lines = reader.lines();
144
145 let header = match lines.next() {
146 Some(Ok(h)) => h,
147 _ => return Ok(()),
148 };
149
150 let columns: Vec<&str> = header.split(',').map(|s| s.trim()).collect();
151
152 let mut builder = TableSchemaBuilder::new(&table_name);
153 for col_name in &columns {
154 builder = builder.column(ColumnSchema::new(*col_name, SqlType::Varchar));
155 }
156 let schema = builder.build();
157 self.catalog.add_table(schema);
158
159 let mut rows = Vec::new();
160 for line in lines {
161 let line = line?;
162 let values: Vec<String> = line.split(',').map(|s| s.trim().to_string()).collect();
163 if values.len() == columns.len() {
164 rows.push(values);
165 }
166 }
167
168 self.tables.insert(table_name, rows);
169 Ok(())
170 }
171
172 pub fn execute(&mut self, sql: &str) -> Result<QueryResult, String> {
174 let mut parser = Parser::new(sql);
175 let statements = parser.parse().map_err(|e| format!("Parse error: {}", e))?;
176
177 if statements.is_empty() {
178 return Err("No statements to execute".to_string());
179 }
180
181 let stmt = &statements[0];
182
183 match &stmt.kind {
184 StatementKind::Query(query) => self.execute_query(query),
185 StatementKind::Insert(insert) => {
186 let table_name = insert
187 .table
188 .parts
189 .last()
190 .map(|i| i.value.clone())
191 .unwrap_or_default();
192
193 if let vibesql::InsertSource::Values(rows) = &insert.source {
194 for row in rows {
195 let values: Vec<String> =
196 row.iter().map(|expr| self.eval_literal(expr)).collect();
197
198 if let Some(table_data) = self.tables.get_mut(&table_name) {
199 table_data.push(values.clone());
200 }
201 }
202 self.save_table(&table_name)?;
203 Ok(QueryResult::new(
204 vec!["result".to_string()],
205 vec![vec!["Inserted".to_string()]],
206 ))
207 } else {
208 Err("Only INSERT VALUES is supported".to_string())
209 }
210 }
211 StatementKind::CreateTable(create) => {
212 let table_name = create
213 .name
214 .parts
215 .last()
216 .map(|i| i.value.clone())
217 .unwrap_or_default();
218
219 let mut builder = TableSchemaBuilder::new(&table_name);
220 for col in &create.columns {
221 builder = builder.column(ColumnSchema::new(&col.name.value, SqlType::Varchar));
222 }
223 let schema = builder.build();
224 self.catalog.add_table(schema);
225
226 self.tables.insert(table_name.clone(), Vec::new());
227 self.save_table(&table_name)?;
228
229 Ok(QueryResult::new(
230 vec!["result".to_string()],
231 vec![vec![format!("Created table {}", table_name)]],
232 ))
233 }
234 _ => Err("Unsupported statement type".to_string()),
235 }
236 }
237
238 fn execute_query(&self, query: &Query) -> Result<QueryResult, String> {
240 let mut analyzer = Analyzer::with_catalog(self.catalog.clone());
241 let analyzed = analyzer
242 .analyze_query_result(query)
243 .map_err(|e| format!("Analysis error: {}", e))?;
244
245 match &query.body {
246 QueryBody::Select(select) => self.execute_select(select, query, &analyzed),
247 _ => Err("Only simple SELECT is supported".to_string()),
248 }
249 }
250
251 fn execute_select(
253 &self,
254 select: &Select,
255 query: &Query,
256 analyzed: &AnalyzedQuery,
257 ) -> Result<QueryResult, String> {
258 let from = select.from.as_ref().ok_or("SELECT requires FROM clause")?;
259
260 if from.tables.is_empty() {
261 return Err("No tables in FROM clause".to_string());
262 }
263
264 let mut exec_rows = self.process_from_clause(&from.tables[0])?;
265
266 if let Some(where_expr) = &select.where_clause {
267 exec_rows.retain(|row| self.eval_where(where_expr, row));
268 }
269
270 let result_rows = if analyzed.has_aggregation {
271 self.execute_aggregation(select, &exec_rows, analyzed)?
272 } else {
273 self.project_columns(select, &exec_rows, analyzed)?
274 };
275
276 let limited_rows = if let Some(limit) = &query.limit {
277 if let Some(count) = &limit.count {
278 let n = self.eval_int(count) as usize;
279 result_rows.into_iter().take(n).collect()
280 } else {
281 result_rows
282 }
283 } else {
284 result_rows
285 };
286
287 Ok(QueryResult::new(
288 analyzed.columns.iter().map(|c| c.name.clone()).collect(),
289 limited_rows,
290 ))
291 }
292
293 fn execute_aggregation(
295 &self,
296 select: &Select,
297 exec_rows: &[ExecutionRow],
298 analyzed: &AnalyzedQuery,
299 ) -> Result<Vec<Row>, String> {
300 let groups = self.group_rows(select, exec_rows);
301
302 let mut result = Vec::new();
303
304 for (_group_key, group_rows) in groups {
305 let mut row = Vec::new();
306
307 for (i, item) in select.projection.iter().enumerate() {
308 match item {
309 SelectItem::Expr { expr, .. } => {
310 let val = self.eval_aggregate_expr(expr, &group_rows);
311 row.push(val);
312 }
313 _ => {
314 if let Some(col) = analyzed.columns.get(i) {
315 if let Some(first_row) = group_rows.first() {
316 let val = first_row.get(&col.name).cloned().unwrap_or_default();
317 row.push(val);
318 } else {
319 row.push(String::new());
320 }
321 }
322 }
323 }
324 }
325
326 result.push(row);
327 }
328
329 if result.is_empty() && select.group_by.is_none() {
330 let mut row = Vec::new();
331 for item in &select.projection {
332 match item {
333 SelectItem::Expr { expr, .. } => {
334 let val = self.eval_aggregate_expr(expr, &[]);
335 row.push(val);
336 }
337 _ => row.push(String::new()),
338 }
339 }
340 result.push(row);
341 }
342
343 Ok(result)
344 }
345
346 fn group_rows(
348 &self,
349 select: &Select,
350 exec_rows: &[ExecutionRow],
351 ) -> Vec<(String, Vec<ExecutionRow>)> {
352 let group_by_exprs: Vec<&vibesql::Expr> = if let Some(group_by) = &select.group_by {
353 group_by
354 .items
355 .iter()
356 .filter_map(|item| {
357 if let vibesql::GroupByItem::Expr(expr) = item {
358 Some(expr.as_ref())
359 } else {
360 None
361 }
362 })
363 .collect()
364 } else {
365 Vec::new()
366 };
367
368 if group_by_exprs.is_empty() {
369 return vec![("".to_string(), exec_rows.to_vec())];
370 }
371
372 let mut groups: HashMap<String, Vec<ExecutionRow>> = HashMap::new();
373
374 for row in exec_rows {
375 let key: Vec<String> = group_by_exprs
376 .iter()
377 .map(|expr| self.eval_expr_row(expr, row))
378 .collect();
379 let key_str = key.join("|");
380
381 groups
382 .entry(key_str)
383 .or_insert_with(Vec::new)
384 .push(row.clone());
385 }
386
387 groups.into_iter().collect()
388 }
389
390 fn eval_aggregate_expr(&self, expr: &vibesql::Expr, rows: &[ExecutionRow]) -> String {
392 match &expr.kind {
393 ExprKind::Aggregate(agg) => {
394 let func_name = agg
395 .function
396 .name
397 .parts
398 .last()
399 .map(|i| i.value.to_uppercase())
400 .unwrap_or_default();
401
402 let arg_values: Vec<String> = if agg.function.args.is_empty() {
403 vec![]
404 } else {
405 rows.iter()
406 .filter_map(|row| {
407 if let Some(vibesql::FunctionArg::Unnamed(arg_expr)) =
408 agg.function.args.first()
409 {
410 Some(self.eval_expr_row(arg_expr, row))
411 } else {
412 None
413 }
414 })
415 .collect()
416 };
417
418 self.compute_aggregate(&func_name, &arg_values, rows.len())
419 }
420 ExprKind::Function(func) => {
421 let func_name = func
422 .name
423 .parts
424 .last()
425 .map(|i| i.value.to_uppercase())
426 .unwrap_or_default();
427
428 if matches!(func_name.as_str(), "COUNT" | "SUM" | "AVG" | "MIN" | "MAX") {
429 let arg_values: Vec<String> = rows
430 .iter()
431 .filter_map(|row| {
432 if let Some(vibesql::FunctionArg::Unnamed(arg_expr)) = func.args.first()
433 {
434 Some(self.eval_expr_row(arg_expr, row))
435 } else {
436 None
437 }
438 })
439 .collect();
440
441 self.compute_aggregate(&func_name, &arg_values, rows.len())
442 } else {
443 rows.first()
444 .map(|row| self.eval_expr_row(expr, row))
445 .unwrap_or_default()
446 }
447 }
448 ExprKind::Identifier(_) | ExprKind::CompoundIdentifier(_) => rows
449 .first()
450 .map(|row| self.eval_expr_row(expr, row))
451 .unwrap_or_default(),
452 ExprKind::BinaryOp { op, left, right } => {
453 let left_val = self.eval_aggregate_expr(left, rows);
454 let right_val = self.eval_aggregate_expr(right, rows);
455
456 if let (Ok(l), Ok(r)) = (left_val.parse::<f64>(), right_val.parse::<f64>()) {
458 let result = match op {
459 vibesql::BinaryOp::Plus => l + r,
460 vibesql::BinaryOp::Minus => l - r,
461 vibesql::BinaryOp::Multiply => l * r,
462 vibesql::BinaryOp::Divide => {
463 if r != 0.0 {
464 l / r
465 } else {
466 return "NULL".to_string();
467 }
468 }
469 vibesql::BinaryOp::Modulo => {
470 if r != 0.0 {
471 l % r
472 } else {
473 return "NULL".to_string();
474 }
475 }
476 _ => return format!("{} {} {}", left_val, op, right_val),
477 };
478 if result.fract() == 0.0 {
479 (result as i64).to_string()
480 } else {
481 format!("{:.2}", result)
482 }
483 } else {
484 format!("{}{}{}", left_val, right_val, "")
486 }
487 }
488 ExprKind::Integer(n) => n.to_string(),
489 ExprKind::Float(f) => f.to_string(),
490 _ => rows
491 .first()
492 .map(|row| self.eval_expr_row(expr, row))
493 .unwrap_or_default(),
494 }
495 }
496
497 fn compute_aggregate(&self, func_name: &str, values: &[String], row_count: usize) -> String {
499 match func_name {
500 "COUNT" => {
501 if values.is_empty() {
502 row_count.to_string()
503 } else {
504 values.iter().filter(|v| !v.is_empty()).count().to_string()
505 }
506 }
507 "SUM" => {
508 let sum: f64 = values.iter().filter_map(|v| v.parse::<f64>().ok()).sum();
509 if sum.fract() == 0.0 {
510 (sum as i64).to_string()
511 } else {
512 format!("{:.2}", sum)
513 }
514 }
515 "AVG" => {
516 let nums: Vec<f64> = values
517 .iter()
518 .filter_map(|v| v.parse::<f64>().ok())
519 .collect();
520 if nums.is_empty() {
521 "NULL".to_string()
522 } else {
523 let avg = nums.iter().sum::<f64>() / nums.len() as f64;
524 format!("{:.2}", avg)
525 }
526 }
527 "MIN" => {
528 let nums: Vec<f64> = values
529 .iter()
530 .filter_map(|v| v.parse::<f64>().ok())
531 .collect();
532 if !nums.is_empty() {
533 let min = nums.iter().cloned().fold(f64::INFINITY, f64::min);
534 if min.fract() == 0.0 {
535 (min as i64).to_string()
536 } else {
537 format!("{:.2}", min)
538 }
539 } else {
540 values
541 .iter()
542 .filter(|v| !v.is_empty())
543 .min()
544 .cloned()
545 .unwrap_or_else(|| "NULL".to_string())
546 }
547 }
548 "MAX" => {
549 let nums: Vec<f64> = values
550 .iter()
551 .filter_map(|v| v.parse::<f64>().ok())
552 .collect();
553 if !nums.is_empty() {
554 let max = nums.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
555 if max.fract() == 0.0 {
556 (max as i64).to_string()
557 } else {
558 format!("{:.2}", max)
559 }
560 } else {
561 values
562 .iter()
563 .filter(|v| !v.is_empty())
564 .max()
565 .cloned()
566 .unwrap_or_else(|| "NULL".to_string())
567 }
568 }
569 _ => "NULL".to_string(),
570 }
571 }
572
573 fn process_from_clause(&self, table_ref: &TableRef) -> Result<Vec<ExecutionRow>, String> {
575 match table_ref {
576 TableRef::Table { name, alias, .. } => {
577 let table_name = name
578 .parts
579 .iter()
580 .map(|i| i.value.clone())
581 .collect::<Vec<_>>()
582 .join(".");
583
584 let table_alias =
585 alias
586 .as_ref()
587 .map(|a| a.name.value.clone())
588 .unwrap_or_else(|| {
589 name.parts
590 .last()
591 .map(|i| i.value.clone())
592 .unwrap_or_default()
593 });
594
595 let table_info = self.get_table_info(&table_name, &table_alias)?;
596
597 let mut exec_rows = Vec::new();
598 for row in &table_info.rows {
599 let mut exec_row = ExecutionRow::new();
600 exec_row.add_table(&table_info.alias, &table_info.columns, row);
601 exec_rows.push(exec_row);
602 }
603 Ok(exec_rows)
604 }
605 TableRef::Join {
606 left,
607 right,
608 join_type,
609 condition,
610 } => {
611 let left_rows = self.process_from_clause(left)?;
612 let right_rows = self.process_from_clause(right)?;
613
614 self.execute_join(&left_rows, &right_rows, join_type, condition)
615 }
616 _ => Err("Unsupported table reference type".to_string()),
617 }
618 }
619
620 fn get_table_info(&self, table_name: &str, alias: &str) -> Result<TableInfo, String> {
622 let normalized_name = table_name.to_lowercase();
623
624 if normalized_name == "information_schema.tables" {
625 let columns = vec![
626 "table_catalog".to_string(),
627 "table_schema".to_string(),
628 "table_name".to_string(),
629 "table_type".to_string(),
630 ];
631 return Ok(TableInfo {
632 alias: alias.to_string(),
633 columns,
634 rows: self.get_information_schema_tables(),
635 });
636 } else if normalized_name == "information_schema.columns" {
637 let columns = vec![
638 "table_catalog".to_string(),
639 "table_schema".to_string(),
640 "table_name".to_string(),
641 "column_name".to_string(),
642 "ordinal_position".to_string(),
643 "data_type".to_string(),
644 "is_nullable".to_string(),
645 ];
646 return Ok(TableInfo {
647 alias: alias.to_string(),
648 columns,
649 rows: self.get_information_schema_columns(),
650 });
651 }
652
653 let schema = self
654 .catalog
655 .resolve_table(&[table_name.to_string()])
656 .map_err(|e| e.to_string())?
657 .ok_or_else(|| format!("Table '{}' not found", table_name))?;
658
659 let rows = self
660 .tables
661 .get(table_name)
662 .cloned()
663 .ok_or_else(|| format!("Data for '{}' not found", table_name))?;
664
665 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
666
667 Ok(TableInfo {
668 alias: alias.to_string(),
669 columns,
670 rows,
671 })
672 }
673
674 fn execute_join(
676 &self,
677 left_rows: &[ExecutionRow],
678 right_rows: &[ExecutionRow],
679 join_type: &JoinType,
680 condition: &Option<JoinCondition>,
681 ) -> Result<Vec<ExecutionRow>, String> {
682 let mut result = Vec::new();
683
684 match join_type {
685 JoinType::Inner | JoinType::Natural => {
686 for left in left_rows {
687 for right in right_rows {
688 let combined = self.combine_rows(left, right);
689 if self.check_join_condition(&combined, condition) {
690 result.push(combined);
691 }
692 }
693 }
694 }
695 JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => {
696 for left in left_rows {
697 let mut found_match = false;
698 for right in right_rows {
699 let combined = self.combine_rows(left, right);
700 if self.check_join_condition(&combined, condition) {
701 result.push(combined);
702 found_match = true;
703 }
704 }
705 if !found_match {
706 let combined = self.combine_rows_with_nulls(left, right_rows.first());
707 result.push(combined);
708 }
709 }
710 }
711 JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => {
712 for right in right_rows {
713 let mut found_match = false;
714 for left in left_rows {
715 let combined = self.combine_rows(left, right);
716 if self.check_join_condition(&combined, condition) {
717 result.push(combined);
718 found_match = true;
719 }
720 }
721 if !found_match {
722 let combined = self.combine_rows_with_nulls_left(left_rows.first(), right);
723 result.push(combined);
724 }
725 }
726 }
727 JoinType::Full => {
728 let mut right_matched: Vec<bool> = vec![false; right_rows.len()];
729
730 for left in left_rows {
731 let mut found_match = false;
732 for (i, right) in right_rows.iter().enumerate() {
733 let combined = self.combine_rows(left, right);
734 if self.check_join_condition(&combined, condition) {
735 result.push(combined);
736 found_match = true;
737 right_matched[i] = true;
738 }
739 }
740 if !found_match {
741 let combined = self.combine_rows_with_nulls(left, right_rows.first());
742 result.push(combined);
743 }
744 }
745
746 for (i, right) in right_rows.iter().enumerate() {
747 if !right_matched[i] {
748 let combined = self.combine_rows_with_nulls_left(left_rows.first(), right);
749 result.push(combined);
750 }
751 }
752 }
753 JoinType::Cross => {
754 for left in left_rows {
755 for right in right_rows {
756 result.push(self.combine_rows(left, right));
757 }
758 }
759 }
760 }
761
762 Ok(result)
763 }
764
765 fn combine_rows(&self, left: &ExecutionRow, right: &ExecutionRow) -> ExecutionRow {
767 let mut combined = ExecutionRow::new();
768 combined.values = left.values.clone();
769 combined.col_map = left.col_map.clone();
770
771 let offset = combined.values.len();
772 for (key, &idx) in &right.col_map {
773 combined.col_map.insert(key.clone(), idx + offset);
774 }
775 combined.values.extend(right.values.iter().cloned());
776
777 combined
778 }
779
780 fn combine_rows_with_nulls(
782 &self,
783 left: &ExecutionRow,
784 right_template: Option<&ExecutionRow>,
785 ) -> ExecutionRow {
786 let mut combined = ExecutionRow::new();
787 combined.values = left.values.clone();
788 combined.col_map = left.col_map.clone();
789
790 if let Some(right) = right_template {
791 let offset = combined.values.len();
792 for (key, &idx) in &right.col_map {
793 combined.col_map.insert(key.clone(), idx + offset);
794 }
795 combined
796 .values
797 .extend(std::iter::repeat(String::new()).take(right.values.len()));
798 }
799
800 combined
801 }
802
803 fn combine_rows_with_nulls_left(
805 &self,
806 left_template: Option<&ExecutionRow>,
807 right: &ExecutionRow,
808 ) -> ExecutionRow {
809 let mut combined = ExecutionRow::new();
810
811 if let Some(left) = left_template {
812 combined.col_map = left.col_map.clone();
813 combined
814 .values
815 .extend(std::iter::repeat(String::new()).take(left.values.len()));
816 }
817
818 let offset = combined.values.len();
819 for (key, &idx) in &right.col_map {
820 combined.col_map.insert(key.clone(), idx + offset);
821 }
822 combined.values.extend(right.values.iter().cloned());
823
824 combined
825 }
826
827 fn check_join_condition(&self, row: &ExecutionRow, condition: &Option<JoinCondition>) -> bool {
829 match condition {
830 Some(JoinCondition::On(expr)) => self.eval_where(expr, row),
831 Some(JoinCondition::Using(_)) => true,
832 None => true,
833 }
834 }
835
836 fn project_columns(
838 &self,
839 select: &Select,
840 exec_rows: &[ExecutionRow],
841 analyzed: &AnalyzedQuery,
842 ) -> Result<Vec<Row>, String> {
843 let mut result = Vec::new();
844
845 for exec_row in exec_rows {
846 let mut row = Vec::new();
847
848 for (i, item) in select.projection.iter().enumerate() {
849 match item {
850 SelectItem::Wildcard => {
851 row.extend(exec_row.values.iter().cloned());
852 }
853 SelectItem::QualifiedWildcard { qualifier } => {
854 let table_prefix = qualifier
855 .parts
856 .last()
857 .map(|i| i.value.to_lowercase())
858 .unwrap_or_default();
859
860 for (key, &idx) in &exec_row.col_map {
861 if key.starts_with(&format!("{}.", table_prefix)) {
862 if let Some(val) = exec_row.values.get(idx) {
863 row.push(val.clone());
864 }
865 }
866 }
867 }
868 SelectItem::Expr { expr, .. } => {
869 let val = self.eval_expr_row(expr, exec_row);
870 row.push(val);
871 }
872 _ => {
873 if let Some(col) = analyzed.columns.get(i) {
874 let val = exec_row.get(&col.name).cloned().unwrap_or_default();
875 row.push(val);
876 }
877 }
878 }
879 }
880
881 result.push(row);
882 }
883
884 Ok(result)
885 }
886
887 fn eval_where(&self, expr: &vibesql::Expr, row: &ExecutionRow) -> bool {
889 match &expr.kind {
890 ExprKind::BinaryOp { op, left, right } => match op {
891 vibesql::BinaryOp::And => self.eval_where(left, row) && self.eval_where(right, row),
892 vibesql::BinaryOp::Or => self.eval_where(left, row) || self.eval_where(right, row),
893 _ => {
894 let left_val = self.eval_expr_row(left, row);
895 let right_val = self.eval_expr_row(right, row);
896
897 match op {
898 vibesql::BinaryOp::Eq => left_val == right_val,
899 vibesql::BinaryOp::NotEq => left_val != right_val,
900 vibesql::BinaryOp::Lt => {
901 self.compare_values(&left_val, &right_val) == std::cmp::Ordering::Less
902 }
903 vibesql::BinaryOp::LtEq => {
904 self.compare_values(&left_val, &right_val)
905 != std::cmp::Ordering::Greater
906 }
907 vibesql::BinaryOp::Gt => {
908 self.compare_values(&left_val, &right_val)
909 == std::cmp::Ordering::Greater
910 }
911 vibesql::BinaryOp::GtEq => {
912 self.compare_values(&left_val, &right_val) != std::cmp::Ordering::Less
913 }
914 _ => false,
915 }
916 }
917 },
918 ExprKind::Boolean(b) => *b,
919 _ => true,
920 }
921 }
922
923 fn eval_expr_row(&self, expr: &vibesql::Expr, row: &ExecutionRow) -> String {
925 match &expr.kind {
926 ExprKind::Identifier(ident) => row.get(&ident.value).cloned().unwrap_or_default(),
927 ExprKind::CompoundIdentifier(parts) => {
928 if parts.len() >= 2 {
929 let key = format!(
930 "{}.{}",
931 parts[parts.len() - 2].value,
932 parts[parts.len() - 1].value
933 );
934 row.get(&key).cloned().unwrap_or_default()
935 } else if parts.len() == 1 {
936 row.get(&parts[0].value).cloned().unwrap_or_default()
937 } else {
938 String::new()
939 }
940 }
941 ExprKind::String(s) => s.clone(),
942 ExprKind::Integer(n) => n.to_string(),
943 ExprKind::Float(f) => f.to_string(),
944 ExprKind::Boolean(b) => b.to_string(),
945 ExprKind::Null => String::new(),
946 _ => String::new(),
947 }
948 }
949
950 fn compare_values(&self, a: &str, b: &str) -> std::cmp::Ordering {
952 if let (Ok(a_num), Ok(b_num)) = (a.parse::<f64>(), b.parse::<f64>()) {
953 a_num
954 .partial_cmp(&b_num)
955 .unwrap_or(std::cmp::Ordering::Equal)
956 } else {
957 a.cmp(b)
958 }
959 }
960
961 fn eval_literal(&self, expr: &vibesql::Expr) -> String {
963 match &expr.kind {
964 ExprKind::String(s) => s.clone(),
965 ExprKind::Integer(n) => n.to_string(),
966 ExprKind::Float(f) => f.to_string(),
967 ExprKind::Boolean(b) => b.to_string(),
968 ExprKind::Null => String::new(),
969 _ => String::new(),
970 }
971 }
972
973 fn eval_int(&self, expr: &vibesql::Expr) -> i64 {
975 match &expr.kind {
976 ExprKind::Integer(n) => *n,
977 _ => 0,
978 }
979 }
980
981 fn save_table(&self, table_name: &str) -> Result<(), String> {
983 let schema = self
984 .catalog
985 .resolve_table(&[table_name.to_string()])
986 .map_err(|e| e.to_string())?
987 .ok_or_else(|| format!("Schema for '{}' not found", table_name))?;
988
989 let path = self.data_dir.join(format!("{}.csv", table_name));
990 let mut file = File::create(&path).map_err(|e| e.to_string())?;
991
992 let header: Vec<&str> = schema.columns.iter().map(|c| c.name.as_str()).collect();
993 writeln!(file, "{}", header.join(",")).map_err(|e| e.to_string())?;
994
995 if let Some(rows) = self.tables.get(table_name) {
996 for row in rows {
997 writeln!(file, "{}", row.join(",")).map_err(|e| e.to_string())?;
998 }
999 }
1000
1001 Ok(())
1002 }
1003}