1use anyhow::{Context, Result};
5use csv::ReaderBuilder;
6use serde_json::Value as JsonValue;
7use std::collections::{HashMap, HashSet};
8use std::io::{BufRead, BufReader, Read};
9use tracing::{debug, info};
10
11use crate::data::advanced_csv_loader::StringInterner;
12use crate::data::datatable::{DataColumn, DataRow, DataTable, DataType, DataValue};
13
14#[derive(Debug, Clone)]
20pub struct CsvReadOptions {
21 pub delimiter: u8,
22 pub has_headers: bool,
23}
24
25impl Default for CsvReadOptions {
26 fn default() -> Self {
27 Self {
28 delimiter: b',',
29 has_headers: true,
30 }
31 }
32}
33
34pub fn detect_delimiter_from_path(path: &str) -> u8 {
40 let lower = path.to_ascii_lowercase();
41 if lower.ends_with(".tsv") {
42 b'\t'
43 } else if lower.ends_with(".psv") {
44 b'|'
45 } else {
46 b','
47 }
48}
49
50pub fn parse_delimiter_arg(s: &str) -> anyhow::Result<u8> {
60 match s {
61 "\\t" | "\t" => return Ok(b'\t'),
62 "\\n" => return Ok(b'\n'),
63 "\\r" => return Ok(b'\r'),
64 _ => {}
65 }
66 let bytes = s.as_bytes();
67 if bytes.len() == 1 && bytes[0].is_ascii() {
68 return Ok(bytes[0]);
69 }
70 Err(anyhow::anyhow!(
71 "delimiter must be a single ASCII character (or '\\t', '\\n', '\\r'); got {:?}",
72 s
73 ))
74}
75
76pub fn resolve_delimiter(path: &str, explicit: Option<u8>) -> u8 {
83 explicit.unwrap_or_else(|| detect_delimiter_from_path(path))
84}
85
86fn delimiter_label(d: u8) -> String {
88 match d {
89 b'\t' => "\\t".to_string(),
90 b'\n' => "\\n".to_string(),
91 b'\r' => "\\r".to_string(),
92 b => (b as char).to_string(),
93 }
94}
95
96#[derive(Debug)]
98struct ColumnAnalysis {
99 index: usize,
100 _name: String,
101 _cardinality: usize,
102 _sample_size: usize,
103 _unique_ratio: f64,
104 is_categorical: bool,
105 _avg_string_length: usize,
106}
107
108pub struct StreamCsvLoader {
110 sample_size: usize,
111 cardinality_threshold: f64,
112 interners: HashMap<usize, StringInterner>,
113}
114
115impl StreamCsvLoader {
116 pub fn new() -> Self {
117 Self {
118 sample_size: 1000,
119 cardinality_threshold: 0.3,
120 interners: HashMap::new(),
121 }
122 }
123
124 fn analyze_columns(
126 &self,
127 rows: &[Vec<String>],
128 headers: &csv::StringRecord,
129 ) -> Vec<ColumnAnalysis> {
130 let mut analyses = Vec::new();
131
132 for (col_idx, header) in headers.iter().enumerate() {
133 let mut unique_values = HashSet::new();
134 let mut total_length = 0;
135 let mut non_empty_count = 0;
136
137 for row in rows.iter().take(self.sample_size) {
139 if let Some(value) = row.get(col_idx) {
140 if !value.is_empty() {
141 unique_values.insert(value.clone());
142 total_length += value.len();
143 non_empty_count += 1;
144 }
145 }
146 }
147
148 let cardinality = unique_values.len();
149 let sample_size = rows.len().min(self.sample_size);
150 let unique_ratio = if sample_size > 0 {
151 cardinality as f64 / sample_size as f64
152 } else {
153 1.0
154 };
155
156 let avg_string_length = if non_empty_count > 0 {
157 total_length / non_empty_count
158 } else {
159 0
160 };
161
162 let is_categorical = unique_ratio < self.cardinality_threshold
164 || (avg_string_length < 20 && cardinality < sample_size / 2);
165
166 analyses.push(ColumnAnalysis {
167 index: col_idx,
168 _name: header.to_string(),
169 _cardinality: cardinality,
170 _sample_size: sample_size,
171 _unique_ratio: unique_ratio,
172 is_categorical,
173 _avg_string_length: avg_string_length,
174 });
175 }
176
177 analyses
178 }
179
180 pub fn load_csv_from_reader<R: Read>(
184 &mut self,
185 reader: R,
186 table_name: &str,
187 source_type: &str,
188 source_path: &str,
189 ) -> Result<DataTable> {
190 self.load_csv_from_reader_with_opts(
191 reader,
192 table_name,
193 source_type,
194 source_path,
195 &CsvReadOptions::default(),
196 )
197 }
198
199 pub fn load_csv_from_reader_with_opts<R: Read>(
202 &mut self,
203 mut reader: R,
204 table_name: &str,
205 source_type: &str,
206 source_path: &str,
207 opts: &CsvReadOptions,
208 ) -> Result<DataTable> {
209 info!(
210 "Stream CSV load: Loading {} with optimizations (delimiter={})",
211 source_path,
212 delimiter_label(opts.delimiter)
213 );
214
215 let mut buffer = Vec::new();
217 reader.read_to_end(&mut buffer)?;
218
219 let mut csv_reader = ReaderBuilder::new()
221 .has_headers(opts.has_headers)
222 .delimiter(opts.delimiter)
223 .from_reader(&buffer[..]);
224
225 let headers = csv_reader.headers()?.clone();
226 let mut table = DataTable::new(table_name);
227
228 table
230 .metadata
231 .insert("source_type".to_string(), source_type.to_string());
232 table
233 .metadata
234 .insert("source_path".to_string(), source_path.to_string());
235 table
236 .metadata
237 .insert("delimiter".to_string(), delimiter_label(opts.delimiter));
238
239 for header in &headers {
241 table.add_column(DataColumn::new(header));
242 }
243
244 let mut string_rows = Vec::new();
246 for result in csv_reader.records() {
247 let record = result?;
248 let row: Vec<String> = record.iter().map(|s| s.to_string()).collect();
249 string_rows.push(row);
250 }
251
252 let analyses = self.analyze_columns(&string_rows, &headers);
254 let categorical_columns: HashSet<usize> = analyses
255 .iter()
256 .filter(|a| a.is_categorical)
257 .map(|a| a.index)
258 .collect();
259
260 info!(
261 "Column analysis: {} of {} columns will use string interning",
262 categorical_columns.len(),
263 analyses.len()
264 );
265
266 for col_idx in &categorical_columns {
268 self.interners.insert(*col_idx, StringInterner::new());
269 }
270
271 let mut line_reader = BufReader::new(&buffer[..]);
273 let mut raw_lines = Vec::new();
274 let mut raw_line = String::new();
275
276 line_reader.read_line(&mut raw_line)?;
278 raw_line.clear();
279
280 for _ in 0..string_rows.len() {
282 line_reader.read_line(&mut raw_line)?;
283 raw_lines.push(raw_line.clone());
284 raw_line.clear();
285 }
286
287 let mut column_types = vec![DataType::Null; headers.len()];
289 let sample_size = string_rows.len().min(100);
290
291 for row in string_rows.iter().take(sample_size) {
292 for (col_idx, value) in row.iter().enumerate() {
293 if !value.is_empty() {
294 let inferred = DataType::infer_from_string(value);
295 column_types[col_idx] = column_types[col_idx].merge(&inferred);
296 }
297 }
298 }
299
300 for (col_idx, column) in table.columns.iter_mut().enumerate() {
302 column.data_type = column_types[col_idx].clone();
303 }
304
305 for (row_idx, string_row) in string_rows.iter().enumerate() {
307 let mut values = Vec::new();
308 let raw_line = &raw_lines[row_idx];
309
310 for (col_idx, value) in string_row.iter().enumerate() {
311 let data_value = if value.is_empty() {
312 if is_null_field(raw_line, col_idx, opts.delimiter as char) {
314 DataValue::Null
315 } else if categorical_columns.contains(&col_idx) {
316 if let Some(interner) = self.interners.get_mut(&col_idx) {
318 DataValue::InternedString(interner.intern(""))
319 } else {
320 DataValue::String(String::new())
321 }
322 } else {
323 DataValue::String(String::new())
324 }
325 } else if categorical_columns.contains(&col_idx)
326 && column_types[col_idx] == DataType::String
327 {
328 if let Some(interner) = self.interners.get_mut(&col_idx) {
330 DataValue::InternedString(interner.intern(value))
331 } else {
332 DataValue::from_string(value, &column_types[col_idx])
333 }
334 } else {
335 DataValue::from_string(value, &column_types[col_idx])
336 };
337 values.push(data_value);
338 }
339 table
340 .add_row(DataRow::new(values))
341 .map_err(|e| anyhow::anyhow!(e))?;
342 }
343
344 for (col_idx, interner) in &self.interners {
346 let stats = interner.stats();
347 if stats.memory_saved_bytes > 0 {
348 debug!(
349 "Column {} interning: {} unique strings, {} references, {} bytes saved",
350 headers.get(*col_idx).unwrap_or(&String::new()),
351 stats.unique_strings,
352 stats.total_references,
353 stats.memory_saved_bytes
354 );
355 }
356 }
357
358 table.infer_column_types();
360
361 Ok(table)
362 }
363}
364
365pub fn load_csv_from_reader<R: Read>(
368 reader: R,
369 table_name: &str,
370 source_type: &str,
371 source_path: &str,
372) -> Result<DataTable> {
373 let mut loader = StreamCsvLoader::new();
374 loader.load_csv_from_reader(reader, table_name, source_type, source_path)
375}
376
377pub fn load_csv_from_reader_with_opts<R: Read>(
380 reader: R,
381 table_name: &str,
382 source_type: &str,
383 source_path: &str,
384 opts: &CsvReadOptions,
385) -> Result<DataTable> {
386 let mut loader = StreamCsvLoader::new();
387 loader.load_csv_from_reader_with_opts(reader, table_name, source_type, source_path, opts)
388}
389
390pub fn parse_json_records(content: &str) -> Result<Vec<JsonValue>> {
398 let trimmed = content.trim_start();
399 if trimmed.starts_with('[') {
400 return serde_json::from_str(content).with_context(|| "Failed to parse JSON array");
401 }
402
403 let mut out = Vec::new();
404 for (idx, raw_line) in content.lines().enumerate() {
405 let line = raw_line.trim();
406 if line.is_empty() {
407 continue;
408 }
409 let value: JsonValue = serde_json::from_str(line)
410 .with_context(|| format!("Failed to parse JSONL at line {}", idx + 1))?;
411 out.push(value);
412 }
413 Ok(out)
414}
415
416pub fn navigate_json_path(value: &JsonValue, path: &str) -> Result<JsonValue> {
437 let parts: Vec<&str> = path.split('.').filter(|p| !p.is_empty()).collect();
438 walk_json_path(value, &parts)
439}
440
441fn walk_json_path(value: &JsonValue, parts: &[&str]) -> Result<JsonValue> {
442 let Some((head, tail)) = parts.split_first() else {
443 return Ok(value.clone());
444 };
445
446 if let Some(name) = head.strip_suffix("[]") {
449 let array_val = if name.is_empty() {
450 value
451 } else {
452 value
453 .get(name)
454 .ok_or_else(|| anyhow::anyhow!("Path '{}' not found in JSON", name))?
455 };
456 let arr = array_val.as_array().ok_or_else(|| {
457 anyhow::anyhow!(
458 "Expected array at '{}' for [] projection, got {}",
459 if name.is_empty() { "<root>" } else { name },
460 json_kind(array_val)
461 )
462 })?;
463 let mut projected = Vec::with_capacity(arr.len());
464 for el in arr {
465 projected.push(walk_json_path(el, tail)?);
466 }
467 return Ok(JsonValue::Array(projected));
468 }
469
470 let next = value
471 .get(head)
472 .ok_or_else(|| anyhow::anyhow!("Path '{}' not found in JSON", head))?;
473 walk_json_path(next, tail)
474}
475
476fn json_kind(value: &JsonValue) -> &'static str {
478 match value {
479 JsonValue::Null => "null",
480 JsonValue::Bool(_) => "bool",
481 JsonValue::Number(_) => "number",
482 JsonValue::String(_) => "string",
483 JsonValue::Array(_) => "array",
484 JsonValue::Object(_) => "object",
485 }
486}
487
488pub fn collect_column_names(records: &[JsonValue], sample_size: usize) -> Vec<String> {
492 let mut seen: HashSet<String> = HashSet::new();
493 let mut names: Vec<String> = Vec::new();
494 for record in records.iter().take(sample_size) {
495 if let Some(obj) = record.as_object() {
496 for key in obj.keys() {
497 if seen.insert(key.clone()) {
498 names.push(key.clone());
499 }
500 }
501 }
502 }
503 names
504}
505
506pub fn load_json_from_reader<R: Read>(
511 mut reader: R,
512 table_name: &str,
513 source_type: &str,
514 source_path: &str,
515) -> Result<DataTable> {
516 let mut json_str = String::new();
517 reader.read_to_string(&mut json_str)?;
518
519 let json_data: Vec<JsonValue> = parse_json_records(&json_str)?;
520
521 if json_data.is_empty() {
522 return Ok(DataTable::new(table_name));
523 }
524
525 let column_names = collect_column_names(&json_data, 100);
529 if column_names.is_empty() {
530 return Err(anyhow::anyhow!(
531 "JSON data must contain objects (got non-object records)"
532 ));
533 }
534
535 let mut table = DataTable::new(table_name);
536
537 table
539 .metadata
540 .insert("source_type".to_string(), source_type.to_string());
541 table
542 .metadata
543 .insert("source_path".to_string(), source_path.to_string());
544
545 for name in &column_names {
546 table.add_column(DataColumn::new(name));
547 }
548
549 let mut string_rows = Vec::new();
551 for json_obj in &json_data {
552 if let Some(obj) = json_obj.as_object() {
553 let mut row = Vec::new();
554 for col_name in &column_names {
555 let value = obj
556 .get(col_name)
557 .map(|v| json_value_to_string(v))
558 .unwrap_or_default();
559 row.push(value);
560 }
561 string_rows.push(row);
562 }
563 }
564
565 let mut column_types = vec![DataType::Null; column_names.len()];
567 let sample_size = string_rows.len().min(100);
568
569 for row in string_rows.iter().take(sample_size) {
570 for (col_idx, value) in row.iter().enumerate() {
571 if !value.is_empty() && value != "null" {
572 let inferred = DataType::infer_from_string(value);
573 column_types[col_idx] = column_types[col_idx].merge(&inferred);
574 }
575 }
576 }
577
578 for (col_idx, column) in table.columns.iter_mut().enumerate() {
580 column.data_type = column_types[col_idx].clone();
581 }
582
583 for string_row in &string_rows {
585 let mut values = Vec::new();
586 for (col_idx, value) in string_row.iter().enumerate() {
587 let data_value = if value.is_empty() || value == "null" {
588 DataValue::Null
589 } else {
590 DataValue::from_string(value, &column_types[col_idx])
591 };
592 values.push(data_value);
593 }
594 table
595 .add_row(DataRow::new(values))
596 .map_err(|e| anyhow::anyhow!(e))?;
597 }
598
599 table.infer_column_types();
601
602 Ok(table)
603}
604
605fn json_value_to_string(value: &JsonValue) -> String {
607 match value {
608 JsonValue::Null => String::new(),
609 JsonValue::Bool(b) => b.to_string(),
610 JsonValue::Number(n) => n.to_string(),
611 JsonValue::String(s) => s.clone(),
612 JsonValue::Array(arr) => format!("{:?}", arr),
613 JsonValue::Object(obj) => format!("{:?}", obj),
614 }
615}
616
617fn is_null_field(raw_line: &str, field_index: usize, delimiter: char) -> bool {
620 let mut delim_count = 0;
621 let mut in_quotes = false;
622 let mut field_start = 0;
623 let mut prev_char = ' ';
624
625 for (i, ch) in raw_line.char_indices() {
626 if ch == '"' && prev_char != '\\' {
627 in_quotes = !in_quotes;
628 } else if ch == delimiter && !in_quotes {
629 if delim_count == field_index {
630 return i == field_start
632 || (i == field_start + 1
633 && raw_line.chars().nth(field_start) == Some(delimiter));
634 }
635 delim_count += 1;
636 field_start = i + 1;
637 }
638 prev_char = ch;
639 }
640
641 if delim_count == field_index {
643 let remaining = raw_line[field_start..].trim_end();
644 return remaining.is_empty() || remaining.chars().next() == Some(delimiter);
645 }
646
647 false
648}
649
650#[cfg(test)]
651mod tests {
652 use super::*;
653 use std::io::Cursor;
654
655 #[test]
658 fn test_navigate_json_path_descends_object_key() {
659 let doc = serde_json::json!({
661 "count": 2,
662 "project": [{"id": "a"}, {"id": "b"}]
663 });
664 let extracted = navigate_json_path(&doc, "project").unwrap();
665 assert!(extracted.is_array());
666 assert_eq!(extracted.as_array().unwrap().len(), 2);
667 }
668
669 #[test]
670 fn test_navigate_json_path_nested_descent() {
671 let doc = serde_json::json!({
673 "projects": {"project": [{"id": "a"}, {"id": "b"}, {"id": "c"}]}
674 });
675 let extracted = navigate_json_path(&doc, "projects.project").unwrap();
676 assert_eq!(extracted.as_array().unwrap().len(), 3);
677 }
678
679 #[test]
680 fn test_navigate_json_path_array_projection() {
681 let doc = serde_json::json!({
683 "hits": {"hits": [
684 {"_source": {"id": 1}},
685 {"_source": {"id": 2}}
686 ]}
687 });
688 let extracted = navigate_json_path(&doc, "hits.hits[]._source").unwrap();
689 let arr = extracted.as_array().unwrap();
690 assert_eq!(arr.len(), 2);
691 assert_eq!(arr[1]["id"], serde_json::json!(2));
692 }
693
694 #[test]
695 fn test_navigate_json_path_bare_projection_over_root_array() {
696 let doc = serde_json::json!([{"v": {"x": 1}}, {"v": {"x": 2}}]);
697 let extracted = navigate_json_path(&doc, "[].v").unwrap();
698 let arr = extracted.as_array().unwrap();
699 assert_eq!(arr[0]["x"], serde_json::json!(1));
700 }
701
702 #[test]
703 fn test_navigate_json_path_empty_path_is_identity() {
704 let doc = serde_json::json!({"a": 1});
705 let extracted = navigate_json_path(&doc, "").unwrap();
706 assert_eq!(extracted, doc);
707 }
708
709 #[test]
710 fn test_navigate_json_path_missing_key_errors() {
711 let doc = serde_json::json!({"a": 1});
712 let err = navigate_json_path(&doc, "b").unwrap_err();
713 assert!(err.to_string().contains("not found"), "{}", err);
714 }
715
716 #[test]
717 fn test_navigate_json_path_projection_on_non_array_errors() {
718 let doc = serde_json::json!({"a": {"not": "an array"}});
719 let err = navigate_json_path(&doc, "a[]").unwrap_err();
720 let msg = err.to_string();
721 assert!(msg.contains("Expected array"), "{}", msg);
722 assert!(msg.contains("object"), "{}", msg);
723 }
724
725 #[test]
726 fn test_csv_from_reader() {
727 let csv_data = "id,name,value\n1,Alice,100\n2,Bob,200\n3,,300";
728 let reader = Cursor::new(csv_data);
729
730 let table =
731 load_csv_from_reader(reader, "test", "stream", "memory").expect("Failed to load CSV");
732
733 assert_eq!(table.name, "test");
734 assert_eq!(table.column_count(), 3);
735 assert_eq!(table.row_count(), 3);
736
737 let value = table.get_value(2, 1).unwrap();
739 assert!(matches!(value, DataValue::Null));
740 }
741
742 #[test]
743 fn test_json_from_reader() {
744 let json_data = r#"[
745 {"id": 1, "name": "Alice", "value": 100},
746 {"id": 2, "name": "Bob", "value": 200},
747 {"id": 3, "name": null, "value": 300}
748 ]"#;
749 let reader = Cursor::new(json_data);
750
751 let table =
752 load_json_from_reader(reader, "test", "stream", "memory").expect("Failed to load JSON");
753
754 assert_eq!(table.name, "test");
755 assert_eq!(table.column_count(), 3);
756 assert_eq!(table.row_count(), 3);
757
758 let value = table.get_value(2, 1).unwrap();
760 assert!(matches!(value, DataValue::Null));
761 }
762
763 #[test]
764 fn test_jsonl_from_reader() {
765 let jsonl_data = "{\"id\":1,\"name\":\"Alice\"}\n{\"id\":2,\"name\":\"Bob\"}\n";
766 let reader = Cursor::new(jsonl_data);
767
768 let table = load_json_from_reader(reader, "test", "stream", "memory")
769 .expect("Failed to load JSONL");
770
771 assert_eq!(table.column_count(), 2);
772 assert_eq!(table.row_count(), 2);
773 }
774
775 #[test]
776 fn test_jsonl_heterogeneous_schema_unioned() {
777 let jsonl_data = "{\"id\":1}\n{\"id\":2,\"extra\":\"hi\"}\n";
780 let reader = Cursor::new(jsonl_data);
781 let table = load_json_from_reader(reader, "test", "stream", "memory").expect("load");
782 assert_eq!(table.column_count(), 2);
783 assert_eq!(table.row_count(), 2);
784 }
785
786 #[test]
787 fn test_jsonl_skips_blank_lines() {
788 let jsonl_data = "{\"id\":1}\n\n\n{\"id\":2}\n";
789 let reader = Cursor::new(jsonl_data);
790 let table = load_json_from_reader(reader, "test", "stream", "memory").expect("load");
791 assert_eq!(table.row_count(), 2);
792 }
793
794 #[test]
795 fn test_parse_json_records_array_form() {
796 let recs = parse_json_records(r#"[{"a":1},{"a":2}]"#).unwrap();
797 assert_eq!(recs.len(), 2);
798 }
799
800 #[test]
801 fn test_parse_json_records_jsonl_form() {
802 let recs = parse_json_records("{\"a\":1}\n{\"a\":2}\n").unwrap();
803 assert_eq!(recs.len(), 2);
804 }
805
806 #[test]
807 fn test_parse_json_records_jsonl_error_cites_line() {
808 let err = parse_json_records("{\"a\":1}\nnot json\n").unwrap_err();
809 assert!(err.to_string().contains("line 2"));
810 }
811
812 #[test]
815 fn test_csv_options_default_is_comma() {
816 let opts = CsvReadOptions::default();
817 assert_eq!(opts.delimiter, b',');
818 assert!(opts.has_headers);
819 }
820
821 #[test]
822 fn test_detect_delimiter_from_path() {
823 assert_eq!(detect_delimiter_from_path("data.tsv"), b'\t');
824 assert_eq!(detect_delimiter_from_path("data.TSV"), b'\t');
825 assert_eq!(detect_delimiter_from_path("/tmp/foo.psv"), b'|');
826 assert_eq!(detect_delimiter_from_path("data.PSV"), b'|');
827 assert_eq!(detect_delimiter_from_path("data.csv"), b',');
828 assert_eq!(detect_delimiter_from_path("noext"), b',');
829 assert_eq!(detect_delimiter_from_path("-"), b',');
830 }
831
832 #[test]
833 fn test_load_csv_with_pipe_delimiter() {
834 let data = "id|name|score\n1|alice|10\n2|bob|20\n";
835 let reader = Cursor::new(data);
836 let opts = CsvReadOptions {
837 delimiter: b'|',
838 has_headers: true,
839 };
840 let table = load_csv_from_reader_with_opts(reader, "psv", "test", "memory", &opts)
841 .expect("load failed");
842 assert_eq!(table.column_count(), 3);
843 assert_eq!(table.row_count(), 2);
844 assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(1));
845 assert_eq!(
846 table.get_value(1, 1).unwrap(),
847 &DataValue::String("bob".to_string())
848 );
849 }
850
851 #[test]
852 fn test_load_csv_with_tab_delimiter() {
853 let data = "id\tname\tscore\n1\talice\t10\n2\tbob\t20\n";
854 let reader = Cursor::new(data);
855 let opts = CsvReadOptions {
856 delimiter: b'\t',
857 has_headers: true,
858 };
859 let table = load_csv_from_reader_with_opts(reader, "tsv", "test", "memory", &opts)
860 .expect("load failed");
861 assert_eq!(table.column_count(), 3);
862 assert_eq!(table.row_count(), 2);
863 assert_eq!(table.get_value(0, 0).unwrap(), &DataValue::Integer(1));
864 }
865
866 #[test]
867 fn test_metadata_records_delimiter() {
868 let table = load_csv_from_reader(Cursor::new("a,b\n1,2\n"), "t", "test", "memory").unwrap();
870 assert_eq!(
871 table.metadata.get("delimiter").map(String::as_str),
872 Some(",")
873 );
874
875 let opts = CsvReadOptions {
877 delimiter: b'\t',
878 has_headers: true,
879 };
880 let table = load_csv_from_reader_with_opts(
881 Cursor::new("a\tb\n1\t2\n"),
882 "t",
883 "test",
884 "memory",
885 &opts,
886 )
887 .unwrap();
888 assert_eq!(
889 table.metadata.get("delimiter").map(String::as_str),
890 Some("\\t")
891 );
892 }
893
894 #[test]
895 fn test_parse_delimiter_arg_accepts_single_char() {
896 assert_eq!(parse_delimiter_arg(",").unwrap(), b',');
897 assert_eq!(parse_delimiter_arg("|").unwrap(), b'|');
898 assert_eq!(parse_delimiter_arg(";").unwrap(), b';');
899 }
900
901 #[test]
902 fn test_parse_delimiter_arg_accepts_backslash_escapes() {
903 assert_eq!(parse_delimiter_arg("\\t").unwrap(), b'\t');
904 assert_eq!(parse_delimiter_arg("\t").unwrap(), b'\t');
905 assert_eq!(parse_delimiter_arg("\\n").unwrap(), b'\n');
906 assert_eq!(parse_delimiter_arg("\\r").unwrap(), b'\r');
907 }
908
909 #[test]
910 fn test_parse_delimiter_arg_rejects_multi_char() {
911 let err = parse_delimiter_arg("||").unwrap_err();
912 assert!(err.to_string().contains("single ASCII character"));
913 }
914
915 #[test]
916 fn test_parse_delimiter_arg_rejects_non_ascii() {
917 let err = parse_delimiter_arg("ö").unwrap_err();
918 assert!(err.to_string().contains("single ASCII character"));
919 }
920
921 #[test]
922 fn test_resolve_delimiter_explicit_wins() {
923 assert_eq!(resolve_delimiter("data.psv", Some(b',')), b',');
924 assert_eq!(resolve_delimiter("data.tsv", Some(b';')), b';');
925 assert_eq!(resolve_delimiter("data.csv", Some(b'|')), b'|');
926 }
927
928 #[test]
929 fn test_resolve_delimiter_falls_back_to_extension() {
930 assert_eq!(resolve_delimiter("data.psv", None), b'|');
931 assert_eq!(resolve_delimiter("data.tsv", None), b'\t');
932 assert_eq!(resolve_delimiter("data.csv", None), b',');
933 assert_eq!(resolve_delimiter("data.dat", None), b',');
934 }
935
936 #[test]
937 fn test_null_detection_works_with_pipe_delimiter() {
938 let data = "id|name|score\n1||10\n";
940 let opts = CsvReadOptions {
941 delimiter: b'|',
942 has_headers: true,
943 };
944 let table =
945 load_csv_from_reader_with_opts(Cursor::new(data), "psv", "test", "memory", &opts)
946 .expect("load failed");
947 assert!(matches!(table.get_value(0, 1).unwrap(), DataValue::Null));
948 }
949}