sql_cli/data/
csv_datasource.rs

1use crate::api_client::{QueryInfo, QueryResponse};
2use crate::csv_fixes::{build_column_lookup, find_column_case_insensitive, parse_column_name};
3use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
4use crate::recursive_parser::{OrderByItem, Parser, SortDirection};
5use crate::sql::parser::ast::SqlExpression;
6use crate::where_ast::evaluate_where_expr_with_options;
7use crate::where_parser::WhereParser;
8use anyhow::Result;
9use csv;
10use serde_json::{json, Value};
11use std::cmp::Ordering;
12use std::collections::HashMap;
13use std::fs::File;
14use std::io::BufReader;
15use std::path::Path;
16use tracing::debug;
17
18#[derive(Clone, Debug)]
19pub struct CsvDataSource {
20    data: Vec<Value>,
21    headers: Vec<String>,
22    table_name: String,
23    column_lookup: HashMap<String, String>,
24}
25
26impl CsvDataSource {
27    pub fn load_from_file<P: AsRef<Path>>(path: P, table_name: &str) -> Result<Self> {
28        use std::io::{BufRead, BufReader as IOBufReader};
29
30        let file = File::open(&path)?;
31        let mut reader = csv::Reader::from_reader(file);
32
33        // Get headers
34        let headers: Vec<String> = reader
35            .headers()?
36            .iter()
37            .map(std::string::ToString::to_string)
38            .collect();
39
40        // Re-open file to read raw lines for null detection
41        let file2 = File::open(&path)?;
42        let mut line_reader = IOBufReader::new(file2);
43        let mut raw_line = String::new();
44        // Skip header line
45        line_reader.read_line(&mut raw_line)?;
46
47        // Read all records into JSON values
48        let mut data = Vec::new();
49        for result in reader.records() {
50            let record = result?;
51
52            // Read the corresponding raw line
53            raw_line.clear();
54            line_reader.read_line(&mut raw_line)?;
55
56            let mut row = serde_json::Map::new();
57
58            for (i, field) in record.iter().enumerate() {
59                if let Some(header) = headers.get(i) {
60                    // Parse the raw line to check if this field was truly empty (,,)
61                    // vs quoted empty ("") - this is a simple heuristic
62                    let value = if field.is_empty() {
63                        // Check if this was preceded/followed by consecutive commas in raw line
64                        // This is a simplified check - for a robust solution we'd need
65                        // a full CSV parser that preserves quote information
66                        if Self::is_null_field(&raw_line, i) {
67                            Value::Null
68                        } else {
69                            // Quoted empty string
70                            Value::String(String::new())
71                        }
72                    } else if let Ok(n) = field.parse::<f64>() {
73                        json!(n)
74                    } else {
75                        Value::String(field.to_string())
76                    };
77                    row.insert(header.clone(), value);
78                }
79            }
80
81            data.push(Value::Object(row));
82        }
83
84        let column_lookup = build_column_lookup(&headers);
85
86        Ok(CsvDataSource {
87            data,
88            headers,
89            table_name: table_name.to_string(),
90            column_lookup,
91        })
92    }
93
94    /// Helper to detect if a field in the raw CSV line is a null (unquoted empty)
95    /// This is a heuristic - consecutive commas indicate null
96    fn is_null_field(raw_line: &str, field_index: usize) -> bool {
97        // Count commas to find the field position
98        let mut comma_count = 0;
99        let mut in_quotes = false;
100        let mut field_start = 0;
101        let mut prev_char = ' ';
102
103        for (i, ch) in raw_line.chars().enumerate() {
104            if ch == '"' && prev_char != '\\' {
105                in_quotes = !in_quotes;
106            }
107
108            if ch == ',' && !in_quotes {
109                if comma_count == field_index {
110                    let field_end = i;
111                    let field_content = &raw_line[field_start..field_end].trim();
112                    // If empty, check if it was quoted (quoted empty = empty string, unquoted empty = NULL)
113                    if field_content.is_empty() {
114                        return true; // Unquoted empty field -> NULL
115                    }
116                    // If it starts and ends with quotes but is empty inside, it's an empty string, not NULL
117                    if field_content.starts_with('"')
118                        && field_content.ends_with('"')
119                        && field_content.len() == 2
120                    {
121                        return false; // Quoted empty field -> empty string
122                    }
123                    return false; // Non-empty field -> not NULL
124                }
125                comma_count += 1;
126                field_start = i + 1;
127            }
128            prev_char = ch;
129        }
130
131        // Check last field
132        if comma_count == field_index {
133            let field_content = raw_line[field_start..]
134                .trim()
135                .trim_end_matches('\n')
136                .trim_end_matches('\r');
137            // If empty, check if it was quoted
138            if field_content.is_empty() {
139                return true; // Unquoted empty field -> NULL
140            }
141            // If it starts and ends with quotes but is empty inside, it's an empty string, not NULL
142            if field_content.starts_with('"')
143                && field_content.ends_with('"')
144                && field_content.len() == 2
145            {
146                return false; // Quoted empty field -> empty string
147            }
148            return false; // Non-empty field -> not NULL
149        }
150
151        false // Field not found -> not NULL (shouldn't happen)
152    }
153
154    pub fn load_from_json_file<P: AsRef<Path>>(path: P, table_name: &str) -> Result<Self> {
155        let file = File::open(&path)?;
156        let reader = BufReader::new(file);
157
158        // Parse JSON array
159        let json_data: Vec<Value> = serde_json::from_reader(reader)?;
160
161        if json_data.is_empty() {
162            return Err(anyhow::anyhow!("JSON file contains no data"));
163        }
164
165        // Extract headers from the first record
166        let headers = if let Some(first_record) = json_data.first() {
167            if let Some(obj) = first_record.as_object() {
168                obj.keys().cloned().collect()
169            } else {
170                return Err(anyhow::anyhow!("JSON records must be objects"));
171            }
172        } else {
173            Vec::new()
174        };
175
176        // Validate all records have the same structure
177        for (i, record) in json_data.iter().enumerate() {
178            if !record.is_object() {
179                return Err(anyhow::anyhow!("Record {} is not an object", i));
180            }
181        }
182
183        let column_lookup = build_column_lookup(&headers);
184
185        Ok(CsvDataSource {
186            data: json_data,
187            headers,
188            table_name: table_name.to_string(),
189            column_lookup,
190        })
191    }
192
193    pub fn query(&self, sql: &str) -> Result<Vec<Value>> {
194        self.query_with_options(sql, false)
195    }
196
197    pub fn query_with_options(&self, sql: &str, case_insensitive: bool) -> Result<Vec<Value>> {
198        // Parse SQL using the recursive parser - NO FALLBACK, recursive parser only
199        let mut parser = Parser::new(sql);
200        let stmt = parser
201            .parse()
202            .map_err(|e| anyhow::anyhow!("Recursive parser error: {}", e))?;
203
204        let mut results = self.data.clone();
205
206        // Handle WHERE clause using the existing WhereParser
207        let sql_lower = sql.to_lowercase();
208        if let Some(where_pos) = sql_lower.find(" where ") {
209            // Extract WHERE clause, but stop at ORDER BY, LIMIT, or OFFSET if present
210            let where_start = where_pos + 7;
211            let mut where_end = sql.len();
212
213            // Find the earliest of ORDER BY, LIMIT, or OFFSET to terminate WHERE clause
214            if let Some(order_pos) = sql_lower.find(" order by ") {
215                where_end = where_end.min(order_pos);
216            }
217            if let Some(limit_pos) = sql_lower.find(" limit ") {
218                where_end = where_end.min(limit_pos);
219            }
220            if let Some(offset_pos) = sql_lower.find(" offset ") {
221                where_end = where_end.min(offset_pos);
222            }
223
224            let where_clause = sql[where_start..where_end].trim();
225            results = self.filter_results_with_options(results, where_clause, case_insensitive)?;
226        }
227
228        // Handle specific column selection
229        if !stmt.columns.contains(&"*".to_string()) {
230            let columns: Vec<&str> = stmt
231                .columns
232                .iter()
233                .map(std::string::String::as_str)
234                .collect();
235            results = self.select_columns(results, &columns)?;
236        }
237
238        // Handle ORDER BY clause
239        if let Some(order_by_columns) = &stmt.order_by {
240            results = self.sort_results(results, order_by_columns)?;
241        }
242
243        // Handle OFFSET and LIMIT clauses
244        if let Some(offset) = stmt.offset {
245            results = results.into_iter().skip(offset).collect();
246        }
247
248        if let Some(limit) = stmt.limit {
249            results = results.into_iter().take(limit).collect();
250        }
251
252        Ok(results)
253    }
254
255    fn filter_results(&self, data: Vec<Value>, where_clause: &str) -> Result<Vec<Value>> {
256        self.filter_results_with_options(data, where_clause, false)
257    }
258
259    fn filter_results_with_options(
260        &self,
261        data: Vec<Value>,
262        where_clause: &str,
263        case_insensitive: bool,
264    ) -> Result<Vec<Value>> {
265        // Parse WHERE clause into AST with column context
266        let columns = self.headers.clone();
267        let expr = WhereParser::parse_with_options(where_clause, columns, case_insensitive)?;
268
269        let mut filtered = Vec::new();
270        for row in data {
271            if evaluate_where_expr_with_options(&expr, &row, case_insensitive)? {
272                filtered.push(row);
273            }
274        }
275
276        Ok(filtered)
277    }
278
279    fn select_columns(&self, data: Vec<Value>, columns: &[&str]) -> Result<Vec<Value>> {
280        let mut results = Vec::new();
281
282        for row in data {
283            if let Some(obj) = row.as_object() {
284                let mut new_row = serde_json::Map::new();
285
286                for &col in columns {
287                    let col_parsed = parse_column_name(col);
288
289                    if let Some((actual_key, value)) =
290                        find_column_case_insensitive(obj, col_parsed, &self.column_lookup)
291                    {
292                        new_row.insert(actual_key.clone(), value.clone());
293                    }
294                }
295
296                results.push(Value::Object(new_row));
297            }
298        }
299
300        Ok(results)
301    }
302
303    fn sort_results(
304        &self,
305        mut data: Vec<Value>,
306        order_by_columns: &[OrderByItem],
307    ) -> Result<Vec<Value>> {
308        if order_by_columns.is_empty() {
309            return Ok(data);
310        }
311
312        // Sort by multiple columns with proper type-aware comparison
313        data.sort_by(|a, b| {
314            for order_col in order_by_columns {
315                // Extract column name from expression (currently only supports simple columns)
316                let column_name = match &order_col.expr {
317                    SqlExpression::Column(col_ref) => col_ref.name.clone(),
318                    _ => continue, // Skip non-column expressions for now (TODO: evaluate expressions)
319                };
320
321                let col_parsed = parse_column_name(&column_name);
322
323                let val_a = if let Some(obj_a) = a.as_object() {
324                    find_column_case_insensitive(obj_a, col_parsed, &self.column_lookup)
325                        .map(|(_, v)| v)
326                } else {
327                    None
328                };
329
330                let val_b = if let Some(obj_b) = b.as_object() {
331                    find_column_case_insensitive(obj_b, col_parsed, &self.column_lookup)
332                        .map(|(_, v)| v)
333                } else {
334                    None
335                };
336
337                let cmp = match (val_a, val_b) {
338                    (Some(Value::Number(a)), Some(Value::Number(b))) => {
339                        // Numeric comparison - handles integers and floats properly
340                        let a_f64 = a.as_f64().unwrap_or(0.0);
341                        let b_f64 = b.as_f64().unwrap_or(0.0);
342                        a_f64.partial_cmp(&b_f64).unwrap_or(Ordering::Equal)
343                    }
344                    (Some(Value::String(a)), Some(Value::String(b))) => {
345                        // String comparison
346                        a.cmp(b)
347                    }
348                    (Some(Value::Bool(a)), Some(Value::Bool(b))) => {
349                        // Boolean comparison (false < true)
350                        a.cmp(b)
351                    }
352                    (Some(Value::Null), Some(Value::Null)) => Ordering::Equal,
353                    (Some(Value::Null), Some(_)) => {
354                        // NULL comes first
355                        Ordering::Less
356                    }
357                    (Some(_), Some(Value::Null)) => {
358                        // NULL comes first
359                        Ordering::Greater
360                    }
361                    (None, None) => Ordering::Equal,
362                    (None, Some(_)) => {
363                        // Missing values come first
364                        Ordering::Less
365                    }
366                    (Some(_), None) => {
367                        // Missing values come first
368                        Ordering::Greater
369                    }
370                    // Mixed type comparisons - convert to strings
371                    (Some(a), Some(b)) => {
372                        let a_str = match a {
373                            Value::String(s) => s.clone(),
374                            Value::Number(n) => n.to_string(),
375                            Value::Bool(b) => b.to_string(),
376                            Value::Null => String::new(),
377                            _ => a.to_string(),
378                        };
379                        let b_str = match b {
380                            Value::String(s) => s.clone(),
381                            Value::Number(n) => n.to_string(),
382                            Value::Bool(b) => b.to_string(),
383                            Value::Null => String::new(),
384                            _ => b.to_string(),
385                        };
386                        a_str.cmp(&b_str)
387                    }
388                };
389
390                // Apply sort direction (ASC or DESC)
391                let final_cmp = match order_col.direction {
392                    SortDirection::Asc => cmp,
393                    SortDirection::Desc => cmp.reverse(),
394                };
395
396                // If this column comparison is not equal, return the result
397                if final_cmp != Ordering::Equal {
398                    return final_cmp;
399                }
400
401                // Otherwise, continue to the next column for tie-breaking
402            }
403
404            // All columns are equal
405            Ordering::Equal
406        });
407
408        Ok(data)
409    }
410
411    #[must_use]
412    pub fn get_headers(&self) -> &[String] {
413        &self.headers
414    }
415
416    #[must_use]
417    pub fn get_table_name(&self) -> &str {
418        &self.table_name
419    }
420
421    #[must_use]
422    pub fn get_row_count(&self) -> usize {
423        self.data.len()
424    }
425
426    /// V49: Convert `CsvDataSource` directly to `DataTable`
427    /// This avoids the JSON intermediate format
428    pub fn to_datatable(&self) -> DataTable {
429        debug!(
430            "V49: Converting CsvDataSource to DataTable for table '{}'",
431            self.table_name
432        );
433
434        let mut table = DataTable::new(&self.table_name);
435
436        // Create columns from headers
437        for header in &self.headers {
438            table.add_column(DataColumn::new(header.clone()));
439        }
440
441        // Convert each JSON row to DataRow
442        for (row_idx, json_row) in self.data.iter().enumerate() {
443            if let Some(obj) = json_row.as_object() {
444                let mut values = Vec::new();
445
446                // Get values in the same order as headers
447                for header in &self.headers {
448                    let value = obj
449                        .get(header)
450                        .map_or(DataValue::Null, json_value_to_data_value);
451                    values.push(value);
452                }
453
454                if let Err(e) = table.add_row(DataRow::new(values)) {
455                    debug!("V49: Failed to add row {}: {}", row_idx, e);
456                }
457            }
458        }
459
460        // Infer column types from the data
461        table.infer_column_types();
462
463        // Add metadata
464        table
465            .metadata
466            .insert("source".to_string(), "csv".to_string());
467        table
468            .metadata
469            .insert("original_count".to_string(), self.data.len().to_string());
470
471        debug!(
472            "V49: Created DataTable with {} rows and {} columns directly from CSV",
473            table.row_count(),
474            table.column_count()
475        );
476
477        table
478    }
479}
480
481/// V49: Helper function to convert JSON value to `DataValue`
482fn json_value_to_data_value(json: &Value) -> DataValue {
483    match json {
484        Value::Null => DataValue::Null,
485        Value::Bool(b) => DataValue::Boolean(*b),
486        Value::Number(n) => {
487            if let Some(i) = n.as_i64() {
488                DataValue::Integer(i)
489            } else if let Some(f) = n.as_f64() {
490                DataValue::Float(f)
491            } else {
492                DataValue::String(n.to_string())
493            }
494        }
495        Value::String(s) => {
496            // Try to detect if it's a date/time
497            if s.contains('-') && s.len() >= 8 && s.len() <= 30 {
498                // Simple heuristic for dates
499                DataValue::DateTime(s.clone())
500            } else {
501                DataValue::String(s.clone())
502            }
503        }
504        Value::Array(_) | Value::Object(_) => {
505            // Store complex types as JSON string
506            DataValue::String(json.to_string())
507        }
508    }
509}
510
511// Integration with ApiClient
512#[derive(Clone)]
513pub struct CsvApiClient {
514    datasource: Option<CsvDataSource>,
515    case_insensitive: bool,
516}
517
518impl Default for CsvApiClient {
519    fn default() -> Self {
520        Self::new()
521    }
522}
523
524impl CsvApiClient {
525    #[must_use]
526    pub fn new() -> Self {
527        Self {
528            datasource: None,
529            case_insensitive: false,
530        }
531    }
532
533    pub fn set_case_insensitive(&mut self, case_insensitive: bool) {
534        self.case_insensitive = case_insensitive;
535    }
536
537    pub fn load_csv<P: AsRef<Path>>(&mut self, path: P, table_name: &str) -> Result<()> {
538        self.datasource = Some(CsvDataSource::load_from_file(path, table_name)?);
539        Ok(())
540    }
541
542    pub fn load_json<P: AsRef<Path>>(&mut self, path: P, table_name: &str) -> Result<()> {
543        self.datasource = Some(CsvDataSource::load_from_json_file(path, table_name)?);
544        Ok(())
545    }
546
547    pub fn query_csv(&self, sql: &str) -> Result<QueryResponse> {
548        if let Some(ref ds) = self.datasource {
549            let data = ds.query_with_options(sql, self.case_insensitive)?;
550            let count = data.len();
551
552            Ok(QueryResponse {
553                data,
554                count,
555                query: QueryInfo {
556                    select: vec!["*".to_string()],
557                    where_clause: None,
558                    order_by: None,
559                },
560                source: Some("file".to_string()),
561                table: Some(ds.table_name.clone()),
562                cached: Some(false),
563            })
564        } else {
565            Err(anyhow::anyhow!("No CSV file loaded"))
566        }
567    }
568
569    #[must_use]
570    pub fn get_schema(&self) -> Option<HashMap<String, Vec<String>>> {
571        self.datasource.as_ref().map(|ds| {
572            let mut schema = HashMap::new();
573            schema.insert(ds.get_table_name().to_string(), ds.get_headers().to_vec());
574            schema
575        })
576    }
577
578    pub fn load_from_json(&mut self, data: Vec<Value>, table_name: &str) -> Result<()> {
579        // Extract headers from the first row
580        let headers: Vec<String> = if let Some(first_row) = data.first() {
581            if let Some(obj) = first_row.as_object() {
582                obj.keys().map(std::string::ToString::to_string).collect()
583            } else {
584                return Err(anyhow::anyhow!("Invalid JSON data format"));
585            }
586        } else {
587            return Err(anyhow::anyhow!("Empty data set"));
588        };
589
590        let column_lookup = build_column_lookup(&headers);
591
592        self.datasource = Some(CsvDataSource {
593            data: data.clone(),
594            headers,
595            table_name: table_name.to_string(),
596            column_lookup,
597        });
598
599        Ok(())
600    }
601
602    /// V49: Get `DataTable` directly from the datasource
603    /// This avoids JSON intermediate conversion
604    #[must_use]
605    pub fn get_datatable(&self) -> Option<DataTable> {
606        self.datasource.as_ref().map(|ds| {
607            debug!("V49: CsvApiClient returning DataTable directly");
608            ds.to_datatable()
609        })
610    }
611
612    /// V49: Check if we have a datasource loaded
613    #[must_use]
614    pub fn has_data(&self) -> bool {
615        self.datasource.is_some()
616    }
617}