1use crate::api_client::{QueryInfo, QueryResponse};
2use crate::csv_fixes::{build_column_lookup, find_column_case_insensitive, parse_column_name};
3use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
4use crate::recursive_parser::{OrderByColumn, Parser, SortDirection};
5use crate::where_ast::evaluate_where_expr_with_options;
6use crate::where_parser::WhereParser;
7use anyhow::Result;
8use csv;
9use serde_json::{json, Value};
10use std::cmp::Ordering;
11use std::collections::HashMap;
12use std::fs::File;
13use std::io::BufReader;
14use std::path::Path;
15use tracing::debug;
16
17#[derive(Clone, Debug)]
18pub struct CsvDataSource {
19 data: Vec<Value>,
20 headers: Vec<String>,
21 table_name: String,
22 column_lookup: HashMap<String, String>,
23}
24
25impl CsvDataSource {
26 pub fn load_from_file<P: AsRef<Path>>(path: P, table_name: &str) -> Result<Self> {
27 let file = File::open(&path)?;
28 let mut reader = csv::Reader::from_reader(file);
29
30 let headers: Vec<String> = reader.headers()?.iter().map(|h| h.to_string()).collect();
32
33 let mut data = Vec::new();
35 for result in reader.records() {
36 let record = result?;
37 let mut row = serde_json::Map::new();
38
39 for (i, field) in record.iter().enumerate() {
40 if let Some(header) = headers.get(i) {
41 let value = if field.is_empty() {
43 Value::Null
44 } else if let Ok(n) = field.parse::<f64>() {
45 json!(n)
46 } else {
47 Value::String(field.to_string())
48 };
49 row.insert(header.clone(), value);
50 }
51 }
52
53 data.push(Value::Object(row));
54 }
55
56 let column_lookup = build_column_lookup(&headers);
57
58 Ok(CsvDataSource {
59 data,
60 headers,
61 table_name: table_name.to_string(),
62 column_lookup,
63 })
64 }
65
66 pub fn load_from_json_file<P: AsRef<Path>>(path: P, table_name: &str) -> Result<Self> {
67 let file = File::open(&path)?;
68 let reader = BufReader::new(file);
69
70 let json_data: Vec<Value> = serde_json::from_reader(reader)?;
72
73 if json_data.is_empty() {
74 return Err(anyhow::anyhow!("JSON file contains no data"));
75 }
76
77 let headers = if let Some(first_record) = json_data.first() {
79 if let Some(obj) = first_record.as_object() {
80 obj.keys().cloned().collect()
81 } else {
82 return Err(anyhow::anyhow!("JSON records must be objects"));
83 }
84 } else {
85 Vec::new()
86 };
87
88 for (i, record) in json_data.iter().enumerate() {
90 if !record.is_object() {
91 return Err(anyhow::anyhow!("Record {} is not an object", i));
92 }
93 }
94
95 let column_lookup = build_column_lookup(&headers);
96
97 Ok(CsvDataSource {
98 data: json_data,
99 headers,
100 table_name: table_name.to_string(),
101 column_lookup,
102 })
103 }
104
105 pub fn query(&self, sql: &str) -> Result<Vec<Value>> {
106 self.query_with_options(sql, false)
107 }
108
109 pub fn query_with_options(&self, sql: &str, case_insensitive: bool) -> Result<Vec<Value>> {
110 let mut parser = Parser::new(sql);
112 let stmt = parser
113 .parse()
114 .map_err(|e| anyhow::anyhow!("Recursive parser error: {}", e))?;
115
116 let mut results = self.data.clone();
117
118 let sql_lower = sql.to_lowercase();
120 if let Some(where_pos) = sql_lower.find(" where ") {
121 let where_start = where_pos + 7;
123 let mut where_end = sql.len();
124
125 if let Some(order_pos) = sql_lower.find(" order by ") {
127 where_end = where_end.min(order_pos);
128 }
129 if let Some(limit_pos) = sql_lower.find(" limit ") {
130 where_end = where_end.min(limit_pos);
131 }
132 if let Some(offset_pos) = sql_lower.find(" offset ") {
133 where_end = where_end.min(offset_pos);
134 }
135
136 let where_clause = sql[where_start..where_end].trim();
137 results = self.filter_results_with_options(results, where_clause, case_insensitive)?;
138 }
139
140 if !stmt.columns.contains(&"*".to_string()) {
142 let columns: Vec<&str> = stmt.columns.iter().map(|s| s.as_str()).collect();
143 results = self.select_columns(results, &columns)?;
144 }
145
146 if let Some(order_by_columns) = &stmt.order_by {
148 results = self.sort_results(results, order_by_columns)?;
149 }
150
151 if let Some(offset) = stmt.offset {
153 results = results.into_iter().skip(offset).collect();
154 }
155
156 if let Some(limit) = stmt.limit {
157 results = results.into_iter().take(limit).collect();
158 }
159
160 Ok(results)
161 }
162
163 fn filter_results(&self, data: Vec<Value>, where_clause: &str) -> Result<Vec<Value>> {
164 self.filter_results_with_options(data, where_clause, false)
165 }
166
167 fn filter_results_with_options(
168 &self,
169 data: Vec<Value>,
170 where_clause: &str,
171 case_insensitive: bool,
172 ) -> Result<Vec<Value>> {
173 let columns = self.headers.clone();
175 let expr = WhereParser::parse_with_options(where_clause, columns, case_insensitive)?;
176
177 let mut filtered = Vec::new();
178 for row in data {
179 if evaluate_where_expr_with_options(&expr, &row, case_insensitive)? {
180 filtered.push(row);
181 }
182 }
183
184 Ok(filtered)
185 }
186
187 fn select_columns(&self, data: Vec<Value>, columns: &[&str]) -> Result<Vec<Value>> {
188 let mut results = Vec::new();
189
190 for row in data {
191 if let Some(obj) = row.as_object() {
192 let mut new_row = serde_json::Map::new();
193
194 for &col in columns {
195 let col_parsed = parse_column_name(col);
196
197 if let Some((actual_key, value)) =
198 find_column_case_insensitive(obj, col_parsed, &self.column_lookup)
199 {
200 new_row.insert(actual_key.clone(), value.clone());
201 }
202 }
203
204 results.push(Value::Object(new_row));
205 }
206 }
207
208 Ok(results)
209 }
210
211 fn sort_results(
212 &self,
213 mut data: Vec<Value>,
214 order_by_columns: &[OrderByColumn],
215 ) -> Result<Vec<Value>> {
216 if order_by_columns.is_empty() {
217 return Ok(data);
218 }
219
220 data.sort_by(|a, b| {
222 for order_col in order_by_columns {
223 let col_parsed = parse_column_name(&order_col.column);
224
225 let val_a = if let Some(obj_a) = a.as_object() {
226 find_column_case_insensitive(obj_a, col_parsed, &self.column_lookup)
227 .map(|(_, v)| v)
228 } else {
229 None
230 };
231
232 let val_b = if let Some(obj_b) = b.as_object() {
233 find_column_case_insensitive(obj_b, col_parsed, &self.column_lookup)
234 .map(|(_, v)| v)
235 } else {
236 None
237 };
238
239 let cmp = match (val_a, val_b) {
240 (Some(Value::Number(a)), Some(Value::Number(b))) => {
241 let a_f64 = a.as_f64().unwrap_or(0.0);
243 let b_f64 = b.as_f64().unwrap_or(0.0);
244 a_f64.partial_cmp(&b_f64).unwrap_or(Ordering::Equal)
245 }
246 (Some(Value::String(a)), Some(Value::String(b))) => {
247 a.cmp(b)
249 }
250 (Some(Value::Bool(a)), Some(Value::Bool(b))) => {
251 a.cmp(b)
253 }
254 (Some(Value::Null), Some(Value::Null)) => Ordering::Equal,
255 (Some(Value::Null), Some(_)) => {
256 Ordering::Less
258 }
259 (Some(_), Some(Value::Null)) => {
260 Ordering::Greater
262 }
263 (None, None) => Ordering::Equal,
264 (None, Some(_)) => {
265 Ordering::Less
267 }
268 (Some(_), None) => {
269 Ordering::Greater
271 }
272 (Some(a), Some(b)) => {
274 let a_str = match a {
275 Value::String(s) => s.clone(),
276 Value::Number(n) => n.to_string(),
277 Value::Bool(b) => b.to_string(),
278 Value::Null => "".to_string(),
279 _ => a.to_string(),
280 };
281 let b_str = match b {
282 Value::String(s) => s.clone(),
283 Value::Number(n) => n.to_string(),
284 Value::Bool(b) => b.to_string(),
285 Value::Null => "".to_string(),
286 _ => b.to_string(),
287 };
288 a_str.cmp(&b_str)
289 }
290 };
291
292 let final_cmp = match order_col.direction {
294 SortDirection::Asc => cmp,
295 SortDirection::Desc => cmp.reverse(),
296 };
297
298 if final_cmp != Ordering::Equal {
300 return final_cmp;
301 }
302
303 }
305
306 Ordering::Equal
308 });
309
310 Ok(data)
311 }
312
313 pub fn get_headers(&self) -> &[String] {
314 &self.headers
315 }
316
317 pub fn get_table_name(&self) -> &str {
318 &self.table_name
319 }
320
321 pub fn get_row_count(&self) -> usize {
322 self.data.len()
323 }
324
325 pub fn to_datatable(&self) -> DataTable {
328 debug!(
329 "V49: Converting CsvDataSource to DataTable for table '{}'",
330 self.table_name
331 );
332
333 let mut table = DataTable::new(&self.table_name);
334
335 for header in &self.headers {
337 table.add_column(DataColumn::new(header.clone()));
338 }
339
340 for (row_idx, json_row) in self.data.iter().enumerate() {
342 if let Some(obj) = json_row.as_object() {
343 let mut values = Vec::new();
344
345 for header in &self.headers {
347 let value = obj
348 .get(header)
349 .map(|v| json_value_to_data_value(v))
350 .unwrap_or(DataValue::Null);
351 values.push(value);
352 }
353
354 if let Err(e) = table.add_row(DataRow::new(values)) {
355 debug!("V49: Failed to add row {}: {}", row_idx, e);
356 }
357 }
358 }
359
360 table.infer_column_types();
362
363 table
365 .metadata
366 .insert("source".to_string(), "csv".to_string());
367 table
368 .metadata
369 .insert("original_count".to_string(), self.data.len().to_string());
370
371 debug!(
372 "V49: Created DataTable with {} rows and {} columns directly from CSV",
373 table.row_count(),
374 table.column_count()
375 );
376
377 table
378 }
379}
380
381fn json_value_to_data_value(json: &Value) -> DataValue {
383 match json {
384 Value::Null => DataValue::Null,
385 Value::Bool(b) => DataValue::Boolean(*b),
386 Value::Number(n) => {
387 if let Some(i) = n.as_i64() {
388 DataValue::Integer(i)
389 } else if let Some(f) = n.as_f64() {
390 DataValue::Float(f)
391 } else {
392 DataValue::String(n.to_string())
393 }
394 }
395 Value::String(s) => {
396 if s.contains('-') && s.len() >= 8 && s.len() <= 30 {
398 DataValue::DateTime(s.clone())
400 } else {
401 DataValue::String(s.clone())
402 }
403 }
404 Value::Array(_) | Value::Object(_) => {
405 DataValue::String(json.to_string())
407 }
408 }
409}
410
411#[derive(Clone)]
413pub struct CsvApiClient {
414 datasource: Option<CsvDataSource>,
415 case_insensitive: bool,
416}
417
418impl CsvApiClient {
419 pub fn new() -> Self {
420 Self {
421 datasource: None,
422 case_insensitive: false,
423 }
424 }
425
426 pub fn set_case_insensitive(&mut self, case_insensitive: bool) {
427 self.case_insensitive = case_insensitive;
428 }
429
430 pub fn load_csv<P: AsRef<Path>>(&mut self, path: P, table_name: &str) -> Result<()> {
431 self.datasource = Some(CsvDataSource::load_from_file(path, table_name)?);
432 Ok(())
433 }
434
435 pub fn load_json<P: AsRef<Path>>(&mut self, path: P, table_name: &str) -> Result<()> {
436 self.datasource = Some(CsvDataSource::load_from_json_file(path, table_name)?);
437 Ok(())
438 }
439
440 pub fn query_csv(&self, sql: &str) -> Result<QueryResponse> {
441 if let Some(ref ds) = self.datasource {
442 let data = ds.query_with_options(sql, self.case_insensitive)?;
443 let count = data.len();
444
445 Ok(QueryResponse {
446 data,
447 count,
448 query: QueryInfo {
449 select: vec!["*".to_string()],
450 where_clause: None,
451 order_by: None,
452 },
453 source: Some("file".to_string()),
454 table: Some(ds.table_name.clone()),
455 cached: Some(false),
456 })
457 } else {
458 Err(anyhow::anyhow!("No CSV file loaded"))
459 }
460 }
461
462 pub fn get_schema(&self) -> Option<HashMap<String, Vec<String>>> {
463 self.datasource.as_ref().map(|ds| {
464 let mut schema = HashMap::new();
465 schema.insert(ds.get_table_name().to_string(), ds.get_headers().to_vec());
466 schema
467 })
468 }
469
470 pub fn load_from_json(&mut self, data: Vec<Value>, table_name: &str) -> Result<()> {
471 let headers: Vec<String> = if let Some(first_row) = data.first() {
473 if let Some(obj) = first_row.as_object() {
474 obj.keys().map(|k| k.to_string()).collect()
475 } else {
476 return Err(anyhow::anyhow!("Invalid JSON data format"));
477 }
478 } else {
479 return Err(anyhow::anyhow!("Empty data set"));
480 };
481
482 let column_lookup = build_column_lookup(&headers);
483
484 self.datasource = Some(CsvDataSource {
485 data: data.clone(),
486 headers,
487 table_name: table_name.to_string(),
488 column_lookup,
489 });
490
491 Ok(())
492 }
493
494 pub fn get_datatable(&self) -> Option<DataTable> {
497 self.datasource.as_ref().map(|ds| {
498 debug!("V49: CsvApiClient returning DataTable directly");
499 ds.to_datatable()
500 })
501 }
502
503 pub fn has_data(&self) -> bool {
505 self.datasource.is_some()
506 }
507}