sql_cli/data/
csv_datasource.rs

1use crate::api_client::{QueryInfo, QueryResponse};
2use crate::csv_fixes::{build_column_lookup, find_column_case_insensitive, parse_column_name};
3use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
4use crate::recursive_parser::{OrderByColumn, Parser, SortDirection};
5use crate::where_ast::evaluate_where_expr_with_options;
6use crate::where_parser::WhereParser;
7use anyhow::Result;
8use csv;
9use serde_json::{json, Value};
10use std::cmp::Ordering;
11use std::collections::HashMap;
12use std::fs::File;
13use std::io::BufReader;
14use std::path::Path;
15use tracing::debug;
16
17#[derive(Clone, Debug)]
18pub struct CsvDataSource {
19    data: Vec<Value>,
20    headers: Vec<String>,
21    table_name: String,
22    column_lookup: HashMap<String, String>,
23}
24
25impl CsvDataSource {
26    pub fn load_from_file<P: AsRef<Path>>(path: P, table_name: &str) -> Result<Self> {
27        use std::io::{BufRead, BufReader as IOBufReader};
28
29        let file = File::open(&path)?;
30        let mut reader = csv::Reader::from_reader(file);
31
32        // Get headers
33        let headers: Vec<String> = reader
34            .headers()?
35            .iter()
36            .map(std::string::ToString::to_string)
37            .collect();
38
39        // Re-open file to read raw lines for null detection
40        let file2 = File::open(&path)?;
41        let mut line_reader = IOBufReader::new(file2);
42        let mut raw_line = String::new();
43        // Skip header line
44        line_reader.read_line(&mut raw_line)?;
45
46        // Read all records into JSON values
47        let mut data = Vec::new();
48        for result in reader.records() {
49            let record = result?;
50
51            // Read the corresponding raw line
52            raw_line.clear();
53            line_reader.read_line(&mut raw_line)?;
54
55            let mut row = serde_json::Map::new();
56
57            for (i, field) in record.iter().enumerate() {
58                if let Some(header) = headers.get(i) {
59                    // Parse the raw line to check if this field was truly empty (,,)
60                    // vs quoted empty ("") - this is a simple heuristic
61                    let value = if field.is_empty() {
62                        // Check if this was preceded/followed by consecutive commas in raw line
63                        // This is a simplified check - for a robust solution we'd need
64                        // a full CSV parser that preserves quote information
65                        if Self::is_null_field(&raw_line, i) {
66                            Value::Null
67                        } else {
68                            // Quoted empty string
69                            Value::String(String::new())
70                        }
71                    } else if let Ok(n) = field.parse::<f64>() {
72                        json!(n)
73                    } else {
74                        Value::String(field.to_string())
75                    };
76                    row.insert(header.clone(), value);
77                }
78            }
79
80            data.push(Value::Object(row));
81        }
82
83        let column_lookup = build_column_lookup(&headers);
84
85        Ok(CsvDataSource {
86            data,
87            headers,
88            table_name: table_name.to_string(),
89            column_lookup,
90        })
91    }
92
93    /// Helper to detect if a field in the raw CSV line is a null (unquoted empty)
94    /// This is a heuristic - consecutive commas indicate null
95    fn is_null_field(raw_line: &str, field_index: usize) -> bool {
96        // Count commas to find the field position
97        let mut comma_count = 0;
98        let mut in_quotes = false;
99        let mut field_start = 0;
100        let mut prev_char = ' ';
101
102        for (i, ch) in raw_line.chars().enumerate() {
103            if ch == '"' && prev_char != '\\' {
104                in_quotes = !in_quotes;
105            }
106
107            if ch == ',' && !in_quotes {
108                if comma_count == field_index {
109                    let field_end = i;
110                    let field_content = &raw_line[field_start..field_end].trim();
111                    // If empty, check if it was quoted (quoted empty = empty string, unquoted empty = NULL)
112                    if field_content.is_empty() {
113                        return true; // Unquoted empty field -> NULL
114                    }
115                    // If it starts and ends with quotes but is empty inside, it's an empty string, not NULL
116                    if field_content.starts_with('"')
117                        && field_content.ends_with('"')
118                        && field_content.len() == 2
119                    {
120                        return false; // Quoted empty field -> empty string
121                    }
122                    return false; // Non-empty field -> not NULL
123                }
124                comma_count += 1;
125                field_start = i + 1;
126            }
127            prev_char = ch;
128        }
129
130        // Check last field
131        if comma_count == field_index {
132            let field_content = raw_line[field_start..]
133                .trim()
134                .trim_end_matches('\n')
135                .trim_end_matches('\r');
136            // If empty, check if it was quoted
137            if field_content.is_empty() {
138                return true; // Unquoted empty field -> NULL
139            }
140            // If it starts and ends with quotes but is empty inside, it's an empty string, not NULL
141            if field_content.starts_with('"')
142                && field_content.ends_with('"')
143                && field_content.len() == 2
144            {
145                return false; // Quoted empty field -> empty string
146            }
147            return false; // Non-empty field -> not NULL
148        }
149
150        false // Field not found -> not NULL (shouldn't happen)
151    }
152
153    pub fn load_from_json_file<P: AsRef<Path>>(path: P, table_name: &str) -> Result<Self> {
154        let file = File::open(&path)?;
155        let reader = BufReader::new(file);
156
157        // Parse JSON array
158        let json_data: Vec<Value> = serde_json::from_reader(reader)?;
159
160        if json_data.is_empty() {
161            return Err(anyhow::anyhow!("JSON file contains no data"));
162        }
163
164        // Extract headers from the first record
165        let headers = if let Some(first_record) = json_data.first() {
166            if let Some(obj) = first_record.as_object() {
167                obj.keys().cloned().collect()
168            } else {
169                return Err(anyhow::anyhow!("JSON records must be objects"));
170            }
171        } else {
172            Vec::new()
173        };
174
175        // Validate all records have the same structure
176        for (i, record) in json_data.iter().enumerate() {
177            if !record.is_object() {
178                return Err(anyhow::anyhow!("Record {} is not an object", i));
179            }
180        }
181
182        let column_lookup = build_column_lookup(&headers);
183
184        Ok(CsvDataSource {
185            data: json_data,
186            headers,
187            table_name: table_name.to_string(),
188            column_lookup,
189        })
190    }
191
192    pub fn query(&self, sql: &str) -> Result<Vec<Value>> {
193        self.query_with_options(sql, false)
194    }
195
196    pub fn query_with_options(&self, sql: &str, case_insensitive: bool) -> Result<Vec<Value>> {
197        // Parse SQL using the recursive parser - NO FALLBACK, recursive parser only
198        let mut parser = Parser::new(sql);
199        let stmt = parser
200            .parse()
201            .map_err(|e| anyhow::anyhow!("Recursive parser error: {}", e))?;
202
203        let mut results = self.data.clone();
204
205        // Handle WHERE clause using the existing WhereParser
206        let sql_lower = sql.to_lowercase();
207        if let Some(where_pos) = sql_lower.find(" where ") {
208            // Extract WHERE clause, but stop at ORDER BY, LIMIT, or OFFSET if present
209            let where_start = where_pos + 7;
210            let mut where_end = sql.len();
211
212            // Find the earliest of ORDER BY, LIMIT, or OFFSET to terminate WHERE clause
213            if let Some(order_pos) = sql_lower.find(" order by ") {
214                where_end = where_end.min(order_pos);
215            }
216            if let Some(limit_pos) = sql_lower.find(" limit ") {
217                where_end = where_end.min(limit_pos);
218            }
219            if let Some(offset_pos) = sql_lower.find(" offset ") {
220                where_end = where_end.min(offset_pos);
221            }
222
223            let where_clause = sql[where_start..where_end].trim();
224            results = self.filter_results_with_options(results, where_clause, case_insensitive)?;
225        }
226
227        // Handle specific column selection
228        if !stmt.columns.contains(&"*".to_string()) {
229            let columns: Vec<&str> = stmt
230                .columns
231                .iter()
232                .map(std::string::String::as_str)
233                .collect();
234            results = self.select_columns(results, &columns)?;
235        }
236
237        // Handle ORDER BY clause
238        if let Some(order_by_columns) = &stmt.order_by {
239            results = self.sort_results(results, order_by_columns)?;
240        }
241
242        // Handle OFFSET and LIMIT clauses
243        if let Some(offset) = stmt.offset {
244            results = results.into_iter().skip(offset).collect();
245        }
246
247        if let Some(limit) = stmt.limit {
248            results = results.into_iter().take(limit).collect();
249        }
250
251        Ok(results)
252    }
253
254    fn filter_results(&self, data: Vec<Value>, where_clause: &str) -> Result<Vec<Value>> {
255        self.filter_results_with_options(data, where_clause, false)
256    }
257
258    fn filter_results_with_options(
259        &self,
260        data: Vec<Value>,
261        where_clause: &str,
262        case_insensitive: bool,
263    ) -> Result<Vec<Value>> {
264        // Parse WHERE clause into AST with column context
265        let columns = self.headers.clone();
266        let expr = WhereParser::parse_with_options(where_clause, columns, case_insensitive)?;
267
268        let mut filtered = Vec::new();
269        for row in data {
270            if evaluate_where_expr_with_options(&expr, &row, case_insensitive)? {
271                filtered.push(row);
272            }
273        }
274
275        Ok(filtered)
276    }
277
278    fn select_columns(&self, data: Vec<Value>, columns: &[&str]) -> Result<Vec<Value>> {
279        let mut results = Vec::new();
280
281        for row in data {
282            if let Some(obj) = row.as_object() {
283                let mut new_row = serde_json::Map::new();
284
285                for &col in columns {
286                    let col_parsed = parse_column_name(col);
287
288                    if let Some((actual_key, value)) =
289                        find_column_case_insensitive(obj, col_parsed, &self.column_lookup)
290                    {
291                        new_row.insert(actual_key.clone(), value.clone());
292                    }
293                }
294
295                results.push(Value::Object(new_row));
296            }
297        }
298
299        Ok(results)
300    }
301
302    fn sort_results(
303        &self,
304        mut data: Vec<Value>,
305        order_by_columns: &[OrderByColumn],
306    ) -> Result<Vec<Value>> {
307        if order_by_columns.is_empty() {
308            return Ok(data);
309        }
310
311        // Sort by multiple columns with proper type-aware comparison
312        data.sort_by(|a, b| {
313            for order_col in order_by_columns {
314                let col_parsed = parse_column_name(&order_col.column);
315
316                let val_a = if let Some(obj_a) = a.as_object() {
317                    find_column_case_insensitive(obj_a, col_parsed, &self.column_lookup)
318                        .map(|(_, v)| v)
319                } else {
320                    None
321                };
322
323                let val_b = if let Some(obj_b) = b.as_object() {
324                    find_column_case_insensitive(obj_b, col_parsed, &self.column_lookup)
325                        .map(|(_, v)| v)
326                } else {
327                    None
328                };
329
330                let cmp = match (val_a, val_b) {
331                    (Some(Value::Number(a)), Some(Value::Number(b))) => {
332                        // Numeric comparison - handles integers and floats properly
333                        let a_f64 = a.as_f64().unwrap_or(0.0);
334                        let b_f64 = b.as_f64().unwrap_or(0.0);
335                        a_f64.partial_cmp(&b_f64).unwrap_or(Ordering::Equal)
336                    }
337                    (Some(Value::String(a)), Some(Value::String(b))) => {
338                        // String comparison
339                        a.cmp(b)
340                    }
341                    (Some(Value::Bool(a)), Some(Value::Bool(b))) => {
342                        // Boolean comparison (false < true)
343                        a.cmp(b)
344                    }
345                    (Some(Value::Null), Some(Value::Null)) => Ordering::Equal,
346                    (Some(Value::Null), Some(_)) => {
347                        // NULL comes first
348                        Ordering::Less
349                    }
350                    (Some(_), Some(Value::Null)) => {
351                        // NULL comes first
352                        Ordering::Greater
353                    }
354                    (None, None) => Ordering::Equal,
355                    (None, Some(_)) => {
356                        // Missing values come first
357                        Ordering::Less
358                    }
359                    (Some(_), None) => {
360                        // Missing values come first
361                        Ordering::Greater
362                    }
363                    // Mixed type comparisons - convert to strings
364                    (Some(a), Some(b)) => {
365                        let a_str = match a {
366                            Value::String(s) => s.clone(),
367                            Value::Number(n) => n.to_string(),
368                            Value::Bool(b) => b.to_string(),
369                            Value::Null => String::new(),
370                            _ => a.to_string(),
371                        };
372                        let b_str = match b {
373                            Value::String(s) => s.clone(),
374                            Value::Number(n) => n.to_string(),
375                            Value::Bool(b) => b.to_string(),
376                            Value::Null => String::new(),
377                            _ => b.to_string(),
378                        };
379                        a_str.cmp(&b_str)
380                    }
381                };
382
383                // Apply sort direction (ASC or DESC)
384                let final_cmp = match order_col.direction {
385                    SortDirection::Asc => cmp,
386                    SortDirection::Desc => cmp.reverse(),
387                };
388
389                // If this column comparison is not equal, return the result
390                if final_cmp != Ordering::Equal {
391                    return final_cmp;
392                }
393
394                // Otherwise, continue to the next column for tie-breaking
395            }
396
397            // All columns are equal
398            Ordering::Equal
399        });
400
401        Ok(data)
402    }
403
404    #[must_use]
405    pub fn get_headers(&self) -> &[String] {
406        &self.headers
407    }
408
409    #[must_use]
410    pub fn get_table_name(&self) -> &str {
411        &self.table_name
412    }
413
414    #[must_use]
415    pub fn get_row_count(&self) -> usize {
416        self.data.len()
417    }
418
419    /// V49: Convert `CsvDataSource` directly to `DataTable`
420    /// This avoids the JSON intermediate format
421    pub fn to_datatable(&self) -> DataTable {
422        debug!(
423            "V49: Converting CsvDataSource to DataTable for table '{}'",
424            self.table_name
425        );
426
427        let mut table = DataTable::new(&self.table_name);
428
429        // Create columns from headers
430        for header in &self.headers {
431            table.add_column(DataColumn::new(header.clone()));
432        }
433
434        // Convert each JSON row to DataRow
435        for (row_idx, json_row) in self.data.iter().enumerate() {
436            if let Some(obj) = json_row.as_object() {
437                let mut values = Vec::new();
438
439                // Get values in the same order as headers
440                for header in &self.headers {
441                    let value = obj
442                        .get(header)
443                        .map_or(DataValue::Null, json_value_to_data_value);
444                    values.push(value);
445                }
446
447                if let Err(e) = table.add_row(DataRow::new(values)) {
448                    debug!("V49: Failed to add row {}: {}", row_idx, e);
449                }
450            }
451        }
452
453        // Infer column types from the data
454        table.infer_column_types();
455
456        // Add metadata
457        table
458            .metadata
459            .insert("source".to_string(), "csv".to_string());
460        table
461            .metadata
462            .insert("original_count".to_string(), self.data.len().to_string());
463
464        debug!(
465            "V49: Created DataTable with {} rows and {} columns directly from CSV",
466            table.row_count(),
467            table.column_count()
468        );
469
470        table
471    }
472}
473
474/// V49: Helper function to convert JSON value to `DataValue`
475fn json_value_to_data_value(json: &Value) -> DataValue {
476    match json {
477        Value::Null => DataValue::Null,
478        Value::Bool(b) => DataValue::Boolean(*b),
479        Value::Number(n) => {
480            if let Some(i) = n.as_i64() {
481                DataValue::Integer(i)
482            } else if let Some(f) = n.as_f64() {
483                DataValue::Float(f)
484            } else {
485                DataValue::String(n.to_string())
486            }
487        }
488        Value::String(s) => {
489            // Try to detect if it's a date/time
490            if s.contains('-') && s.len() >= 8 && s.len() <= 30 {
491                // Simple heuristic for dates
492                DataValue::DateTime(s.clone())
493            } else {
494                DataValue::String(s.clone())
495            }
496        }
497        Value::Array(_) | Value::Object(_) => {
498            // Store complex types as JSON string
499            DataValue::String(json.to_string())
500        }
501    }
502}
503
504// Integration with ApiClient
505#[derive(Clone)]
506pub struct CsvApiClient {
507    datasource: Option<CsvDataSource>,
508    case_insensitive: bool,
509}
510
511impl Default for CsvApiClient {
512    fn default() -> Self {
513        Self::new()
514    }
515}
516
517impl CsvApiClient {
518    #[must_use]
519    pub fn new() -> Self {
520        Self {
521            datasource: None,
522            case_insensitive: false,
523        }
524    }
525
526    pub fn set_case_insensitive(&mut self, case_insensitive: bool) {
527        self.case_insensitive = case_insensitive;
528    }
529
530    pub fn load_csv<P: AsRef<Path>>(&mut self, path: P, table_name: &str) -> Result<()> {
531        self.datasource = Some(CsvDataSource::load_from_file(path, table_name)?);
532        Ok(())
533    }
534
535    pub fn load_json<P: AsRef<Path>>(&mut self, path: P, table_name: &str) -> Result<()> {
536        self.datasource = Some(CsvDataSource::load_from_json_file(path, table_name)?);
537        Ok(())
538    }
539
540    pub fn query_csv(&self, sql: &str) -> Result<QueryResponse> {
541        if let Some(ref ds) = self.datasource {
542            let data = ds.query_with_options(sql, self.case_insensitive)?;
543            let count = data.len();
544
545            Ok(QueryResponse {
546                data,
547                count,
548                query: QueryInfo {
549                    select: vec!["*".to_string()],
550                    where_clause: None,
551                    order_by: None,
552                },
553                source: Some("file".to_string()),
554                table: Some(ds.table_name.clone()),
555                cached: Some(false),
556            })
557        } else {
558            Err(anyhow::anyhow!("No CSV file loaded"))
559        }
560    }
561
562    #[must_use]
563    pub fn get_schema(&self) -> Option<HashMap<String, Vec<String>>> {
564        self.datasource.as_ref().map(|ds| {
565            let mut schema = HashMap::new();
566            schema.insert(ds.get_table_name().to_string(), ds.get_headers().to_vec());
567            schema
568        })
569    }
570
571    pub fn load_from_json(&mut self, data: Vec<Value>, table_name: &str) -> Result<()> {
572        // Extract headers from the first row
573        let headers: Vec<String> = if let Some(first_row) = data.first() {
574            if let Some(obj) = first_row.as_object() {
575                obj.keys().map(std::string::ToString::to_string).collect()
576            } else {
577                return Err(anyhow::anyhow!("Invalid JSON data format"));
578            }
579        } else {
580            return Err(anyhow::anyhow!("Empty data set"));
581        };
582
583        let column_lookup = build_column_lookup(&headers);
584
585        self.datasource = Some(CsvDataSource {
586            data: data.clone(),
587            headers,
588            table_name: table_name.to_string(),
589            column_lookup,
590        });
591
592        Ok(())
593    }
594
595    /// V49: Get `DataTable` directly from the datasource
596    /// This avoids JSON intermediate conversion
597    #[must_use]
598    pub fn get_datatable(&self) -> Option<DataTable> {
599        self.datasource.as_ref().map(|ds| {
600            debug!("V49: CsvApiClient returning DataTable directly");
601            ds.to_datatable()
602        })
603    }
604
605    /// V49: Check if we have a datasource loaded
606    #[must_use]
607    pub fn has_data(&self) -> bool {
608        self.datasource.is_some()
609    }
610}