sql_cli/data/
direct_csv_loader.rs1use crate::data::datatable::{DataColumn, DataRow, DataTable};
3use crate::data::value_parsing::parse_value;
4use anyhow::Result;
5use csv;
6use std::fs::File;
7use std::io::{BufRead, BufReader};
8use std::path::Path;
9use tracing::{debug, info};
10
11pub struct DirectCsvLoader;
12
13impl DirectCsvLoader {
14 pub fn load_csv_direct<P: AsRef<Path>>(path: P, table_name: &str) -> Result<DataTable> {
16 let path = path.as_ref();
17 info!("Direct CSV load: Loading {} into DataTable", path.display());
18
19 crate::utils::memory_tracker::track_memory("direct_csv_start");
21
22 let file = File::open(path)?;
23 let mut reader = csv::Reader::from_reader(file);
24
25 let headers = reader.headers()?.clone(); let mut table = DataTable::new(table_name);
28
29 for header in &headers {
30 table.add_column(DataColumn::new(header.to_string()));
31 }
32
33 crate::utils::memory_tracker::track_memory("direct_csv_headers");
34
35 let file2 = File::open(path)?;
37 let mut line_reader = BufReader::new(file2);
38 let mut raw_line = String::new();
39 line_reader.read_line(&mut raw_line)?;
41
42 let mut row_count = 0;
44 for result in reader.records() {
45 let record = result?;
46
47 raw_line.clear();
49 line_reader.read_line(&mut raw_line)?;
50
51 let mut values = Vec::with_capacity(headers.len());
52
53 for (i, field) in record.iter().enumerate() {
54 let is_null = field.is_empty() && Self::is_null_field(&raw_line, i);
56 let value = parse_value(field, is_null);
57 values.push(value);
58 }
59
60 table
61 .add_row(DataRow::new(values))
62 .map_err(|e| anyhow::anyhow!(e))?;
63 row_count += 1;
64
65 if row_count % 5000 == 0 {
67 crate::utils::memory_tracker::track_memory(&format!("direct_csv_{row_count}rows"));
68 }
69 }
70
71 table.infer_column_types();
73
74 crate::utils::memory_tracker::track_memory("direct_csv_complete");
75
76 info!(
77 "Direct CSV load complete: {} rows, {} columns, ~{} MB",
78 table.row_count(),
79 table.column_count(),
80 table.estimate_memory_size() / 1024 / 1024
81 );
82
83 Ok(table)
84 }
85
86 fn is_null_field(raw_line: &str, field_index: usize) -> bool {
88 let mut comma_count = 0;
89 let mut in_quotes = false;
90 let mut field_start = 0;
91 let mut prev_char = ' ';
92
93 for (i, ch) in raw_line.chars().enumerate() {
94 if ch == '"' && prev_char != '\\' {
95 in_quotes = !in_quotes;
96 }
97
98 if ch == ',' && !in_quotes {
99 if comma_count == field_index {
100 let field_end = i;
101 let field_content = &raw_line[field_start..field_end].trim();
102 if field_content.is_empty() {
104 return true; }
106 if field_content.starts_with('"')
108 && field_content.ends_with('"')
109 && field_content.len() == 2
110 {
111 return false; }
113 return false; }
115 comma_count += 1;
116 field_start = i + 1;
117 }
118 prev_char = ch;
119 }
120
121 if comma_count == field_index {
123 let field_content = raw_line[field_start..]
124 .trim()
125 .trim_end_matches('\n')
126 .trim_end_matches('\r');
127 if field_content.is_empty() {
129 return true; }
131 if field_content.starts_with('"')
133 && field_content.ends_with('"')
134 && field_content.len() == 2
135 {
136 return false; }
138 return false; }
140
141 false }
143
144 pub fn query_datatable(table: &DataTable, sql: &str) -> Result<DataTable> {
146 debug!("Direct query on DataTable: {}", sql);
149
150 if sql.trim().to_uppercase().starts_with("SELECT *") {
152 Ok(table.clone())
153 } else {
154 Ok(table.clone())
156 }
157 }
158}