1use crate::api_client::{QueryInfo, QueryResponse};
2use crate::csv_fixes::{build_column_lookup, find_column_case_insensitive, parse_column_name};
3use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
4use crate::recursive_parser::{OrderByItem, Parser, SortDirection};
5use crate::sql::parser::ast::SqlExpression;
6use crate::where_ast::evaluate_where_expr_with_options;
7use crate::where_parser::WhereParser;
8use anyhow::Result;
9use csv;
10use serde_json::{json, Value};
11use std::cmp::Ordering;
12use std::collections::HashMap;
13use std::fs::File;
14use std::io::BufReader;
15use std::path::Path;
16use tracing::debug;
17
18#[derive(Clone, Debug)]
19pub struct CsvDataSource {
20 data: Vec<Value>,
21 headers: Vec<String>,
22 table_name: String,
23 column_lookup: HashMap<String, String>,
24}
25
26impl CsvDataSource {
27 pub fn load_from_file<P: AsRef<Path>>(path: P, table_name: &str) -> Result<Self> {
28 use std::io::{BufRead, BufReader as IOBufReader};
29
30 let file = File::open(&path)?;
31 let mut reader = csv::Reader::from_reader(file);
32
33 let headers: Vec<String> = reader
35 .headers()?
36 .iter()
37 .map(std::string::ToString::to_string)
38 .collect();
39
40 let file2 = File::open(&path)?;
42 let mut line_reader = IOBufReader::new(file2);
43 let mut raw_line = String::new();
44 line_reader.read_line(&mut raw_line)?;
46
47 let mut data = Vec::new();
49 for result in reader.records() {
50 let record = result?;
51
52 raw_line.clear();
54 line_reader.read_line(&mut raw_line)?;
55
56 let mut row = serde_json::Map::new();
57
58 for (i, field) in record.iter().enumerate() {
59 if let Some(header) = headers.get(i) {
60 let value = if field.is_empty() {
63 if Self::is_null_field(&raw_line, i) {
67 Value::Null
68 } else {
69 Value::String(String::new())
71 }
72 } else if let Ok(n) = field.parse::<f64>() {
73 json!(n)
74 } else {
75 Value::String(field.to_string())
76 };
77 row.insert(header.clone(), value);
78 }
79 }
80
81 data.push(Value::Object(row));
82 }
83
84 let column_lookup = build_column_lookup(&headers);
85
86 Ok(CsvDataSource {
87 data,
88 headers,
89 table_name: table_name.to_string(),
90 column_lookup,
91 })
92 }
93
94 fn is_null_field(raw_line: &str, field_index: usize) -> bool {
97 let mut comma_count = 0;
99 let mut in_quotes = false;
100 let mut field_start = 0;
101 let mut prev_char = ' ';
102
103 for (i, ch) in raw_line.chars().enumerate() {
104 if ch == '"' && prev_char != '\\' {
105 in_quotes = !in_quotes;
106 }
107
108 if ch == ',' && !in_quotes {
109 if comma_count == field_index {
110 let field_end = i;
111 let field_content = &raw_line[field_start..field_end].trim();
112 if field_content.is_empty() {
114 return true; }
116 if field_content.starts_with('"')
118 && field_content.ends_with('"')
119 && field_content.len() == 2
120 {
121 return false; }
123 return false; }
125 comma_count += 1;
126 field_start = i + 1;
127 }
128 prev_char = ch;
129 }
130
131 if comma_count == field_index {
133 let field_content = raw_line[field_start..]
134 .trim()
135 .trim_end_matches('\n')
136 .trim_end_matches('\r');
137 if field_content.is_empty() {
139 return true; }
141 if field_content.starts_with('"')
143 && field_content.ends_with('"')
144 && field_content.len() == 2
145 {
146 return false; }
148 return false; }
150
151 false }
153
154 pub fn load_from_json_file<P: AsRef<Path>>(path: P, table_name: &str) -> Result<Self> {
155 let file = File::open(&path)?;
156 let reader = BufReader::new(file);
157
158 let json_data: Vec<Value> = serde_json::from_reader(reader)?;
160
161 if json_data.is_empty() {
162 return Err(anyhow::anyhow!("JSON file contains no data"));
163 }
164
165 let headers = if let Some(first_record) = json_data.first() {
167 if let Some(obj) = first_record.as_object() {
168 obj.keys().cloned().collect()
169 } else {
170 return Err(anyhow::anyhow!("JSON records must be objects"));
171 }
172 } else {
173 Vec::new()
174 };
175
176 for (i, record) in json_data.iter().enumerate() {
178 if !record.is_object() {
179 return Err(anyhow::anyhow!("Record {} is not an object", i));
180 }
181 }
182
183 let column_lookup = build_column_lookup(&headers);
184
185 Ok(CsvDataSource {
186 data: json_data,
187 headers,
188 table_name: table_name.to_string(),
189 column_lookup,
190 })
191 }
192
193 pub fn query(&self, sql: &str) -> Result<Vec<Value>> {
194 self.query_with_options(sql, false)
195 }
196
197 pub fn query_with_options(&self, sql: &str, case_insensitive: bool) -> Result<Vec<Value>> {
198 let mut parser = Parser::new(sql);
200 let stmt = parser
201 .parse()
202 .map_err(|e| anyhow::anyhow!("Recursive parser error: {}", e))?;
203
204 let mut results = self.data.clone();
205
206 let sql_lower = sql.to_lowercase();
208 if let Some(where_pos) = sql_lower.find(" where ") {
209 let where_start = where_pos + 7;
211 let mut where_end = sql.len();
212
213 if let Some(order_pos) = sql_lower.find(" order by ") {
215 where_end = where_end.min(order_pos);
216 }
217 if let Some(limit_pos) = sql_lower.find(" limit ") {
218 where_end = where_end.min(limit_pos);
219 }
220 if let Some(offset_pos) = sql_lower.find(" offset ") {
221 where_end = where_end.min(offset_pos);
222 }
223
224 let where_clause = sql[where_start..where_end].trim();
225 results = self.filter_results_with_options(results, where_clause, case_insensitive)?;
226 }
227
228 if !stmt.columns.contains(&"*".to_string()) {
230 let columns: Vec<&str> = stmt
231 .columns
232 .iter()
233 .map(std::string::String::as_str)
234 .collect();
235 results = self.select_columns(results, &columns)?;
236 }
237
238 if let Some(order_by_columns) = &stmt.order_by {
240 results = self.sort_results(results, order_by_columns)?;
241 }
242
243 if let Some(offset) = stmt.offset {
245 results = results.into_iter().skip(offset).collect();
246 }
247
248 if let Some(limit) = stmt.limit {
249 results = results.into_iter().take(limit).collect();
250 }
251
252 Ok(results)
253 }
254
255 fn filter_results(&self, data: Vec<Value>, where_clause: &str) -> Result<Vec<Value>> {
256 self.filter_results_with_options(data, where_clause, false)
257 }
258
259 fn filter_results_with_options(
260 &self,
261 data: Vec<Value>,
262 where_clause: &str,
263 case_insensitive: bool,
264 ) -> Result<Vec<Value>> {
265 let columns = self.headers.clone();
267 let expr = WhereParser::parse_with_options(where_clause, columns, case_insensitive)?;
268
269 let mut filtered = Vec::new();
270 for row in data {
271 if evaluate_where_expr_with_options(&expr, &row, case_insensitive)? {
272 filtered.push(row);
273 }
274 }
275
276 Ok(filtered)
277 }
278
279 fn select_columns(&self, data: Vec<Value>, columns: &[&str]) -> Result<Vec<Value>> {
280 let mut results = Vec::new();
281
282 for row in data {
283 if let Some(obj) = row.as_object() {
284 let mut new_row = serde_json::Map::new();
285
286 for &col in columns {
287 let col_parsed = parse_column_name(col);
288
289 if let Some((actual_key, value)) =
290 find_column_case_insensitive(obj, col_parsed, &self.column_lookup)
291 {
292 new_row.insert(actual_key.clone(), value.clone());
293 }
294 }
295
296 results.push(Value::Object(new_row));
297 }
298 }
299
300 Ok(results)
301 }
302
303 fn sort_results(
304 &self,
305 mut data: Vec<Value>,
306 order_by_columns: &[OrderByItem],
307 ) -> Result<Vec<Value>> {
308 if order_by_columns.is_empty() {
309 return Ok(data);
310 }
311
312 data.sort_by(|a, b| {
314 for order_col in order_by_columns {
315 let column_name = match &order_col.expr {
317 SqlExpression::Column(col_ref) => col_ref.name.clone(),
318 _ => continue, };
320
321 let col_parsed = parse_column_name(&column_name);
322
323 let val_a = if let Some(obj_a) = a.as_object() {
324 find_column_case_insensitive(obj_a, col_parsed, &self.column_lookup)
325 .map(|(_, v)| v)
326 } else {
327 None
328 };
329
330 let val_b = if let Some(obj_b) = b.as_object() {
331 find_column_case_insensitive(obj_b, col_parsed, &self.column_lookup)
332 .map(|(_, v)| v)
333 } else {
334 None
335 };
336
337 let cmp = match (val_a, val_b) {
338 (Some(Value::Number(a)), Some(Value::Number(b))) => {
339 let a_f64 = a.as_f64().unwrap_or(0.0);
341 let b_f64 = b.as_f64().unwrap_or(0.0);
342 a_f64.partial_cmp(&b_f64).unwrap_or(Ordering::Equal)
343 }
344 (Some(Value::String(a)), Some(Value::String(b))) => {
345 a.cmp(b)
347 }
348 (Some(Value::Bool(a)), Some(Value::Bool(b))) => {
349 a.cmp(b)
351 }
352 (Some(Value::Null), Some(Value::Null)) => Ordering::Equal,
353 (Some(Value::Null), Some(_)) => {
354 Ordering::Less
356 }
357 (Some(_), Some(Value::Null)) => {
358 Ordering::Greater
360 }
361 (None, None) => Ordering::Equal,
362 (None, Some(_)) => {
363 Ordering::Less
365 }
366 (Some(_), None) => {
367 Ordering::Greater
369 }
370 (Some(a), Some(b)) => {
372 let a_str = match a {
373 Value::String(s) => s.clone(),
374 Value::Number(n) => n.to_string(),
375 Value::Bool(b) => b.to_string(),
376 Value::Null => String::new(),
377 _ => a.to_string(),
378 };
379 let b_str = match b {
380 Value::String(s) => s.clone(),
381 Value::Number(n) => n.to_string(),
382 Value::Bool(b) => b.to_string(),
383 Value::Null => String::new(),
384 _ => b.to_string(),
385 };
386 a_str.cmp(&b_str)
387 }
388 };
389
390 let final_cmp = match order_col.direction {
392 SortDirection::Asc => cmp,
393 SortDirection::Desc => cmp.reverse(),
394 };
395
396 if final_cmp != Ordering::Equal {
398 return final_cmp;
399 }
400
401 }
403
404 Ordering::Equal
406 });
407
408 Ok(data)
409 }
410
411 #[must_use]
412 pub fn get_headers(&self) -> &[String] {
413 &self.headers
414 }
415
416 #[must_use]
417 pub fn get_table_name(&self) -> &str {
418 &self.table_name
419 }
420
421 #[must_use]
422 pub fn get_row_count(&self) -> usize {
423 self.data.len()
424 }
425
426 pub fn to_datatable(&self) -> DataTable {
429 debug!(
430 "V49: Converting CsvDataSource to DataTable for table '{}'",
431 self.table_name
432 );
433
434 let mut table = DataTable::new(&self.table_name);
435
436 for header in &self.headers {
438 table.add_column(DataColumn::new(header.clone()));
439 }
440
441 for (row_idx, json_row) in self.data.iter().enumerate() {
443 if let Some(obj) = json_row.as_object() {
444 let mut values = Vec::new();
445
446 for header in &self.headers {
448 let value = obj
449 .get(header)
450 .map_or(DataValue::Null, json_value_to_data_value);
451 values.push(value);
452 }
453
454 if let Err(e) = table.add_row(DataRow::new(values)) {
455 debug!("V49: Failed to add row {}: {}", row_idx, e);
456 }
457 }
458 }
459
460 table.infer_column_types();
462
463 table
465 .metadata
466 .insert("source".to_string(), "csv".to_string());
467 table
468 .metadata
469 .insert("original_count".to_string(), self.data.len().to_string());
470
471 debug!(
472 "V49: Created DataTable with {} rows and {} columns directly from CSV",
473 table.row_count(),
474 table.column_count()
475 );
476
477 table
478 }
479}
480
481fn json_value_to_data_value(json: &Value) -> DataValue {
483 match json {
484 Value::Null => DataValue::Null,
485 Value::Bool(b) => DataValue::Boolean(*b),
486 Value::Number(n) => {
487 if let Some(i) = n.as_i64() {
488 DataValue::Integer(i)
489 } else if let Some(f) = n.as_f64() {
490 DataValue::Float(f)
491 } else {
492 DataValue::String(n.to_string())
493 }
494 }
495 Value::String(s) => {
496 if s.contains('-') && s.len() >= 8 && s.len() <= 30 {
498 DataValue::DateTime(s.clone())
500 } else {
501 DataValue::String(s.clone())
502 }
503 }
504 Value::Array(_) | Value::Object(_) => {
505 DataValue::String(json.to_string())
507 }
508 }
509}
510
511#[derive(Clone)]
513pub struct CsvApiClient {
514 datasource: Option<CsvDataSource>,
515 case_insensitive: bool,
516}
517
518impl Default for CsvApiClient {
519 fn default() -> Self {
520 Self::new()
521 }
522}
523
524impl CsvApiClient {
525 #[must_use]
526 pub fn new() -> Self {
527 Self {
528 datasource: None,
529 case_insensitive: false,
530 }
531 }
532
533 pub fn set_case_insensitive(&mut self, case_insensitive: bool) {
534 self.case_insensitive = case_insensitive;
535 }
536
537 pub fn load_csv<P: AsRef<Path>>(&mut self, path: P, table_name: &str) -> Result<()> {
538 self.datasource = Some(CsvDataSource::load_from_file(path, table_name)?);
539 Ok(())
540 }
541
542 pub fn load_json<P: AsRef<Path>>(&mut self, path: P, table_name: &str) -> Result<()> {
543 self.datasource = Some(CsvDataSource::load_from_json_file(path, table_name)?);
544 Ok(())
545 }
546
547 pub fn query_csv(&self, sql: &str) -> Result<QueryResponse> {
548 if let Some(ref ds) = self.datasource {
549 let data = ds.query_with_options(sql, self.case_insensitive)?;
550 let count = data.len();
551
552 Ok(QueryResponse {
553 data,
554 count,
555 query: QueryInfo {
556 select: vec!["*".to_string()],
557 where_clause: None,
558 order_by: None,
559 },
560 source: Some("file".to_string()),
561 table: Some(ds.table_name.clone()),
562 cached: Some(false),
563 })
564 } else {
565 Err(anyhow::anyhow!("No CSV file loaded"))
566 }
567 }
568
569 #[must_use]
570 pub fn get_schema(&self) -> Option<HashMap<String, Vec<String>>> {
571 self.datasource.as_ref().map(|ds| {
572 let mut schema = HashMap::new();
573 schema.insert(ds.get_table_name().to_string(), ds.get_headers().to_vec());
574 schema
575 })
576 }
577
578 pub fn load_from_json(&mut self, data: Vec<Value>, table_name: &str) -> Result<()> {
579 let headers: Vec<String> = if let Some(first_row) = data.first() {
581 if let Some(obj) = first_row.as_object() {
582 obj.keys().map(std::string::ToString::to_string).collect()
583 } else {
584 return Err(anyhow::anyhow!("Invalid JSON data format"));
585 }
586 } else {
587 return Err(anyhow::anyhow!("Empty data set"));
588 };
589
590 let column_lookup = build_column_lookup(&headers);
591
592 self.datasource = Some(CsvDataSource {
593 data: data.clone(),
594 headers,
595 table_name: table_name.to_string(),
596 column_lookup,
597 });
598
599 Ok(())
600 }
601
602 #[must_use]
605 pub fn get_datatable(&self) -> Option<DataTable> {
606 self.datasource.as_ref().map(|ds| {
607 debug!("V49: CsvApiClient returning DataTable directly");
608 ds.to_datatable()
609 })
610 }
611
612 #[must_use]
614 pub fn has_data(&self) -> bool {
615 self.datasource.is_some()
616 }
617}