sql_cli/data/
csv_datasource.rs

1use crate::api_client::{QueryInfo, QueryResponse};
2use crate::csv_fixes::{build_column_lookup, find_column_case_insensitive, parse_column_name};
3use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
4use crate::recursive_parser::{OrderByColumn, Parser, SortDirection};
5use crate::where_ast::evaluate_where_expr_with_options;
6use crate::where_parser::WhereParser;
7use anyhow::Result;
8use csv;
9use serde_json::{json, Value};
10use std::cmp::Ordering;
11use std::collections::HashMap;
12use std::fs::File;
13use std::io::BufReader;
14use std::path::Path;
15use tracing::debug;
16
17#[derive(Clone, Debug)]
18pub struct CsvDataSource {
19    data: Vec<Value>,
20    headers: Vec<String>,
21    table_name: String,
22    column_lookup: HashMap<String, String>,
23}
24
25impl CsvDataSource {
26    pub fn load_from_file<P: AsRef<Path>>(path: P, table_name: &str) -> Result<Self> {
27        let file = File::open(&path)?;
28        let mut reader = csv::Reader::from_reader(file);
29
30        // Get headers
31        let headers: Vec<String> = reader.headers()?.iter().map(|h| h.to_string()).collect();
32
33        // Read all records into JSON values
34        let mut data = Vec::new();
35        for result in reader.records() {
36            let record = result?;
37            let mut row = serde_json::Map::new();
38
39            for (i, field) in record.iter().enumerate() {
40                if let Some(header) = headers.get(i) {
41                    // Try to parse as number, otherwise store as string
42                    let value = if field.is_empty() {
43                        Value::Null
44                    } else if let Ok(n) = field.parse::<f64>() {
45                        json!(n)
46                    } else {
47                        Value::String(field.to_string())
48                    };
49                    row.insert(header.clone(), value);
50                }
51            }
52
53            data.push(Value::Object(row));
54        }
55
56        let column_lookup = build_column_lookup(&headers);
57
58        Ok(CsvDataSource {
59            data,
60            headers,
61            table_name: table_name.to_string(),
62            column_lookup,
63        })
64    }
65
66    pub fn load_from_json_file<P: AsRef<Path>>(path: P, table_name: &str) -> Result<Self> {
67        let file = File::open(&path)?;
68        let reader = BufReader::new(file);
69
70        // Parse JSON array
71        let json_data: Vec<Value> = serde_json::from_reader(reader)?;
72
73        if json_data.is_empty() {
74            return Err(anyhow::anyhow!("JSON file contains no data"));
75        }
76
77        // Extract headers from the first record
78        let headers = if let Some(first_record) = json_data.first() {
79            if let Some(obj) = first_record.as_object() {
80                obj.keys().cloned().collect()
81            } else {
82                return Err(anyhow::anyhow!("JSON records must be objects"));
83            }
84        } else {
85            Vec::new()
86        };
87
88        // Validate all records have the same structure
89        for (i, record) in json_data.iter().enumerate() {
90            if !record.is_object() {
91                return Err(anyhow::anyhow!("Record {} is not an object", i));
92            }
93        }
94
95        let column_lookup = build_column_lookup(&headers);
96
97        Ok(CsvDataSource {
98            data: json_data,
99            headers,
100            table_name: table_name.to_string(),
101            column_lookup,
102        })
103    }
104
105    pub fn query(&self, sql: &str) -> Result<Vec<Value>> {
106        self.query_with_options(sql, false)
107    }
108
109    pub fn query_with_options(&self, sql: &str, case_insensitive: bool) -> Result<Vec<Value>> {
110        // Parse SQL using the recursive parser - NO FALLBACK, recursive parser only
111        let mut parser = Parser::new(sql);
112        let stmt = parser
113            .parse()
114            .map_err(|e| anyhow::anyhow!("Recursive parser error: {}", e))?;
115
116        let mut results = self.data.clone();
117
118        // Handle WHERE clause using the existing WhereParser
119        let sql_lower = sql.to_lowercase();
120        if let Some(where_pos) = sql_lower.find(" where ") {
121            // Extract WHERE clause, but stop at ORDER BY, LIMIT, or OFFSET if present
122            let where_start = where_pos + 7;
123            let mut where_end = sql.len();
124
125            // Find the earliest of ORDER BY, LIMIT, or OFFSET to terminate WHERE clause
126            if let Some(order_pos) = sql_lower.find(" order by ") {
127                where_end = where_end.min(order_pos);
128            }
129            if let Some(limit_pos) = sql_lower.find(" limit ") {
130                where_end = where_end.min(limit_pos);
131            }
132            if let Some(offset_pos) = sql_lower.find(" offset ") {
133                where_end = where_end.min(offset_pos);
134            }
135
136            let where_clause = sql[where_start..where_end].trim();
137            results = self.filter_results_with_options(results, where_clause, case_insensitive)?;
138        }
139
140        // Handle specific column selection
141        if !stmt.columns.contains(&"*".to_string()) {
142            let columns: Vec<&str> = stmt.columns.iter().map(|s| s.as_str()).collect();
143            results = self.select_columns(results, &columns)?;
144        }
145
146        // Handle ORDER BY clause
147        if let Some(order_by_columns) = &stmt.order_by {
148            results = self.sort_results(results, order_by_columns)?;
149        }
150
151        // Handle OFFSET and LIMIT clauses
152        if let Some(offset) = stmt.offset {
153            results = results.into_iter().skip(offset).collect();
154        }
155
156        if let Some(limit) = stmt.limit {
157            results = results.into_iter().take(limit).collect();
158        }
159
160        Ok(results)
161    }
162
163    fn filter_results(&self, data: Vec<Value>, where_clause: &str) -> Result<Vec<Value>> {
164        self.filter_results_with_options(data, where_clause, false)
165    }
166
167    fn filter_results_with_options(
168        &self,
169        data: Vec<Value>,
170        where_clause: &str,
171        case_insensitive: bool,
172    ) -> Result<Vec<Value>> {
173        // Parse WHERE clause into AST with column context
174        let columns = self.headers.clone();
175        let expr = WhereParser::parse_with_options(where_clause, columns, case_insensitive)?;
176
177        let mut filtered = Vec::new();
178        for row in data {
179            if evaluate_where_expr_with_options(&expr, &row, case_insensitive)? {
180                filtered.push(row);
181            }
182        }
183
184        Ok(filtered)
185    }
186
187    fn select_columns(&self, data: Vec<Value>, columns: &[&str]) -> Result<Vec<Value>> {
188        let mut results = Vec::new();
189
190        for row in data {
191            if let Some(obj) = row.as_object() {
192                let mut new_row = serde_json::Map::new();
193
194                for &col in columns {
195                    let col_parsed = parse_column_name(col);
196
197                    if let Some((actual_key, value)) =
198                        find_column_case_insensitive(obj, col_parsed, &self.column_lookup)
199                    {
200                        new_row.insert(actual_key.clone(), value.clone());
201                    }
202                }
203
204                results.push(Value::Object(new_row));
205            }
206        }
207
208        Ok(results)
209    }
210
211    fn sort_results(
212        &self,
213        mut data: Vec<Value>,
214        order_by_columns: &[OrderByColumn],
215    ) -> Result<Vec<Value>> {
216        if order_by_columns.is_empty() {
217            return Ok(data);
218        }
219
220        // Sort by multiple columns with proper type-aware comparison
221        data.sort_by(|a, b| {
222            for order_col in order_by_columns {
223                let col_parsed = parse_column_name(&order_col.column);
224
225                let val_a = if let Some(obj_a) = a.as_object() {
226                    find_column_case_insensitive(obj_a, col_parsed, &self.column_lookup)
227                        .map(|(_, v)| v)
228                } else {
229                    None
230                };
231
232                let val_b = if let Some(obj_b) = b.as_object() {
233                    find_column_case_insensitive(obj_b, col_parsed, &self.column_lookup)
234                        .map(|(_, v)| v)
235                } else {
236                    None
237                };
238
239                let cmp = match (val_a, val_b) {
240                    (Some(Value::Number(a)), Some(Value::Number(b))) => {
241                        // Numeric comparison - handles integers and floats properly
242                        let a_f64 = a.as_f64().unwrap_or(0.0);
243                        let b_f64 = b.as_f64().unwrap_or(0.0);
244                        a_f64.partial_cmp(&b_f64).unwrap_or(Ordering::Equal)
245                    }
246                    (Some(Value::String(a)), Some(Value::String(b))) => {
247                        // String comparison
248                        a.cmp(b)
249                    }
250                    (Some(Value::Bool(a)), Some(Value::Bool(b))) => {
251                        // Boolean comparison (false < true)
252                        a.cmp(b)
253                    }
254                    (Some(Value::Null), Some(Value::Null)) => Ordering::Equal,
255                    (Some(Value::Null), Some(_)) => {
256                        // NULL comes first
257                        Ordering::Less
258                    }
259                    (Some(_), Some(Value::Null)) => {
260                        // NULL comes first
261                        Ordering::Greater
262                    }
263                    (None, None) => Ordering::Equal,
264                    (None, Some(_)) => {
265                        // Missing values come first
266                        Ordering::Less
267                    }
268                    (Some(_), None) => {
269                        // Missing values come first
270                        Ordering::Greater
271                    }
272                    // Mixed type comparisons - convert to strings
273                    (Some(a), Some(b)) => {
274                        let a_str = match a {
275                            Value::String(s) => s.clone(),
276                            Value::Number(n) => n.to_string(),
277                            Value::Bool(b) => b.to_string(),
278                            Value::Null => "".to_string(),
279                            _ => a.to_string(),
280                        };
281                        let b_str = match b {
282                            Value::String(s) => s.clone(),
283                            Value::Number(n) => n.to_string(),
284                            Value::Bool(b) => b.to_string(),
285                            Value::Null => "".to_string(),
286                            _ => b.to_string(),
287                        };
288                        a_str.cmp(&b_str)
289                    }
290                };
291
292                // Apply sort direction (ASC or DESC)
293                let final_cmp = match order_col.direction {
294                    SortDirection::Asc => cmp,
295                    SortDirection::Desc => cmp.reverse(),
296                };
297
298                // If this column comparison is not equal, return the result
299                if final_cmp != Ordering::Equal {
300                    return final_cmp;
301                }
302
303                // Otherwise, continue to the next column for tie-breaking
304            }
305
306            // All columns are equal
307            Ordering::Equal
308        });
309
310        Ok(data)
311    }
312
313    pub fn get_headers(&self) -> &[String] {
314        &self.headers
315    }
316
317    pub fn get_table_name(&self) -> &str {
318        &self.table_name
319    }
320
321    pub fn get_row_count(&self) -> usize {
322        self.data.len()
323    }
324
325    /// V49: Convert CsvDataSource directly to DataTable
326    /// This avoids the JSON intermediate format
327    pub fn to_datatable(&self) -> DataTable {
328        debug!(
329            "V49: Converting CsvDataSource to DataTable for table '{}'",
330            self.table_name
331        );
332
333        let mut table = DataTable::new(&self.table_name);
334
335        // Create columns from headers
336        for header in &self.headers {
337            table.add_column(DataColumn::new(header.clone()));
338        }
339
340        // Convert each JSON row to DataRow
341        for (row_idx, json_row) in self.data.iter().enumerate() {
342            if let Some(obj) = json_row.as_object() {
343                let mut values = Vec::new();
344
345                // Get values in the same order as headers
346                for header in &self.headers {
347                    let value = obj
348                        .get(header)
349                        .map(|v| json_value_to_data_value(v))
350                        .unwrap_or(DataValue::Null);
351                    values.push(value);
352                }
353
354                if let Err(e) = table.add_row(DataRow::new(values)) {
355                    debug!("V49: Failed to add row {}: {}", row_idx, e);
356                }
357            }
358        }
359
360        // Infer column types from the data
361        table.infer_column_types();
362
363        // Add metadata
364        table
365            .metadata
366            .insert("source".to_string(), "csv".to_string());
367        table
368            .metadata
369            .insert("original_count".to_string(), self.data.len().to_string());
370
371        debug!(
372            "V49: Created DataTable with {} rows and {} columns directly from CSV",
373            table.row_count(),
374            table.column_count()
375        );
376
377        table
378    }
379}
380
381/// V49: Helper function to convert JSON value to DataValue
382fn json_value_to_data_value(json: &Value) -> DataValue {
383    match json {
384        Value::Null => DataValue::Null,
385        Value::Bool(b) => DataValue::Boolean(*b),
386        Value::Number(n) => {
387            if let Some(i) = n.as_i64() {
388                DataValue::Integer(i)
389            } else if let Some(f) = n.as_f64() {
390                DataValue::Float(f)
391            } else {
392                DataValue::String(n.to_string())
393            }
394        }
395        Value::String(s) => {
396            // Try to detect if it's a date/time
397            if s.contains('-') && s.len() >= 8 && s.len() <= 30 {
398                // Simple heuristic for dates
399                DataValue::DateTime(s.clone())
400            } else {
401                DataValue::String(s.clone())
402            }
403        }
404        Value::Array(_) | Value::Object(_) => {
405            // Store complex types as JSON string
406            DataValue::String(json.to_string())
407        }
408    }
409}
410
411// Integration with ApiClient
412#[derive(Clone)]
413pub struct CsvApiClient {
414    datasource: Option<CsvDataSource>,
415    case_insensitive: bool,
416}
417
418impl CsvApiClient {
419    pub fn new() -> Self {
420        Self {
421            datasource: None,
422            case_insensitive: false,
423        }
424    }
425
426    pub fn set_case_insensitive(&mut self, case_insensitive: bool) {
427        self.case_insensitive = case_insensitive;
428    }
429
430    pub fn load_csv<P: AsRef<Path>>(&mut self, path: P, table_name: &str) -> Result<()> {
431        self.datasource = Some(CsvDataSource::load_from_file(path, table_name)?);
432        Ok(())
433    }
434
435    pub fn load_json<P: AsRef<Path>>(&mut self, path: P, table_name: &str) -> Result<()> {
436        self.datasource = Some(CsvDataSource::load_from_json_file(path, table_name)?);
437        Ok(())
438    }
439
440    pub fn query_csv(&self, sql: &str) -> Result<QueryResponse> {
441        if let Some(ref ds) = self.datasource {
442            let data = ds.query_with_options(sql, self.case_insensitive)?;
443            let count = data.len();
444
445            Ok(QueryResponse {
446                data,
447                count,
448                query: QueryInfo {
449                    select: vec!["*".to_string()],
450                    where_clause: None,
451                    order_by: None,
452                },
453                source: Some("file".to_string()),
454                table: Some(ds.table_name.clone()),
455                cached: Some(false),
456            })
457        } else {
458            Err(anyhow::anyhow!("No CSV file loaded"))
459        }
460    }
461
462    pub fn get_schema(&self) -> Option<HashMap<String, Vec<String>>> {
463        self.datasource.as_ref().map(|ds| {
464            let mut schema = HashMap::new();
465            schema.insert(ds.get_table_name().to_string(), ds.get_headers().to_vec());
466            schema
467        })
468    }
469
470    pub fn load_from_json(&mut self, data: Vec<Value>, table_name: &str) -> Result<()> {
471        // Extract headers from the first row
472        let headers: Vec<String> = if let Some(first_row) = data.first() {
473            if let Some(obj) = first_row.as_object() {
474                obj.keys().map(|k| k.to_string()).collect()
475            } else {
476                return Err(anyhow::anyhow!("Invalid JSON data format"));
477            }
478        } else {
479            return Err(anyhow::anyhow!("Empty data set"));
480        };
481
482        let column_lookup = build_column_lookup(&headers);
483
484        self.datasource = Some(CsvDataSource {
485            data: data.clone(),
486            headers,
487            table_name: table_name.to_string(),
488            column_lookup,
489        });
490
491        Ok(())
492    }
493
494    /// V49: Get DataTable directly from the datasource
495    /// This avoids JSON intermediate conversion
496    pub fn get_datatable(&self) -> Option<DataTable> {
497        self.datasource.as_ref().map(|ds| {
498            debug!("V49: CsvApiClient returning DataTable directly");
499            ds.to_datatable()
500        })
501    }
502
503    /// V49: Check if we have a datasource loaded
504    pub fn has_data(&self) -> bool {
505        self.datasource.is_some()
506    }
507}