1use crate::api_client::{QueryInfo, QueryResponse};
2use crate::csv_fixes::{build_column_lookup, find_column_case_insensitive, parse_column_name};
3use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
4use crate::recursive_parser::{OrderByColumn, Parser, SortDirection};
5use crate::where_ast::evaluate_where_expr_with_options;
6use crate::where_parser::WhereParser;
7use anyhow::Result;
8use csv;
9use serde_json::{json, Value};
10use std::cmp::Ordering;
11use std::collections::HashMap;
12use std::fs::File;
13use std::io::BufReader;
14use std::path::Path;
15use tracing::debug;
16
17#[derive(Clone, Debug)]
18pub struct CsvDataSource {
19 data: Vec<Value>,
20 headers: Vec<String>,
21 table_name: String,
22 column_lookup: HashMap<String, String>,
23}
24
25impl CsvDataSource {
26 pub fn load_from_file<P: AsRef<Path>>(path: P, table_name: &str) -> Result<Self> {
27 let file = File::open(&path)?;
28 let mut reader = csv::Reader::from_reader(file);
29
30 let headers: Vec<String> = reader
32 .headers()?
33 .iter()
34 .map(std::string::ToString::to_string)
35 .collect();
36
37 let mut data = Vec::new();
39 for result in reader.records() {
40 let record = result?;
41 let mut row = serde_json::Map::new();
42
43 for (i, field) in record.iter().enumerate() {
44 if let Some(header) = headers.get(i) {
45 let value = if field.is_empty() {
47 Value::Null
48 } else if let Ok(n) = field.parse::<f64>() {
49 json!(n)
50 } else {
51 Value::String(field.to_string())
52 };
53 row.insert(header.clone(), value);
54 }
55 }
56
57 data.push(Value::Object(row));
58 }
59
60 let column_lookup = build_column_lookup(&headers);
61
62 Ok(CsvDataSource {
63 data,
64 headers,
65 table_name: table_name.to_string(),
66 column_lookup,
67 })
68 }
69
70 pub fn load_from_json_file<P: AsRef<Path>>(path: P, table_name: &str) -> Result<Self> {
71 let file = File::open(&path)?;
72 let reader = BufReader::new(file);
73
74 let json_data: Vec<Value> = serde_json::from_reader(reader)?;
76
77 if json_data.is_empty() {
78 return Err(anyhow::anyhow!("JSON file contains no data"));
79 }
80
81 let headers = if let Some(first_record) = json_data.first() {
83 if let Some(obj) = first_record.as_object() {
84 obj.keys().cloned().collect()
85 } else {
86 return Err(anyhow::anyhow!("JSON records must be objects"));
87 }
88 } else {
89 Vec::new()
90 };
91
92 for (i, record) in json_data.iter().enumerate() {
94 if !record.is_object() {
95 return Err(anyhow::anyhow!("Record {} is not an object", i));
96 }
97 }
98
99 let column_lookup = build_column_lookup(&headers);
100
101 Ok(CsvDataSource {
102 data: json_data,
103 headers,
104 table_name: table_name.to_string(),
105 column_lookup,
106 })
107 }
108
109 pub fn query(&self, sql: &str) -> Result<Vec<Value>> {
110 self.query_with_options(sql, false)
111 }
112
113 pub fn query_with_options(&self, sql: &str, case_insensitive: bool) -> Result<Vec<Value>> {
114 let mut parser = Parser::new(sql);
116 let stmt = parser
117 .parse()
118 .map_err(|e| anyhow::anyhow!("Recursive parser error: {}", e))?;
119
120 let mut results = self.data.clone();
121
122 let sql_lower = sql.to_lowercase();
124 if let Some(where_pos) = sql_lower.find(" where ") {
125 let where_start = where_pos + 7;
127 let mut where_end = sql.len();
128
129 if let Some(order_pos) = sql_lower.find(" order by ") {
131 where_end = where_end.min(order_pos);
132 }
133 if let Some(limit_pos) = sql_lower.find(" limit ") {
134 where_end = where_end.min(limit_pos);
135 }
136 if let Some(offset_pos) = sql_lower.find(" offset ") {
137 where_end = where_end.min(offset_pos);
138 }
139
140 let where_clause = sql[where_start..where_end].trim();
141 results = self.filter_results_with_options(results, where_clause, case_insensitive)?;
142 }
143
144 if !stmt.columns.contains(&"*".to_string()) {
146 let columns: Vec<&str> = stmt
147 .columns
148 .iter()
149 .map(std::string::String::as_str)
150 .collect();
151 results = self.select_columns(results, &columns)?;
152 }
153
154 if let Some(order_by_columns) = &stmt.order_by {
156 results = self.sort_results(results, order_by_columns)?;
157 }
158
159 if let Some(offset) = stmt.offset {
161 results = results.into_iter().skip(offset).collect();
162 }
163
164 if let Some(limit) = stmt.limit {
165 results = results.into_iter().take(limit).collect();
166 }
167
168 Ok(results)
169 }
170
171 fn filter_results(&self, data: Vec<Value>, where_clause: &str) -> Result<Vec<Value>> {
172 self.filter_results_with_options(data, where_clause, false)
173 }
174
175 fn filter_results_with_options(
176 &self,
177 data: Vec<Value>,
178 where_clause: &str,
179 case_insensitive: bool,
180 ) -> Result<Vec<Value>> {
181 let columns = self.headers.clone();
183 let expr = WhereParser::parse_with_options(where_clause, columns, case_insensitive)?;
184
185 let mut filtered = Vec::new();
186 for row in data {
187 if evaluate_where_expr_with_options(&expr, &row, case_insensitive)? {
188 filtered.push(row);
189 }
190 }
191
192 Ok(filtered)
193 }
194
195 fn select_columns(&self, data: Vec<Value>, columns: &[&str]) -> Result<Vec<Value>> {
196 let mut results = Vec::new();
197
198 for row in data {
199 if let Some(obj) = row.as_object() {
200 let mut new_row = serde_json::Map::new();
201
202 for &col in columns {
203 let col_parsed = parse_column_name(col);
204
205 if let Some((actual_key, value)) =
206 find_column_case_insensitive(obj, col_parsed, &self.column_lookup)
207 {
208 new_row.insert(actual_key.clone(), value.clone());
209 }
210 }
211
212 results.push(Value::Object(new_row));
213 }
214 }
215
216 Ok(results)
217 }
218
219 fn sort_results(
220 &self,
221 mut data: Vec<Value>,
222 order_by_columns: &[OrderByColumn],
223 ) -> Result<Vec<Value>> {
224 if order_by_columns.is_empty() {
225 return Ok(data);
226 }
227
228 data.sort_by(|a, b| {
230 for order_col in order_by_columns {
231 let col_parsed = parse_column_name(&order_col.column);
232
233 let val_a = if let Some(obj_a) = a.as_object() {
234 find_column_case_insensitive(obj_a, col_parsed, &self.column_lookup)
235 .map(|(_, v)| v)
236 } else {
237 None
238 };
239
240 let val_b = if let Some(obj_b) = b.as_object() {
241 find_column_case_insensitive(obj_b, col_parsed, &self.column_lookup)
242 .map(|(_, v)| v)
243 } else {
244 None
245 };
246
247 let cmp = match (val_a, val_b) {
248 (Some(Value::Number(a)), Some(Value::Number(b))) => {
249 let a_f64 = a.as_f64().unwrap_or(0.0);
251 let b_f64 = b.as_f64().unwrap_or(0.0);
252 a_f64.partial_cmp(&b_f64).unwrap_or(Ordering::Equal)
253 }
254 (Some(Value::String(a)), Some(Value::String(b))) => {
255 a.cmp(b)
257 }
258 (Some(Value::Bool(a)), Some(Value::Bool(b))) => {
259 a.cmp(b)
261 }
262 (Some(Value::Null), Some(Value::Null)) => Ordering::Equal,
263 (Some(Value::Null), Some(_)) => {
264 Ordering::Less
266 }
267 (Some(_), Some(Value::Null)) => {
268 Ordering::Greater
270 }
271 (None, None) => Ordering::Equal,
272 (None, Some(_)) => {
273 Ordering::Less
275 }
276 (Some(_), None) => {
277 Ordering::Greater
279 }
280 (Some(a), Some(b)) => {
282 let a_str = match a {
283 Value::String(s) => s.clone(),
284 Value::Number(n) => n.to_string(),
285 Value::Bool(b) => b.to_string(),
286 Value::Null => String::new(),
287 _ => a.to_string(),
288 };
289 let b_str = match b {
290 Value::String(s) => s.clone(),
291 Value::Number(n) => n.to_string(),
292 Value::Bool(b) => b.to_string(),
293 Value::Null => String::new(),
294 _ => b.to_string(),
295 };
296 a_str.cmp(&b_str)
297 }
298 };
299
300 let final_cmp = match order_col.direction {
302 SortDirection::Asc => cmp,
303 SortDirection::Desc => cmp.reverse(),
304 };
305
306 if final_cmp != Ordering::Equal {
308 return final_cmp;
309 }
310
311 }
313
314 Ordering::Equal
316 });
317
318 Ok(data)
319 }
320
321 #[must_use]
322 pub fn get_headers(&self) -> &[String] {
323 &self.headers
324 }
325
326 #[must_use]
327 pub fn get_table_name(&self) -> &str {
328 &self.table_name
329 }
330
331 #[must_use]
332 pub fn get_row_count(&self) -> usize {
333 self.data.len()
334 }
335
336 pub fn to_datatable(&self) -> DataTable {
339 debug!(
340 "V49: Converting CsvDataSource to DataTable for table '{}'",
341 self.table_name
342 );
343
344 let mut table = DataTable::new(&self.table_name);
345
346 for header in &self.headers {
348 table.add_column(DataColumn::new(header.clone()));
349 }
350
351 for (row_idx, json_row) in self.data.iter().enumerate() {
353 if let Some(obj) = json_row.as_object() {
354 let mut values = Vec::new();
355
356 for header in &self.headers {
358 let value = obj
359 .get(header)
360 .map_or(DataValue::Null, json_value_to_data_value);
361 values.push(value);
362 }
363
364 if let Err(e) = table.add_row(DataRow::new(values)) {
365 debug!("V49: Failed to add row {}: {}", row_idx, e);
366 }
367 }
368 }
369
370 table.infer_column_types();
372
373 table
375 .metadata
376 .insert("source".to_string(), "csv".to_string());
377 table
378 .metadata
379 .insert("original_count".to_string(), self.data.len().to_string());
380
381 debug!(
382 "V49: Created DataTable with {} rows and {} columns directly from CSV",
383 table.row_count(),
384 table.column_count()
385 );
386
387 table
388 }
389}
390
391fn json_value_to_data_value(json: &Value) -> DataValue {
393 match json {
394 Value::Null => DataValue::Null,
395 Value::Bool(b) => DataValue::Boolean(*b),
396 Value::Number(n) => {
397 if let Some(i) = n.as_i64() {
398 DataValue::Integer(i)
399 } else if let Some(f) = n.as_f64() {
400 DataValue::Float(f)
401 } else {
402 DataValue::String(n.to_string())
403 }
404 }
405 Value::String(s) => {
406 if s.contains('-') && s.len() >= 8 && s.len() <= 30 {
408 DataValue::DateTime(s.clone())
410 } else {
411 DataValue::String(s.clone())
412 }
413 }
414 Value::Array(_) | Value::Object(_) => {
415 DataValue::String(json.to_string())
417 }
418 }
419}
420
421#[derive(Clone)]
423pub struct CsvApiClient {
424 datasource: Option<CsvDataSource>,
425 case_insensitive: bool,
426}
427
428impl Default for CsvApiClient {
429 fn default() -> Self {
430 Self::new()
431 }
432}
433
434impl CsvApiClient {
435 #[must_use]
436 pub fn new() -> Self {
437 Self {
438 datasource: None,
439 case_insensitive: false,
440 }
441 }
442
443 pub fn set_case_insensitive(&mut self, case_insensitive: bool) {
444 self.case_insensitive = case_insensitive;
445 }
446
447 pub fn load_csv<P: AsRef<Path>>(&mut self, path: P, table_name: &str) -> Result<()> {
448 self.datasource = Some(CsvDataSource::load_from_file(path, table_name)?);
449 Ok(())
450 }
451
452 pub fn load_json<P: AsRef<Path>>(&mut self, path: P, table_name: &str) -> Result<()> {
453 self.datasource = Some(CsvDataSource::load_from_json_file(path, table_name)?);
454 Ok(())
455 }
456
457 pub fn query_csv(&self, sql: &str) -> Result<QueryResponse> {
458 if let Some(ref ds) = self.datasource {
459 let data = ds.query_with_options(sql, self.case_insensitive)?;
460 let count = data.len();
461
462 Ok(QueryResponse {
463 data,
464 count,
465 query: QueryInfo {
466 select: vec!["*".to_string()],
467 where_clause: None,
468 order_by: None,
469 },
470 source: Some("file".to_string()),
471 table: Some(ds.table_name.clone()),
472 cached: Some(false),
473 })
474 } else {
475 Err(anyhow::anyhow!("No CSV file loaded"))
476 }
477 }
478
479 #[must_use]
480 pub fn get_schema(&self) -> Option<HashMap<String, Vec<String>>> {
481 self.datasource.as_ref().map(|ds| {
482 let mut schema = HashMap::new();
483 schema.insert(ds.get_table_name().to_string(), ds.get_headers().to_vec());
484 schema
485 })
486 }
487
488 pub fn load_from_json(&mut self, data: Vec<Value>, table_name: &str) -> Result<()> {
489 let headers: Vec<String> = if let Some(first_row) = data.first() {
491 if let Some(obj) = first_row.as_object() {
492 obj.keys().map(std::string::ToString::to_string).collect()
493 } else {
494 return Err(anyhow::anyhow!("Invalid JSON data format"));
495 }
496 } else {
497 return Err(anyhow::anyhow!("Empty data set"));
498 };
499
500 let column_lookup = build_column_lookup(&headers);
501
502 self.datasource = Some(CsvDataSource {
503 data: data.clone(),
504 headers,
505 table_name: table_name.to_string(),
506 column_lookup,
507 });
508
509 Ok(())
510 }
511
512 #[must_use]
515 pub fn get_datatable(&self) -> Option<DataTable> {
516 self.datasource.as_ref().map(|ds| {
517 debug!("V49: CsvApiClient returning DataTable directly");
518 ds.to_datatable()
519 })
520 }
521
522 #[must_use]
524 pub fn has_data(&self) -> bool {
525 self.datasource.is_some()
526 }
527}