1use crate::data::stream_loader::{detect_delimiter_from_path, CsvReadOptions};
2use crate::datatable::{DataColumn, DataRow, DataTable, DataType, DataValue};
3use anyhow::{Context, Result};
4use csv::ReaderBuilder;
5use serde_json::Value as JsonValue;
6use std::collections::HashSet;
7use std::fs::File;
8use std::io::{BufRead, BufReader, Read};
9use std::path::Path;
10
11fn is_null_field(raw_line: &str, field_index: usize, delimiter: char) -> bool {
14 let mut delim_count = 0;
15 let mut in_quotes = false;
16 let mut field_start = 0;
17 let mut prev_char = ' ';
18
19 for (i, ch) in raw_line.char_indices() {
20 if ch == '"' && prev_char != '\\' {
21 in_quotes = !in_quotes;
22 }
23
24 if ch == delimiter && !in_quotes {
25 if delim_count == field_index {
26 let field_end = i;
27 let field_content = &raw_line[field_start..field_end].trim();
28 if field_content.is_empty() {
30 return true; }
32 if field_content.starts_with('"')
34 && field_content.ends_with('"')
35 && field_content.len() == 2
36 {
37 return false; }
39 return false; }
41 delim_count += 1;
42 field_start = i + ch.len_utf8();
43 }
44 prev_char = ch;
45 }
46
47 if delim_count == field_index {
49 let field_content = raw_line[field_start..]
50 .trim()
51 .trim_end_matches('\n')
52 .trim_end_matches('\r');
53 if field_content.is_empty() {
55 return true; }
57 if field_content.starts_with('"')
59 && field_content.ends_with('"')
60 && field_content.len() == 2
61 {
62 return false; }
64 return false; }
66
67 false }
69
70pub fn load_csv_to_datatable<P: AsRef<Path>>(path: P, table_name: &str) -> Result<DataTable> {
74 let path_ref = path.as_ref();
75 let opts = CsvReadOptions {
76 delimiter: detect_delimiter_from_path(&path_ref.display().to_string()),
77 has_headers: true,
78 };
79 load_csv_to_datatable_with_opts(path, table_name, &opts)
80}
81
82pub fn load_csv_to_datatable_with_opts<P: AsRef<Path>>(
85 path: P,
86 table_name: &str,
87 opts: &CsvReadOptions,
88) -> Result<DataTable> {
89 let file = File::open(&path)
90 .with_context(|| format!("Failed to open CSV file: {:?}", path.as_ref()))?;
91
92 let mut reader = ReaderBuilder::new()
93 .has_headers(opts.has_headers)
94 .delimiter(opts.delimiter)
95 .from_reader(file);
96
97 let headers = reader.headers()?.clone();
99 let mut table = DataTable::new(table_name);
100
101 table
103 .metadata
104 .insert("source_type".to_string(), "csv".to_string());
105 table.metadata.insert(
106 "source_path".to_string(),
107 path.as_ref().display().to_string(),
108 );
109 table.metadata.insert(
110 "delimiter".to_string(),
111 match opts.delimiter {
112 b'\t' => "\\t".to_string(),
113 b'\n' => "\\n".to_string(),
114 b'\r' => "\\r".to_string(),
115 b => (b as char).to_string(),
116 },
117 );
118
119 for header in &headers {
121 table.add_column(DataColumn::new(header));
122 }
123
124 let file2 = File::open(&path).with_context(|| {
126 format!(
127 "Failed to open CSV file for raw reading: {:?}",
128 path.as_ref()
129 )
130 })?;
131 let mut line_reader = BufReader::new(file2);
132 let mut raw_line = String::new();
133 line_reader.read_line(&mut raw_line)?;
135
136 let mut string_rows = Vec::new();
138 let mut raw_lines = Vec::new();
139
140 for result in reader.records() {
141 let record = result?;
142 let row: Vec<String> = record
143 .iter()
144 .map(std::string::ToString::to_string)
145 .collect();
146
147 raw_line.clear();
149 line_reader.read_line(&mut raw_line)?;
150 raw_lines.push(raw_line.clone());
151
152 string_rows.push(row);
153 }
154
155 let mut column_types = vec![DataType::Null; headers.len()];
157 let sample_size = string_rows.len().min(100); for row in string_rows.iter().take(sample_size) {
160 for (col_idx, value) in row.iter().enumerate() {
161 if !value.is_empty() {
162 let inferred = DataType::infer_from_string(value);
163 column_types[col_idx] = column_types[col_idx].merge(&inferred);
164 }
165 }
166 }
167
168 for (col_idx, column) in table.columns.iter_mut().enumerate() {
170 column.data_type = column_types[col_idx].clone();
171 }
172
173 for (row_idx, string_row) in string_rows.iter().enumerate() {
175 let mut values = Vec::new();
176 let raw_line = &raw_lines[row_idx];
177
178 for (col_idx, value) in string_row.iter().enumerate() {
179 let data_value = if value.is_empty() {
180 if is_null_field(raw_line, col_idx, opts.delimiter as char) {
182 DataValue::Null
183 } else {
184 DataValue::String(String::new())
185 }
186 } else {
187 DataValue::from_string(value, &column_types[col_idx])
188 };
189 values.push(data_value);
190 }
191 table
192 .add_row(DataRow::new(values))
193 .map_err(|e| anyhow::anyhow!(e))?;
194 }
195
196 table.infer_column_types();
198
199 Ok(table)
200}
201
202pub fn load_json_to_datatable<P: AsRef<Path>>(path: P, table_name: &str) -> Result<DataTable> {
207 let mut file = File::open(&path)
209 .with_context(|| format!("Failed to open JSON file: {:?}", path.as_ref()))?;
210 let mut json_str = String::new();
211 file.read_to_string(&mut json_str)?;
212
213 let json_data: Vec<JsonValue> = crate::data::stream_loader::parse_json_records(&json_str)?;
214
215 if json_data.is_empty() {
216 return Ok(DataTable::new(table_name));
217 }
218
219 let column_names = crate::data::stream_loader::collect_column_names(&json_data, 100);
222 if column_names.is_empty() {
223 return Err(anyhow::anyhow!(
224 "JSON data must contain objects (got non-object records)"
225 ));
226 }
227
228 let mut table = DataTable::new(table_name);
229
230 table
232 .metadata
233 .insert("source_type".to_string(), "json".to_string());
234 table.metadata.insert(
235 "source_path".to_string(),
236 path.as_ref().display().to_string(),
237 );
238
239 for name in &column_names {
240 table.add_column(DataColumn::new(name));
241 }
242
243 let mut string_rows = Vec::new();
245 for json_obj in &json_data {
246 if let Some(obj) = json_obj.as_object() {
247 let mut row = Vec::new();
248 for name in &column_names {
249 let value_str = match obj.get(name) {
250 Some(JsonValue::Null) | None => String::new(),
251 Some(JsonValue::Bool(b)) => b.to_string(),
252 Some(JsonValue::Number(n)) => n.to_string(),
253 Some(JsonValue::String(s)) => s.clone(),
254 Some(JsonValue::Array(arr)) => format!("{arr:?}"), Some(JsonValue::Object(obj)) => format!("{obj:?}"), };
257 row.push(value_str);
258 }
259 string_rows.push(row);
260 }
261 }
262
263 let mut column_types = vec![DataType::Null; column_names.len()];
265 let sample_size = string_rows.len().min(100);
266
267 for row in string_rows.iter().take(sample_size) {
268 for (col_idx, value) in row.iter().enumerate() {
269 if !value.is_empty() {
270 let inferred = DataType::infer_from_string(value);
271 column_types[col_idx] = column_types[col_idx].merge(&inferred);
272 }
273 }
274 }
275
276 for (col_idx, column) in table.columns.iter_mut().enumerate() {
278 column.data_type = column_types[col_idx].clone();
279 }
280
281 for string_row in string_rows {
283 let mut values = Vec::new();
284 for (col_idx, value) in string_row.iter().enumerate() {
285 let data_value = DataValue::from_string(value, &column_types[col_idx]);
286 values.push(data_value);
287 }
288 table
289 .add_row(DataRow::new(values))
290 .map_err(|e| anyhow::anyhow!(e))?;
291 }
292
293 table.infer_column_types();
295
296 Ok(table)
297}
298
299pub fn load_json_data_to_datatable(data: Vec<JsonValue>, table_name: &str) -> Result<DataTable> {
301 if data.is_empty() {
302 return Ok(DataTable::new(table_name));
303 }
304
305 let mut all_columns = HashSet::new();
307 for item in &data {
308 if let Some(obj) = item.as_object() {
309 for key in obj.keys() {
310 all_columns.insert(key.clone());
311 }
312 }
313 }
314
315 let column_names: Vec<String> = all_columns.into_iter().collect();
316 let mut table = DataTable::new(table_name);
317
318 table
320 .metadata
321 .insert("source_type".to_string(), "json_data".to_string());
322
323 for name in &column_names {
325 table.add_column(DataColumn::new(name));
326 }
327
328 let mut string_rows = Vec::new();
330 for json_obj in &data {
331 if let Some(obj) = json_obj.as_object() {
332 let mut row = Vec::new();
333 for name in &column_names {
334 let value_str = match obj.get(name) {
335 Some(JsonValue::Null) | None => String::new(),
336 Some(JsonValue::Bool(b)) => b.to_string(),
337 Some(JsonValue::Number(n)) => n.to_string(),
338 Some(JsonValue::String(s)) => s.clone(),
339 Some(JsonValue::Array(arr)) => format!("{arr:?}"),
340 Some(JsonValue::Object(obj)) => format!("{obj:?}"),
341 };
342 row.push(value_str);
343 }
344 string_rows.push(row);
345 }
346 }
347
348 let mut column_types = vec![DataType::Null; column_names.len()];
350 let sample_size = string_rows.len().min(100);
351
352 for row in string_rows.iter().take(sample_size) {
353 for (col_idx, value) in row.iter().enumerate() {
354 if !value.is_empty() {
355 let inferred = DataType::infer_from_string(value);
356 column_types[col_idx] = column_types[col_idx].merge(&inferred);
357 }
358 }
359 }
360
361 for (col_idx, column) in table.columns.iter_mut().enumerate() {
362 column.data_type = column_types[col_idx].clone();
363 }
364
365 for string_row in string_rows {
366 let mut values = Vec::new();
367 for (col_idx, value) in string_row.iter().enumerate() {
368 let data_value = DataValue::from_string(value, &column_types[col_idx]);
369 values.push(data_value);
370 }
371 table
372 .add_row(DataRow::new(values))
373 .map_err(|e| anyhow::anyhow!(e))?;
374 }
375
376 table.infer_column_types();
377
378 Ok(table)
379}
380
381#[cfg(test)]
382mod tests {
383 use super::*;
384 use std::io::Write;
385 use tempfile::NamedTempFile;
386
387 #[test]
388 fn test_load_csv() -> Result<()> {
389 let mut temp_file = NamedTempFile::new()?;
391 writeln!(temp_file, "id,name,price,quantity")?;
392 writeln!(temp_file, "1,Widget,9.99,100")?;
393 writeln!(temp_file, "2,Gadget,19.99,50")?;
394 writeln!(temp_file, "3,Doohickey,5.00,200")?;
395 temp_file.flush()?;
396
397 let table = load_csv_to_datatable(temp_file.path(), "products")?;
398
399 assert_eq!(table.name, "products");
400 assert_eq!(table.column_count(), 4);
401 assert_eq!(table.row_count(), 3);
402
403 assert_eq!(table.columns[0].name, "id");
405 assert_eq!(table.columns[0].data_type, DataType::Integer);
406
407 assert_eq!(table.columns[1].name, "name");
408 assert_eq!(table.columns[1].data_type, DataType::String);
409
410 assert_eq!(table.columns[2].name, "price");
411 assert_eq!(table.columns[2].data_type, DataType::Float);
412
413 assert_eq!(table.columns[3].name, "quantity");
414 assert_eq!(table.columns[3].data_type, DataType::Integer);
415
416 let value = table.get_value_by_name(0, "name").unwrap();
418 assert_eq!(value.to_string(), "Widget");
419
420 Ok(())
421 }
422
423 #[test]
424 fn test_fractional_value_beyond_sample_window_promotes_to_float() -> Result<()> {
425 let mut temp_file = NamedTempFile::new()?;
430 writeln!(temp_file, "id,area")?;
431 for i in 0..120 {
432 writeln!(temp_file, "{i},{}", i * 10)?; }
434 writeln!(temp_file, "999,34.2")?; temp_file.flush()?;
436
437 let table = load_csv_to_datatable(temp_file.path(), "areas")?;
438
439 let area_idx = table.get_column_index("area").unwrap();
441 assert_eq!(table.columns[area_idx].data_type, DataType::Float);
442
443 let last = table.get_value(120, area_idx).unwrap();
445 assert!(
446 matches!(last, DataValue::Float(f) if (*f - 34.2).abs() < 1e-9),
447 "expected Float(34.2), got {last:?}"
448 );
449
450 Ok(())
451 }
452
453 #[test]
454 fn test_load_json() -> Result<()> {
455 let mut temp_file = NamedTempFile::new()?;
457 writeln!(
458 temp_file,
459 r#"[
460 {{"id": 1, "name": "Alice", "active": true, "score": 95.5}},
461 {{"id": 2, "name": "Bob", "active": false, "score": 87.3}},
462 {{"id": 3, "name": "Charlie", "active": true, "score": null}}
463 ]"#
464 )?;
465 temp_file.flush()?;
466
467 let table = load_json_to_datatable(temp_file.path(), "users")?;
468
469 assert_eq!(table.name, "users");
470 assert_eq!(table.column_count(), 4);
471 assert_eq!(table.row_count(), 3);
472
473 let score = table.get_value_by_name(2, "score").unwrap();
475 assert!(score.is_null());
476
477 Ok(())
478 }
479
480 #[test]
481 fn test_load_csv_with_pipe_delimiter_via_opts() -> Result<()> {
482 let mut temp_file = NamedTempFile::new()?;
483 writeln!(temp_file, "id|name|price")?;
484 writeln!(temp_file, "1|Widget|9.99")?;
485 writeln!(temp_file, "2|Gadget|19.99")?;
486 temp_file.flush()?;
487
488 let opts = CsvReadOptions {
489 delimiter: b'|',
490 has_headers: true,
491 };
492 let table = load_csv_to_datatable_with_opts(temp_file.path(), "psv_products", &opts)?;
493
494 assert_eq!(table.column_count(), 3);
495 assert_eq!(table.row_count(), 2);
496 assert_eq!(table.columns[0].name, "id");
497 assert_eq!(table.columns[1].name, "name");
498 assert_eq!(table.columns[0].data_type, DataType::Integer);
499 assert_eq!(
500 table.get_value_by_name(0, "name").unwrap().to_string(),
501 "Widget"
502 );
503 assert_eq!(
504 table.metadata.get("delimiter").map(String::as_str),
505 Some("|")
506 );
507 Ok(())
508 }
509
510 #[test]
511 fn test_default_load_csv_records_comma_delimiter() -> Result<()> {
512 let mut temp_file = NamedTempFile::new()?;
513 writeln!(temp_file, "a,b")?;
514 writeln!(temp_file, "1,2")?;
515 temp_file.flush()?;
516
517 let table = load_csv_to_datatable(temp_file.path(), "t")?;
518 assert_eq!(
519 table.metadata.get("delimiter").map(String::as_str),
520 Some(",")
521 );
522 Ok(())
523 }
524}