sql_cli/data/
csv_datasource.rs

1use crate::api_client::{QueryInfo, QueryResponse};
2use crate::csv_fixes::{build_column_lookup, find_column_case_insensitive, parse_column_name};
3use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
4use crate::recursive_parser::{OrderByColumn, Parser, SortDirection};
5use crate::where_ast::evaluate_where_expr_with_options;
6use crate::where_parser::WhereParser;
7use anyhow::Result;
8use csv;
9use serde_json::{json, Value};
10use std::cmp::Ordering;
11use std::collections::HashMap;
12use std::fs::File;
13use std::io::BufReader;
14use std::path::Path;
15use tracing::debug;
16
17#[derive(Clone, Debug)]
18pub struct CsvDataSource {
19    data: Vec<Value>,
20    headers: Vec<String>,
21    table_name: String,
22    column_lookup: HashMap<String, String>,
23}
24
25impl CsvDataSource {
26    pub fn load_from_file<P: AsRef<Path>>(path: P, table_name: &str) -> Result<Self> {
27        let file = File::open(&path)?;
28        let mut reader = csv::Reader::from_reader(file);
29
30        // Get headers
31        let headers: Vec<String> = reader
32            .headers()?
33            .iter()
34            .map(std::string::ToString::to_string)
35            .collect();
36
37        // Read all records into JSON values
38        let mut data = Vec::new();
39        for result in reader.records() {
40            let record = result?;
41            let mut row = serde_json::Map::new();
42
43            for (i, field) in record.iter().enumerate() {
44                if let Some(header) = headers.get(i) {
45                    // Try to parse as number, otherwise store as string
46                    let value = if field.is_empty() {
47                        Value::Null
48                    } else if let Ok(n) = field.parse::<f64>() {
49                        json!(n)
50                    } else {
51                        Value::String(field.to_string())
52                    };
53                    row.insert(header.clone(), value);
54                }
55            }
56
57            data.push(Value::Object(row));
58        }
59
60        let column_lookup = build_column_lookup(&headers);
61
62        Ok(CsvDataSource {
63            data,
64            headers,
65            table_name: table_name.to_string(),
66            column_lookup,
67        })
68    }
69
70    pub fn load_from_json_file<P: AsRef<Path>>(path: P, table_name: &str) -> Result<Self> {
71        let file = File::open(&path)?;
72        let reader = BufReader::new(file);
73
74        // Parse JSON array
75        let json_data: Vec<Value> = serde_json::from_reader(reader)?;
76
77        if json_data.is_empty() {
78            return Err(anyhow::anyhow!("JSON file contains no data"));
79        }
80
81        // Extract headers from the first record
82        let headers = if let Some(first_record) = json_data.first() {
83            if let Some(obj) = first_record.as_object() {
84                obj.keys().cloned().collect()
85            } else {
86                return Err(anyhow::anyhow!("JSON records must be objects"));
87            }
88        } else {
89            Vec::new()
90        };
91
92        // Validate all records have the same structure
93        for (i, record) in json_data.iter().enumerate() {
94            if !record.is_object() {
95                return Err(anyhow::anyhow!("Record {} is not an object", i));
96            }
97        }
98
99        let column_lookup = build_column_lookup(&headers);
100
101        Ok(CsvDataSource {
102            data: json_data,
103            headers,
104            table_name: table_name.to_string(),
105            column_lookup,
106        })
107    }
108
109    pub fn query(&self, sql: &str) -> Result<Vec<Value>> {
110        self.query_with_options(sql, false)
111    }
112
113    pub fn query_with_options(&self, sql: &str, case_insensitive: bool) -> Result<Vec<Value>> {
114        // Parse SQL using the recursive parser - NO FALLBACK, recursive parser only
115        let mut parser = Parser::new(sql);
116        let stmt = parser
117            .parse()
118            .map_err(|e| anyhow::anyhow!("Recursive parser error: {}", e))?;
119
120        let mut results = self.data.clone();
121
122        // Handle WHERE clause using the existing WhereParser
123        let sql_lower = sql.to_lowercase();
124        if let Some(where_pos) = sql_lower.find(" where ") {
125            // Extract WHERE clause, but stop at ORDER BY, LIMIT, or OFFSET if present
126            let where_start = where_pos + 7;
127            let mut where_end = sql.len();
128
129            // Find the earliest of ORDER BY, LIMIT, or OFFSET to terminate WHERE clause
130            if let Some(order_pos) = sql_lower.find(" order by ") {
131                where_end = where_end.min(order_pos);
132            }
133            if let Some(limit_pos) = sql_lower.find(" limit ") {
134                where_end = where_end.min(limit_pos);
135            }
136            if let Some(offset_pos) = sql_lower.find(" offset ") {
137                where_end = where_end.min(offset_pos);
138            }
139
140            let where_clause = sql[where_start..where_end].trim();
141            results = self.filter_results_with_options(results, where_clause, case_insensitive)?;
142        }
143
144        // Handle specific column selection
145        if !stmt.columns.contains(&"*".to_string()) {
146            let columns: Vec<&str> = stmt
147                .columns
148                .iter()
149                .map(std::string::String::as_str)
150                .collect();
151            results = self.select_columns(results, &columns)?;
152        }
153
154        // Handle ORDER BY clause
155        if let Some(order_by_columns) = &stmt.order_by {
156            results = self.sort_results(results, order_by_columns)?;
157        }
158
159        // Handle OFFSET and LIMIT clauses
160        if let Some(offset) = stmt.offset {
161            results = results.into_iter().skip(offset).collect();
162        }
163
164        if let Some(limit) = stmt.limit {
165            results = results.into_iter().take(limit).collect();
166        }
167
168        Ok(results)
169    }
170
171    fn filter_results(&self, data: Vec<Value>, where_clause: &str) -> Result<Vec<Value>> {
172        self.filter_results_with_options(data, where_clause, false)
173    }
174
175    fn filter_results_with_options(
176        &self,
177        data: Vec<Value>,
178        where_clause: &str,
179        case_insensitive: bool,
180    ) -> Result<Vec<Value>> {
181        // Parse WHERE clause into AST with column context
182        let columns = self.headers.clone();
183        let expr = WhereParser::parse_with_options(where_clause, columns, case_insensitive)?;
184
185        let mut filtered = Vec::new();
186        for row in data {
187            if evaluate_where_expr_with_options(&expr, &row, case_insensitive)? {
188                filtered.push(row);
189            }
190        }
191
192        Ok(filtered)
193    }
194
195    fn select_columns(&self, data: Vec<Value>, columns: &[&str]) -> Result<Vec<Value>> {
196        let mut results = Vec::new();
197
198        for row in data {
199            if let Some(obj) = row.as_object() {
200                let mut new_row = serde_json::Map::new();
201
202                for &col in columns {
203                    let col_parsed = parse_column_name(col);
204
205                    if let Some((actual_key, value)) =
206                        find_column_case_insensitive(obj, col_parsed, &self.column_lookup)
207                    {
208                        new_row.insert(actual_key.clone(), value.clone());
209                    }
210                }
211
212                results.push(Value::Object(new_row));
213            }
214        }
215
216        Ok(results)
217    }
218
219    fn sort_results(
220        &self,
221        mut data: Vec<Value>,
222        order_by_columns: &[OrderByColumn],
223    ) -> Result<Vec<Value>> {
224        if order_by_columns.is_empty() {
225            return Ok(data);
226        }
227
228        // Sort by multiple columns with proper type-aware comparison
229        data.sort_by(|a, b| {
230            for order_col in order_by_columns {
231                let col_parsed = parse_column_name(&order_col.column);
232
233                let val_a = if let Some(obj_a) = a.as_object() {
234                    find_column_case_insensitive(obj_a, col_parsed, &self.column_lookup)
235                        .map(|(_, v)| v)
236                } else {
237                    None
238                };
239
240                let val_b = if let Some(obj_b) = b.as_object() {
241                    find_column_case_insensitive(obj_b, col_parsed, &self.column_lookup)
242                        .map(|(_, v)| v)
243                } else {
244                    None
245                };
246
247                let cmp = match (val_a, val_b) {
248                    (Some(Value::Number(a)), Some(Value::Number(b))) => {
249                        // Numeric comparison - handles integers and floats properly
250                        let a_f64 = a.as_f64().unwrap_or(0.0);
251                        let b_f64 = b.as_f64().unwrap_or(0.0);
252                        a_f64.partial_cmp(&b_f64).unwrap_or(Ordering::Equal)
253                    }
254                    (Some(Value::String(a)), Some(Value::String(b))) => {
255                        // String comparison
256                        a.cmp(b)
257                    }
258                    (Some(Value::Bool(a)), Some(Value::Bool(b))) => {
259                        // Boolean comparison (false < true)
260                        a.cmp(b)
261                    }
262                    (Some(Value::Null), Some(Value::Null)) => Ordering::Equal,
263                    (Some(Value::Null), Some(_)) => {
264                        // NULL comes first
265                        Ordering::Less
266                    }
267                    (Some(_), Some(Value::Null)) => {
268                        // NULL comes first
269                        Ordering::Greater
270                    }
271                    (None, None) => Ordering::Equal,
272                    (None, Some(_)) => {
273                        // Missing values come first
274                        Ordering::Less
275                    }
276                    (Some(_), None) => {
277                        // Missing values come first
278                        Ordering::Greater
279                    }
280                    // Mixed type comparisons - convert to strings
281                    (Some(a), Some(b)) => {
282                        let a_str = match a {
283                            Value::String(s) => s.clone(),
284                            Value::Number(n) => n.to_string(),
285                            Value::Bool(b) => b.to_string(),
286                            Value::Null => String::new(),
287                            _ => a.to_string(),
288                        };
289                        let b_str = match b {
290                            Value::String(s) => s.clone(),
291                            Value::Number(n) => n.to_string(),
292                            Value::Bool(b) => b.to_string(),
293                            Value::Null => String::new(),
294                            _ => b.to_string(),
295                        };
296                        a_str.cmp(&b_str)
297                    }
298                };
299
300                // Apply sort direction (ASC or DESC)
301                let final_cmp = match order_col.direction {
302                    SortDirection::Asc => cmp,
303                    SortDirection::Desc => cmp.reverse(),
304                };
305
306                // If this column comparison is not equal, return the result
307                if final_cmp != Ordering::Equal {
308                    return final_cmp;
309                }
310
311                // Otherwise, continue to the next column for tie-breaking
312            }
313
314            // All columns are equal
315            Ordering::Equal
316        });
317
318        Ok(data)
319    }
320
321    #[must_use]
322    pub fn get_headers(&self) -> &[String] {
323        &self.headers
324    }
325
326    #[must_use]
327    pub fn get_table_name(&self) -> &str {
328        &self.table_name
329    }
330
331    #[must_use]
332    pub fn get_row_count(&self) -> usize {
333        self.data.len()
334    }
335
336    /// V49: Convert `CsvDataSource` directly to `DataTable`
337    /// This avoids the JSON intermediate format
338    pub fn to_datatable(&self) -> DataTable {
339        debug!(
340            "V49: Converting CsvDataSource to DataTable for table '{}'",
341            self.table_name
342        );
343
344        let mut table = DataTable::new(&self.table_name);
345
346        // Create columns from headers
347        for header in &self.headers {
348            table.add_column(DataColumn::new(header.clone()));
349        }
350
351        // Convert each JSON row to DataRow
352        for (row_idx, json_row) in self.data.iter().enumerate() {
353            if let Some(obj) = json_row.as_object() {
354                let mut values = Vec::new();
355
356                // Get values in the same order as headers
357                for header in &self.headers {
358                    let value = obj
359                        .get(header)
360                        .map_or(DataValue::Null, json_value_to_data_value);
361                    values.push(value);
362                }
363
364                if let Err(e) = table.add_row(DataRow::new(values)) {
365                    debug!("V49: Failed to add row {}: {}", row_idx, e);
366                }
367            }
368        }
369
370        // Infer column types from the data
371        table.infer_column_types();
372
373        // Add metadata
374        table
375            .metadata
376            .insert("source".to_string(), "csv".to_string());
377        table
378            .metadata
379            .insert("original_count".to_string(), self.data.len().to_string());
380
381        debug!(
382            "V49: Created DataTable with {} rows and {} columns directly from CSV",
383            table.row_count(),
384            table.column_count()
385        );
386
387        table
388    }
389}
390
391/// V49: Helper function to convert JSON value to `DataValue`
392fn json_value_to_data_value(json: &Value) -> DataValue {
393    match json {
394        Value::Null => DataValue::Null,
395        Value::Bool(b) => DataValue::Boolean(*b),
396        Value::Number(n) => {
397            if let Some(i) = n.as_i64() {
398                DataValue::Integer(i)
399            } else if let Some(f) = n.as_f64() {
400                DataValue::Float(f)
401            } else {
402                DataValue::String(n.to_string())
403            }
404        }
405        Value::String(s) => {
406            // Try to detect if it's a date/time
407            if s.contains('-') && s.len() >= 8 && s.len() <= 30 {
408                // Simple heuristic for dates
409                DataValue::DateTime(s.clone())
410            } else {
411                DataValue::String(s.clone())
412            }
413        }
414        Value::Array(_) | Value::Object(_) => {
415            // Store complex types as JSON string
416            DataValue::String(json.to_string())
417        }
418    }
419}
420
421// Integration with ApiClient
422#[derive(Clone)]
423pub struct CsvApiClient {
424    datasource: Option<CsvDataSource>,
425    case_insensitive: bool,
426}
427
428impl Default for CsvApiClient {
429    fn default() -> Self {
430        Self::new()
431    }
432}
433
434impl CsvApiClient {
435    #[must_use]
436    pub fn new() -> Self {
437        Self {
438            datasource: None,
439            case_insensitive: false,
440        }
441    }
442
443    pub fn set_case_insensitive(&mut self, case_insensitive: bool) {
444        self.case_insensitive = case_insensitive;
445    }
446
447    pub fn load_csv<P: AsRef<Path>>(&mut self, path: P, table_name: &str) -> Result<()> {
448        self.datasource = Some(CsvDataSource::load_from_file(path, table_name)?);
449        Ok(())
450    }
451
452    pub fn load_json<P: AsRef<Path>>(&mut self, path: P, table_name: &str) -> Result<()> {
453        self.datasource = Some(CsvDataSource::load_from_json_file(path, table_name)?);
454        Ok(())
455    }
456
457    pub fn query_csv(&self, sql: &str) -> Result<QueryResponse> {
458        if let Some(ref ds) = self.datasource {
459            let data = ds.query_with_options(sql, self.case_insensitive)?;
460            let count = data.len();
461
462            Ok(QueryResponse {
463                data,
464                count,
465                query: QueryInfo {
466                    select: vec!["*".to_string()],
467                    where_clause: None,
468                    order_by: None,
469                },
470                source: Some("file".to_string()),
471                table: Some(ds.table_name.clone()),
472                cached: Some(false),
473            })
474        } else {
475            Err(anyhow::anyhow!("No CSV file loaded"))
476        }
477    }
478
479    #[must_use]
480    pub fn get_schema(&self) -> Option<HashMap<String, Vec<String>>> {
481        self.datasource.as_ref().map(|ds| {
482            let mut schema = HashMap::new();
483            schema.insert(ds.get_table_name().to_string(), ds.get_headers().to_vec());
484            schema
485        })
486    }
487
488    pub fn load_from_json(&mut self, data: Vec<Value>, table_name: &str) -> Result<()> {
489        // Extract headers from the first row
490        let headers: Vec<String> = if let Some(first_row) = data.first() {
491            if let Some(obj) = first_row.as_object() {
492                obj.keys().map(std::string::ToString::to_string).collect()
493            } else {
494                return Err(anyhow::anyhow!("Invalid JSON data format"));
495            }
496        } else {
497            return Err(anyhow::anyhow!("Empty data set"));
498        };
499
500        let column_lookup = build_column_lookup(&headers);
501
502        self.datasource = Some(CsvDataSource {
503            data: data.clone(),
504            headers,
505            table_name: table_name.to_string(),
506            column_lookup,
507        });
508
509        Ok(())
510    }
511
512    /// V49: Get `DataTable` directly from the datasource
513    /// This avoids JSON intermediate conversion
514    #[must_use]
515    pub fn get_datatable(&self) -> Option<DataTable> {
516        self.datasource.as_ref().map(|ds| {
517            debug!("V49: CsvApiClient returning DataTable directly");
518            ds.to_datatable()
519        })
520    }
521
522    /// V49: Check if we have a datasource loaded
523    #[must_use]
524    pub fn has_data(&self) -> bool {
525        self.datasource.is_some()
526    }
527}