1use anyhow::{Context, Result};
5use csv::ReaderBuilder;
6use serde_json::Value as JsonValue;
7use std::collections::{HashMap, HashSet};
8use std::io::{BufRead, BufReader, Read};
9use tracing::{debug, info};
10
11use crate::data::advanced_csv_loader::StringInterner;
12use crate::data::datatable::{DataColumn, DataRow, DataTable, DataType, DataValue};
13
14#[derive(Debug)]
16struct ColumnAnalysis {
17 index: usize,
18 _name: String,
19 _cardinality: usize,
20 _sample_size: usize,
21 _unique_ratio: f64,
22 is_categorical: bool,
23 _avg_string_length: usize,
24}
25
26pub struct StreamCsvLoader {
28 sample_size: usize,
29 cardinality_threshold: f64,
30 interners: HashMap<usize, StringInterner>,
31}
32
33impl StreamCsvLoader {
34 pub fn new() -> Self {
35 Self {
36 sample_size: 1000,
37 cardinality_threshold: 0.3,
38 interners: HashMap::new(),
39 }
40 }
41
42 fn analyze_columns(
44 &self,
45 rows: &[Vec<String>],
46 headers: &csv::StringRecord,
47 ) -> Vec<ColumnAnalysis> {
48 let mut analyses = Vec::new();
49
50 for (col_idx, header) in headers.iter().enumerate() {
51 let mut unique_values = HashSet::new();
52 let mut total_length = 0;
53 let mut non_empty_count = 0;
54
55 for row in rows.iter().take(self.sample_size) {
57 if let Some(value) = row.get(col_idx) {
58 if !value.is_empty() {
59 unique_values.insert(value.clone());
60 total_length += value.len();
61 non_empty_count += 1;
62 }
63 }
64 }
65
66 let cardinality = unique_values.len();
67 let sample_size = rows.len().min(self.sample_size);
68 let unique_ratio = if sample_size > 0 {
69 cardinality as f64 / sample_size as f64
70 } else {
71 1.0
72 };
73
74 let avg_string_length = if non_empty_count > 0 {
75 total_length / non_empty_count
76 } else {
77 0
78 };
79
80 let is_categorical = unique_ratio < self.cardinality_threshold
82 || (avg_string_length < 20 && cardinality < sample_size / 2);
83
84 analyses.push(ColumnAnalysis {
85 index: col_idx,
86 _name: header.to_string(),
87 _cardinality: cardinality,
88 _sample_size: sample_size,
89 _unique_ratio: unique_ratio,
90 is_categorical,
91 _avg_string_length: avg_string_length,
92 });
93 }
94
95 analyses
96 }
97
98 pub fn load_csv_from_reader<R: Read>(
100 &mut self,
101 mut reader: R,
102 table_name: &str,
103 source_type: &str,
104 source_path: &str,
105 ) -> Result<DataTable> {
106 info!(
107 "Stream CSV load: Loading {} with optimizations",
108 source_path
109 );
110
111 let mut buffer = Vec::new();
113 reader.read_to_end(&mut buffer)?;
114
115 let mut csv_reader = ReaderBuilder::new()
117 .has_headers(true)
118 .from_reader(&buffer[..]);
119
120 let headers = csv_reader.headers()?.clone();
121 let mut table = DataTable::new(table_name);
122
123 table
125 .metadata
126 .insert("source_type".to_string(), source_type.to_string());
127 table
128 .metadata
129 .insert("source_path".to_string(), source_path.to_string());
130
131 for header in &headers {
133 table.add_column(DataColumn::new(header));
134 }
135
136 let mut string_rows = Vec::new();
138 for result in csv_reader.records() {
139 let record = result?;
140 let row: Vec<String> = record.iter().map(|s| s.to_string()).collect();
141 string_rows.push(row);
142 }
143
144 let analyses = self.analyze_columns(&string_rows, &headers);
146 let categorical_columns: HashSet<usize> = analyses
147 .iter()
148 .filter(|a| a.is_categorical)
149 .map(|a| a.index)
150 .collect();
151
152 info!(
153 "Column analysis: {} of {} columns will use string interning",
154 categorical_columns.len(),
155 analyses.len()
156 );
157
158 for col_idx in &categorical_columns {
160 self.interners.insert(*col_idx, StringInterner::new());
161 }
162
163 let mut line_reader = BufReader::new(&buffer[..]);
165 let mut raw_lines = Vec::new();
166 let mut raw_line = String::new();
167
168 line_reader.read_line(&mut raw_line)?;
170 raw_line.clear();
171
172 for _ in 0..string_rows.len() {
174 line_reader.read_line(&mut raw_line)?;
175 raw_lines.push(raw_line.clone());
176 raw_line.clear();
177 }
178
179 let mut column_types = vec![DataType::Null; headers.len()];
181 let sample_size = string_rows.len().min(100);
182
183 for row in string_rows.iter().take(sample_size) {
184 for (col_idx, value) in row.iter().enumerate() {
185 if !value.is_empty() {
186 let inferred = DataType::infer_from_string(value);
187 column_types[col_idx] = column_types[col_idx].merge(&inferred);
188 }
189 }
190 }
191
192 for (col_idx, column) in table.columns.iter_mut().enumerate() {
194 column.data_type = column_types[col_idx].clone();
195 }
196
197 for (row_idx, string_row) in string_rows.iter().enumerate() {
199 let mut values = Vec::new();
200 let raw_line = &raw_lines[row_idx];
201
202 for (col_idx, value) in string_row.iter().enumerate() {
203 let data_value = if value.is_empty() {
204 if is_null_field(raw_line, col_idx) {
206 DataValue::Null
207 } else if categorical_columns.contains(&col_idx) {
208 if let Some(interner) = self.interners.get_mut(&col_idx) {
210 DataValue::InternedString(interner.intern(""))
211 } else {
212 DataValue::String(String::new())
213 }
214 } else {
215 DataValue::String(String::new())
216 }
217 } else if categorical_columns.contains(&col_idx)
218 && column_types[col_idx] == DataType::String
219 {
220 if let Some(interner) = self.interners.get_mut(&col_idx) {
222 DataValue::InternedString(interner.intern(value))
223 } else {
224 DataValue::from_string(value, &column_types[col_idx])
225 }
226 } else {
227 DataValue::from_string(value, &column_types[col_idx])
228 };
229 values.push(data_value);
230 }
231 table
232 .add_row(DataRow::new(values))
233 .map_err(|e| anyhow::anyhow!(e))?;
234 }
235
236 for (col_idx, interner) in &self.interners {
238 let stats = interner.stats();
239 if stats.memory_saved_bytes > 0 {
240 debug!(
241 "Column {} interning: {} unique strings, {} references, {} bytes saved",
242 headers.get(*col_idx).unwrap_or(&String::new()),
243 stats.unique_strings,
244 stats.total_references,
245 stats.memory_saved_bytes
246 );
247 }
248 }
249
250 table.infer_column_types();
252
253 Ok(table)
254 }
255}
256
257pub fn load_csv_from_reader<R: Read>(
259 reader: R,
260 table_name: &str,
261 source_type: &str,
262 source_path: &str,
263) -> Result<DataTable> {
264 let mut loader = StreamCsvLoader::new();
265 loader.load_csv_from_reader(reader, table_name, source_type, source_path)
266}
267
268pub fn parse_json_records(content: &str) -> Result<Vec<JsonValue>> {
276 let trimmed = content.trim_start();
277 if trimmed.starts_with('[') {
278 return serde_json::from_str(content).with_context(|| "Failed to parse JSON array");
279 }
280
281 let mut out = Vec::new();
282 for (idx, raw_line) in content.lines().enumerate() {
283 let line = raw_line.trim();
284 if line.is_empty() {
285 continue;
286 }
287 let value: JsonValue = serde_json::from_str(line)
288 .with_context(|| format!("Failed to parse JSONL at line {}", idx + 1))?;
289 out.push(value);
290 }
291 Ok(out)
292}
293
294pub fn collect_column_names(records: &[JsonValue], sample_size: usize) -> Vec<String> {
298 let mut seen: HashSet<String> = HashSet::new();
299 let mut names: Vec<String> = Vec::new();
300 for record in records.iter().take(sample_size) {
301 if let Some(obj) = record.as_object() {
302 for key in obj.keys() {
303 if seen.insert(key.clone()) {
304 names.push(key.clone());
305 }
306 }
307 }
308 }
309 names
310}
311
312pub fn load_json_from_reader<R: Read>(
317 mut reader: R,
318 table_name: &str,
319 source_type: &str,
320 source_path: &str,
321) -> Result<DataTable> {
322 let mut json_str = String::new();
323 reader.read_to_string(&mut json_str)?;
324
325 let json_data: Vec<JsonValue> = parse_json_records(&json_str)?;
326
327 if json_data.is_empty() {
328 return Ok(DataTable::new(table_name));
329 }
330
331 let column_names = collect_column_names(&json_data, 100);
335 if column_names.is_empty() {
336 return Err(anyhow::anyhow!(
337 "JSON data must contain objects (got non-object records)"
338 ));
339 }
340
341 let mut table = DataTable::new(table_name);
342
343 table
345 .metadata
346 .insert("source_type".to_string(), source_type.to_string());
347 table
348 .metadata
349 .insert("source_path".to_string(), source_path.to_string());
350
351 for name in &column_names {
352 table.add_column(DataColumn::new(name));
353 }
354
355 let mut string_rows = Vec::new();
357 for json_obj in &json_data {
358 if let Some(obj) = json_obj.as_object() {
359 let mut row = Vec::new();
360 for col_name in &column_names {
361 let value = obj
362 .get(col_name)
363 .map(|v| json_value_to_string(v))
364 .unwrap_or_default();
365 row.push(value);
366 }
367 string_rows.push(row);
368 }
369 }
370
371 let mut column_types = vec![DataType::Null; column_names.len()];
373 let sample_size = string_rows.len().min(100);
374
375 for row in string_rows.iter().take(sample_size) {
376 for (col_idx, value) in row.iter().enumerate() {
377 if !value.is_empty() && value != "null" {
378 let inferred = DataType::infer_from_string(value);
379 column_types[col_idx] = column_types[col_idx].merge(&inferred);
380 }
381 }
382 }
383
384 for (col_idx, column) in table.columns.iter_mut().enumerate() {
386 column.data_type = column_types[col_idx].clone();
387 }
388
389 for string_row in &string_rows {
391 let mut values = Vec::new();
392 for (col_idx, value) in string_row.iter().enumerate() {
393 let data_value = if value.is_empty() || value == "null" {
394 DataValue::Null
395 } else {
396 DataValue::from_string(value, &column_types[col_idx])
397 };
398 values.push(data_value);
399 }
400 table
401 .add_row(DataRow::new(values))
402 .map_err(|e| anyhow::anyhow!(e))?;
403 }
404
405 table.infer_column_types();
407
408 Ok(table)
409}
410
411fn json_value_to_string(value: &JsonValue) -> String {
413 match value {
414 JsonValue::Null => String::new(),
415 JsonValue::Bool(b) => b.to_string(),
416 JsonValue::Number(n) => n.to_string(),
417 JsonValue::String(s) => s.clone(),
418 JsonValue::Array(arr) => format!("{:?}", arr),
419 JsonValue::Object(obj) => format!("{:?}", obj),
420 }
421}
422
423fn is_null_field(raw_line: &str, field_index: usize) -> bool {
425 let mut comma_count = 0;
426 let mut in_quotes = false;
427 let mut field_start = 0;
428 let mut prev_char = ' ';
429
430 for (i, ch) in raw_line.char_indices() {
431 if ch == '"' && prev_char != '\\' {
432 in_quotes = !in_quotes;
433 } else if ch == ',' && !in_quotes {
434 if comma_count == field_index {
435 return i == field_start
437 || (i == field_start + 1 && raw_line.chars().nth(field_start) == Some(','));
438 }
439 comma_count += 1;
440 field_start = i + 1;
441 }
442 prev_char = ch;
443 }
444
445 if comma_count == field_index {
447 let remaining = raw_line[field_start..].trim_end();
448 return remaining.is_empty() || remaining == ",";
449 }
450
451 false
452}
453
454#[cfg(test)]
455mod tests {
456 use super::*;
457 use std::io::Cursor;
458
459 #[test]
460 fn test_csv_from_reader() {
461 let csv_data = "id,name,value\n1,Alice,100\n2,Bob,200\n3,,300";
462 let reader = Cursor::new(csv_data);
463
464 let table =
465 load_csv_from_reader(reader, "test", "stream", "memory").expect("Failed to load CSV");
466
467 assert_eq!(table.name, "test");
468 assert_eq!(table.column_count(), 3);
469 assert_eq!(table.row_count(), 3);
470
471 let value = table.get_value(2, 1).unwrap();
473 assert!(matches!(value, DataValue::Null));
474 }
475
476 #[test]
477 fn test_json_from_reader() {
478 let json_data = r#"[
479 {"id": 1, "name": "Alice", "value": 100},
480 {"id": 2, "name": "Bob", "value": 200},
481 {"id": 3, "name": null, "value": 300}
482 ]"#;
483 let reader = Cursor::new(json_data);
484
485 let table =
486 load_json_from_reader(reader, "test", "stream", "memory").expect("Failed to load JSON");
487
488 assert_eq!(table.name, "test");
489 assert_eq!(table.column_count(), 3);
490 assert_eq!(table.row_count(), 3);
491
492 let value = table.get_value(2, 1).unwrap();
494 assert!(matches!(value, DataValue::Null));
495 }
496
497 #[test]
498 fn test_jsonl_from_reader() {
499 let jsonl_data = "{\"id\":1,\"name\":\"Alice\"}\n{\"id\":2,\"name\":\"Bob\"}\n";
500 let reader = Cursor::new(jsonl_data);
501
502 let table = load_json_from_reader(reader, "test", "stream", "memory")
503 .expect("Failed to load JSONL");
504
505 assert_eq!(table.column_count(), 2);
506 assert_eq!(table.row_count(), 2);
507 }
508
509 #[test]
510 fn test_jsonl_heterogeneous_schema_unioned() {
511 let jsonl_data = "{\"id\":1}\n{\"id\":2,\"extra\":\"hi\"}\n";
514 let reader = Cursor::new(jsonl_data);
515 let table = load_json_from_reader(reader, "test", "stream", "memory").expect("load");
516 assert_eq!(table.column_count(), 2);
517 assert_eq!(table.row_count(), 2);
518 }
519
520 #[test]
521 fn test_jsonl_skips_blank_lines() {
522 let jsonl_data = "{\"id\":1}\n\n\n{\"id\":2}\n";
523 let reader = Cursor::new(jsonl_data);
524 let table = load_json_from_reader(reader, "test", "stream", "memory").expect("load");
525 assert_eq!(table.row_count(), 2);
526 }
527
528 #[test]
529 fn test_parse_json_records_array_form() {
530 let recs = parse_json_records(r#"[{"a":1},{"a":2}]"#).unwrap();
531 assert_eq!(recs.len(), 2);
532 }
533
534 #[test]
535 fn test_parse_json_records_jsonl_form() {
536 let recs = parse_json_records("{\"a\":1}\n{\"a\":2}\n").unwrap();
537 assert_eq!(recs.len(), 2);
538 }
539
540 #[test]
541 fn test_parse_json_records_jsonl_error_cites_line() {
542 let err = parse_json_records("{\"a\":1}\nnot json\n").unwrap_err();
543 assert!(err.to_string().contains("line 2"));
544 }
545}