1use anyhow::{Context, Result};
5use csv::ReaderBuilder;
6use serde_json::Value as JsonValue;
7use std::collections::{HashMap, HashSet};
8use std::io::{BufRead, BufReader, Read};
9use tracing::{debug, info};
10
11use crate::data::advanced_csv_loader::StringInterner;
12use crate::data::datatable::{DataColumn, DataRow, DataTable, DataType, DataValue};
13
14#[derive(Debug)]
16struct ColumnAnalysis {
17 index: usize,
18 name: String,
19 cardinality: usize,
20 sample_size: usize,
21 unique_ratio: f64,
22 is_categorical: bool,
23 avg_string_length: usize,
24}
25
26pub struct StreamCsvLoader {
28 sample_size: usize,
29 cardinality_threshold: f64,
30 interners: HashMap<usize, StringInterner>,
31}
32
33impl StreamCsvLoader {
34 pub fn new() -> Self {
35 Self {
36 sample_size: 1000,
37 cardinality_threshold: 0.3,
38 interners: HashMap::new(),
39 }
40 }
41
42 fn analyze_columns(
44 &self,
45 rows: &[Vec<String>],
46 headers: &csv::StringRecord,
47 ) -> Vec<ColumnAnalysis> {
48 let mut analyses = Vec::new();
49
50 for (col_idx, header) in headers.iter().enumerate() {
51 let mut unique_values = HashSet::new();
52 let mut total_length = 0;
53 let mut non_empty_count = 0;
54
55 for row in rows.iter().take(self.sample_size) {
57 if let Some(value) = row.get(col_idx) {
58 if !value.is_empty() {
59 unique_values.insert(value.clone());
60 total_length += value.len();
61 non_empty_count += 1;
62 }
63 }
64 }
65
66 let cardinality = unique_values.len();
67 let sample_size = rows.len().min(self.sample_size);
68 let unique_ratio = if sample_size > 0 {
69 cardinality as f64 / sample_size as f64
70 } else {
71 1.0
72 };
73
74 let avg_string_length = if non_empty_count > 0 {
75 total_length / non_empty_count
76 } else {
77 0
78 };
79
80 let is_categorical = unique_ratio < self.cardinality_threshold
82 || (avg_string_length < 20 && cardinality < sample_size / 2);
83
84 analyses.push(ColumnAnalysis {
85 index: col_idx,
86 name: header.to_string(),
87 cardinality,
88 sample_size,
89 unique_ratio,
90 is_categorical,
91 avg_string_length,
92 });
93 }
94
95 analyses
96 }
97
98 pub fn load_csv_from_reader<R: Read>(
100 &mut self,
101 mut reader: R,
102 table_name: &str,
103 source_type: &str,
104 source_path: &str,
105 ) -> Result<DataTable> {
106 info!(
107 "Stream CSV load: Loading {} with optimizations",
108 source_path
109 );
110
111 let mut buffer = Vec::new();
113 reader.read_to_end(&mut buffer)?;
114
115 let mut csv_reader = ReaderBuilder::new()
117 .has_headers(true)
118 .from_reader(&buffer[..]);
119
120 let headers = csv_reader.headers()?.clone();
121 let mut table = DataTable::new(table_name);
122
123 table
125 .metadata
126 .insert("source_type".to_string(), source_type.to_string());
127 table
128 .metadata
129 .insert("source_path".to_string(), source_path.to_string());
130
131 for header in &headers {
133 table.add_column(DataColumn::new(header));
134 }
135
136 let mut string_rows = Vec::new();
138 for result in csv_reader.records() {
139 let record = result?;
140 let row: Vec<String> = record.iter().map(|s| s.to_string()).collect();
141 string_rows.push(row);
142 }
143
144 let analyses = self.analyze_columns(&string_rows, &headers);
146 let categorical_columns: HashSet<usize> = analyses
147 .iter()
148 .filter(|a| a.is_categorical)
149 .map(|a| a.index)
150 .collect();
151
152 info!(
153 "Column analysis: {} of {} columns will use string interning",
154 categorical_columns.len(),
155 analyses.len()
156 );
157
158 for col_idx in &categorical_columns {
160 self.interners.insert(*col_idx, StringInterner::new());
161 }
162
163 let mut line_reader = BufReader::new(&buffer[..]);
165 let mut raw_lines = Vec::new();
166 let mut raw_line = String::new();
167
168 line_reader.read_line(&mut raw_line)?;
170 raw_line.clear();
171
172 for _ in 0..string_rows.len() {
174 line_reader.read_line(&mut raw_line)?;
175 raw_lines.push(raw_line.clone());
176 raw_line.clear();
177 }
178
179 let mut column_types = vec![DataType::Null; headers.len()];
181 let sample_size = string_rows.len().min(100);
182
183 for row in string_rows.iter().take(sample_size) {
184 for (col_idx, value) in row.iter().enumerate() {
185 if !value.is_empty() {
186 let inferred = DataType::infer_from_string(value);
187 column_types[col_idx] = column_types[col_idx].merge(&inferred);
188 }
189 }
190 }
191
192 for (col_idx, column) in table.columns.iter_mut().enumerate() {
194 column.data_type = column_types[col_idx].clone();
195 }
196
197 for (row_idx, string_row) in string_rows.iter().enumerate() {
199 let mut values = Vec::new();
200 let raw_line = &raw_lines[row_idx];
201
202 for (col_idx, value) in string_row.iter().enumerate() {
203 let data_value = if value.is_empty() {
204 if is_null_field(raw_line, col_idx) {
206 DataValue::Null
207 } else if categorical_columns.contains(&col_idx) {
208 if let Some(interner) = self.interners.get_mut(&col_idx) {
210 DataValue::InternedString(interner.intern(""))
211 } else {
212 DataValue::String(String::new())
213 }
214 } else {
215 DataValue::String(String::new())
216 }
217 } else if categorical_columns.contains(&col_idx)
218 && column_types[col_idx] == DataType::String
219 {
220 if let Some(interner) = self.interners.get_mut(&col_idx) {
222 DataValue::InternedString(interner.intern(value))
223 } else {
224 DataValue::from_string(value, &column_types[col_idx])
225 }
226 } else {
227 DataValue::from_string(value, &column_types[col_idx])
228 };
229 values.push(data_value);
230 }
231 table
232 .add_row(DataRow::new(values))
233 .map_err(|e| anyhow::anyhow!(e))?;
234 }
235
236 for (col_idx, interner) in &self.interners {
238 let stats = interner.stats();
239 if stats.memory_saved_bytes > 0 {
240 debug!(
241 "Column {} interning: {} unique strings, {} references, {} bytes saved",
242 headers.get(*col_idx).unwrap_or(&String::new()),
243 stats.unique_strings,
244 stats.total_references,
245 stats.memory_saved_bytes
246 );
247 }
248 }
249
250 table.infer_column_types();
252
253 Ok(table)
254 }
255}
256
257pub fn load_csv_from_reader<R: Read>(
259 reader: R,
260 table_name: &str,
261 source_type: &str,
262 source_path: &str,
263) -> Result<DataTable> {
264 let mut loader = StreamCsvLoader::new();
265 loader.load_csv_from_reader(reader, table_name, source_type, source_path)
266}
267
268pub fn load_json_from_reader<R: Read>(
270 mut reader: R,
271 table_name: &str,
272 source_type: &str,
273 source_path: &str,
274) -> Result<DataTable> {
275 let mut json_str = String::new();
276 reader.read_to_string(&mut json_str)?;
277
278 let json_data: Vec<JsonValue> =
279 serde_json::from_str(&json_str).with_context(|| "Failed to parse JSON data")?;
280
281 if json_data.is_empty() {
282 return Ok(DataTable::new(table_name));
283 }
284
285 let first_obj = json_data[0]
287 .as_object()
288 .context("JSON data must be an array of objects")?;
289
290 let mut table = DataTable::new(table_name);
291
292 table
294 .metadata
295 .insert("source_type".to_string(), source_type.to_string());
296 table
297 .metadata
298 .insert("source_path".to_string(), source_path.to_string());
299
300 let column_names: Vec<String> = first_obj.keys().cloned().collect();
302 for name in &column_names {
303 table.add_column(DataColumn::new(name));
304 }
305
306 let mut string_rows = Vec::new();
308 for json_obj in &json_data {
309 if let Some(obj) = json_obj.as_object() {
310 let mut row = Vec::new();
311 for col_name in &column_names {
312 let value = obj
313 .get(col_name)
314 .map(|v| json_value_to_string(v))
315 .unwrap_or_default();
316 row.push(value);
317 }
318 string_rows.push(row);
319 }
320 }
321
322 let mut column_types = vec![DataType::Null; column_names.len()];
324 let sample_size = string_rows.len().min(100);
325
326 for row in string_rows.iter().take(sample_size) {
327 for (col_idx, value) in row.iter().enumerate() {
328 if !value.is_empty() && value != "null" {
329 let inferred = DataType::infer_from_string(value);
330 column_types[col_idx] = column_types[col_idx].merge(&inferred);
331 }
332 }
333 }
334
335 for (col_idx, column) in table.columns.iter_mut().enumerate() {
337 column.data_type = column_types[col_idx].clone();
338 }
339
340 for string_row in &string_rows {
342 let mut values = Vec::new();
343 for (col_idx, value) in string_row.iter().enumerate() {
344 let data_value = if value.is_empty() || value == "null" {
345 DataValue::Null
346 } else {
347 DataValue::from_string(value, &column_types[col_idx])
348 };
349 values.push(data_value);
350 }
351 table
352 .add_row(DataRow::new(values))
353 .map_err(|e| anyhow::anyhow!(e))?;
354 }
355
356 table.infer_column_types();
358
359 Ok(table)
360}
361
362fn json_value_to_string(value: &JsonValue) -> String {
364 match value {
365 JsonValue::Null => String::new(),
366 JsonValue::Bool(b) => b.to_string(),
367 JsonValue::Number(n) => n.to_string(),
368 JsonValue::String(s) => s.clone(),
369 JsonValue::Array(arr) => format!("{:?}", arr),
370 JsonValue::Object(obj) => format!("{:?}", obj),
371 }
372}
373
374fn is_null_field(raw_line: &str, field_index: usize) -> bool {
376 let mut comma_count = 0;
377 let mut in_quotes = false;
378 let mut field_start = 0;
379 let mut prev_char = ' ';
380
381 for (i, ch) in raw_line.char_indices() {
382 if ch == '"' && prev_char != '\\' {
383 in_quotes = !in_quotes;
384 } else if ch == ',' && !in_quotes {
385 if comma_count == field_index {
386 return i == field_start
388 || (i == field_start + 1 && raw_line.chars().nth(field_start) == Some(','));
389 }
390 comma_count += 1;
391 field_start = i + 1;
392 }
393 prev_char = ch;
394 }
395
396 if comma_count == field_index {
398 let remaining = raw_line[field_start..].trim_end();
399 return remaining.is_empty() || remaining == ",";
400 }
401
402 false
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408 use std::io::Cursor;
409
410 #[test]
411 fn test_csv_from_reader() {
412 let csv_data = "id,name,value\n1,Alice,100\n2,Bob,200\n3,,300";
413 let reader = Cursor::new(csv_data);
414
415 let table =
416 load_csv_from_reader(reader, "test", "stream", "memory").expect("Failed to load CSV");
417
418 assert_eq!(table.name, "test");
419 assert_eq!(table.column_count(), 3);
420 assert_eq!(table.row_count(), 3);
421
422 let value = table.get_value(2, 1).unwrap();
424 assert!(matches!(value, DataValue::Null));
425 }
426
427 #[test]
428 fn test_json_from_reader() {
429 let json_data = r#"[
430 {"id": 1, "name": "Alice", "value": 100},
431 {"id": 2, "name": "Bob", "value": 200},
432 {"id": 3, "name": null, "value": 300}
433 ]"#;
434 let reader = Cursor::new(json_data);
435
436 let table =
437 load_json_from_reader(reader, "test", "stream", "memory").expect("Failed to load JSON");
438
439 assert_eq!(table.name, "test");
440 assert_eq!(table.column_count(), 3);
441 assert_eq!(table.row_count(), 3);
442
443 let value = table.get_value(2, 1).unwrap();
445 assert!(matches!(value, DataValue::Null));
446 }
447}