Skip to main content

sql_cli/data/
csv_datasource.rs

1use crate::api_client::{QueryInfo, QueryResponse};
2use crate::csv_fixes::{build_column_lookup, find_column_case_insensitive, parse_column_name};
3use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
4use crate::recursive_parser::{OrderByItem, Parser, SortDirection};
5use crate::sql::parser::ast::SqlExpression;
6use crate::where_ast::evaluate_where_expr_with_options;
7use crate::where_parser::WhereParser;
8use anyhow::Result;
9use csv;
10use serde_json::{json, Value};
11use std::cmp::Ordering;
12use std::collections::HashMap;
13use std::fs::File;
14use std::path::Path;
15use tracing::debug;
16
17#[derive(Clone, Debug)]
18pub struct CsvDataSource {
19    data: Vec<Value>,
20    headers: Vec<String>,
21    table_name: String,
22    column_lookup: HashMap<String, String>,
23}
24
25impl CsvDataSource {
26    pub fn load_from_file<P: AsRef<Path>>(path: P, table_name: &str) -> Result<Self> {
27        use std::io::{BufRead, BufReader as IOBufReader};
28
29        let file = File::open(&path)?;
30        let mut reader = csv::Reader::from_reader(file);
31
32        // Get headers
33        let headers: Vec<String> = reader
34            .headers()?
35            .iter()
36            .map(std::string::ToString::to_string)
37            .collect();
38
39        // Re-open file to read raw lines for null detection
40        let file2 = File::open(&path)?;
41        let mut line_reader = IOBufReader::new(file2);
42        let mut raw_line = String::new();
43        // Skip header line
44        line_reader.read_line(&mut raw_line)?;
45
46        // Read all records into JSON values
47        let mut data = Vec::new();
48        for result in reader.records() {
49            let record = result?;
50
51            // Read the corresponding raw line
52            raw_line.clear();
53            line_reader.read_line(&mut raw_line)?;
54
55            let mut row = serde_json::Map::new();
56
57            for (i, field) in record.iter().enumerate() {
58                if let Some(header) = headers.get(i) {
59                    // Parse the raw line to check if this field was truly empty (,,)
60                    // vs quoted empty ("") - this is a simple heuristic
61                    let value = if field.is_empty() {
62                        // Check if this was preceded/followed by consecutive commas in raw line
63                        // This is a simplified check - for a robust solution we'd need
64                        // a full CSV parser that preserves quote information
65                        if Self::is_null_field(&raw_line, i) {
66                            Value::Null
67                        } else {
68                            // Quoted empty string
69                            Value::String(String::new())
70                        }
71                    } else if let Ok(n) = field.parse::<f64>() {
72                        json!(n)
73                    } else {
74                        Value::String(field.to_string())
75                    };
76                    row.insert(header.clone(), value);
77                }
78            }
79
80            data.push(Value::Object(row));
81        }
82
83        let column_lookup = build_column_lookup(&headers);
84
85        Ok(CsvDataSource {
86            data,
87            headers,
88            table_name: table_name.to_string(),
89            column_lookup,
90        })
91    }
92
93    /// Helper to detect if a field in the raw CSV line is a null (unquoted empty)
94    /// This is a heuristic - consecutive commas indicate null
95    fn is_null_field(raw_line: &str, field_index: usize) -> bool {
96        // Count commas to find the field position
97        let mut comma_count = 0;
98        let mut in_quotes = false;
99        let mut field_start = 0;
100        let mut prev_char = ' ';
101
102        for (i, ch) in raw_line.chars().enumerate() {
103            if ch == '"' && prev_char != '\\' {
104                in_quotes = !in_quotes;
105            }
106
107            if ch == ',' && !in_quotes {
108                if comma_count == field_index {
109                    let field_end = i;
110                    let field_content = &raw_line[field_start..field_end].trim();
111                    // If empty, check if it was quoted (quoted empty = empty string, unquoted empty = NULL)
112                    if field_content.is_empty() {
113                        return true; // Unquoted empty field -> NULL
114                    }
115                    // If it starts and ends with quotes but is empty inside, it's an empty string, not NULL
116                    if field_content.starts_with('"')
117                        && field_content.ends_with('"')
118                        && field_content.len() == 2
119                    {
120                        return false; // Quoted empty field -> empty string
121                    }
122                    return false; // Non-empty field -> not NULL
123                }
124                comma_count += 1;
125                field_start = i + 1;
126            }
127            prev_char = ch;
128        }
129
130        // Check last field
131        if comma_count == field_index {
132            let field_content = raw_line[field_start..]
133                .trim()
134                .trim_end_matches('\n')
135                .trim_end_matches('\r');
136            // If empty, check if it was quoted
137            if field_content.is_empty() {
138                return true; // Unquoted empty field -> NULL
139            }
140            // If it starts and ends with quotes but is empty inside, it's an empty string, not NULL
141            if field_content.starts_with('"')
142                && field_content.ends_with('"')
143                && field_content.len() == 2
144            {
145                return false; // Quoted empty field -> empty string
146            }
147            return false; // Non-empty field -> not NULL
148        }
149
150        false // Field not found -> not NULL (shouldn't happen)
151    }
152
153    pub fn load_from_json_file<P: AsRef<Path>>(path: P, table_name: &str) -> Result<Self> {
154        use std::io::Read;
155        let mut file = File::open(&path)?;
156        let mut json_str = String::new();
157        file.read_to_string(&mut json_str)?;
158
159        // Accept either a JSON array of objects or JSONL (one object per line);
160        // parse_json_records auto-detects.
161        let json_data: Vec<Value> = crate::data::stream_loader::parse_json_records(&json_str)?;
162
163        if json_data.is_empty() {
164            return Err(anyhow::anyhow!("JSON file contains no data"));
165        }
166
167        // Headers are the union of object keys across the first 100 records so
168        // heterogeneous JSONL doesn't drop fields the first record happens to omit.
169        let headers = crate::data::stream_loader::collect_column_names(&json_data, 100);
170        if headers.is_empty() {
171            return Err(anyhow::anyhow!("JSON records must be objects"));
172        }
173
174        // Validate all records have the same structure
175        for (i, record) in json_data.iter().enumerate() {
176            if !record.is_object() {
177                return Err(anyhow::anyhow!("Record {} is not an object", i));
178            }
179        }
180
181        let column_lookup = build_column_lookup(&headers);
182
183        Ok(CsvDataSource {
184            data: json_data,
185            headers,
186            table_name: table_name.to_string(),
187            column_lookup,
188        })
189    }
190
191    pub fn query(&self, sql: &str) -> Result<Vec<Value>> {
192        self.query_with_options(sql, false)
193    }
194
195    pub fn query_with_options(&self, sql: &str, case_insensitive: bool) -> Result<Vec<Value>> {
196        // Parse SQL using the recursive parser - NO FALLBACK, recursive parser only
197        let mut parser = Parser::new(sql);
198        let stmt = parser
199            .parse()
200            .map_err(|e| anyhow::anyhow!("Recursive parser error: {}", e))?;
201
202        let mut results = self.data.clone();
203
204        // Handle WHERE clause using the existing WhereParser
205        let sql_lower = sql.to_lowercase();
206        if let Some(where_pos) = sql_lower.find(" where ") {
207            // Extract WHERE clause, but stop at ORDER BY, LIMIT, or OFFSET if present
208            let where_start = where_pos + 7;
209            let mut where_end = sql.len();
210
211            // Find the earliest of ORDER BY, LIMIT, or OFFSET to terminate WHERE clause
212            if let Some(order_pos) = sql_lower.find(" order by ") {
213                where_end = where_end.min(order_pos);
214            }
215            if let Some(limit_pos) = sql_lower.find(" limit ") {
216                where_end = where_end.min(limit_pos);
217            }
218            if let Some(offset_pos) = sql_lower.find(" offset ") {
219                where_end = where_end.min(offset_pos);
220            }
221
222            let where_clause = sql[where_start..where_end].trim();
223            results = self.filter_results_with_options(results, where_clause, case_insensitive)?;
224        }
225
226        // Handle specific column selection
227        if !stmt.columns.contains(&"*".to_string()) {
228            let columns: Vec<&str> = stmt
229                .columns
230                .iter()
231                .map(std::string::String::as_str)
232                .collect();
233            results = self.select_columns(results, &columns)?;
234        }
235
236        // Handle ORDER BY clause
237        if let Some(order_by_columns) = &stmt.order_by {
238            results = self.sort_results(results, order_by_columns)?;
239        }
240
241        // Handle OFFSET and LIMIT clauses
242        if let Some(offset) = stmt.offset {
243            results = results.into_iter().skip(offset).collect();
244        }
245
246        if let Some(limit) = stmt.limit {
247            results = results.into_iter().take(limit).collect();
248        }
249
250        Ok(results)
251    }
252
253    fn filter_results(&self, data: Vec<Value>, where_clause: &str) -> Result<Vec<Value>> {
254        self.filter_results_with_options(data, where_clause, false)
255    }
256
257    fn filter_results_with_options(
258        &self,
259        data: Vec<Value>,
260        where_clause: &str,
261        case_insensitive: bool,
262    ) -> Result<Vec<Value>> {
263        // Parse WHERE clause into AST with column context
264        let columns = self.headers.clone();
265        let expr = WhereParser::parse_with_options(where_clause, columns, case_insensitive)?;
266
267        let mut filtered = Vec::new();
268        for row in data {
269            if evaluate_where_expr_with_options(&expr, &row, case_insensitive)? {
270                filtered.push(row);
271            }
272        }
273
274        Ok(filtered)
275    }
276
277    fn select_columns(&self, data: Vec<Value>, columns: &[&str]) -> Result<Vec<Value>> {
278        let mut results = Vec::new();
279
280        for row in data {
281            if let Some(obj) = row.as_object() {
282                let mut new_row = serde_json::Map::new();
283
284                for &col in columns {
285                    let col_parsed = parse_column_name(col);
286
287                    if let Some((actual_key, value)) =
288                        find_column_case_insensitive(obj, col_parsed, &self.column_lookup)
289                    {
290                        new_row.insert(actual_key.clone(), value.clone());
291                    }
292                }
293
294                results.push(Value::Object(new_row));
295            }
296        }
297
298        Ok(results)
299    }
300
301    fn sort_results(
302        &self,
303        mut data: Vec<Value>,
304        order_by_columns: &[OrderByItem],
305    ) -> Result<Vec<Value>> {
306        if order_by_columns.is_empty() {
307            return Ok(data);
308        }
309
310        // Sort by multiple columns with proper type-aware comparison
311        data.sort_by(|a, b| {
312            for order_col in order_by_columns {
313                // Extract column name from expression (currently only supports simple columns)
314                let column_name = match &order_col.expr {
315                    SqlExpression::Column(col_ref) => col_ref.name.clone(),
316                    _ => continue, // Skip non-column expressions for now (TODO: evaluate expressions)
317                };
318
319                let col_parsed = parse_column_name(&column_name);
320
321                let val_a = if let Some(obj_a) = a.as_object() {
322                    find_column_case_insensitive(obj_a, col_parsed, &self.column_lookup)
323                        .map(|(_, v)| v)
324                } else {
325                    None
326                };
327
328                let val_b = if let Some(obj_b) = b.as_object() {
329                    find_column_case_insensitive(obj_b, col_parsed, &self.column_lookup)
330                        .map(|(_, v)| v)
331                } else {
332                    None
333                };
334
335                let cmp = match (val_a, val_b) {
336                    (Some(Value::Number(a)), Some(Value::Number(b))) => {
337                        // Numeric comparison - handles integers and floats properly
338                        let a_f64 = a.as_f64().unwrap_or(0.0);
339                        let b_f64 = b.as_f64().unwrap_or(0.0);
340                        a_f64.partial_cmp(&b_f64).unwrap_or(Ordering::Equal)
341                    }
342                    (Some(Value::String(a)), Some(Value::String(b))) => {
343                        // String comparison
344                        a.cmp(b)
345                    }
346                    (Some(Value::Bool(a)), Some(Value::Bool(b))) => {
347                        // Boolean comparison (false < true)
348                        a.cmp(b)
349                    }
350                    (Some(Value::Null), Some(Value::Null)) => Ordering::Equal,
351                    (Some(Value::Null), Some(_)) => {
352                        // NULL comes first
353                        Ordering::Less
354                    }
355                    (Some(_), Some(Value::Null)) => {
356                        // NULL comes first
357                        Ordering::Greater
358                    }
359                    (None, None) => Ordering::Equal,
360                    (None, Some(_)) => {
361                        // Missing values come first
362                        Ordering::Less
363                    }
364                    (Some(_), None) => {
365                        // Missing values come first
366                        Ordering::Greater
367                    }
368                    // Mixed type comparisons - convert to strings
369                    (Some(a), Some(b)) => {
370                        let a_str = match a {
371                            Value::String(s) => s.clone(),
372                            Value::Number(n) => n.to_string(),
373                            Value::Bool(b) => b.to_string(),
374                            Value::Null => String::new(),
375                            _ => a.to_string(),
376                        };
377                        let b_str = match b {
378                            Value::String(s) => s.clone(),
379                            Value::Number(n) => n.to_string(),
380                            Value::Bool(b) => b.to_string(),
381                            Value::Null => String::new(),
382                            _ => b.to_string(),
383                        };
384                        a_str.cmp(&b_str)
385                    }
386                };
387
388                // Apply sort direction (ASC or DESC)
389                let final_cmp = match order_col.direction {
390                    SortDirection::Asc => cmp,
391                    SortDirection::Desc => cmp.reverse(),
392                };
393
394                // If this column comparison is not equal, return the result
395                if final_cmp != Ordering::Equal {
396                    return final_cmp;
397                }
398
399                // Otherwise, continue to the next column for tie-breaking
400            }
401
402            // All columns are equal
403            Ordering::Equal
404        });
405
406        Ok(data)
407    }
408
409    #[must_use]
410    pub fn get_headers(&self) -> &[String] {
411        &self.headers
412    }
413
414    #[must_use]
415    pub fn get_table_name(&self) -> &str {
416        &self.table_name
417    }
418
419    #[must_use]
420    pub fn get_row_count(&self) -> usize {
421        self.data.len()
422    }
423
424    /// V49: Convert `CsvDataSource` directly to `DataTable`
425    /// This avoids the JSON intermediate format
426    pub fn to_datatable(&self) -> DataTable {
427        debug!(
428            "V49: Converting CsvDataSource to DataTable for table '{}'",
429            self.table_name
430        );
431
432        let mut table = DataTable::new(&self.table_name);
433
434        // Create columns from headers
435        for header in &self.headers {
436            table.add_column(DataColumn::new(header.clone()));
437        }
438
439        // Convert each JSON row to DataRow
440        for (row_idx, json_row) in self.data.iter().enumerate() {
441            if let Some(obj) = json_row.as_object() {
442                let mut values = Vec::new();
443
444                // Get values in the same order as headers
445                for header in &self.headers {
446                    let value = obj
447                        .get(header)
448                        .map_or(DataValue::Null, json_value_to_data_value);
449                    values.push(value);
450                }
451
452                if let Err(e) = table.add_row(DataRow::new(values)) {
453                    debug!("V49: Failed to add row {}: {}", row_idx, e);
454                }
455            }
456        }
457
458        // Infer column types from the data
459        table.infer_column_types();
460
461        // Add metadata
462        table
463            .metadata
464            .insert("source".to_string(), "csv".to_string());
465        table
466            .metadata
467            .insert("original_count".to_string(), self.data.len().to_string());
468
469        debug!(
470            "V49: Created DataTable with {} rows and {} columns directly from CSV",
471            table.row_count(),
472            table.column_count()
473        );
474
475        table
476    }
477}
478
479/// V49: Helper function to convert JSON value to `DataValue`
480fn json_value_to_data_value(json: &Value) -> DataValue {
481    match json {
482        Value::Null => DataValue::Null,
483        Value::Bool(b) => DataValue::Boolean(*b),
484        Value::Number(n) => {
485            if let Some(i) = n.as_i64() {
486                DataValue::Integer(i)
487            } else if let Some(f) = n.as_f64() {
488                DataValue::Float(f)
489            } else {
490                DataValue::String(n.to_string())
491            }
492        }
493        Value::String(s) => {
494            // Try to detect if it's a date/time
495            if s.contains('-') && s.len() >= 8 && s.len() <= 30 {
496                // Simple heuristic for dates
497                DataValue::DateTime(s.clone())
498            } else {
499                DataValue::String(s.clone())
500            }
501        }
502        Value::Array(_) | Value::Object(_) => {
503            // Store complex types as JSON string
504            DataValue::String(json.to_string())
505        }
506    }
507}
508
509// Integration with ApiClient
510#[derive(Clone)]
511pub struct CsvApiClient {
512    datasource: Option<CsvDataSource>,
513    case_insensitive: bool,
514}
515
516impl Default for CsvApiClient {
517    fn default() -> Self {
518        Self::new()
519    }
520}
521
522impl CsvApiClient {
523    #[must_use]
524    pub fn new() -> Self {
525        Self {
526            datasource: None,
527            case_insensitive: false,
528        }
529    }
530
531    pub fn set_case_insensitive(&mut self, case_insensitive: bool) {
532        self.case_insensitive = case_insensitive;
533    }
534
535    pub fn load_csv<P: AsRef<Path>>(&mut self, path: P, table_name: &str) -> Result<()> {
536        self.datasource = Some(CsvDataSource::load_from_file(path, table_name)?);
537        Ok(())
538    }
539
540    pub fn load_json<P: AsRef<Path>>(&mut self, path: P, table_name: &str) -> Result<()> {
541        self.datasource = Some(CsvDataSource::load_from_json_file(path, table_name)?);
542        Ok(())
543    }
544
545    pub fn query_csv(&self, sql: &str) -> Result<QueryResponse> {
546        if let Some(ref ds) = self.datasource {
547            let data = ds.query_with_options(sql, self.case_insensitive)?;
548            let count = data.len();
549
550            Ok(QueryResponse {
551                data,
552                count,
553                query: QueryInfo {
554                    select: vec!["*".to_string()],
555                    where_clause: None,
556                    order_by: None,
557                },
558                source: Some("file".to_string()),
559                table: Some(ds.table_name.clone()),
560                cached: Some(false),
561            })
562        } else {
563            Err(anyhow::anyhow!("No CSV file loaded"))
564        }
565    }
566
567    #[must_use]
568    pub fn get_schema(&self) -> Option<HashMap<String, Vec<String>>> {
569        self.datasource.as_ref().map(|ds| {
570            let mut schema = HashMap::new();
571            schema.insert(ds.get_table_name().to_string(), ds.get_headers().to_vec());
572            schema
573        })
574    }
575
576    pub fn load_from_json(&mut self, data: Vec<Value>, table_name: &str) -> Result<()> {
577        // Extract headers from the first row
578        let headers: Vec<String> = if let Some(first_row) = data.first() {
579            if let Some(obj) = first_row.as_object() {
580                obj.keys().map(std::string::ToString::to_string).collect()
581            } else {
582                return Err(anyhow::anyhow!("Invalid JSON data format"));
583            }
584        } else {
585            return Err(anyhow::anyhow!("Empty data set"));
586        };
587
588        let column_lookup = build_column_lookup(&headers);
589
590        self.datasource = Some(CsvDataSource {
591            data: data.clone(),
592            headers,
593            table_name: table_name.to_string(),
594            column_lookup,
595        });
596
597        Ok(())
598    }
599
600    /// V49: Get `DataTable` directly from the datasource
601    /// This avoids JSON intermediate conversion
602    #[must_use]
603    pub fn get_datatable(&self) -> Option<DataTable> {
604        self.datasource.as_ref().map(|ds| {
605            debug!("V49: CsvApiClient returning DataTable directly");
606            ds.to_datatable()
607        })
608    }
609
610    /// V49: Check if we have a datasource loaded
611    #[must_use]
612    pub fn has_data(&self) -> bool {
613        self.datasource.is_some()
614    }
615}