sql_cli/data/
direct_csv_loader.rs1use crate::data::datatable::{DataColumn, DataRow, DataTable, DataValue};
3use anyhow::Result;
4use csv;
5use std::fs::File;
6use std::io::{BufRead, BufReader};
7use std::path::Path;
8use tracing::{debug, info};
9
10pub struct DirectCsvLoader;
11
12impl DirectCsvLoader {
13 pub fn load_csv_direct<P: AsRef<Path>>(path: P, table_name: &str) -> Result<DataTable> {
15 let path = path.as_ref();
16 info!("Direct CSV load: Loading {} into DataTable", path.display());
17
18 crate::utils::memory_tracker::track_memory("direct_csv_start");
20
21 let file = File::open(path)?;
22 let mut reader = csv::Reader::from_reader(file);
23
24 let headers = reader.headers()?.clone(); let mut table = DataTable::new(table_name);
27
28 for header in &headers {
29 table.add_column(DataColumn::new(header.to_string()));
30 }
31
32 crate::utils::memory_tracker::track_memory("direct_csv_headers");
33
34 let file2 = File::open(path)?;
36 let mut line_reader = BufReader::new(file2);
37 let mut raw_line = String::new();
38 line_reader.read_line(&mut raw_line)?;
40
41 let mut row_count = 0;
43 for result in reader.records() {
44 let record = result?;
45
46 raw_line.clear();
48 line_reader.read_line(&mut raw_line)?;
49
50 let mut values = Vec::with_capacity(headers.len());
51
52 for (i, field) in record.iter().enumerate() {
53 let value = if field.is_empty() {
55 if Self::is_null_field(&raw_line, i) {
56 DataValue::Null
57 } else {
58 DataValue::String(String::new())
59 }
60 } else if let Ok(b) = field.parse::<bool>() {
61 DataValue::Boolean(b)
62 } else if let Ok(i) = field.parse::<i64>() {
63 DataValue::Integer(i)
64 } else if let Ok(f) = field.parse::<f64>() {
65 DataValue::Float(f)
66 } else {
67 if field.contains('-') && field.len() >= 8 && field.len() <= 30 {
69 DataValue::DateTime(field.to_string())
70 } else {
71 DataValue::String(field.to_string())
72 }
73 };
74 values.push(value);
75 }
76
77 table
78 .add_row(DataRow::new(values))
79 .map_err(|e| anyhow::anyhow!(e))?;
80 row_count += 1;
81
82 if row_count % 5000 == 0 {
84 crate::utils::memory_tracker::track_memory(&format!("direct_csv_{row_count}rows"));
85 }
86 }
87
88 table.infer_column_types();
90
91 crate::utils::memory_tracker::track_memory("direct_csv_complete");
92
93 info!(
94 "Direct CSV load complete: {} rows, {} columns, ~{} MB",
95 table.row_count(),
96 table.column_count(),
97 table.estimate_memory_size() / 1024 / 1024
98 );
99
100 Ok(table)
101 }
102
103 fn is_null_field(raw_line: &str, field_index: usize) -> bool {
105 let mut comma_count = 0;
106 let mut in_quotes = false;
107 let mut field_start = 0;
108 let mut prev_char = ' ';
109
110 for (i, ch) in raw_line.chars().enumerate() {
111 if ch == '"' && prev_char != '\\' {
112 in_quotes = !in_quotes;
113 }
114
115 if ch == ',' && !in_quotes {
116 if comma_count == field_index {
117 let field_end = i;
118 let field_content = &raw_line[field_start..field_end].trim();
119 if field_content.is_empty() {
121 return true; }
123 if field_content.starts_with('"')
125 && field_content.ends_with('"')
126 && field_content.len() == 2
127 {
128 return false; }
130 return false; }
132 comma_count += 1;
133 field_start = i + 1;
134 }
135 prev_char = ch;
136 }
137
138 if comma_count == field_index {
140 let field_content = raw_line[field_start..]
141 .trim()
142 .trim_end_matches('\n')
143 .trim_end_matches('\r');
144 if field_content.is_empty() {
146 return true; }
148 if field_content.starts_with('"')
150 && field_content.ends_with('"')
151 && field_content.len() == 2
152 {
153 return false; }
155 return false; }
157
158 false }
160
161 pub fn query_datatable(table: &DataTable, sql: &str) -> Result<DataTable> {
163 debug!("Direct query on DataTable: {}", sql);
166
167 if sql.trim().to_uppercase().starts_with("SELECT *") {
169 Ok(table.clone())
170 } else {
171 Ok(table.clone())
173 }
174 }
175}