csv_database/
database.rs

1//! CSV database implementation.
2
3use std::collections::HashMap;
4use std::fs::File;
5use std::io::{BufRead, BufReader, Write};
6use std::path::{Path, PathBuf};
7
8use vibesql::{
9    AnalyzedQuery, Analyzer, Catalog, ColumnSchema, ExprKind, JoinCondition, JoinType,
10    MemoryCatalog, Parser, Query, QueryBody, Select, SelectItem, SqlType, StatementKind, TableRef,
11    TableSchemaBuilder,
12};
13
14use crate::execution::{ExecutionRow, TableInfo};
15use crate::result::{QueryResult, Row};
16
17/// A simple CSV database.
18pub struct CsvDatabase {
19    /// Directory containing CSV files.
20    data_dir: PathBuf,
21    /// Catalog with table schemas.
22    catalog: MemoryCatalog,
23    /// Cached table data.
24    tables: HashMap<String, Vec<Row>>,
25}
26
27impl CsvDatabase {
28    /// Create a new CSV database from a directory.
29    pub fn new(data_dir: impl AsRef<Path>) -> std::io::Result<Self> {
30        let data_dir = data_dir.as_ref().to_path_buf();
31        let mut db = Self {
32            data_dir,
33            catalog: MemoryCatalog::new(),
34            tables: HashMap::new(),
35        };
36        db.catalog.register_builtins();
37        db.discover_tables()?;
38        db.register_information_schema();
39        Ok(db)
40    }
41
42    /// Check if a table exists.
43    pub fn has_table(&self, name: &str) -> bool {
44        self.tables.contains_key(name)
45    }
46
47    /// Register information_schema virtual tables.
48    fn register_information_schema(&mut self) {
49        // Create the information_schema schema
50        let schema = self.catalog.add_schema("information_schema");
51
52        // Register TABLES table
53        let tables_schema = TableSchemaBuilder::new("tables")
54            .column(ColumnSchema::new("table_catalog", SqlType::Varchar))
55            .column(ColumnSchema::new("table_schema", SqlType::Varchar))
56            .column(ColumnSchema::new("table_name", SqlType::Varchar))
57            .column(ColumnSchema::new("table_type", SqlType::Varchar))
58            .build();
59        schema.tables.insert("tables".to_string(), tables_schema);
60
61        // Register COLUMNS table
62        let columns_schema = TableSchemaBuilder::new("columns")
63            .column(ColumnSchema::new("table_catalog", SqlType::Varchar))
64            .column(ColumnSchema::new("table_schema", SqlType::Varchar))
65            .column(ColumnSchema::new("table_name", SqlType::Varchar))
66            .column(ColumnSchema::new("column_name", SqlType::Varchar))
67            .column(ColumnSchema::new("ordinal_position", SqlType::Varchar))
68            .column(ColumnSchema::new("data_type", SqlType::Varchar))
69            .column(ColumnSchema::new("is_nullable", SqlType::Varchar))
70            .build();
71        schema.tables.insert("columns".to_string(), columns_schema);
72    }
73
74    /// Generate information_schema.tables data dynamically.
75    fn get_information_schema_tables(&self) -> Vec<Row> {
76        let mut rows = Vec::new();
77        for table_name in self.tables.keys() {
78            if table_name.starts_with("information_schema.") {
79                continue;
80            }
81            rows.push(vec![
82                "csvdb".to_string(),
83                "default".to_string(),
84                table_name.clone(),
85                "BASE TABLE".to_string(),
86            ]);
87        }
88        rows.sort_by(|a, b| a[2].cmp(&b[2]));
89        rows
90    }
91
92    /// Generate information_schema.columns data dynamically.
93    fn get_information_schema_columns(&self) -> Vec<Row> {
94        let mut rows = Vec::new();
95        for table_name in self.tables.keys() {
96            if table_name.starts_with("information_schema.") {
97                continue;
98            }
99            if let Ok(Some(schema)) = self.catalog.resolve_table(&[table_name.clone()]) {
100                for (i, col) in schema.columns.iter().enumerate() {
101                    rows.push(vec![
102                        "csvdb".to_string(),
103                        "default".to_string(),
104                        table_name.clone(),
105                        col.name.clone(),
106                        (i + 1).to_string(),
107                        format!("{:?}", col.data_type),
108                        if col.nullable { "YES" } else { "NO" }.to_string(),
109                    ]);
110                }
111            }
112        }
113        rows.sort_by(|a, b| (&a[2], &a[4]).cmp(&(&b[2], &b[4])));
114        rows
115    }
116
117    /// Discover CSV files and register them as tables.
118    fn discover_tables(&mut self) -> std::io::Result<()> {
119        if !self.data_dir.exists() {
120            std::fs::create_dir_all(&self.data_dir)?;
121        }
122
123        for entry in std::fs::read_dir(&self.data_dir)? {
124            let entry = entry?;
125            let path = entry.path();
126            if path.extension().map(|e| e == "csv").unwrap_or(false) {
127                self.load_table(&path)?;
128            }
129        }
130        Ok(())
131    }
132
133    /// Load a CSV file as a table.
134    fn load_table(&mut self, path: &Path) -> std::io::Result<()> {
135        let table_name = path
136            .file_stem()
137            .and_then(|s| s.to_str())
138            .unwrap_or("unknown")
139            .to_string();
140
141        let file = File::open(path)?;
142        let reader = BufReader::new(file);
143        let mut lines = reader.lines();
144
145        let header = match lines.next() {
146            Some(Ok(h)) => h,
147            _ => return Ok(()),
148        };
149
150        let columns: Vec<&str> = header.split(',').map(|s| s.trim()).collect();
151
152        let mut builder = TableSchemaBuilder::new(&table_name);
153        for col_name in &columns {
154            builder = builder.column(ColumnSchema::new(*col_name, SqlType::Varchar));
155        }
156        let schema = builder.build();
157        self.catalog.add_table(schema);
158
159        let mut rows = Vec::new();
160        for line in lines {
161            let line = line?;
162            let values: Vec<String> = line.split(',').map(|s| s.trim().to_string()).collect();
163            if values.len() == columns.len() {
164                rows.push(values);
165            }
166        }
167
168        self.tables.insert(table_name, rows);
169        Ok(())
170    }
171
172    /// Execute a SQL query and return results.
173    pub fn execute(&mut self, sql: &str) -> Result<QueryResult, String> {
174        let mut parser = Parser::new(sql);
175        let statements = parser.parse().map_err(|e| format!("Parse error: {}", e))?;
176
177        if statements.is_empty() {
178            return Err("No statements to execute".to_string());
179        }
180
181        let stmt = &statements[0];
182
183        match &stmt.kind {
184            StatementKind::Query(query) => self.execute_query(query),
185            StatementKind::Insert(insert) => {
186                let table_name = insert
187                    .table
188                    .parts
189                    .last()
190                    .map(|i| i.value.clone())
191                    .unwrap_or_default();
192
193                if let vibesql::InsertSource::Values(rows) = &insert.source {
194                    for row in rows {
195                        let values: Vec<String> =
196                            row.iter().map(|expr| self.eval_literal(expr)).collect();
197
198                        if let Some(table_data) = self.tables.get_mut(&table_name) {
199                            table_data.push(values.clone());
200                        }
201                    }
202                    self.save_table(&table_name)?;
203                    Ok(QueryResult::new(
204                        vec!["result".to_string()],
205                        vec![vec!["Inserted".to_string()]],
206                    ))
207                } else {
208                    Err("Only INSERT VALUES is supported".to_string())
209                }
210            }
211            StatementKind::CreateTable(create) => {
212                let table_name = create
213                    .name
214                    .parts
215                    .last()
216                    .map(|i| i.value.clone())
217                    .unwrap_or_default();
218
219                let mut builder = TableSchemaBuilder::new(&table_name);
220                for col in &create.columns {
221                    builder = builder.column(ColumnSchema::new(&col.name.value, SqlType::Varchar));
222                }
223                let schema = builder.build();
224                self.catalog.add_table(schema);
225
226                self.tables.insert(table_name.clone(), Vec::new());
227                self.save_table(&table_name)?;
228
229                Ok(QueryResult::new(
230                    vec!["result".to_string()],
231                    vec![vec![format!("Created table {}", table_name)]],
232                ))
233            }
234            _ => Err("Unsupported statement type".to_string()),
235        }
236    }
237
238    /// Execute a SELECT query.
239    fn execute_query(&self, query: &Query) -> Result<QueryResult, String> {
240        let mut analyzer = Analyzer::with_catalog(self.catalog.clone());
241        let analyzed = analyzer
242            .analyze_query_result(query)
243            .map_err(|e| format!("Analysis error: {}", e))?;
244
245        match &query.body {
246            QueryBody::Select(select) => self.execute_select(select, query, &analyzed),
247            _ => Err("Only simple SELECT is supported".to_string()),
248        }
249    }
250
251    /// Execute a SELECT statement.
252    fn execute_select(
253        &self,
254        select: &Select,
255        query: &Query,
256        analyzed: &AnalyzedQuery,
257    ) -> Result<QueryResult, String> {
258        let from = select.from.as_ref().ok_or("SELECT requires FROM clause")?;
259
260        if from.tables.is_empty() {
261            return Err("No tables in FROM clause".to_string());
262        }
263
264        let mut exec_rows = self.process_from_clause(&from.tables[0])?;
265
266        if let Some(where_expr) = &select.where_clause {
267            exec_rows.retain(|row| self.eval_where(where_expr, row));
268        }
269
270        let result_rows = if analyzed.has_aggregation {
271            self.execute_aggregation(select, &exec_rows, analyzed)?
272        } else {
273            self.project_columns(select, &exec_rows, analyzed)?
274        };
275
276        let limited_rows = if let Some(limit) = &query.limit {
277            if let Some(count) = &limit.count {
278                let n = self.eval_int(count) as usize;
279                result_rows.into_iter().take(n).collect()
280            } else {
281                result_rows
282            }
283        } else {
284            result_rows
285        };
286
287        Ok(QueryResult::new(
288            analyzed.columns.iter().map(|c| c.name.clone()).collect(),
289            limited_rows,
290        ))
291    }
292
293    /// Execute aggregation query.
294    fn execute_aggregation(
295        &self,
296        select: &Select,
297        exec_rows: &[ExecutionRow],
298        analyzed: &AnalyzedQuery,
299    ) -> Result<Vec<Row>, String> {
300        let groups = self.group_rows(select, exec_rows);
301
302        let mut result = Vec::new();
303
304        for (_group_key, group_rows) in groups {
305            let mut row = Vec::new();
306
307            for (i, item) in select.projection.iter().enumerate() {
308                match item {
309                    SelectItem::Expr { expr, .. } => {
310                        let val = self.eval_aggregate_expr(expr, &group_rows);
311                        row.push(val);
312                    }
313                    _ => {
314                        if let Some(col) = analyzed.columns.get(i) {
315                            if let Some(first_row) = group_rows.first() {
316                                let val = first_row.get(&col.name).cloned().unwrap_or_default();
317                                row.push(val);
318                            } else {
319                                row.push(String::new());
320                            }
321                        }
322                    }
323                }
324            }
325
326            result.push(row);
327        }
328
329        if result.is_empty() && select.group_by.is_none() {
330            let mut row = Vec::new();
331            for item in &select.projection {
332                match item {
333                    SelectItem::Expr { expr, .. } => {
334                        let val = self.eval_aggregate_expr(expr, &[]);
335                        row.push(val);
336                    }
337                    _ => row.push(String::new()),
338                }
339            }
340            result.push(row);
341        }
342
343        Ok(result)
344    }
345
346    /// Group rows by GROUP BY columns.
347    fn group_rows(
348        &self,
349        select: &Select,
350        exec_rows: &[ExecutionRow],
351    ) -> Vec<(String, Vec<ExecutionRow>)> {
352        let group_by_exprs: Vec<&vibesql::Expr> = if let Some(group_by) = &select.group_by {
353            group_by
354                .items
355                .iter()
356                .filter_map(|item| {
357                    if let vibesql::GroupByItem::Expr(expr) = item {
358                        Some(expr.as_ref())
359                    } else {
360                        None
361                    }
362                })
363                .collect()
364        } else {
365            Vec::new()
366        };
367
368        if group_by_exprs.is_empty() {
369            return vec![("".to_string(), exec_rows.to_vec())];
370        }
371
372        let mut groups: HashMap<String, Vec<ExecutionRow>> = HashMap::new();
373
374        for row in exec_rows {
375            let key: Vec<String> = group_by_exprs
376                .iter()
377                .map(|expr| self.eval_expr_row(expr, row))
378                .collect();
379            let key_str = key.join("|");
380
381            groups
382                .entry(key_str)
383                .or_insert_with(Vec::new)
384                .push(row.clone());
385        }
386
387        groups.into_iter().collect()
388    }
389
390    /// Evaluate an expression that may contain aggregates.
391    fn eval_aggregate_expr(&self, expr: &vibesql::Expr, rows: &[ExecutionRow]) -> String {
392        match &expr.kind {
393            ExprKind::Aggregate(agg) => {
394                let func_name = agg
395                    .function
396                    .name
397                    .parts
398                    .last()
399                    .map(|i| i.value.to_uppercase())
400                    .unwrap_or_default();
401
402                let arg_values: Vec<String> = if agg.function.args.is_empty() {
403                    vec![]
404                } else {
405                    rows.iter()
406                        .filter_map(|row| {
407                            if let Some(vibesql::FunctionArg::Unnamed(arg_expr)) =
408                                agg.function.args.first()
409                            {
410                                Some(self.eval_expr_row(arg_expr, row))
411                            } else {
412                                None
413                            }
414                        })
415                        .collect()
416                };
417
418                self.compute_aggregate(&func_name, &arg_values, rows.len())
419            }
420            ExprKind::Function(func) => {
421                let func_name = func
422                    .name
423                    .parts
424                    .last()
425                    .map(|i| i.value.to_uppercase())
426                    .unwrap_or_default();
427
428                if matches!(func_name.as_str(), "COUNT" | "SUM" | "AVG" | "MIN" | "MAX") {
429                    let arg_values: Vec<String> = rows
430                        .iter()
431                        .filter_map(|row| {
432                            if let Some(vibesql::FunctionArg::Unnamed(arg_expr)) = func.args.first()
433                            {
434                                Some(self.eval_expr_row(arg_expr, row))
435                            } else {
436                                None
437                            }
438                        })
439                        .collect();
440
441                    self.compute_aggregate(&func_name, &arg_values, rows.len())
442                } else {
443                    rows.first()
444                        .map(|row| self.eval_expr_row(expr, row))
445                        .unwrap_or_default()
446                }
447            }
448            ExprKind::Identifier(_) | ExprKind::CompoundIdentifier(_) => rows
449                .first()
450                .map(|row| self.eval_expr_row(expr, row))
451                .unwrap_or_default(),
452            ExprKind::BinaryOp { op, left, right } => {
453                let left_val = self.eval_aggregate_expr(left, rows);
454                let right_val = self.eval_aggregate_expr(right, rows);
455
456                // Try numeric arithmetic
457                if let (Ok(l), Ok(r)) = (left_val.parse::<f64>(), right_val.parse::<f64>()) {
458                    let result = match op {
459                        vibesql::BinaryOp::Plus => l + r,
460                        vibesql::BinaryOp::Minus => l - r,
461                        vibesql::BinaryOp::Multiply => l * r,
462                        vibesql::BinaryOp::Divide => {
463                            if r != 0.0 {
464                                l / r
465                            } else {
466                                return "NULL".to_string();
467                            }
468                        }
469                        vibesql::BinaryOp::Modulo => {
470                            if r != 0.0 {
471                                l % r
472                            } else {
473                                return "NULL".to_string();
474                            }
475                        }
476                        _ => return format!("{} {} {}", left_val, op, right_val),
477                    };
478                    if result.fract() == 0.0 {
479                        (result as i64).to_string()
480                    } else {
481                        format!("{:.2}", result)
482                    }
483                } else {
484                    // Non-numeric, return concatenated for debugging
485                    format!("{}{}{}", left_val, right_val, "")
486                }
487            }
488            ExprKind::Integer(n) => n.to_string(),
489            ExprKind::Float(f) => f.to_string(),
490            _ => rows
491                .first()
492                .map(|row| self.eval_expr_row(expr, row))
493                .unwrap_or_default(),
494        }
495    }
496
497    /// Compute an aggregate function.
498    fn compute_aggregate(&self, func_name: &str, values: &[String], row_count: usize) -> String {
499        match func_name {
500            "COUNT" => {
501                if values.is_empty() {
502                    row_count.to_string()
503                } else {
504                    values.iter().filter(|v| !v.is_empty()).count().to_string()
505                }
506            }
507            "SUM" => {
508                let sum: f64 = values.iter().filter_map(|v| v.parse::<f64>().ok()).sum();
509                if sum.fract() == 0.0 {
510                    (sum as i64).to_string()
511                } else {
512                    format!("{:.2}", sum)
513                }
514            }
515            "AVG" => {
516                let nums: Vec<f64> = values
517                    .iter()
518                    .filter_map(|v| v.parse::<f64>().ok())
519                    .collect();
520                if nums.is_empty() {
521                    "NULL".to_string()
522                } else {
523                    let avg = nums.iter().sum::<f64>() / nums.len() as f64;
524                    format!("{:.2}", avg)
525                }
526            }
527            "MIN" => {
528                let nums: Vec<f64> = values
529                    .iter()
530                    .filter_map(|v| v.parse::<f64>().ok())
531                    .collect();
532                if !nums.is_empty() {
533                    let min = nums.iter().cloned().fold(f64::INFINITY, f64::min);
534                    if min.fract() == 0.0 {
535                        (min as i64).to_string()
536                    } else {
537                        format!("{:.2}", min)
538                    }
539                } else {
540                    values
541                        .iter()
542                        .filter(|v| !v.is_empty())
543                        .min()
544                        .cloned()
545                        .unwrap_or_else(|| "NULL".to_string())
546                }
547            }
548            "MAX" => {
549                let nums: Vec<f64> = values
550                    .iter()
551                    .filter_map(|v| v.parse::<f64>().ok())
552                    .collect();
553                if !nums.is_empty() {
554                    let max = nums.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
555                    if max.fract() == 0.0 {
556                        (max as i64).to_string()
557                    } else {
558                        format!("{:.2}", max)
559                    }
560                } else {
561                    values
562                        .iter()
563                        .filter(|v| !v.is_empty())
564                        .max()
565                        .cloned()
566                        .unwrap_or_else(|| "NULL".to_string())
567                }
568            }
569            _ => "NULL".to_string(),
570        }
571    }
572
573    /// Process FROM clause and return execution rows.
574    fn process_from_clause(&self, table_ref: &TableRef) -> Result<Vec<ExecutionRow>, String> {
575        match table_ref {
576            TableRef::Table { name, alias, .. } => {
577                let table_name = name
578                    .parts
579                    .iter()
580                    .map(|i| i.value.clone())
581                    .collect::<Vec<_>>()
582                    .join(".");
583
584                let table_alias =
585                    alias
586                        .as_ref()
587                        .map(|a| a.name.value.clone())
588                        .unwrap_or_else(|| {
589                            name.parts
590                                .last()
591                                .map(|i| i.value.clone())
592                                .unwrap_or_default()
593                        });
594
595                let table_info = self.get_table_info(&table_name, &table_alias)?;
596
597                let mut exec_rows = Vec::new();
598                for row in &table_info.rows {
599                    let mut exec_row = ExecutionRow::new();
600                    exec_row.add_table(&table_info.alias, &table_info.columns, row);
601                    exec_rows.push(exec_row);
602                }
603                Ok(exec_rows)
604            }
605            TableRef::Join {
606                left,
607                right,
608                join_type,
609                condition,
610            } => {
611                let left_rows = self.process_from_clause(left)?;
612                let right_rows = self.process_from_clause(right)?;
613
614                self.execute_join(&left_rows, &right_rows, join_type, condition)
615            }
616            _ => Err("Unsupported table reference type".to_string()),
617        }
618    }
619
620    /// Get table info for execution.
621    fn get_table_info(&self, table_name: &str, alias: &str) -> Result<TableInfo, String> {
622        let normalized_name = table_name.to_lowercase();
623
624        if normalized_name == "information_schema.tables" {
625            let columns = vec![
626                "table_catalog".to_string(),
627                "table_schema".to_string(),
628                "table_name".to_string(),
629                "table_type".to_string(),
630            ];
631            return Ok(TableInfo {
632                alias: alias.to_string(),
633                columns,
634                rows: self.get_information_schema_tables(),
635            });
636        } else if normalized_name == "information_schema.columns" {
637            let columns = vec![
638                "table_catalog".to_string(),
639                "table_schema".to_string(),
640                "table_name".to_string(),
641                "column_name".to_string(),
642                "ordinal_position".to_string(),
643                "data_type".to_string(),
644                "is_nullable".to_string(),
645            ];
646            return Ok(TableInfo {
647                alias: alias.to_string(),
648                columns,
649                rows: self.get_information_schema_columns(),
650            });
651        }
652
653        let schema = self
654            .catalog
655            .resolve_table(&[table_name.to_string()])
656            .map_err(|e| e.to_string())?
657            .ok_or_else(|| format!("Table '{}' not found", table_name))?;
658
659        let rows = self
660            .tables
661            .get(table_name)
662            .cloned()
663            .ok_or_else(|| format!("Data for '{}' not found", table_name))?;
664
665        let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
666
667        Ok(TableInfo {
668            alias: alias.to_string(),
669            columns,
670            rows,
671        })
672    }
673
674    /// Execute a JOIN operation.
675    fn execute_join(
676        &self,
677        left_rows: &[ExecutionRow],
678        right_rows: &[ExecutionRow],
679        join_type: &JoinType,
680        condition: &Option<JoinCondition>,
681    ) -> Result<Vec<ExecutionRow>, String> {
682        let mut result = Vec::new();
683
684        match join_type {
685            JoinType::Inner | JoinType::Natural => {
686                for left in left_rows {
687                    for right in right_rows {
688                        let combined = self.combine_rows(left, right);
689                        if self.check_join_condition(&combined, condition) {
690                            result.push(combined);
691                        }
692                    }
693                }
694            }
695            JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti => {
696                for left in left_rows {
697                    let mut found_match = false;
698                    for right in right_rows {
699                        let combined = self.combine_rows(left, right);
700                        if self.check_join_condition(&combined, condition) {
701                            result.push(combined);
702                            found_match = true;
703                        }
704                    }
705                    if !found_match {
706                        let combined = self.combine_rows_with_nulls(left, right_rows.first());
707                        result.push(combined);
708                    }
709                }
710            }
711            JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => {
712                for right in right_rows {
713                    let mut found_match = false;
714                    for left in left_rows {
715                        let combined = self.combine_rows(left, right);
716                        if self.check_join_condition(&combined, condition) {
717                            result.push(combined);
718                            found_match = true;
719                        }
720                    }
721                    if !found_match {
722                        let combined = self.combine_rows_with_nulls_left(left_rows.first(), right);
723                        result.push(combined);
724                    }
725                }
726            }
727            JoinType::Full => {
728                let mut right_matched: Vec<bool> = vec![false; right_rows.len()];
729
730                for left in left_rows {
731                    let mut found_match = false;
732                    for (i, right) in right_rows.iter().enumerate() {
733                        let combined = self.combine_rows(left, right);
734                        if self.check_join_condition(&combined, condition) {
735                            result.push(combined);
736                            found_match = true;
737                            right_matched[i] = true;
738                        }
739                    }
740                    if !found_match {
741                        let combined = self.combine_rows_with_nulls(left, right_rows.first());
742                        result.push(combined);
743                    }
744                }
745
746                for (i, right) in right_rows.iter().enumerate() {
747                    if !right_matched[i] {
748                        let combined = self.combine_rows_with_nulls_left(left_rows.first(), right);
749                        result.push(combined);
750                    }
751                }
752            }
753            JoinType::Cross => {
754                for left in left_rows {
755                    for right in right_rows {
756                        result.push(self.combine_rows(left, right));
757                    }
758                }
759            }
760        }
761
762        Ok(result)
763    }
764
765    /// Combine two execution rows.
766    fn combine_rows(&self, left: &ExecutionRow, right: &ExecutionRow) -> ExecutionRow {
767        let mut combined = ExecutionRow::new();
768        combined.values = left.values.clone();
769        combined.col_map = left.col_map.clone();
770
771        let offset = combined.values.len();
772        for (key, &idx) in &right.col_map {
773            combined.col_map.insert(key.clone(), idx + offset);
774        }
775        combined.values.extend(right.values.iter().cloned());
776
777        combined
778    }
779
780    /// Combine left row with NULLs for right side.
781    fn combine_rows_with_nulls(
782        &self,
783        left: &ExecutionRow,
784        right_template: Option<&ExecutionRow>,
785    ) -> ExecutionRow {
786        let mut combined = ExecutionRow::new();
787        combined.values = left.values.clone();
788        combined.col_map = left.col_map.clone();
789
790        if let Some(right) = right_template {
791            let offset = combined.values.len();
792            for (key, &idx) in &right.col_map {
793                combined.col_map.insert(key.clone(), idx + offset);
794            }
795            combined
796                .values
797                .extend(std::iter::repeat(String::new()).take(right.values.len()));
798        }
799
800        combined
801    }
802
803    /// Combine NULLs for left side with right row.
804    fn combine_rows_with_nulls_left(
805        &self,
806        left_template: Option<&ExecutionRow>,
807        right: &ExecutionRow,
808    ) -> ExecutionRow {
809        let mut combined = ExecutionRow::new();
810
811        if let Some(left) = left_template {
812            combined.col_map = left.col_map.clone();
813            combined
814                .values
815                .extend(std::iter::repeat(String::new()).take(left.values.len()));
816        }
817
818        let offset = combined.values.len();
819        for (key, &idx) in &right.col_map {
820            combined.col_map.insert(key.clone(), idx + offset);
821        }
822        combined.values.extend(right.values.iter().cloned());
823
824        combined
825    }
826
827    /// Check if a join condition is satisfied.
828    fn check_join_condition(&self, row: &ExecutionRow, condition: &Option<JoinCondition>) -> bool {
829        match condition {
830            Some(JoinCondition::On(expr)) => self.eval_where(expr, row),
831            Some(JoinCondition::Using(_)) => true,
832            None => true,
833        }
834    }
835
836    /// Project columns from execution rows.
837    fn project_columns(
838        &self,
839        select: &Select,
840        exec_rows: &[ExecutionRow],
841        analyzed: &AnalyzedQuery,
842    ) -> Result<Vec<Row>, String> {
843        let mut result = Vec::new();
844
845        for exec_row in exec_rows {
846            let mut row = Vec::new();
847
848            for (i, item) in select.projection.iter().enumerate() {
849                match item {
850                    SelectItem::Wildcard => {
851                        row.extend(exec_row.values.iter().cloned());
852                    }
853                    SelectItem::QualifiedWildcard { qualifier } => {
854                        let table_prefix = qualifier
855                            .parts
856                            .last()
857                            .map(|i| i.value.to_lowercase())
858                            .unwrap_or_default();
859
860                        for (key, &idx) in &exec_row.col_map {
861                            if key.starts_with(&format!("{}.", table_prefix)) {
862                                if let Some(val) = exec_row.values.get(idx) {
863                                    row.push(val.clone());
864                                }
865                            }
866                        }
867                    }
868                    SelectItem::Expr { expr, .. } => {
869                        let val = self.eval_expr_row(expr, exec_row);
870                        row.push(val);
871                    }
872                    _ => {
873                        if let Some(col) = analyzed.columns.get(i) {
874                            let val = exec_row.get(&col.name).cloned().unwrap_or_default();
875                            row.push(val);
876                        }
877                    }
878                }
879            }
880
881            result.push(row);
882        }
883
884        Ok(result)
885    }
886
887    /// Evaluate a WHERE clause expression.
888    fn eval_where(&self, expr: &vibesql::Expr, row: &ExecutionRow) -> bool {
889        match &expr.kind {
890            ExprKind::BinaryOp { op, left, right } => match op {
891                vibesql::BinaryOp::And => self.eval_where(left, row) && self.eval_where(right, row),
892                vibesql::BinaryOp::Or => self.eval_where(left, row) || self.eval_where(right, row),
893                _ => {
894                    let left_val = self.eval_expr_row(left, row);
895                    let right_val = self.eval_expr_row(right, row);
896
897                    match op {
898                        vibesql::BinaryOp::Eq => left_val == right_val,
899                        vibesql::BinaryOp::NotEq => left_val != right_val,
900                        vibesql::BinaryOp::Lt => {
901                            self.compare_values(&left_val, &right_val) == std::cmp::Ordering::Less
902                        }
903                        vibesql::BinaryOp::LtEq => {
904                            self.compare_values(&left_val, &right_val)
905                                != std::cmp::Ordering::Greater
906                        }
907                        vibesql::BinaryOp::Gt => {
908                            self.compare_values(&left_val, &right_val)
909                                == std::cmp::Ordering::Greater
910                        }
911                        vibesql::BinaryOp::GtEq => {
912                            self.compare_values(&left_val, &right_val) != std::cmp::Ordering::Less
913                        }
914                        _ => false,
915                    }
916                }
917            },
918            ExprKind::Boolean(b) => *b,
919            _ => true,
920        }
921    }
922
923    /// Evaluate an expression against an execution row.
924    fn eval_expr_row(&self, expr: &vibesql::Expr, row: &ExecutionRow) -> String {
925        match &expr.kind {
926            ExprKind::Identifier(ident) => row.get(&ident.value).cloned().unwrap_or_default(),
927            ExprKind::CompoundIdentifier(parts) => {
928                if parts.len() >= 2 {
929                    let key = format!(
930                        "{}.{}",
931                        parts[parts.len() - 2].value,
932                        parts[parts.len() - 1].value
933                    );
934                    row.get(&key).cloned().unwrap_or_default()
935                } else if parts.len() == 1 {
936                    row.get(&parts[0].value).cloned().unwrap_or_default()
937                } else {
938                    String::new()
939                }
940            }
941            ExprKind::String(s) => s.clone(),
942            ExprKind::Integer(n) => n.to_string(),
943            ExprKind::Float(f) => f.to_string(),
944            ExprKind::Boolean(b) => b.to_string(),
945            ExprKind::Null => String::new(),
946            _ => String::new(),
947        }
948    }
949
950    /// Compare two string values (try numeric comparison first).
951    fn compare_values(&self, a: &str, b: &str) -> std::cmp::Ordering {
952        if let (Ok(a_num), Ok(b_num)) = (a.parse::<f64>(), b.parse::<f64>()) {
953            a_num
954                .partial_cmp(&b_num)
955                .unwrap_or(std::cmp::Ordering::Equal)
956        } else {
957            a.cmp(b)
958        }
959    }
960
961    /// Evaluate a literal expression.
962    fn eval_literal(&self, expr: &vibesql::Expr) -> String {
963        match &expr.kind {
964            ExprKind::String(s) => s.clone(),
965            ExprKind::Integer(n) => n.to_string(),
966            ExprKind::Float(f) => f.to_string(),
967            ExprKind::Boolean(b) => b.to_string(),
968            ExprKind::Null => String::new(),
969            _ => String::new(),
970        }
971    }
972
973    /// Evaluate an integer expression.
974    fn eval_int(&self, expr: &vibesql::Expr) -> i64 {
975        match &expr.kind {
976            ExprKind::Integer(n) => *n,
977            _ => 0,
978        }
979    }
980
981    /// Save a table to CSV.
982    fn save_table(&self, table_name: &str) -> Result<(), String> {
983        let schema = self
984            .catalog
985            .resolve_table(&[table_name.to_string()])
986            .map_err(|e| e.to_string())?
987            .ok_or_else(|| format!("Schema for '{}' not found", table_name))?;
988
989        let path = self.data_dir.join(format!("{}.csv", table_name));
990        let mut file = File::create(&path).map_err(|e| e.to_string())?;
991
992        let header: Vec<&str> = schema.columns.iter().map(|c| c.name.as_str()).collect();
993        writeln!(file, "{}", header.join(",")).map_err(|e| e.to_string())?;
994
995        if let Some(rows) = self.tables.get(table_name) {
996            for row in rows {
997                writeln!(file, "{}", row.join(",")).map_err(|e| e.to_string())?;
998            }
999        }
1000
1001        Ok(())
1002    }
1003}