Skip to main content

grafeo_core/execution/operators/
load_data.rs

1//! LOAD DATA operator for reading CSV, JSONL, and Parquet files.
2
3use std::collections::BTreeMap;
4use std::fs::File;
5use std::io::{BufRead, BufReader};
6use std::sync::Arc;
7
8use super::{Operator, OperatorError, OperatorResult};
9use crate::execution::chunk::DataChunkBuilder;
10use grafeo_common::types::{ArcStr, LogicalType, PropertyKey, Value};
11
12/// File format for the load data operator.
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14#[non_exhaustive]
15pub enum LoadDataFormat {
16    /// CSV (comma-separated values).
17    Csv,
18    /// JSON Lines (one JSON object per line).
19    Jsonl,
20    /// Apache Parquet columnar format.
21    Parquet,
22}
23
24/// Operator that reads a data file and produces one row per record.
25///
26/// CSV with headers: each row is a `Value::Map` with column names as keys.
27/// CSV without headers: each row is a `Value::List` of string values.
28/// JSONL: each row is a `Value::Map` from JSON object fields.
29/// Parquet: each row is a `Value::Map` from column names.
30pub struct LoadDataOperator {
31    /// File format.
32    format: LoadDataFormat,
33    /// Buffered reader for CSV/JSONL files.
34    reader: Option<BufReader<File>>,
35    /// Column headers (if CSV WITH HEADERS).
36    headers: Option<Vec<String>>,
37    /// Whether the CSV has headers.
38    with_headers: bool,
39    /// File path (for reset).
40    path: String,
41    /// Field separator (CSV only).
42    delimiter: u8,
43    /// Whether the file has been opened.
44    opened: bool,
45    /// Buffered Parquet rows (read all upfront, then iterate).
46    #[cfg(feature = "parquet-import")]
47    parquet_rows: Option<std::vec::IntoIter<Value>>,
48}
49
50impl LoadDataOperator {
51    /// Creates a new LOAD DATA operator.
52    pub fn new(
53        path: String,
54        format: LoadDataFormat,
55        with_headers: bool,
56        field_terminator: Option<char>,
57        _variable: String,
58    ) -> Self {
59        let delimiter = field_terminator.map_or(b',', |c| {
60            let mut buf = [0u8; 4];
61            c.encode_utf8(&mut buf);
62            buf[0]
63        });
64
65        Self {
66            format,
67            reader: None,
68            headers: None,
69            with_headers,
70            path,
71            delimiter,
72            opened: false,
73            #[cfg(feature = "parquet-import")]
74            parquet_rows: None,
75        }
76    }
77
78    /// Opens the file and reads headers if needed (CSV/JSONL).
79    fn open_text(&mut self) -> Result<(), OperatorError> {
80        let file_path = strip_file_prefix(&self.path);
81
82        let file = File::open(file_path).map_err(|e| {
83            OperatorError::Execution(format!(
84                "Failed to open {} file '{}': {}",
85                format_name(self.format),
86                self.path,
87                e
88            ))
89        })?;
90        let mut reader = BufReader::new(file);
91
92        if self.format == LoadDataFormat::Csv && self.with_headers {
93            let mut header_line = String::new();
94            reader.read_line(&mut header_line).map_err(|e| {
95                OperatorError::Execution(format!("Failed to read CSV headers: {e}"))
96            })?;
97            // Strip BOM if present
98            let header_line = header_line.strip_prefix('\u{feff}').unwrap_or(&header_line);
99            let header_line = header_line.trim_end_matches(['\r', '\n']);
100            self.headers = Some(parse_csv_row(header_line, self.delimiter));
101        }
102
103        self.reader = Some(reader);
104        self.opened = true;
105        Ok(())
106    }
107
108    /// Reads the next CSV record.
109    fn next_csv(&mut self) -> OperatorResult {
110        let reader = self
111            .reader
112            .as_mut()
113            .ok_or_else(|| OperatorError::Execution("CSV reader not initialized".to_string()))?;
114
115        let mut line = String::new();
116        loop {
117            line.clear();
118            let bytes_read = reader
119                .read_line(&mut line)
120                .map_err(|e| OperatorError::Execution(format!("Failed to read CSV line: {e}")))?;
121
122            if bytes_read == 0 {
123                return Ok(None); // EOF
124            }
125
126            let trimmed = line.trim_end_matches(['\r', '\n']);
127            if trimmed.is_empty() {
128                continue; // skip blank lines
129            }
130
131            let fields = parse_csv_row(trimmed, self.delimiter);
132
133            let row_value = if let Some(headers) = &self.headers {
134                // WITH HEADERS: produce a Map
135                let mut map = BTreeMap::new();
136                for (i, header) in headers.iter().enumerate() {
137                    let value = fields.get(i).map_or(Value::Null, |s| {
138                        if s.is_empty() {
139                            Value::Null
140                        } else {
141                            Value::String(ArcStr::from(s.as_str()))
142                        }
143                    });
144                    map.insert(PropertyKey::from(header.as_str()), value);
145                }
146                Value::Map(Arc::new(map))
147            } else {
148                // Without headers: produce a List
149                let values: Vec<Value> = fields
150                    .into_iter()
151                    .map(|s| {
152                        if s.is_empty() {
153                            Value::Null
154                        } else {
155                            Value::String(ArcStr::from(s.as_str()))
156                        }
157                    })
158                    .collect();
159                Value::List(Arc::from(values))
160            };
161
162            return Ok(Some(build_single_row_chunk(row_value)));
163        }
164    }
165
166    /// Reads the next JSONL record.
167    #[cfg(feature = "jsonl-import")]
168    fn next_jsonl(&mut self) -> OperatorResult {
169        let reader = self
170            .reader
171            .as_mut()
172            .ok_or_else(|| OperatorError::Execution("JSONL reader not initialized".to_string()))?;
173
174        let mut line = String::new();
175        loop {
176            line.clear();
177            let bytes_read = reader
178                .read_line(&mut line)
179                .map_err(|e| OperatorError::Execution(format!("Failed to read JSONL line: {e}")))?;
180
181            if bytes_read == 0 {
182                return Ok(None); // EOF
183            }
184
185            let trimmed = line.trim();
186            if trimmed.is_empty() {
187                continue; // skip blank lines
188            }
189
190            let json_value: serde_json::Value = serde_json::from_str(trimmed)
191                .map_err(|e| OperatorError::Execution(format!("Failed to parse JSON line: {e}")))?;
192
193            let row_value = json_to_value(&json_value);
194            return Ok(Some(build_single_row_chunk(row_value)));
195        }
196    }
197
198    /// Reads the next JSONL record (stub when feature disabled).
199    #[cfg(not(feature = "jsonl-import"))]
200    fn next_jsonl(&mut self) -> OperatorResult {
201        Err(OperatorError::Execution(
202            "JSONL import not enabled (compile with --features jsonl-import)".to_string(),
203        ))
204    }
205
206    /// Opens and reads all rows from a Parquet file into a buffer.
207    #[cfg(feature = "parquet-import")]
208    fn open_parquet(&mut self) -> Result<(), OperatorError> {
209        use parquet::file::reader::FileReader;
210
211        let file_path = strip_file_prefix(&self.path);
212        let file = File::open(file_path).map_err(|e| {
213            OperatorError::Execution(format!(
214                "Failed to open Parquet file '{}': {}",
215                self.path, e
216            ))
217        })?;
218
219        let reader = parquet::file::reader::SerializedFileReader::new(file).map_err(|e| {
220            OperatorError::Execution(format!(
221                "Failed to read Parquet file '{}': {}",
222                self.path, e
223            ))
224        })?;
225
226        let row_iter = reader.get_row_iter(None).map_err(|e| {
227            OperatorError::Execution(format!("Failed to create Parquet row iterator: {e}"))
228        })?;
229
230        let mut rows = Vec::new();
231        for row_result in row_iter {
232            let row = row_result.map_err(|e| {
233                OperatorError::Execution(format!("Failed to read Parquet row: {e}"))
234            })?;
235            rows.push(parquet_row_to_value(&row));
236        }
237
238        self.parquet_rows = Some(rows.into_iter());
239        self.opened = true;
240        Ok(())
241    }
242
243    /// Reads the next buffered Parquet record.
244    #[cfg(feature = "parquet-import")]
245    fn next_parquet(&mut self) -> OperatorResult {
246        let rows = self.parquet_rows.as_mut().ok_or_else(|| {
247            OperatorError::Execution("Parquet reader not initialized".to_string())
248        })?;
249
250        match rows.next() {
251            Some(row_value) => Ok(Some(build_single_row_chunk(row_value))),
252            None => Ok(None), // EOF
253        }
254    }
255}
256
257impl Operator for LoadDataOperator {
258    fn next(&mut self) -> OperatorResult {
259        match self.format {
260            LoadDataFormat::Csv => {
261                if !self.opened {
262                    self.open_text()?;
263                }
264                self.next_csv()
265            }
266            LoadDataFormat::Jsonl => {
267                if !self.opened {
268                    self.open_text()?;
269                }
270                self.next_jsonl()
271            }
272            LoadDataFormat::Parquet => {
273                #[cfg(feature = "parquet-import")]
274                {
275                    if !self.opened {
276                        self.open_parquet()?;
277                    }
278                    self.next_parquet()
279                }
280                #[cfg(not(feature = "parquet-import"))]
281                Err(OperatorError::Execution(
282                    "Parquet import not enabled (compile with --features parquet-import)"
283                        .to_string(),
284                ))
285            }
286        }
287    }
288
289    fn reset(&mut self) {
290        self.reader = None;
291        self.headers = None;
292        self.opened = false;
293        #[cfg(feature = "parquet-import")]
294        {
295            self.parquet_rows = None;
296        }
297    }
298
299    fn name(&self) -> &'static str {
300        match self.format {
301            LoadDataFormat::Csv => "LoadCsv",
302            LoadDataFormat::Jsonl => "LoadJsonl",
303            LoadDataFormat::Parquet => "LoadParquet",
304        }
305    }
306}
307
308// ============================================================================
309// Helper functions
310// ============================================================================
311
312/// Strips `file:///` or `file://` prefix from a path (Neo4j convention).
313fn strip_file_prefix(path: &str) -> &str {
314    path.strip_prefix("file:///")
315        .or_else(|| path.strip_prefix("file://"))
316        .unwrap_or(path)
317}
318
319/// Returns a human-readable format name.
320fn format_name(format: LoadDataFormat) -> &'static str {
321    match format {
322        LoadDataFormat::Csv => "CSV",
323        LoadDataFormat::Jsonl => "JSONL",
324        LoadDataFormat::Parquet => "Parquet",
325    }
326}
327
328/// Builds a single-row `DataChunk` with one column containing the given value.
329fn build_single_row_chunk(value: Value) -> crate::execution::DataChunk {
330    let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
331    if let Some(col) = builder.column_mut(0) {
332        col.push_value(value);
333    }
334    builder.advance_row();
335    builder.finish()
336}
337
338/// Parses a single CSV row into fields, respecting quoted fields.
339///
340/// Handles:
341/// - Unquoted fields separated by the delimiter
342/// - Double-quoted fields (can contain delimiters, newlines, and escaped quotes)
343/// - Escaped quotes within quoted fields (`""` becomes `"`)
344fn parse_csv_row(line: &str, delimiter: u8) -> Vec<String> {
345    let delim = delimiter as char;
346    let mut fields = Vec::new();
347    let mut chars = line.chars().peekable();
348    let mut field = String::new();
349
350    loop {
351        if chars.peek() == Some(&'"') {
352            // Quoted field
353            chars.next(); // consume opening quote
354            loop {
355                match chars.next() {
356                    Some('"') => {
357                        if chars.peek() == Some(&'"') {
358                            // Escaped quote
359                            chars.next();
360                            field.push('"');
361                        } else {
362                            // End of quoted field
363                            break;
364                        }
365                    }
366                    Some(c) => field.push(c),
367                    None => break, // Unterminated quote, take what we have
368                }
369            }
370            // Skip to delimiter or end
371            match chars.peek() {
372                Some(c) if *c == delim => {
373                    chars.next();
374                }
375                _ => {}
376            }
377            fields.push(std::mem::take(&mut field));
378        } else {
379            // Unquoted field
380            loop {
381                match chars.peek() {
382                    Some(c) if *c == delim => {
383                        chars.next();
384                        break;
385                    }
386                    Some(_) => {
387                        field.push(chars.next().unwrap());
388                    }
389                    None => break,
390                }
391            }
392            fields.push(std::mem::take(&mut field));
393        }
394
395        if chars.peek().is_none() {
396            break;
397        }
398    }
399
400    fields
401}
402
403// ============================================================================
404// JSONL helpers
405// ============================================================================
406
407/// Converts a `serde_json::Value` to a `grafeo_common::types::Value`.
408#[cfg(feature = "jsonl-import")]
409fn json_to_value(json: &serde_json::Value) -> Value {
410    match json {
411        serde_json::Value::Null => Value::Null,
412        serde_json::Value::Bool(b) => Value::Bool(*b),
413        serde_json::Value::Number(n) => {
414            if let Some(i) = n.as_i64() {
415                Value::Int64(i)
416            } else if let Some(f) = n.as_f64() {
417                Value::Float64(f)
418            } else {
419                Value::String(ArcStr::from(n.to_string().as_str()))
420            }
421        }
422        serde_json::Value::String(s) => Value::String(ArcStr::from(s.as_str())),
423        serde_json::Value::Array(arr) => {
424            let items: Vec<Value> = arr.iter().map(json_to_value).collect();
425            Value::List(Arc::from(items))
426        }
427        serde_json::Value::Object(obj) => {
428            let mut map = BTreeMap::new();
429            for (key, val) in obj {
430                map.insert(PropertyKey::from(key.as_str()), json_to_value(val));
431            }
432            Value::Map(Arc::new(map))
433        }
434    }
435}
436
437// ============================================================================
438// Parquet helpers
439// ============================================================================
440
441/// Converts a Parquet `Row` to a `Value::Map`.
442#[cfg(feature = "parquet-import")]
443fn parquet_row_to_value(row: &parquet::record::Row) -> Value {
444    use parquet::record::Field;
445
446    let mut map = BTreeMap::new();
447    for (name, field) in row.get_column_iter() {
448        let value = match field {
449            Field::Null => Value::Null,
450            Field::Bool(b) => Value::Bool(*b),
451            Field::Byte(b) => Value::Int64(i64::from(*b)),
452            Field::Short(s) => Value::Int64(i64::from(*s)),
453            Field::Int(i) => Value::Int64(i64::from(*i)),
454            Field::Long(l) => Value::Int64(*l),
455            Field::UByte(b) => Value::Int64(i64::from(*b)),
456            Field::UShort(s) => Value::Int64(i64::from(*s)),
457            Field::UInt(i) => Value::Int64(i64::from(*i)),
458            Field::ULong(l) => {
459                // u64 may overflow i64, store as string if too large
460                if let Ok(i) = i64::try_from(*l) {
461                    Value::Int64(i)
462                } else {
463                    Value::String(ArcStr::from(l.to_string().as_str()))
464                }
465            }
466            Field::Float(f) => Value::Float64(f64::from(*f)),
467            Field::Double(d) => Value::Float64(*d),
468            Field::Str(s) => Value::String(ArcStr::from(s.as_str())),
469            Field::Bytes(b) => Value::Bytes(Arc::from(b.data().to_vec())),
470            Field::Decimal(d) => {
471                // Convert decimal to f64 for simplicity
472                Value::Float64(decimal_to_f64(d))
473            }
474            Field::Float16(f) => Value::Float64(f64::from(*f)),
475            Field::Group(row) => parquet_row_to_value(row),
476            Field::ListInternal(list) => {
477                let items: Vec<Value> =
478                    list.elements().iter().map(parquet_field_to_value).collect();
479                Value::List(Arc::from(items))
480            }
481            Field::MapInternal(map_internal) => {
482                let mut inner_map = BTreeMap::new();
483                for (key_field, val_field) in map_internal.entries() {
484                    let key_str = match key_field {
485                        Field::Str(s) => s.clone(),
486                        other => format!("{other}"),
487                    };
488                    inner_map.insert(
489                        PropertyKey::from(key_str.as_str()),
490                        parquet_field_to_value(val_field),
491                    );
492                }
493                Value::Map(Arc::new(inner_map))
494            }
495            Field::TimestampMillis(ms) => Value::Int64(*ms),
496            Field::TimestampMicros(us) => Value::Int64(*us),
497            Field::TimeMillis(ms) => Value::Int64(i64::from(*ms)),
498            Field::TimeMicros(us) => Value::Int64(*us),
499            Field::Date(days) => Value::Int64(i64::from(*days)),
500        };
501        map.insert(PropertyKey::from(name.as_str()), value);
502    }
503    Value::Map(Arc::new(map))
504}
505
506/// Converts a single Parquet field to a Value.
507#[cfg(feature = "parquet-import")]
508fn parquet_field_to_value(field: &parquet::record::Field) -> Value {
509    use parquet::record::Field;
510
511    match field {
512        Field::Null => Value::Null,
513        Field::Bool(b) => Value::Bool(*b),
514        Field::Byte(b) => Value::Int64(i64::from(*b)),
515        Field::Short(s) => Value::Int64(i64::from(*s)),
516        Field::Int(i) => Value::Int64(i64::from(*i)),
517        Field::Long(l) => Value::Int64(*l),
518        Field::UByte(b) => Value::Int64(i64::from(*b)),
519        Field::UShort(s) => Value::Int64(i64::from(*s)),
520        Field::UInt(i) => Value::Int64(i64::from(*i)),
521        Field::ULong(l) => {
522            if let Ok(i) = i64::try_from(*l) {
523                Value::Int64(i)
524            } else {
525                Value::String(ArcStr::from(l.to_string().as_str()))
526            }
527        }
528        Field::Float(f) => Value::Float64(f64::from(*f)),
529        Field::Double(d) => Value::Float64(*d),
530        Field::Str(s) => Value::String(ArcStr::from(s.as_str())),
531        Field::Bytes(b) => Value::Bytes(Arc::from(b.data().to_vec())),
532        Field::Decimal(d) => Value::Float64(decimal_to_f64(d)),
533        Field::Float16(f) => Value::Float64(f64::from(*f)),
534        Field::Group(row) => parquet_row_to_value(row),
535        Field::ListInternal(list) => {
536            let items: Vec<Value> = list.elements().iter().map(parquet_field_to_value).collect();
537            Value::List(Arc::from(items))
538        }
539        Field::MapInternal(map_internal) => {
540            let mut inner_map = BTreeMap::new();
541            for (key_field, val_field) in map_internal.entries() {
542                let key_str = match key_field {
543                    Field::Str(s) => s.clone(),
544                    other => format!("{other}"),
545                };
546                inner_map.insert(
547                    PropertyKey::from(key_str.as_str()),
548                    parquet_field_to_value(val_field),
549                );
550            }
551            Value::Map(Arc::new(inner_map))
552        }
553        Field::TimestampMillis(ms) => Value::Int64(*ms),
554        Field::TimestampMicros(us) => Value::Int64(*us),
555        Field::TimeMillis(ms) => Value::Int64(i64::from(*ms)),
556        Field::TimeMicros(us) => Value::Int64(*us),
557        Field::Date(days) => Value::Int64(i64::from(*days)),
558    }
559}
560
561/// Converts a Parquet Decimal to f64.
562#[cfg(feature = "parquet-import")]
563fn decimal_to_f64(d: &parquet::data_type::Decimal) -> f64 {
564    let bytes = d.data();
565    let scale = d.scale();
566    // Interpret bytes as big-endian signed integer
567    let mut value: i128 = if !bytes.is_empty() && bytes[0] & 0x80 != 0 {
568        -1 // sign-extend for negative
569    } else {
570        0
571    };
572    for &b in bytes {
573        value = (value << 8) | i128::from(b);
574    }
575    value as f64 / 10f64.powi(scale)
576}
577
578#[cfg(test)]
579mod tests {
580    use super::*;
581
582    #[test]
583    fn test_parse_csv_simple() {
584        let fields = parse_csv_row("a,b,c", b',');
585        assert_eq!(fields, vec!["a", "b", "c"]);
586    }
587
588    #[test]
589    fn test_parse_csv_quoted() {
590        let fields = parse_csv_row(r#""hello","world""#, b',');
591        assert_eq!(fields, vec!["hello", "world"]);
592    }
593
594    #[test]
595    fn test_parse_csv_escaped_quotes() {
596        let fields = parse_csv_row(r#""say ""hi""","ok""#, b',');
597        assert_eq!(fields, vec![r#"say "hi""#, "ok"]);
598    }
599
600    #[test]
601    fn test_parse_csv_delimiter_in_quoted() {
602        let fields = parse_csv_row(r#""a,b",c"#, b',');
603        assert_eq!(fields, vec!["a,b", "c"]);
604    }
605
606    #[test]
607    fn test_parse_csv_empty_fields() {
608        let fields = parse_csv_row("a,,c", b',');
609        assert_eq!(fields, vec!["a", "", "c"]);
610    }
611
612    #[test]
613    fn test_parse_csv_tab_delimiter() {
614        let fields = parse_csv_row("a\tb\tc", b'\t');
615        assert_eq!(fields, vec!["a", "b", "c"]);
616    }
617
618    #[test]
619    fn test_parse_csv_single_field() {
620        let fields = parse_csv_row("hello", b',');
621        assert_eq!(fields, vec!["hello"]);
622    }
623
624    #[test]
625    fn test_strip_file_prefix() {
626        assert_eq!(strip_file_prefix("file:///data.csv"), "data.csv");
627        assert_eq!(strip_file_prefix("file://data.csv"), "data.csv");
628        assert_eq!(strip_file_prefix("data.csv"), "data.csv");
629        assert_eq!(strip_file_prefix("/tmp/data.csv"), "/tmp/data.csv");
630    }
631
632    #[test]
633    fn test_format_name() {
634        assert_eq!(format_name(LoadDataFormat::Csv), "CSV");
635        assert_eq!(format_name(LoadDataFormat::Jsonl), "JSONL");
636        assert_eq!(format_name(LoadDataFormat::Parquet), "Parquet");
637    }
638
639    #[cfg(feature = "jsonl-import")]
640    mod jsonl_tests {
641        use super::*;
642
643        #[test]
644        fn test_json_to_value_null() {
645            assert!(matches!(
646                json_to_value(&serde_json::Value::Null),
647                Value::Null
648            ));
649        }
650
651        #[test]
652        fn test_json_to_value_bool() {
653            assert!(matches!(
654                json_to_value(&serde_json::Value::Bool(true)),
655                Value::Bool(true)
656            ));
657        }
658
659        #[test]
660        fn test_json_to_value_integer() {
661            let json: serde_json::Value = serde_json::from_str("42").unwrap();
662            assert!(matches!(json_to_value(&json), Value::Int64(42)));
663        }
664
665        #[test]
666        fn test_json_to_value_float() {
667            let json: serde_json::Value = serde_json::from_str("1.5").unwrap();
668            match json_to_value(&json) {
669                Value::Float64(f) => assert!((f - 1.5_f64).abs() < f64::EPSILON),
670                other => panic!("expected Float64, got {other:?}"),
671            }
672        }
673
674        #[test]
675        fn test_json_to_value_string() {
676            let json: serde_json::Value = serde_json::from_str(r#""hello""#).unwrap();
677            match json_to_value(&json) {
678                Value::String(s) => assert_eq!(s.as_str(), "hello"),
679                other => panic!("expected String, got {other:?}"),
680            }
681        }
682
683        #[test]
684        fn test_json_to_value_array() {
685            let json: serde_json::Value = serde_json::from_str("[1, 2, 3]").unwrap();
686            match json_to_value(&json) {
687                Value::List(items) => {
688                    assert_eq!(items.len(), 3);
689                    assert!(matches!(items[0], Value::Int64(1)));
690                }
691                other => panic!("expected List, got {other:?}"),
692            }
693        }
694
695        #[test]
696        fn test_json_to_value_object() {
697            let json: serde_json::Value =
698                serde_json::from_str(r#"{"name": "Alix", "age": 30}"#).unwrap();
699            match json_to_value(&json) {
700                Value::Map(map) => {
701                    assert_eq!(map.len(), 2);
702                    assert!(matches!(
703                        map.get(&PropertyKey::from("age")),
704                        Some(Value::Int64(30))
705                    ));
706                }
707                other => panic!("expected Map, got {other:?}"),
708            }
709        }
710    }
711}