Skip to main content

grafeo_core/execution/operators/
load_csv.rs

1//! LOAD CSV operator for reading CSV files and producing rows.
2
3use std::collections::BTreeMap;
4use std::fs::File;
5use std::io::{BufRead, BufReader};
6use std::sync::Arc;
7
8use super::{Operator, OperatorError, OperatorResult};
9use crate::execution::chunk::DataChunkBuilder;
10use grafeo_common::types::{ArcStr, LogicalType, PropertyKey, Value};
11
12/// Operator that reads a CSV file and produces one row per CSV record.
13///
14/// With headers: each row is a `Value::Map` with column names as keys.
15/// Without headers: each row is a `Value::List` of string values.
16pub struct LoadCsvOperator {
17    /// Buffered reader for the CSV file.
18    reader: Option<BufReader<File>>,
19    /// Column headers (if WITH HEADERS).
20    headers: Option<Vec<String>>,
21    /// Whether the CSV has headers.
22    with_headers: bool,
23    /// File path (for reset).
24    path: String,
25    /// Field separator.
26    delimiter: u8,
27    /// Whether the file has been opened.
28    opened: bool,
29}
30
31impl LoadCsvOperator {
32    /// Creates a new LOAD CSV operator.
33    pub fn new(
34        path: String,
35        with_headers: bool,
36        field_terminator: Option<char>,
37        _variable: String,
38    ) -> Self {
39        let delimiter = field_terminator.map_or(b',', |c| {
40            let mut buf = [0u8; 4];
41            c.encode_utf8(&mut buf);
42            buf[0]
43        });
44
45        Self {
46            reader: None,
47            headers: None,
48            with_headers,
49            path,
50            delimiter,
51            opened: false,
52        }
53    }
54
55    /// Opens the file and reads headers if needed.
56    fn open(&mut self) -> Result<(), OperatorError> {
57        // Strip file:/// prefix if present (Neo4j convention)
58        let file_path = self
59            .path
60            .strip_prefix("file:///")
61            .or_else(|| self.path.strip_prefix("file://"))
62            .unwrap_or(&self.path);
63
64        let file = File::open(file_path).map_err(|e| {
65            OperatorError::Execution(format!("Failed to open CSV file '{}': {}", self.path, e))
66        })?;
67        let mut reader = BufReader::new(file);
68
69        if self.with_headers {
70            let mut header_line = String::new();
71            reader.read_line(&mut header_line).map_err(|e| {
72                OperatorError::Execution(format!("Failed to read CSV headers: {e}"))
73            })?;
74            // Strip BOM if present
75            let header_line = header_line.strip_prefix('\u{feff}').unwrap_or(&header_line);
76            let header_line = header_line.trim_end_matches(['\r', '\n']);
77            self.headers = Some(parse_csv_row(header_line, self.delimiter));
78        }
79
80        self.reader = Some(reader);
81        self.opened = true;
82        Ok(())
83    }
84}
85
86impl Operator for LoadCsvOperator {
87    fn next(&mut self) -> OperatorResult {
88        if !self.opened {
89            self.open()?;
90        }
91
92        let reader = self
93            .reader
94            .as_mut()
95            .ok_or_else(|| OperatorError::Execution("CSV reader not initialized".to_string()))?;
96
97        let mut line = String::new();
98        loop {
99            line.clear();
100            let bytes_read = reader
101                .read_line(&mut line)
102                .map_err(|e| OperatorError::Execution(format!("Failed to read CSV line: {e}")))?;
103
104            if bytes_read == 0 {
105                return Ok(None); // EOF
106            }
107
108            let trimmed = line.trim_end_matches(['\r', '\n']);
109            if trimmed.is_empty() {
110                continue; // skip blank lines
111            }
112
113            let fields = parse_csv_row(trimmed, self.delimiter);
114
115            let row_value = if let Some(headers) = &self.headers {
116                // WITH HEADERS: produce a Map
117                let mut map = BTreeMap::new();
118                for (i, header) in headers.iter().enumerate() {
119                    let value = fields.get(i).map_or(Value::Null, |s| {
120                        if s.is_empty() {
121                            Value::Null
122                        } else {
123                            Value::String(ArcStr::from(s.as_str()))
124                        }
125                    });
126                    map.insert(PropertyKey::from(header.as_str()), value);
127                }
128                Value::Map(Arc::new(map))
129            } else {
130                // Without headers: produce a List
131                let values: Vec<Value> = fields
132                    .into_iter()
133                    .map(|s| {
134                        if s.is_empty() {
135                            Value::Null
136                        } else {
137                            Value::String(ArcStr::from(s.as_str()))
138                        }
139                    })
140                    .collect();
141                Value::List(Arc::from(values))
142            };
143
144            // Build a single-row DataChunk with one column (the row variable)
145            let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
146            if let Some(col) = builder.column_mut(0) {
147                col.push_value(row_value);
148            }
149            builder.advance_row();
150            return Ok(Some(builder.finish()));
151        }
152    }
153
154    fn reset(&mut self) {
155        self.reader = None;
156        self.headers = None;
157        self.opened = false;
158    }
159
160    fn name(&self) -> &'static str {
161        "LoadCsv"
162    }
163}
164
165/// Parses a single CSV row into fields, respecting quoted fields.
166///
167/// Handles:
168/// - Unquoted fields separated by the delimiter
169/// - Double-quoted fields (can contain delimiters, newlines, and escaped quotes)
170/// - Escaped quotes within quoted fields (`""` becomes `"`)
171fn parse_csv_row(line: &str, delimiter: u8) -> Vec<String> {
172    let delim = delimiter as char;
173    let mut fields = Vec::new();
174    let mut chars = line.chars().peekable();
175    let mut field = String::new();
176
177    loop {
178        if chars.peek() == Some(&'"') {
179            // Quoted field
180            chars.next(); // consume opening quote
181            loop {
182                match chars.next() {
183                    Some('"') => {
184                        if chars.peek() == Some(&'"') {
185                            // Escaped quote
186                            chars.next();
187                            field.push('"');
188                        } else {
189                            // End of quoted field
190                            break;
191                        }
192                    }
193                    Some(c) => field.push(c),
194                    None => break, // Unterminated quote, take what we have
195                }
196            }
197            // Skip to delimiter or end
198            match chars.peek() {
199                Some(c) if *c == delim => {
200                    chars.next();
201                }
202                _ => {}
203            }
204            fields.push(std::mem::take(&mut field));
205        } else {
206            // Unquoted field
207            loop {
208                match chars.peek() {
209                    Some(c) if *c == delim => {
210                        chars.next();
211                        break;
212                    }
213                    Some(_) => {
214                        field.push(chars.next().unwrap());
215                    }
216                    None => break,
217                }
218            }
219            fields.push(std::mem::take(&mut field));
220        }
221
222        if chars.peek().is_none() {
223            break;
224        }
225    }
226
227    fields
228}
229
230#[cfg(test)]
231mod tests {
232    use super::*;
233
234    #[test]
235    fn test_parse_csv_simple() {
236        let fields = parse_csv_row("a,b,c", b',');
237        assert_eq!(fields, vec!["a", "b", "c"]);
238    }
239
240    #[test]
241    fn test_parse_csv_quoted() {
242        let fields = parse_csv_row(r#""hello","world""#, b',');
243        assert_eq!(fields, vec!["hello", "world"]);
244    }
245
246    #[test]
247    fn test_parse_csv_escaped_quotes() {
248        let fields = parse_csv_row(r#""say ""hi""","ok""#, b',');
249        assert_eq!(fields, vec![r#"say "hi""#, "ok"]);
250    }
251
252    #[test]
253    fn test_parse_csv_delimiter_in_quoted() {
254        let fields = parse_csv_row(r#""a,b",c"#, b',');
255        assert_eq!(fields, vec!["a,b", "c"]);
256    }
257
258    #[test]
259    fn test_parse_csv_empty_fields() {
260        let fields = parse_csv_row("a,,c", b',');
261        assert_eq!(fields, vec!["a", "", "c"]);
262    }
263
264    #[test]
265    fn test_parse_csv_tab_delimiter() {
266        let fields = parse_csv_row("a\tb\tc", b'\t');
267        assert_eq!(fields, vec!["a", "b", "c"]);
268    }
269
270    #[test]
271    fn test_parse_csv_single_field() {
272        let fields = parse_csv_row("hello", b',');
273        assert_eq!(fields, vec!["hello"]);
274    }
275}