1use anyhow::{Context, Result};
5use csv::ReaderBuilder;
6use serde_json::Value as JsonValue;
7use std::collections::{HashMap, HashSet};
8use std::io::{BufRead, BufReader, Read};
9use tracing::{debug, info};
10
11use crate::data::advanced_csv_loader::StringInterner;
12use crate::data::datatable::{DataColumn, DataRow, DataTable, DataType, DataValue};
13
14#[derive(Debug, Clone)]
20pub struct CsvReadOptions {
21 pub delimiter: u8,
22 pub has_headers: bool,
23}
24
25impl Default for CsvReadOptions {
26 fn default() -> Self {
27 Self {
28 delimiter: b',',
29 has_headers: true,
30 }
31 }
32}
33
34pub fn detect_delimiter_from_path(path: &str) -> u8 {
40 let lower = path.to_ascii_lowercase();
41 if lower.ends_with(".tsv") {
42 b'\t'
43 } else if lower.ends_with(".psv") {
44 b'|'
45 } else {
46 b','
47 }
48}
49
50pub fn parse_delimiter_arg(s: &str) -> anyhow::Result<u8> {
60 match s {
61 "\\t" | "\t" => return Ok(b'\t'),
62 "\\n" => return Ok(b'\n'),
63 "\\r" => return Ok(b'\r'),
64 _ => {}
65 }
66 let bytes = s.as_bytes();
67 if bytes.len() == 1 && bytes[0].is_ascii() {
68 return Ok(bytes[0]);
69 }
70 Err(anyhow::anyhow!(
71 "delimiter must be a single ASCII character (or '\\t', '\\n', '\\r'); got {:?}",
72 s
73 ))
74}
75
76pub fn resolve_delimiter(path: &str, explicit: Option<u8>) -> u8 {
83 explicit.unwrap_or_else(|| detect_delimiter_from_path(path))
84}
85
86fn delimiter_label(d: u8) -> String {
88 match d {
89 b'\t' => "\\t".to_string(),
90 b'\n' => "\\n".to_string(),
91 b'\r' => "\\r".to_string(),
92 b => (b as char).to_string(),
93 }
94}
95
96#[derive(Debug)]
98struct ColumnAnalysis {
99 index: usize,
100 _name: String,
101 _cardinality: usize,
102 _sample_size: usize,
103 _unique_ratio: f64,
104 is_categorical: bool,
105 _avg_string_length: usize,
106}
107
108pub struct StreamCsvLoader {
110 sample_size: usize,
111 cardinality_threshold: f64,
112 interners: HashMap<usize, StringInterner>,
113}
114
115impl StreamCsvLoader {
116 pub fn new() -> Self {
117 Self {
118 sample_size: 1000,
119 cardinality_threshold: 0.3,
120 interners: HashMap::new(),
121 }
122 }
123
124 fn analyze_columns(
126 &self,
127 rows: &[Vec<String>],
128 headers: &csv::StringRecord,
129 ) -> Vec<ColumnAnalysis> {
130 let mut analyses = Vec::new();
131
132 for (col_idx, header) in headers.iter().enumerate() {
133 let mut unique_values = HashSet::new();
134 let mut total_length = 0;
135 let mut non_empty_count = 0;
136
137 for row in rows.iter().take(self.sample_size) {
139 if let Some(value) = row.get(col_idx) {
140 if !value.is_empty() {
141 unique_values.insert(value.clone());
142 total_length += value.len();
143 non_empty_count += 1;
144 }
145 }
146 }
147
148 let cardinality = unique_values.len();
149 let sample_size = rows.len().min(self.sample_size);
150 let unique_ratio = if sample_size > 0 {
151 cardinality as f64 / sample_size as f64
152 } else {
153 1.0
154 };
155
156 let avg_string_length = if non_empty_count > 0 {
157 total_length / non_empty_count
158 } else {
159 0
160 };
161
162 let is_categorical = unique_ratio < self.cardinality_threshold
164 || (avg_string_length < 20 && cardinality < sample_size / 2);
165
166 analyses.push(ColumnAnalysis {
167 index: col_idx,
168 _name: header.to_string(),
169 _cardinality: cardinality,
170 _sample_size: sample_size,
171 _unique_ratio: unique_ratio,
172 is_categorical,
173 _avg_string_length: avg_string_length,
174 });
175 }
176
177 analyses
178 }
179
180 pub fn load_csv_from_reader<R: Read>(
184 &mut self,
185 reader: R,
186 table_name: &str,
187 source_type: &str,
188 source_path: &str,
189 ) -> Result<DataTable> {
190 self.load_csv_from_reader_with_opts(
191 reader,
192 table_name,
193 source_type,
194 source_path,
195 &CsvReadOptions::default(),
196 )
197 }
198
199 pub fn load_csv_from_reader_with_opts<R: Read>(
202 &mut self,
203 mut reader: R,
204 table_name: &str,
205 source_type: &str,
206 source_path: &str,
207 opts: &CsvReadOptions,
208 ) -> Result<DataTable> {
209 info!(
210 "Stream CSV load: Loading {} with optimizations (delimiter={})",
211 source_path,
212 delimiter_label(opts.delimiter)
213 );
214
215 let mut buffer = Vec::new();
217 reader.read_to_end(&mut buffer)?;
218
219 let mut csv_reader = ReaderBuilder::new()
221 .has_headers(opts.has_headers)
222 .delimiter(opts.delimiter)
223 .from_reader(&buffer[..]);
224
225 let headers = csv_reader.headers()?.clone();
226 let mut table = DataTable::new(table_name);
227
228 table
230 .metadata
231 .insert("source_type".to_string(), source_type.to_string());
232 table
233 .metadata
234 .insert("source_path".to_string(), source_path.to_string());
235 table
236 .metadata
237 .insert("delimiter".to_string(), delimiter_label(opts.delimiter));
238
239 for header in &headers {
241 table.add_column(DataColumn::new(header));
242 }
243
244 let mut string_rows = Vec::new();
246 for result in csv_reader.records() {
247 let record = result?;
248 let row: Vec<String> = record.iter().map(|s| s.to_string()).collect();
249 string_rows.push(row);
250 }
251
252 let analyses = self.analyze_columns(&string_rows, &headers);
254 let categorical_columns: HashSet<usize> = analyses
255 .iter()
256 .filter(|a| a.is_categorical)
257 .map(|a| a.index)
258 .collect();
259
260 info!(
261 "Column analysis: {} of {} columns will use string interning",
262 categorical_columns.len(),
263 analyses.len()
264 );
265
266 for col_idx in &categorical_columns {
268 self.interners.insert(*col_idx, StringInterner::new());
269 }
270
271 let mut line_reader = BufReader::new(&buffer[..]);
273 let mut raw_lines = Vec::new();
274 let mut raw_line = String::new();
275
276 line_reader.read_line(&mut raw_line)?;
278 raw_line.clear();
279
280 for _ in 0..string_rows.len() {
282 line_reader.read_line(&mut raw_line)?;
283 raw_lines.push(raw_line.clone());
284 raw_line.clear();
285 }
286
287 let mut column_types = vec![DataType::Null; headers.len()];
289 let sample_size = string_rows.len().min(100);
290
291 for row in string_rows.iter().take(sample_size) {
292 for (col_idx, value) in row.iter().enumerate() {
293 if !value.is_empty() {
294 let inferred = DataType::infer_from_string(value);
295 column_types[col_idx] = column_types[col_idx].merge(&inferred);
296 }
297 }
298 }
299
300 for (col_idx, column) in table.columns.iter_mut().enumerate() {
302 column.data_type = column_types[col_idx].clone();
303 }
304
305 for (row_idx, string_row) in string_rows.iter().enumerate() {
307 let mut values = Vec::new();
308 let raw_line = &raw_lines[row_idx];
309
310 for (col_idx, value) in string_row.iter().enumerate() {
311 let data_value = if value.is_empty() {
312 if is_null_field(raw_line, col_idx, opts.delimiter as char) {
314 DataValue::Null
315 } else if categorical_columns.contains(&col_idx) {
316 if let Some(interner) = self.interners.get_mut(&col_idx) {
318 DataValue::InternedString(interner.intern(""))
319 } else {
320 DataValue::String(String::new())
321 }
322 } else {
323 DataValue::String(String::new())
324 }
325 } else if categorical_columns.contains(&col_idx)
326 && column_types[col_idx] == DataType::String
327 {
328 if let Some(interner) = self.interners.get_mut(&col_idx) {
330 DataValue::InternedString(interner.intern(value))
331 } else {
332 DataValue::from_string(value, &column_types[col_idx])
333 }
334 } else {
335 DataValue::from_string(value, &column_types[col_idx])
336 };
337 values.push(data_value);
338 }
339 table
340 .add_row(DataRow::new(values))
341 .map_err(|e| anyhow::anyhow!(e))?;
342 }
343
344 for (col_idx, interner) in &self.interners {
346 let stats = interner.stats();
347 if stats.memory_saved_bytes > 0 {
348 debug!(
349 "Column {} interning: {} unique strings, {} references, {} bytes saved",
350 headers.get(*col_idx).unwrap_or(&String::new()),
351 stats.unique_strings,
352 stats.total_references,
353 stats.memory_saved_bytes
354 );
355 }
356 }
357
358 table.infer_column_types();
360
361 Ok(table)
362 }
363}
364
365pub fn load_csv_from_reader<R: Read>(
368 reader: R,
369 table_name: &str,
370 source_type: &str,
371 source_path: &str,
372) -> Result<DataTable> {
373 let mut loader = StreamCsvLoader::new();
374 loader.load_csv_from_reader(reader, table_name, source_type, source_path)
375}
376
377pub fn load_csv_from_reader_with_opts<R: Read>(
380 reader: R,
381 table_name: &str,
382 source_type: &str,
383 source_path: &str,
384 opts: &CsvReadOptions,
385) -> Result<DataTable> {
386 let mut loader = StreamCsvLoader::new();
387 loader.load_csv_from_reader_with_opts(reader, table_name, source_type, source_path, opts)
388}
389
390pub fn parse_json_records(content: &str) -> Result<Vec<JsonValue>> {
398 let trimmed = content.trim_start();
399 if trimmed.starts_with('[') {
400 return serde_json::from_str(content).with_context(|| "Failed to parse JSON array");
401 }
402
403 let mut out = Vec::new();
404 for (idx, raw_line) in content.lines().enumerate() {
405 let line = raw_line.trim();
406 if line.is_empty() {
407 continue;
408 }
409 let value: JsonValue = serde_json::from_str(line)
410 .with_context(|| format!("Failed to parse JSONL at line {}", idx + 1))?;
411 out.push(value);
412 }
413 Ok(out)
414}
415
416pub fn collect_column_names(records: &[JsonValue], sample_size: usize) -> Vec<String> {
420 let mut seen: HashSet<String> = HashSet::new();
421 let mut names: Vec<String> = Vec::new();
422 for record in records.iter().take(sample_size) {
423 if let Some(obj) = record.as_object() {
424 for key in obj.keys() {
425 if seen.insert(key.clone()) {
426 names.push(key.clone());
427 }
428 }
429 }
430 }
431 names
432}
433
434pub fn load_json_from_reader<R: Read>(
439 mut reader: R,
440 table_name: &str,
441 source_type: &str,
442 source_path: &str,
443) -> Result<DataTable> {
444 let mut json_str = String::new();
445 reader.read_to_string(&mut json_str)?;
446
447 let json_data: Vec<JsonValue> = parse_json_records(&json_str)?;
448
449 if json_data.is_empty() {
450 return Ok(DataTable::new(table_name));
451 }
452
453 let column_names = collect_column_names(&json_data, 100);
457 if column_names.is_empty() {
458 return Err(anyhow::anyhow!(
459 "JSON data must contain objects (got non-object records)"
460 ));
461 }
462
463 let mut table = DataTable::new(table_name);
464
465 table
467 .metadata
468 .insert("source_type".to_string(), source_type.to_string());
469 table
470 .metadata
471 .insert("source_path".to_string(), source_path.to_string());
472
473 for name in &column_names {
474 table.add_column(DataColumn::new(name));
475 }
476
477 let mut string_rows = Vec::new();
479 for json_obj in &json_data {
480 if let Some(obj) = json_obj.as_object() {
481 let mut row = Vec::new();
482 for col_name in &column_names {
483 let value = obj
484 .get(col_name)
485 .map(|v| json_value_to_string(v))
486 .unwrap_or_default();
487 row.push(value);
488 }
489 string_rows.push(row);
490 }
491 }
492
493 let mut column_types = vec![DataType::Null; column_names.len()];
495 let sample_size = string_rows.len().min(100);
496
497 for row in string_rows.iter().take(sample_size) {
498 for (col_idx, value) in row.iter().enumerate() {
499 if !value.is_empty() && value != "null" {
500 let inferred = DataType::infer_from_string(value);
501 column_types[col_idx] = column_types[col_idx].merge(&inferred);
502 }
503 }
504 }
505
506 for (col_idx, column) in table.columns.iter_mut().enumerate() {
508 column.data_type = column_types[col_idx].clone();
509 }
510
511 for string_row in &string_rows {
513 let mut values = Vec::new();
514 for (col_idx, value) in string_row.iter().enumerate() {
515 let data_value = if value.is_empty() || value == "null" {
516 DataValue::Null
517 } else {
518 DataValue::from_string(value, &column_types[col_idx])
519 };
520 values.push(data_value);
521 }
522 table
523 .add_row(DataRow::new(values))
524 .map_err(|e| anyhow::anyhow!(e))?;
525 }
526
527 table.infer_column_types();
529
530 Ok(table)
531}
532
533fn json_value_to_string(value: &JsonValue) -> String {
535 match value {
536 JsonValue::Null => String::new(),
537 JsonValue::Bool(b) => b.to_string(),
538 JsonValue::Number(n) => n.to_string(),
539 JsonValue::String(s) => s.clone(),
540 JsonValue::Array(arr) => format!("{:?}", arr),
541 JsonValue::Object(obj) => format!("{:?}", obj),
542 }
543}
544
545fn is_null_field(raw_line: &str, field_index: usize, delimiter: char) -> bool {
548 let mut delim_count = 0;
549 let mut in_quotes = false;
550 let mut field_start = 0;
551 let mut prev_char = ' ';
552
553 for (i, ch) in raw_line.char_indices() {
554 if ch == '"' && prev_char != '\\' {
555 in_quotes = !in_quotes;
556 } else if ch == delimiter && !in_quotes {
557 if delim_count == field_index {
558 return i == field_start
560 || (i == field_start + 1
561 && raw_line.chars().nth(field_start) == Some(delimiter));
562 }
563 delim_count += 1;
564 field_start = i + 1;
565 }
566 prev_char = ch;
567 }
568
569 if delim_count == field_index {
571 let remaining = raw_line[field_start..].trim_end();
572 return remaining.is_empty() || remaining.chars().next() == Some(delimiter);
573 }
574
575 false
576}
577
578#[cfg(test)]
579mod tests {
580 use super::*;
581 use std::io::Cursor;
582
583 #[test]
584 fn test_csv_from_reader() {
585 let csv_data = "id,name,value\n1,Alice,100\n2,Bob,200\n3,,300";
586 let reader = Cursor::new(csv_data);
587
588 let table =
589 load_csv_from_reader(reader, "test", "stream", "memory").expect("Failed to load CSV");
590
591 assert_eq!(table.name, "test");
592 assert_eq!(table.column_count(), 3);
593 assert_eq!(table.row_count(), 3);
594
595 let value = table.get_value(2, 1).unwrap();
597 assert!(matches!(value, DataValue::Null));
598 }
599
600 #[test]
601 fn test_json_from_reader() {
602 let json_data = r#"[
603 {"id": 1, "name": "Alice", "value": 100},
604 {"id": 2, "name": "Bob", "value": 200},
605 {"id": 3, "name": null, "value": 300}
606 ]"#;
607 let reader = Cursor::new(json_data);
608
609 let table =
610 load_json_from_reader(reader, "test", "stream", "memory").expect("Failed to load JSON");
611
612 assert_eq!(table.name, "test");
613 assert_eq!(table.column_count(), 3);
614 assert_eq!(table.row_count(), 3);
615
616 let value = table.get_value(2, 1).unwrap();
618 assert!(matches!(value, DataValue::Null));
619 }
620
621 #[test]
622 fn test_jsonl_from_reader() {
623 let jsonl_data = "{\"id\":1,\"name\":\"Alice\"}\n{\"id\":2,\"name\":\"Bob\"}\n";
624 let reader = Cursor::new(jsonl_data);
625
626 let table = load_json_from_reader(reader, "test", "stream", "memory")
627 .expect("Failed to load JSONL");
628
629 assert_eq!(table.column_count(), 2);
630 assert_eq!(table.row_count(), 2);
631 }
632
633 #[test]
634 fn test_jsonl_heterogeneous_schema_unioned() {
635 let jsonl_data = "{\"id\":1}\n{\"id\":2,\"extra\":\"hi\"}\n";
638 let reader = Cursor::new(jsonl_data);
639 let table = load_json_from_reader(reader, "test", "stream", "memory").expect("load");
640 assert_eq!(table.column_count(), 2);
641 assert_eq!(table.row_count(), 2);
642 }
643
644 #[test]
645 fn test_jsonl_skips_blank_lines() {
646 let jsonl_data = "{\"id\":1}\n\n\n{\"id\":2}\n";
647 let reader = Cursor::new(jsonl_data);
648 let table = load_json_from_reader(reader, "test", "stream", "memory").expect("load");
649 assert_eq!(table.row_count(), 2);
650 }
651
652 #[test]
653 fn test_parse_json_records_array_form() {
654 let recs = parse_json_records(r#"[{"a":1},{"a":2}]"#).unwrap();
655 assert_eq!(recs.len(), 2);
656 }
657
658 #[test]
659 fn test_parse_json_records_jsonl_form() {
660 let recs = parse_json_records("{\"a\":1}\n{\"a\":2}\n").unwrap();
661 assert_eq!(recs.len(), 2);
662 }
663
664 #[test]
665 fn test_parse_json_records_jsonl_error_cites_line() {
666 let err = parse_json_records("{\"a\":1}\nnot json\n").unwrap_err();
667 assert!(err.to_string().contains("line 2"));
668 }
669
670 #[test]
673 fn test_csv_options_default_is_comma() {
674 let opts = CsvReadOptions::default();
675 assert_eq!(opts.delimiter, b',');
676 assert!(opts.has_headers);
677 }
678
679 #[test]
680 fn test_detect_delimiter_from_path() {
681 assert_eq!(detect_delimiter_from_path("data.tsv"), b'\t');
682 assert_eq!(detect_delimiter_from_path("data.TSV"), b'\t');
683 assert_eq!(detect_delimiter_from_path("/tmp/foo.psv"), b'|');
684 assert_eq!(detect_delimiter_from_path("data.PSV"), b'|');
685 assert_eq!(detect_delimiter_from_path("data.csv"), b',');
686 assert_eq!(detect_delimiter_from_path("noext"), b',');
687 assert_eq!(detect_delimiter_from_path("-"), b',');
688 }
689
690 #[test]
691 fn test_load_csv_with_pipe_delimiter() {
692 let data = "id|name|score\n1|alice|10\n2|bob|20\n";
693 let reader = Cursor::new(data);
694 let opts = CsvReadOptions {
695 delimiter: b'|',
696 has_headers: true,
697 };
698 let table = load_csv_from_reader_with_opts(reader, "psv", "test", "memory", &opts)
699 .expect("load failed");
700 assert_eq!(table.column_count(), 3);
701 assert_eq!(table.row_count(), 2);
702 assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(1));
703 assert_eq!(
704 table.get_value(1, 1).unwrap(),
705 &DataValue::String("bob".to_string())
706 );
707 }
708
709 #[test]
710 fn test_load_csv_with_tab_delimiter() {
711 let data = "id\tname\tscore\n1\talice\t10\n2\tbob\t20\n";
712 let reader = Cursor::new(data);
713 let opts = CsvReadOptions {
714 delimiter: b'\t',
715 has_headers: true,
716 };
717 let table = load_csv_from_reader_with_opts(reader, "tsv", "test", "memory", &opts)
718 .expect("load failed");
719 assert_eq!(table.column_count(), 3);
720 assert_eq!(table.row_count(), 2);
721 assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(1));
722 }
723
724 #[test]
725 fn test_metadata_records_delimiter() {
726 let table = load_csv_from_reader(Cursor::new("a,b\n1,2\n"), "t", "test", "memory").unwrap();
728 assert_eq!(
729 table.metadata.get("delimiter").map(String::as_str),
730 Some(",")
731 );
732
733 let opts = CsvReadOptions {
735 delimiter: b'\t',
736 has_headers: true,
737 };
738 let table = load_csv_from_reader_with_opts(
739 Cursor::new("a\tb\n1\t2\n"),
740 "t",
741 "test",
742 "memory",
743 &opts,
744 )
745 .unwrap();
746 assert_eq!(
747 table.metadata.get("delimiter").map(String::as_str),
748 Some("\\t")
749 );
750 }
751
752 #[test]
753 fn test_parse_delimiter_arg_accepts_single_char() {
754 assert_eq!(parse_delimiter_arg(",").unwrap(), b',');
755 assert_eq!(parse_delimiter_arg("|").unwrap(), b'|');
756 assert_eq!(parse_delimiter_arg(";").unwrap(), b';');
757 }
758
759 #[test]
760 fn test_parse_delimiter_arg_accepts_backslash_escapes() {
761 assert_eq!(parse_delimiter_arg("\\t").unwrap(), b'\t');
762 assert_eq!(parse_delimiter_arg("\t").unwrap(), b'\t');
763 assert_eq!(parse_delimiter_arg("\\n").unwrap(), b'\n');
764 assert_eq!(parse_delimiter_arg("\\r").unwrap(), b'\r');
765 }
766
767 #[test]
768 fn test_parse_delimiter_arg_rejects_multi_char() {
769 let err = parse_delimiter_arg("||").unwrap_err();
770 assert!(err.to_string().contains("single ASCII character"));
771 }
772
773 #[test]
774 fn test_parse_delimiter_arg_rejects_non_ascii() {
775 let err = parse_delimiter_arg("ö").unwrap_err();
776 assert!(err.to_string().contains("single ASCII character"));
777 }
778
779 #[test]
780 fn test_resolve_delimiter_explicit_wins() {
781 assert_eq!(resolve_delimiter("data.psv", Some(b',')), b',');
782 assert_eq!(resolve_delimiter("data.tsv", Some(b';')), b';');
783 assert_eq!(resolve_delimiter("data.csv", Some(b'|')), b'|');
784 }
785
786 #[test]
787 fn test_resolve_delimiter_falls_back_to_extension() {
788 assert_eq!(resolve_delimiter("data.psv", None), b'|');
789 assert_eq!(resolve_delimiter("data.tsv", None), b'\t');
790 assert_eq!(resolve_delimiter("data.csv", None), b',');
791 assert_eq!(resolve_delimiter("data.dat", None), b',');
792 }
793
794 #[test]
795 fn test_null_detection_works_with_pipe_delimiter() {
796 let data = "id|name|score\n1||10\n";
798 let opts = CsvReadOptions {
799 delimiter: b'|',
800 has_headers: true,
801 };
802 let table =
803 load_csv_from_reader_with_opts(Cursor::new(data), "psv", "test", "memory", &opts)
804 .expect("load failed");
805 assert!(matches!(table.get_value(0, 1).unwrap(), DataValue::Null));
806 }
807}