1use crate::api_client::{QueryInfo, QueryResponse};
2use crate::csv_fixes::{build_column_lookup, find_column_case_insensitive, parse_column_name};
3use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
4use crate::recursive_parser::{OrderByColumn, Parser, SortDirection};
5use crate::where_ast::evaluate_where_expr_with_options;
6use crate::where_parser::WhereParser;
7use anyhow::Result;
8use csv;
9use serde_json::{json, Value};
10use std::cmp::Ordering;
11use std::collections::HashMap;
12use std::fs::File;
13use std::io::BufReader;
14use std::path::Path;
15use tracing::debug;
16
17#[derive(Clone, Debug)]
18pub struct CsvDataSource {
19 data: Vec<Value>,
20 headers: Vec<String>,
21 table_name: String,
22 column_lookup: HashMap<String, String>,
23}
24
25impl CsvDataSource {
26 pub fn load_from_file<P: AsRef<Path>>(path: P, table_name: &str) -> Result<Self> {
27 use std::io::{BufRead, BufReader as IOBufReader};
28
29 let file = File::open(&path)?;
30 let mut reader = csv::Reader::from_reader(file);
31
32 let headers: Vec<String> = reader
34 .headers()?
35 .iter()
36 .map(std::string::ToString::to_string)
37 .collect();
38
39 let file2 = File::open(&path)?;
41 let mut line_reader = IOBufReader::new(file2);
42 let mut raw_line = String::new();
43 line_reader.read_line(&mut raw_line)?;
45
46 let mut data = Vec::new();
48 for result in reader.records() {
49 let record = result?;
50
51 raw_line.clear();
53 line_reader.read_line(&mut raw_line)?;
54
55 let mut row = serde_json::Map::new();
56
57 for (i, field) in record.iter().enumerate() {
58 if let Some(header) = headers.get(i) {
59 let value = if field.is_empty() {
62 if Self::is_null_field(&raw_line, i) {
66 Value::Null
67 } else {
68 Value::String(String::new())
70 }
71 } else if let Ok(n) = field.parse::<f64>() {
72 json!(n)
73 } else {
74 Value::String(field.to_string())
75 };
76 row.insert(header.clone(), value);
77 }
78 }
79
80 data.push(Value::Object(row));
81 }
82
83 let column_lookup = build_column_lookup(&headers);
84
85 Ok(CsvDataSource {
86 data,
87 headers,
88 table_name: table_name.to_string(),
89 column_lookup,
90 })
91 }
92
93 fn is_null_field(raw_line: &str, field_index: usize) -> bool {
96 let mut comma_count = 0;
98 let mut in_quotes = false;
99 let mut field_start = 0;
100 let mut prev_char = ' ';
101
102 for (i, ch) in raw_line.chars().enumerate() {
103 if ch == '"' && prev_char != '\\' {
104 in_quotes = !in_quotes;
105 }
106
107 if ch == ',' && !in_quotes {
108 if comma_count == field_index {
109 let field_end = i;
110 let field_content = &raw_line[field_start..field_end].trim();
111 if field_content.is_empty() {
113 return true; }
115 if field_content.starts_with('"')
117 && field_content.ends_with('"')
118 && field_content.len() == 2
119 {
120 return false; }
122 return false; }
124 comma_count += 1;
125 field_start = i + 1;
126 }
127 prev_char = ch;
128 }
129
130 if comma_count == field_index {
132 let field_content = raw_line[field_start..]
133 .trim()
134 .trim_end_matches('\n')
135 .trim_end_matches('\r');
136 if field_content.is_empty() {
138 return true; }
140 if field_content.starts_with('"')
142 && field_content.ends_with('"')
143 && field_content.len() == 2
144 {
145 return false; }
147 return false; }
149
150 false }
152
153 pub fn load_from_json_file<P: AsRef<Path>>(path: P, table_name: &str) -> Result<Self> {
154 let file = File::open(&path)?;
155 let reader = BufReader::new(file);
156
157 let json_data: Vec<Value> = serde_json::from_reader(reader)?;
159
160 if json_data.is_empty() {
161 return Err(anyhow::anyhow!("JSON file contains no data"));
162 }
163
164 let headers = if let Some(first_record) = json_data.first() {
166 if let Some(obj) = first_record.as_object() {
167 obj.keys().cloned().collect()
168 } else {
169 return Err(anyhow::anyhow!("JSON records must be objects"));
170 }
171 } else {
172 Vec::new()
173 };
174
175 for (i, record) in json_data.iter().enumerate() {
177 if !record.is_object() {
178 return Err(anyhow::anyhow!("Record {} is not an object", i));
179 }
180 }
181
182 let column_lookup = build_column_lookup(&headers);
183
184 Ok(CsvDataSource {
185 data: json_data,
186 headers,
187 table_name: table_name.to_string(),
188 column_lookup,
189 })
190 }
191
192 pub fn query(&self, sql: &str) -> Result<Vec<Value>> {
193 self.query_with_options(sql, false)
194 }
195
196 pub fn query_with_options(&self, sql: &str, case_insensitive: bool) -> Result<Vec<Value>> {
197 let mut parser = Parser::new(sql);
199 let stmt = parser
200 .parse()
201 .map_err(|e| anyhow::anyhow!("Recursive parser error: {}", e))?;
202
203 let mut results = self.data.clone();
204
205 let sql_lower = sql.to_lowercase();
207 if let Some(where_pos) = sql_lower.find(" where ") {
208 let where_start = where_pos + 7;
210 let mut where_end = sql.len();
211
212 if let Some(order_pos) = sql_lower.find(" order by ") {
214 where_end = where_end.min(order_pos);
215 }
216 if let Some(limit_pos) = sql_lower.find(" limit ") {
217 where_end = where_end.min(limit_pos);
218 }
219 if let Some(offset_pos) = sql_lower.find(" offset ") {
220 where_end = where_end.min(offset_pos);
221 }
222
223 let where_clause = sql[where_start..where_end].trim();
224 results = self.filter_results_with_options(results, where_clause, case_insensitive)?;
225 }
226
227 if !stmt.columns.contains(&"*".to_string()) {
229 let columns: Vec<&str> = stmt
230 .columns
231 .iter()
232 .map(std::string::String::as_str)
233 .collect();
234 results = self.select_columns(results, &columns)?;
235 }
236
237 if let Some(order_by_columns) = &stmt.order_by {
239 results = self.sort_results(results, order_by_columns)?;
240 }
241
242 if let Some(offset) = stmt.offset {
244 results = results.into_iter().skip(offset).collect();
245 }
246
247 if let Some(limit) = stmt.limit {
248 results = results.into_iter().take(limit).collect();
249 }
250
251 Ok(results)
252 }
253
254 fn filter_results(&self, data: Vec<Value>, where_clause: &str) -> Result<Vec<Value>> {
255 self.filter_results_with_options(data, where_clause, false)
256 }
257
258 fn filter_results_with_options(
259 &self,
260 data: Vec<Value>,
261 where_clause: &str,
262 case_insensitive: bool,
263 ) -> Result<Vec<Value>> {
264 let columns = self.headers.clone();
266 let expr = WhereParser::parse_with_options(where_clause, columns, case_insensitive)?;
267
268 let mut filtered = Vec::new();
269 for row in data {
270 if evaluate_where_expr_with_options(&expr, &row, case_insensitive)? {
271 filtered.push(row);
272 }
273 }
274
275 Ok(filtered)
276 }
277
278 fn select_columns(&self, data: Vec<Value>, columns: &[&str]) -> Result<Vec<Value>> {
279 let mut results = Vec::new();
280
281 for row in data {
282 if let Some(obj) = row.as_object() {
283 let mut new_row = serde_json::Map::new();
284
285 for &col in columns {
286 let col_parsed = parse_column_name(col);
287
288 if let Some((actual_key, value)) =
289 find_column_case_insensitive(obj, col_parsed, &self.column_lookup)
290 {
291 new_row.insert(actual_key.clone(), value.clone());
292 }
293 }
294
295 results.push(Value::Object(new_row));
296 }
297 }
298
299 Ok(results)
300 }
301
302 fn sort_results(
303 &self,
304 mut data: Vec<Value>,
305 order_by_columns: &[OrderByColumn],
306 ) -> Result<Vec<Value>> {
307 if order_by_columns.is_empty() {
308 return Ok(data);
309 }
310
311 data.sort_by(|a, b| {
313 for order_col in order_by_columns {
314 let col_parsed = parse_column_name(&order_col.column);
315
316 let val_a = if let Some(obj_a) = a.as_object() {
317 find_column_case_insensitive(obj_a, col_parsed, &self.column_lookup)
318 .map(|(_, v)| v)
319 } else {
320 None
321 };
322
323 let val_b = if let Some(obj_b) = b.as_object() {
324 find_column_case_insensitive(obj_b, col_parsed, &self.column_lookup)
325 .map(|(_, v)| v)
326 } else {
327 None
328 };
329
330 let cmp = match (val_a, val_b) {
331 (Some(Value::Number(a)), Some(Value::Number(b))) => {
332 let a_f64 = a.as_f64().unwrap_or(0.0);
334 let b_f64 = b.as_f64().unwrap_or(0.0);
335 a_f64.partial_cmp(&b_f64).unwrap_or(Ordering::Equal)
336 }
337 (Some(Value::String(a)), Some(Value::String(b))) => {
338 a.cmp(b)
340 }
341 (Some(Value::Bool(a)), Some(Value::Bool(b))) => {
342 a.cmp(b)
344 }
345 (Some(Value::Null), Some(Value::Null)) => Ordering::Equal,
346 (Some(Value::Null), Some(_)) => {
347 Ordering::Less
349 }
350 (Some(_), Some(Value::Null)) => {
351 Ordering::Greater
353 }
354 (None, None) => Ordering::Equal,
355 (None, Some(_)) => {
356 Ordering::Less
358 }
359 (Some(_), None) => {
360 Ordering::Greater
362 }
363 (Some(a), Some(b)) => {
365 let a_str = match a {
366 Value::String(s) => s.clone(),
367 Value::Number(n) => n.to_string(),
368 Value::Bool(b) => b.to_string(),
369 Value::Null => String::new(),
370 _ => a.to_string(),
371 };
372 let b_str = match b {
373 Value::String(s) => s.clone(),
374 Value::Number(n) => n.to_string(),
375 Value::Bool(b) => b.to_string(),
376 Value::Null => String::new(),
377 _ => b.to_string(),
378 };
379 a_str.cmp(&b_str)
380 }
381 };
382
383 let final_cmp = match order_col.direction {
385 SortDirection::Asc => cmp,
386 SortDirection::Desc => cmp.reverse(),
387 };
388
389 if final_cmp != Ordering::Equal {
391 return final_cmp;
392 }
393
394 }
396
397 Ordering::Equal
399 });
400
401 Ok(data)
402 }
403
404 #[must_use]
405 pub fn get_headers(&self) -> &[String] {
406 &self.headers
407 }
408
409 #[must_use]
410 pub fn get_table_name(&self) -> &str {
411 &self.table_name
412 }
413
414 #[must_use]
415 pub fn get_row_count(&self) -> usize {
416 self.data.len()
417 }
418
419 pub fn to_datatable(&self) -> DataTable {
422 debug!(
423 "V49: Converting CsvDataSource to DataTable for table '{}'",
424 self.table_name
425 );
426
427 let mut table = DataTable::new(&self.table_name);
428
429 for header in &self.headers {
431 table.add_column(DataColumn::new(header.clone()));
432 }
433
434 for (row_idx, json_row) in self.data.iter().enumerate() {
436 if let Some(obj) = json_row.as_object() {
437 let mut values = Vec::new();
438
439 for header in &self.headers {
441 let value = obj
442 .get(header)
443 .map_or(DataValue::Null, json_value_to_data_value);
444 values.push(value);
445 }
446
447 if let Err(e) = table.add_row(DataRow::new(values)) {
448 debug!("V49: Failed to add row {}: {}", row_idx, e);
449 }
450 }
451 }
452
453 table.infer_column_types();
455
456 table
458 .metadata
459 .insert("source".to_string(), "csv".to_string());
460 table
461 .metadata
462 .insert("original_count".to_string(), self.data.len().to_string());
463
464 debug!(
465 "V49: Created DataTable with {} rows and {} columns directly from CSV",
466 table.row_count(),
467 table.column_count()
468 );
469
470 table
471 }
472}
473
474fn json_value_to_data_value(json: &Value) -> DataValue {
476 match json {
477 Value::Null => DataValue::Null,
478 Value::Bool(b) => DataValue::Boolean(*b),
479 Value::Number(n) => {
480 if let Some(i) = n.as_i64() {
481 DataValue::Integer(i)
482 } else if let Some(f) = n.as_f64() {
483 DataValue::Float(f)
484 } else {
485 DataValue::String(n.to_string())
486 }
487 }
488 Value::String(s) => {
489 if s.contains('-') && s.len() >= 8 && s.len() <= 30 {
491 DataValue::DateTime(s.clone())
493 } else {
494 DataValue::String(s.clone())
495 }
496 }
497 Value::Array(_) | Value::Object(_) => {
498 DataValue::String(json.to_string())
500 }
501 }
502}
503
504#[derive(Clone)]
506pub struct CsvApiClient {
507 datasource: Option<CsvDataSource>,
508 case_insensitive: bool,
509}
510
511impl Default for CsvApiClient {
512 fn default() -> Self {
513 Self::new()
514 }
515}
516
517impl CsvApiClient {
518 #[must_use]
519 pub fn new() -> Self {
520 Self {
521 datasource: None,
522 case_insensitive: false,
523 }
524 }
525
526 pub fn set_case_insensitive(&mut self, case_insensitive: bool) {
527 self.case_insensitive = case_insensitive;
528 }
529
530 pub fn load_csv<P: AsRef<Path>>(&mut self, path: P, table_name: &str) -> Result<()> {
531 self.datasource = Some(CsvDataSource::load_from_file(path, table_name)?);
532 Ok(())
533 }
534
535 pub fn load_json<P: AsRef<Path>>(&mut self, path: P, table_name: &str) -> Result<()> {
536 self.datasource = Some(CsvDataSource::load_from_json_file(path, table_name)?);
537 Ok(())
538 }
539
540 pub fn query_csv(&self, sql: &str) -> Result<QueryResponse> {
541 if let Some(ref ds) = self.datasource {
542 let data = ds.query_with_options(sql, self.case_insensitive)?;
543 let count = data.len();
544
545 Ok(QueryResponse {
546 data,
547 count,
548 query: QueryInfo {
549 select: vec!["*".to_string()],
550 where_clause: None,
551 order_by: None,
552 },
553 source: Some("file".to_string()),
554 table: Some(ds.table_name.clone()),
555 cached: Some(false),
556 })
557 } else {
558 Err(anyhow::anyhow!("No CSV file loaded"))
559 }
560 }
561
562 #[must_use]
563 pub fn get_schema(&self) -> Option<HashMap<String, Vec<String>>> {
564 self.datasource.as_ref().map(|ds| {
565 let mut schema = HashMap::new();
566 schema.insert(ds.get_table_name().to_string(), ds.get_headers().to_vec());
567 schema
568 })
569 }
570
571 pub fn load_from_json(&mut self, data: Vec<Value>, table_name: &str) -> Result<()> {
572 let headers: Vec<String> = if let Some(first_row) = data.first() {
574 if let Some(obj) = first_row.as_object() {
575 obj.keys().map(std::string::ToString::to_string).collect()
576 } else {
577 return Err(anyhow::anyhow!("Invalid JSON data format"));
578 }
579 } else {
580 return Err(anyhow::anyhow!("Empty data set"));
581 };
582
583 let column_lookup = build_column_lookup(&headers);
584
585 self.datasource = Some(CsvDataSource {
586 data: data.clone(),
587 headers,
588 table_name: table_name.to_string(),
589 column_lookup,
590 });
591
592 Ok(())
593 }
594
595 #[must_use]
598 pub fn get_datatable(&self) -> Option<DataTable> {
599 self.datasource.as_ref().map(|ds| {
600 debug!("V49: CsvApiClient returning DataTable directly");
601 ds.to_datatable()
602 })
603 }
604
605 #[must_use]
607 pub fn has_data(&self) -> bool {
608 self.datasource.is_some()
609 }
610}