grafeo_core/execution/operators/
load_csv.rs1use std::collections::BTreeMap;
4use std::fs::File;
5use std::io::{BufRead, BufReader};
6use std::sync::Arc;
7
8use super::{Operator, OperatorError, OperatorResult};
9use crate::execution::chunk::DataChunkBuilder;
10use grafeo_common::types::{ArcStr, LogicalType, PropertyKey, Value};
11
12pub struct LoadCsvOperator {
17 reader: Option<BufReader<File>>,
19 headers: Option<Vec<String>>,
21 with_headers: bool,
23 path: String,
25 delimiter: u8,
27 opened: bool,
29}
30
31impl LoadCsvOperator {
32 pub fn new(
34 path: String,
35 with_headers: bool,
36 field_terminator: Option<char>,
37 _variable: String,
38 ) -> Self {
39 let delimiter = field_terminator.map_or(b',', |c| {
40 let mut buf = [0u8; 4];
41 c.encode_utf8(&mut buf);
42 buf[0]
43 });
44
45 Self {
46 reader: None,
47 headers: None,
48 with_headers,
49 path,
50 delimiter,
51 opened: false,
52 }
53 }
54
55 fn open(&mut self) -> Result<(), OperatorError> {
57 let file_path = self
59 .path
60 .strip_prefix("file:///")
61 .or_else(|| self.path.strip_prefix("file://"))
62 .unwrap_or(&self.path);
63
64 let file = File::open(file_path).map_err(|e| {
65 OperatorError::Execution(format!("Failed to open CSV file '{}': {}", self.path, e))
66 })?;
67 let mut reader = BufReader::new(file);
68
69 if self.with_headers {
70 let mut header_line = String::new();
71 reader.read_line(&mut header_line).map_err(|e| {
72 OperatorError::Execution(format!("Failed to read CSV headers: {e}"))
73 })?;
74 let header_line = header_line.strip_prefix('\u{feff}').unwrap_or(&header_line);
76 let header_line = header_line.trim_end_matches(['\r', '\n']);
77 self.headers = Some(parse_csv_row(header_line, self.delimiter));
78 }
79
80 self.reader = Some(reader);
81 self.opened = true;
82 Ok(())
83 }
84}
85
86impl Operator for LoadCsvOperator {
87 fn next(&mut self) -> OperatorResult {
88 if !self.opened {
89 self.open()?;
90 }
91
92 let reader = self
93 .reader
94 .as_mut()
95 .ok_or_else(|| OperatorError::Execution("CSV reader not initialized".to_string()))?;
96
97 let mut line = String::new();
98 loop {
99 line.clear();
100 let bytes_read = reader
101 .read_line(&mut line)
102 .map_err(|e| OperatorError::Execution(format!("Failed to read CSV line: {e}")))?;
103
104 if bytes_read == 0 {
105 return Ok(None); }
107
108 let trimmed = line.trim_end_matches(['\r', '\n']);
109 if trimmed.is_empty() {
110 continue; }
112
113 let fields = parse_csv_row(trimmed, self.delimiter);
114
115 let row_value = if let Some(headers) = &self.headers {
116 let mut map = BTreeMap::new();
118 for (i, header) in headers.iter().enumerate() {
119 let value = fields.get(i).map_or(Value::Null, |s| {
120 if s.is_empty() {
121 Value::Null
122 } else {
123 Value::String(ArcStr::from(s.as_str()))
124 }
125 });
126 map.insert(PropertyKey::from(header.as_str()), value);
127 }
128 Value::Map(Arc::new(map))
129 } else {
130 let values: Vec<Value> = fields
132 .into_iter()
133 .map(|s| {
134 if s.is_empty() {
135 Value::Null
136 } else {
137 Value::String(ArcStr::from(s.as_str()))
138 }
139 })
140 .collect();
141 Value::List(Arc::from(values))
142 };
143
144 let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
146 if let Some(col) = builder.column_mut(0) {
147 col.push_value(row_value);
148 }
149 builder.advance_row();
150 return Ok(Some(builder.finish()));
151 }
152 }
153
154 fn reset(&mut self) {
155 self.reader = None;
156 self.headers = None;
157 self.opened = false;
158 }
159
160 fn name(&self) -> &'static str {
161 "LoadCsv"
162 }
163}
164
165fn parse_csv_row(line: &str, delimiter: u8) -> Vec<String> {
172 let delim = delimiter as char;
173 let mut fields = Vec::new();
174 let mut chars = line.chars().peekable();
175 let mut field = String::new();
176
177 loop {
178 if chars.peek() == Some(&'"') {
179 chars.next(); loop {
182 match chars.next() {
183 Some('"') => {
184 if chars.peek() == Some(&'"') {
185 chars.next();
187 field.push('"');
188 } else {
189 break;
191 }
192 }
193 Some(c) => field.push(c),
194 None => break, }
196 }
197 match chars.peek() {
199 Some(c) if *c == delim => {
200 chars.next();
201 }
202 _ => {}
203 }
204 fields.push(std::mem::take(&mut field));
205 } else {
206 loop {
208 match chars.peek() {
209 Some(c) if *c == delim => {
210 chars.next();
211 break;
212 }
213 Some(_) => {
214 field.push(chars.next().unwrap());
215 }
216 None => break,
217 }
218 }
219 fields.push(std::mem::take(&mut field));
220 }
221
222 if chars.peek().is_none() {
223 break;
224 }
225 }
226
227 fields
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233
234 #[test]
235 fn test_parse_csv_simple() {
236 let fields = parse_csv_row("a,b,c", b',');
237 assert_eq!(fields, vec!["a", "b", "c"]);
238 }
239
240 #[test]
241 fn test_parse_csv_quoted() {
242 let fields = parse_csv_row(r#""hello","world""#, b',');
243 assert_eq!(fields, vec!["hello", "world"]);
244 }
245
246 #[test]
247 fn test_parse_csv_escaped_quotes() {
248 let fields = parse_csv_row(r#""say ""hi""","ok""#, b',');
249 assert_eq!(fields, vec![r#"say "hi""#, "ok"]);
250 }
251
252 #[test]
253 fn test_parse_csv_delimiter_in_quoted() {
254 let fields = parse_csv_row(r#""a,b",c"#, b',');
255 assert_eq!(fields, vec!["a,b", "c"]);
256 }
257
258 #[test]
259 fn test_parse_csv_empty_fields() {
260 let fields = parse_csv_row("a,,c", b',');
261 assert_eq!(fields, vec!["a", "", "c"]);
262 }
263
264 #[test]
265 fn test_parse_csv_tab_delimiter() {
266 let fields = parse_csv_row("a\tb\tc", b'\t');
267 assert_eq!(fields, vec!["a", "b", "c"]);
268 }
269
270 #[test]
271 fn test_parse_csv_single_field() {
272 let fields = parse_csv_row("hello", b',');
273 assert_eq!(fields, vec!["hello"]);
274 }
275}