1use crate::api_client::{QueryInfo, QueryResponse};
2use crate::csv_fixes::{build_column_lookup, find_column_case_insensitive, parse_column_name};
3use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
4use crate::recursive_parser::{OrderByItem, Parser, SortDirection};
5use crate::sql::parser::ast::SqlExpression;
6use crate::where_ast::evaluate_where_expr_with_options;
7use crate::where_parser::WhereParser;
8use anyhow::Result;
9use csv;
10use serde_json::{json, Value};
11use std::cmp::Ordering;
12use std::collections::HashMap;
13use std::fs::File;
14use std::path::Path;
15use tracing::debug;
16
17#[derive(Clone, Debug)]
18pub struct CsvDataSource {
19 data: Vec<Value>,
20 headers: Vec<String>,
21 table_name: String,
22 column_lookup: HashMap<String, String>,
23}
24
25impl CsvDataSource {
26 pub fn load_from_file<P: AsRef<Path>>(path: P, table_name: &str) -> Result<Self> {
27 use std::io::{BufRead, BufReader as IOBufReader};
28
29 let file = File::open(&path)?;
30 let mut reader = csv::Reader::from_reader(file);
31
32 let headers: Vec<String> = reader
34 .headers()?
35 .iter()
36 .map(std::string::ToString::to_string)
37 .collect();
38
39 let file2 = File::open(&path)?;
41 let mut line_reader = IOBufReader::new(file2);
42 let mut raw_line = String::new();
43 line_reader.read_line(&mut raw_line)?;
45
46 let mut data = Vec::new();
48 for result in reader.records() {
49 let record = result?;
50
51 raw_line.clear();
53 line_reader.read_line(&mut raw_line)?;
54
55 let mut row = serde_json::Map::new();
56
57 for (i, field) in record.iter().enumerate() {
58 if let Some(header) = headers.get(i) {
59 let value = if field.is_empty() {
62 if Self::is_null_field(&raw_line, i) {
66 Value::Null
67 } else {
68 Value::String(String::new())
70 }
71 } else if let Ok(n) = field.parse::<f64>() {
72 json!(n)
73 } else {
74 Value::String(field.to_string())
75 };
76 row.insert(header.clone(), value);
77 }
78 }
79
80 data.push(Value::Object(row));
81 }
82
83 let column_lookup = build_column_lookup(&headers);
84
85 Ok(CsvDataSource {
86 data,
87 headers,
88 table_name: table_name.to_string(),
89 column_lookup,
90 })
91 }
92
93 fn is_null_field(raw_line: &str, field_index: usize) -> bool {
96 let mut comma_count = 0;
98 let mut in_quotes = false;
99 let mut field_start = 0;
100 let mut prev_char = ' ';
101
102 for (i, ch) in raw_line.chars().enumerate() {
103 if ch == '"' && prev_char != '\\' {
104 in_quotes = !in_quotes;
105 }
106
107 if ch == ',' && !in_quotes {
108 if comma_count == field_index {
109 let field_end = i;
110 let field_content = &raw_line[field_start..field_end].trim();
111 if field_content.is_empty() {
113 return true; }
115 if field_content.starts_with('"')
117 && field_content.ends_with('"')
118 && field_content.len() == 2
119 {
120 return false; }
122 return false; }
124 comma_count += 1;
125 field_start = i + 1;
126 }
127 prev_char = ch;
128 }
129
130 if comma_count == field_index {
132 let field_content = raw_line[field_start..]
133 .trim()
134 .trim_end_matches('\n')
135 .trim_end_matches('\r');
136 if field_content.is_empty() {
138 return true; }
140 if field_content.starts_with('"')
142 && field_content.ends_with('"')
143 && field_content.len() == 2
144 {
145 return false; }
147 return false; }
149
150 false }
152
153 pub fn load_from_json_file<P: AsRef<Path>>(path: P, table_name: &str) -> Result<Self> {
154 use std::io::Read;
155 let mut file = File::open(&path)?;
156 let mut json_str = String::new();
157 file.read_to_string(&mut json_str)?;
158
159 let json_data: Vec<Value> = crate::data::stream_loader::parse_json_records(&json_str)?;
162
163 if json_data.is_empty() {
164 return Err(anyhow::anyhow!("JSON file contains no data"));
165 }
166
167 let headers = crate::data::stream_loader::collect_column_names(&json_data, 100);
170 if headers.is_empty() {
171 return Err(anyhow::anyhow!("JSON records must be objects"));
172 }
173
174 for (i, record) in json_data.iter().enumerate() {
176 if !record.is_object() {
177 return Err(anyhow::anyhow!("Record {} is not an object", i));
178 }
179 }
180
181 let column_lookup = build_column_lookup(&headers);
182
183 Ok(CsvDataSource {
184 data: json_data,
185 headers,
186 table_name: table_name.to_string(),
187 column_lookup,
188 })
189 }
190
191 pub fn query(&self, sql: &str) -> Result<Vec<Value>> {
192 self.query_with_options(sql, false)
193 }
194
195 pub fn query_with_options(&self, sql: &str, case_insensitive: bool) -> Result<Vec<Value>> {
196 let mut parser = Parser::new(sql);
198 let stmt = parser
199 .parse()
200 .map_err(|e| anyhow::anyhow!("Recursive parser error: {}", e))?;
201
202 let mut results = self.data.clone();
203
204 let sql_lower = sql.to_lowercase();
206 if let Some(where_pos) = sql_lower.find(" where ") {
207 let where_start = where_pos + 7;
209 let mut where_end = sql.len();
210
211 if let Some(order_pos) = sql_lower.find(" order by ") {
213 where_end = where_end.min(order_pos);
214 }
215 if let Some(limit_pos) = sql_lower.find(" limit ") {
216 where_end = where_end.min(limit_pos);
217 }
218 if let Some(offset_pos) = sql_lower.find(" offset ") {
219 where_end = where_end.min(offset_pos);
220 }
221
222 let where_clause = sql[where_start..where_end].trim();
223 results = self.filter_results_with_options(results, where_clause, case_insensitive)?;
224 }
225
226 if !stmt.columns.contains(&"*".to_string()) {
228 let columns: Vec<&str> = stmt
229 .columns
230 .iter()
231 .map(std::string::String::as_str)
232 .collect();
233 results = self.select_columns(results, &columns)?;
234 }
235
236 if let Some(order_by_columns) = &stmt.order_by {
238 results = self.sort_results(results, order_by_columns)?;
239 }
240
241 if let Some(offset) = stmt.offset {
243 results = results.into_iter().skip(offset).collect();
244 }
245
246 if let Some(limit) = stmt.limit {
247 results = results.into_iter().take(limit).collect();
248 }
249
250 Ok(results)
251 }
252
253 fn filter_results(&self, data: Vec<Value>, where_clause: &str) -> Result<Vec<Value>> {
254 self.filter_results_with_options(data, where_clause, false)
255 }
256
257 fn filter_results_with_options(
258 &self,
259 data: Vec<Value>,
260 where_clause: &str,
261 case_insensitive: bool,
262 ) -> Result<Vec<Value>> {
263 let columns = self.headers.clone();
265 let expr = WhereParser::parse_with_options(where_clause, columns, case_insensitive)?;
266
267 let mut filtered = Vec::new();
268 for row in data {
269 if evaluate_where_expr_with_options(&expr, &row, case_insensitive)? {
270 filtered.push(row);
271 }
272 }
273
274 Ok(filtered)
275 }
276
277 fn select_columns(&self, data: Vec<Value>, columns: &[&str]) -> Result<Vec<Value>> {
278 let mut results = Vec::new();
279
280 for row in data {
281 if let Some(obj) = row.as_object() {
282 let mut new_row = serde_json::Map::new();
283
284 for &col in columns {
285 let col_parsed = parse_column_name(col);
286
287 if let Some((actual_key, value)) =
288 find_column_case_insensitive(obj, col_parsed, &self.column_lookup)
289 {
290 new_row.insert(actual_key.clone(), value.clone());
291 }
292 }
293
294 results.push(Value::Object(new_row));
295 }
296 }
297
298 Ok(results)
299 }
300
301 fn sort_results(
302 &self,
303 mut data: Vec<Value>,
304 order_by_columns: &[OrderByItem],
305 ) -> Result<Vec<Value>> {
306 if order_by_columns.is_empty() {
307 return Ok(data);
308 }
309
310 data.sort_by(|a, b| {
312 for order_col in order_by_columns {
313 let column_name = match &order_col.expr {
315 SqlExpression::Column(col_ref) => col_ref.name.clone(),
316 _ => continue, };
318
319 let col_parsed = parse_column_name(&column_name);
320
321 let val_a = if let Some(obj_a) = a.as_object() {
322 find_column_case_insensitive(obj_a, col_parsed, &self.column_lookup)
323 .map(|(_, v)| v)
324 } else {
325 None
326 };
327
328 let val_b = if let Some(obj_b) = b.as_object() {
329 find_column_case_insensitive(obj_b, col_parsed, &self.column_lookup)
330 .map(|(_, v)| v)
331 } else {
332 None
333 };
334
335 let cmp = match (val_a, val_b) {
336 (Some(Value::Number(a)), Some(Value::Number(b))) => {
337 let a_f64 = a.as_f64().unwrap_or(0.0);
339 let b_f64 = b.as_f64().unwrap_or(0.0);
340 a_f64.partial_cmp(&b_f64).unwrap_or(Ordering::Equal)
341 }
342 (Some(Value::String(a)), Some(Value::String(b))) => {
343 a.cmp(b)
345 }
346 (Some(Value::Bool(a)), Some(Value::Bool(b))) => {
347 a.cmp(b)
349 }
350 (Some(Value::Null), Some(Value::Null)) => Ordering::Equal,
351 (Some(Value::Null), Some(_)) => {
352 Ordering::Less
354 }
355 (Some(_), Some(Value::Null)) => {
356 Ordering::Greater
358 }
359 (None, None) => Ordering::Equal,
360 (None, Some(_)) => {
361 Ordering::Less
363 }
364 (Some(_), None) => {
365 Ordering::Greater
367 }
368 (Some(a), Some(b)) => {
370 let a_str = match a {
371 Value::String(s) => s.clone(),
372 Value::Number(n) => n.to_string(),
373 Value::Bool(b) => b.to_string(),
374 Value::Null => String::new(),
375 _ => a.to_string(),
376 };
377 let b_str = match b {
378 Value::String(s) => s.clone(),
379 Value::Number(n) => n.to_string(),
380 Value::Bool(b) => b.to_string(),
381 Value::Null => String::new(),
382 _ => b.to_string(),
383 };
384 a_str.cmp(&b_str)
385 }
386 };
387
388 let final_cmp = match order_col.direction {
390 SortDirection::Asc => cmp,
391 SortDirection::Desc => cmp.reverse(),
392 };
393
394 if final_cmp != Ordering::Equal {
396 return final_cmp;
397 }
398
399 }
401
402 Ordering::Equal
404 });
405
406 Ok(data)
407 }
408
409 #[must_use]
410 pub fn get_headers(&self) -> &[String] {
411 &self.headers
412 }
413
414 #[must_use]
415 pub fn get_table_name(&self) -> &str {
416 &self.table_name
417 }
418
419 #[must_use]
420 pub fn get_row_count(&self) -> usize {
421 self.data.len()
422 }
423
424 pub fn to_datatable(&self) -> DataTable {
427 debug!(
428 "V49: Converting CsvDataSource to DataTable for table '{}'",
429 self.table_name
430 );
431
432 let mut table = DataTable::new(&self.table_name);
433
434 for header in &self.headers {
436 table.add_column(DataColumn::new(header.clone()));
437 }
438
439 for (row_idx, json_row) in self.data.iter().enumerate() {
441 if let Some(obj) = json_row.as_object() {
442 let mut values = Vec::new();
443
444 for header in &self.headers {
446 let value = obj
447 .get(header)
448 .map_or(DataValue::Null, json_value_to_data_value);
449 values.push(value);
450 }
451
452 if let Err(e) = table.add_row(DataRow::new(values)) {
453 debug!("V49: Failed to add row {}: {}", row_idx, e);
454 }
455 }
456 }
457
458 table.infer_column_types();
460
461 table
463 .metadata
464 .insert("source".to_string(), "csv".to_string());
465 table
466 .metadata
467 .insert("original_count".to_string(), self.data.len().to_string());
468
469 debug!(
470 "V49: Created DataTable with {} rows and {} columns directly from CSV",
471 table.row_count(),
472 table.column_count()
473 );
474
475 table
476 }
477}
478
479fn json_value_to_data_value(json: &Value) -> DataValue {
481 match json {
482 Value::Null => DataValue::Null,
483 Value::Bool(b) => DataValue::Boolean(*b),
484 Value::Number(n) => {
485 if let Some(i) = n.as_i64() {
486 DataValue::Integer(i)
487 } else if let Some(f) = n.as_f64() {
488 DataValue::Float(f)
489 } else {
490 DataValue::String(n.to_string())
491 }
492 }
493 Value::String(s) => {
494 if s.contains('-') && s.len() >= 8 && s.len() <= 30 {
496 DataValue::DateTime(s.clone())
498 } else {
499 DataValue::String(s.clone())
500 }
501 }
502 Value::Array(_) | Value::Object(_) => {
503 DataValue::String(json.to_string())
505 }
506 }
507}
508
509#[derive(Clone)]
511pub struct CsvApiClient {
512 datasource: Option<CsvDataSource>,
513 case_insensitive: bool,
514}
515
516impl Default for CsvApiClient {
517 fn default() -> Self {
518 Self::new()
519 }
520}
521
522impl CsvApiClient {
523 #[must_use]
524 pub fn new() -> Self {
525 Self {
526 datasource: None,
527 case_insensitive: false,
528 }
529 }
530
531 pub fn set_case_insensitive(&mut self, case_insensitive: bool) {
532 self.case_insensitive = case_insensitive;
533 }
534
535 pub fn load_csv<P: AsRef<Path>>(&mut self, path: P, table_name: &str) -> Result<()> {
536 self.datasource = Some(CsvDataSource::load_from_file(path, table_name)?);
537 Ok(())
538 }
539
540 pub fn load_json<P: AsRef<Path>>(&mut self, path: P, table_name: &str) -> Result<()> {
541 self.datasource = Some(CsvDataSource::load_from_json_file(path, table_name)?);
542 Ok(())
543 }
544
545 pub fn query_csv(&self, sql: &str) -> Result<QueryResponse> {
546 if let Some(ref ds) = self.datasource {
547 let data = ds.query_with_options(sql, self.case_insensitive)?;
548 let count = data.len();
549
550 Ok(QueryResponse {
551 data,
552 count,
553 query: QueryInfo {
554 select: vec!["*".to_string()],
555 where_clause: None,
556 order_by: None,
557 },
558 source: Some("file".to_string()),
559 table: Some(ds.table_name.clone()),
560 cached: Some(false),
561 })
562 } else {
563 Err(anyhow::anyhow!("No CSV file loaded"))
564 }
565 }
566
567 #[must_use]
568 pub fn get_schema(&self) -> Option<HashMap<String, Vec<String>>> {
569 self.datasource.as_ref().map(|ds| {
570 let mut schema = HashMap::new();
571 schema.insert(ds.get_table_name().to_string(), ds.get_headers().to_vec());
572 schema
573 })
574 }
575
576 pub fn load_from_json(&mut self, data: Vec<Value>, table_name: &str) -> Result<()> {
577 let headers: Vec<String> = if let Some(first_row) = data.first() {
579 if let Some(obj) = first_row.as_object() {
580 obj.keys().map(std::string::ToString::to_string).collect()
581 } else {
582 return Err(anyhow::anyhow!("Invalid JSON data format"));
583 }
584 } else {
585 return Err(anyhow::anyhow!("Empty data set"));
586 };
587
588 let column_lookup = build_column_lookup(&headers);
589
590 self.datasource = Some(CsvDataSource {
591 data: data.clone(),
592 headers,
593 table_name: table_name.to_string(),
594 column_lookup,
595 });
596
597 Ok(())
598 }
599
600 #[must_use]
603 pub fn get_datatable(&self) -> Option<DataTable> {
604 self.datasource.as_ref().map(|ds| {
605 debug!("V49: CsvApiClient returning DataTable directly");
606 ds.to_datatable()
607 })
608 }
609
610 #[must_use]
612 pub fn has_data(&self) -> bool {
613 self.datasource.is_some()
614 }
615}