Skip to main content

sql_cli/data/
datatable_loaders.rs

1use crate::data::stream_loader::{detect_delimiter_from_path, CsvReadOptions};
2use crate::datatable::{DataColumn, DataRow, DataTable, DataType, DataValue};
3use anyhow::{Context, Result};
4use csv::ReaderBuilder;
5use serde_json::Value as JsonValue;
6use std::collections::HashSet;
7use std::fs::File;
8use std::io::{BufRead, BufReader, Read};
9use std::path::Path;
10
11/// Helper to detect if a field in the raw CSV line is a null (unquoted empty).
12/// `delimiter` is the field separator character used in the source.
13fn is_null_field(raw_line: &str, field_index: usize, delimiter: char) -> bool {
14    let mut delim_count = 0;
15    let mut in_quotes = false;
16    let mut field_start = 0;
17    let mut prev_char = ' ';
18
19    for (i, ch) in raw_line.char_indices() {
20        if ch == '"' && prev_char != '\\' {
21            in_quotes = !in_quotes;
22        }
23
24        if ch == delimiter && !in_quotes {
25            if delim_count == field_index {
26                let field_end = i;
27                let field_content = &raw_line[field_start..field_end].trim();
28                // If empty, check if it was quoted (quoted empty = empty string, unquoted empty = NULL)
29                if field_content.is_empty() {
30                    return true; // Unquoted empty field -> NULL
31                }
32                // If it starts and ends with quotes but is empty inside, it's an empty string, not NULL
33                if field_content.starts_with('"')
34                    && field_content.ends_with('"')
35                    && field_content.len() == 2
36                {
37                    return false; // Quoted empty field -> empty string
38                }
39                return false; // Non-empty field -> not NULL
40            }
41            delim_count += 1;
42            field_start = i + ch.len_utf8();
43        }
44        prev_char = ch;
45    }
46
47    // Check last field
48    if delim_count == field_index {
49        let field_content = raw_line[field_start..]
50            .trim()
51            .trim_end_matches('\n')
52            .trim_end_matches('\r');
53        // If empty, check if it was quoted
54        if field_content.is_empty() {
55            return true; // Unquoted empty field -> NULL
56        }
57        // If it starts and ends with quotes but is empty inside, it's an empty string, not NULL
58        if field_content.starts_with('"')
59            && field_content.ends_with('"')
60            && field_content.len() == 2
61        {
62            return false; // Quoted empty field -> empty string
63        }
64        return false; // Non-empty field -> not NULL
65    }
66
67    false // Field not found -> not NULL (shouldn't happen)
68}
69
70/// Load a CSV file into a `DataTable`. Delimiter is auto-detected from the
71/// file extension (`.tsv` → tab, `.psv` → pipe, else comma). To override the
72/// auto-detect, use [`load_csv_to_datatable_with_opts`].
73pub fn load_csv_to_datatable<P: AsRef<Path>>(path: P, table_name: &str) -> Result<DataTable> {
74    let path_ref = path.as_ref();
75    let opts = CsvReadOptions {
76        delimiter: detect_delimiter_from_path(&path_ref.display().to_string()),
77        has_headers: true,
78    };
79    load_csv_to_datatable_with_opts(path, table_name, &opts)
80}
81
82/// Load a CSV file into a `DataTable` honouring caller-supplied options
83/// (delimiter, headers).
84pub fn load_csv_to_datatable_with_opts<P: AsRef<Path>>(
85    path: P,
86    table_name: &str,
87    opts: &CsvReadOptions,
88) -> Result<DataTable> {
89    let file = File::open(&path)
90        .with_context(|| format!("Failed to open CSV file: {:?}", path.as_ref()))?;
91
92    let mut reader = ReaderBuilder::new()
93        .has_headers(opts.has_headers)
94        .delimiter(opts.delimiter)
95        .from_reader(file);
96
97    // Get headers and create columns
98    let headers = reader.headers()?.clone();
99    let mut table = DataTable::new(table_name);
100
101    // Add metadata about the source
102    table
103        .metadata
104        .insert("source_type".to_string(), "csv".to_string());
105    table.metadata.insert(
106        "source_path".to_string(),
107        path.as_ref().display().to_string(),
108    );
109    table.metadata.insert(
110        "delimiter".to_string(),
111        match opts.delimiter {
112            b'\t' => "\\t".to_string(),
113            b'\n' => "\\n".to_string(),
114            b'\r' => "\\r".to_string(),
115            b => (b as char).to_string(),
116        },
117    );
118
119    // Create columns from headers (types will be inferred later)
120    for header in &headers {
121        table.add_column(DataColumn::new(header));
122    }
123
124    // Open a second file handle for raw line reading
125    let file2 = File::open(&path).with_context(|| {
126        format!(
127            "Failed to open CSV file for raw reading: {:?}",
128            path.as_ref()
129        )
130    })?;
131    let mut line_reader = BufReader::new(file2);
132    let mut raw_line = String::new();
133    // Skip header line
134    line_reader.read_line(&mut raw_line)?;
135
136    // Read all rows first to collect data
137    let mut string_rows = Vec::new();
138    let mut raw_lines = Vec::new();
139
140    for result in reader.records() {
141        let record = result?;
142        let row: Vec<String> = record
143            .iter()
144            .map(std::string::ToString::to_string)
145            .collect();
146
147        // Read the corresponding raw line
148        raw_line.clear();
149        line_reader.read_line(&mut raw_line)?;
150        raw_lines.push(raw_line.clone());
151
152        string_rows.push(row);
153    }
154
155    // Infer column types by sampling the data
156    let mut column_types = vec![DataType::Null; headers.len()];
157    let sample_size = string_rows.len().min(100); // Sample first 100 rows for type inference
158
159    for row in string_rows.iter().take(sample_size) {
160        for (col_idx, value) in row.iter().enumerate() {
161            if !value.is_empty() {
162                let inferred = DataType::infer_from_string(value);
163                column_types[col_idx] = column_types[col_idx].merge(&inferred);
164            }
165        }
166    }
167
168    // Update column types
169    for (col_idx, column) in table.columns.iter_mut().enumerate() {
170        column.data_type = column_types[col_idx].clone();
171    }
172
173    // Convert string data to typed DataValues and add rows
174    for (row_idx, string_row) in string_rows.iter().enumerate() {
175        let mut values = Vec::new();
176        let raw_line = &raw_lines[row_idx];
177
178        for (col_idx, value) in string_row.iter().enumerate() {
179            let data_value = if value.is_empty() {
180                // Distinguish between NULL (,,) and empty string ("")
181                if is_null_field(raw_line, col_idx, opts.delimiter as char) {
182                    DataValue::Null
183                } else {
184                    DataValue::String(String::new())
185                }
186            } else {
187                DataValue::from_string(value, &column_types[col_idx])
188            };
189            values.push(data_value);
190        }
191        table
192            .add_row(DataRow::new(values))
193            .map_err(|e| anyhow::anyhow!(e))?;
194    }
195
196    // Update column statistics
197    table.infer_column_types();
198
199    Ok(table)
200}
201
202/// Load a JSON file into a `DataTable`.
203///
204/// Accepts either a JSON array of objects (`[{...}, {...}]`) or JSONL
205/// (one JSON object per line). Format is auto-detected.
206pub fn load_json_to_datatable<P: AsRef<Path>>(path: P, table_name: &str) -> Result<DataTable> {
207    // Read file as string first to preserve key order
208    let mut file = File::open(&path)
209        .with_context(|| format!("Failed to open JSON file: {:?}", path.as_ref()))?;
210    let mut json_str = String::new();
211    file.read_to_string(&mut json_str)?;
212
213    let json_data: Vec<JsonValue> = crate::data::stream_loader::parse_json_records(&json_str)?;
214
215    if json_data.is_empty() {
216        return Ok(DataTable::new(table_name));
217    }
218
219    // Schema is the union of keys across the first 100 records so heterogeneous
220    // JSONL streams don't silently drop columns missing on the first object.
221    let column_names = crate::data::stream_loader::collect_column_names(&json_data, 100);
222    if column_names.is_empty() {
223        return Err(anyhow::anyhow!(
224            "JSON data must contain objects (got non-object records)"
225        ));
226    }
227
228    let mut table = DataTable::new(table_name);
229
230    // Add metadata
231    table
232        .metadata
233        .insert("source_type".to_string(), "json".to_string());
234    table.metadata.insert(
235        "source_path".to_string(),
236        path.as_ref().display().to_string(),
237    );
238
239    for name in &column_names {
240        table.add_column(DataColumn::new(name));
241    }
242
243    // Collect all values as strings first for type inference
244    let mut string_rows = Vec::new();
245    for json_obj in &json_data {
246        if let Some(obj) = json_obj.as_object() {
247            let mut row = Vec::new();
248            for name in &column_names {
249                let value_str = match obj.get(name) {
250                    Some(JsonValue::Null) | None => String::new(),
251                    Some(JsonValue::Bool(b)) => b.to_string(),
252                    Some(JsonValue::Number(n)) => n.to_string(),
253                    Some(JsonValue::String(s)) => s.clone(),
254                    Some(JsonValue::Array(arr)) => format!("{arr:?}"), // Arrays as debug string for now
255                    Some(JsonValue::Object(obj)) => format!("{obj:?}"), // Objects as debug string for now
256                };
257                row.push(value_str);
258            }
259            string_rows.push(row);
260        }
261    }
262
263    // Infer column types
264    let mut column_types = vec![DataType::Null; column_names.len()];
265    let sample_size = string_rows.len().min(100);
266
267    for row in string_rows.iter().take(sample_size) {
268        for (col_idx, value) in row.iter().enumerate() {
269            if !value.is_empty() {
270                let inferred = DataType::infer_from_string(value);
271                column_types[col_idx] = column_types[col_idx].merge(&inferred);
272            }
273        }
274    }
275
276    // Update column types
277    for (col_idx, column) in table.columns.iter_mut().enumerate() {
278        column.data_type = column_types[col_idx].clone();
279    }
280
281    // Convert to DataRows
282    for string_row in string_rows {
283        let mut values = Vec::new();
284        for (col_idx, value) in string_row.iter().enumerate() {
285            let data_value = DataValue::from_string(value, &column_types[col_idx]);
286            values.push(data_value);
287        }
288        table
289            .add_row(DataRow::new(values))
290            .map_err(|e| anyhow::anyhow!(e))?;
291    }
292
293    // Update statistics
294    table.infer_column_types();
295
296    Ok(table)
297}
298
299/// Load JSON data directly (already parsed) into a `DataTable`
300pub fn load_json_data_to_datatable(data: Vec<JsonValue>, table_name: &str) -> Result<DataTable> {
301    if data.is_empty() {
302        return Ok(DataTable::new(table_name));
303    }
304
305    // Extract column names from all objects (union of all keys)
306    let mut all_columns = HashSet::new();
307    for item in &data {
308        if let Some(obj) = item.as_object() {
309            for key in obj.keys() {
310                all_columns.insert(key.clone());
311            }
312        }
313    }
314
315    let column_names: Vec<String> = all_columns.into_iter().collect();
316    let mut table = DataTable::new(table_name);
317
318    // Add metadata
319    table
320        .metadata
321        .insert("source_type".to_string(), "json_data".to_string());
322
323    // Create columns
324    for name in &column_names {
325        table.add_column(DataColumn::new(name));
326    }
327
328    // Process data similar to file loading
329    let mut string_rows = Vec::new();
330    for json_obj in &data {
331        if let Some(obj) = json_obj.as_object() {
332            let mut row = Vec::new();
333            for name in &column_names {
334                let value_str = match obj.get(name) {
335                    Some(JsonValue::Null) | None => String::new(),
336                    Some(JsonValue::Bool(b)) => b.to_string(),
337                    Some(JsonValue::Number(n)) => n.to_string(),
338                    Some(JsonValue::String(s)) => s.clone(),
339                    Some(JsonValue::Array(arr)) => format!("{arr:?}"),
340                    Some(JsonValue::Object(obj)) => format!("{obj:?}"),
341                };
342                row.push(value_str);
343            }
344            string_rows.push(row);
345        }
346    }
347
348    // Infer types and convert to DataRows (same as above)
349    let mut column_types = vec![DataType::Null; column_names.len()];
350    let sample_size = string_rows.len().min(100);
351
352    for row in string_rows.iter().take(sample_size) {
353        for (col_idx, value) in row.iter().enumerate() {
354            if !value.is_empty() {
355                let inferred = DataType::infer_from_string(value);
356                column_types[col_idx] = column_types[col_idx].merge(&inferred);
357            }
358        }
359    }
360
361    for (col_idx, column) in table.columns.iter_mut().enumerate() {
362        column.data_type = column_types[col_idx].clone();
363    }
364
365    for string_row in string_rows {
366        let mut values = Vec::new();
367        for (col_idx, value) in string_row.iter().enumerate() {
368            let data_value = DataValue::from_string(value, &column_types[col_idx]);
369            values.push(data_value);
370        }
371        table
372            .add_row(DataRow::new(values))
373            .map_err(|e| anyhow::anyhow!(e))?;
374    }
375
376    table.infer_column_types();
377
378    Ok(table)
379}
380
381#[cfg(test)]
382mod tests {
383    use super::*;
384    use std::io::Write;
385    use tempfile::NamedTempFile;
386
387    #[test]
388    fn test_load_csv() -> Result<()> {
389        // Create a temporary CSV file
390        let mut temp_file = NamedTempFile::new()?;
391        writeln!(temp_file, "id,name,price,quantity")?;
392        writeln!(temp_file, "1,Widget,9.99,100")?;
393        writeln!(temp_file, "2,Gadget,19.99,50")?;
394        writeln!(temp_file, "3,Doohickey,5.00,200")?;
395        temp_file.flush()?;
396
397        let table = load_csv_to_datatable(temp_file.path(), "products")?;
398
399        assert_eq!(table.name, "products");
400        assert_eq!(table.column_count(), 4);
401        assert_eq!(table.row_count(), 3);
402
403        // Check column types were inferred correctly
404        assert_eq!(table.columns[0].name, "id");
405        assert_eq!(table.columns[0].data_type, DataType::Integer);
406
407        assert_eq!(table.columns[1].name, "name");
408        assert_eq!(table.columns[1].data_type, DataType::String);
409
410        assert_eq!(table.columns[2].name, "price");
411        assert_eq!(table.columns[2].data_type, DataType::Float);
412
413        assert_eq!(table.columns[3].name, "quantity");
414        assert_eq!(table.columns[3].data_type, DataType::Integer);
415
416        // Check data
417        let value = table.get_value_by_name(0, "name").unwrap();
418        assert_eq!(value.to_string(), "Widget");
419
420        Ok(())
421    }
422
423    #[test]
424    fn test_load_json() -> Result<()> {
425        // Create a temporary JSON file
426        let mut temp_file = NamedTempFile::new()?;
427        writeln!(
428            temp_file,
429            r#"[
430            {{"id": 1, "name": "Alice", "active": true, "score": 95.5}},
431            {{"id": 2, "name": "Bob", "active": false, "score": 87.3}},
432            {{"id": 3, "name": "Charlie", "active": true, "score": null}}
433        ]"#
434        )?;
435        temp_file.flush()?;
436
437        let table = load_json_to_datatable(temp_file.path(), "users")?;
438
439        assert_eq!(table.name, "users");
440        assert_eq!(table.column_count(), 4);
441        assert_eq!(table.row_count(), 3);
442
443        // Check that null handling works
444        let score = table.get_value_by_name(2, "score").unwrap();
445        assert!(score.is_null());
446
447        Ok(())
448    }
449
450    #[test]
451    fn test_load_csv_with_pipe_delimiter_via_opts() -> Result<()> {
452        let mut temp_file = NamedTempFile::new()?;
453        writeln!(temp_file, "id|name|price")?;
454        writeln!(temp_file, "1|Widget|9.99")?;
455        writeln!(temp_file, "2|Gadget|19.99")?;
456        temp_file.flush()?;
457
458        let opts = CsvReadOptions {
459            delimiter: b'|',
460            has_headers: true,
461        };
462        let table = load_csv_to_datatable_with_opts(temp_file.path(), "psv_products", &opts)?;
463
464        assert_eq!(table.column_count(), 3);
465        assert_eq!(table.row_count(), 2);
466        assert_eq!(table.columns[0].name, "id");
467        assert_eq!(table.columns[1].name, "name");
468        assert_eq!(table.columns[0].data_type, DataType::Integer);
469        assert_eq!(
470            table.get_value_by_name(0, "name").unwrap().to_string(),
471            "Widget"
472        );
473        assert_eq!(
474            table.metadata.get("delimiter").map(String::as_str),
475            Some("|")
476        );
477        Ok(())
478    }
479
480    #[test]
481    fn test_default_load_csv_records_comma_delimiter() -> Result<()> {
482        let mut temp_file = NamedTempFile::new()?;
483        writeln!(temp_file, "a,b")?;
484        writeln!(temp_file, "1,2")?;
485        temp_file.flush()?;
486
487        let table = load_csv_to_datatable(temp_file.path(), "t")?;
488        assert_eq!(
489            table.metadata.get("delimiter").map(String::as_str),
490            Some(",")
491        );
492        Ok(())
493    }
494}