Skip to main content

grafeo_core/execution/operators/
load_data.rs

1//! LOAD DATA operator for reading CSV, JSONL, and Parquet files.
2
3use std::collections::BTreeMap;
4use std::fs::File;
5use std::io::{BufRead, BufReader};
6use std::sync::Arc;
7
8use super::{Operator, OperatorError, OperatorResult};
9use crate::execution::chunk::DataChunkBuilder;
10use grafeo_common::types::{ArcStr, LogicalType, PropertyKey, Value};
11
12/// File format for the load data operator.
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum LoadDataFormat {
15    /// CSV (comma-separated values).
16    Csv,
17    /// JSON Lines (one JSON object per line).
18    Jsonl,
19    /// Apache Parquet columnar format.
20    Parquet,
21}
22
23/// Operator that reads a data file and produces one row per record.
24///
25/// CSV with headers: each row is a `Value::Map` with column names as keys.
26/// CSV without headers: each row is a `Value::List` of string values.
27/// JSONL: each row is a `Value::Map` from JSON object fields.
28/// Parquet: each row is a `Value::Map` from column names.
29pub struct LoadDataOperator {
30    /// File format.
31    format: LoadDataFormat,
32    /// Buffered reader for CSV/JSONL files.
33    reader: Option<BufReader<File>>,
34    /// Column headers (if CSV WITH HEADERS).
35    headers: Option<Vec<String>>,
36    /// Whether the CSV has headers.
37    with_headers: bool,
38    /// File path (for reset).
39    path: String,
40    /// Field separator (CSV only).
41    delimiter: u8,
42    /// Whether the file has been opened.
43    opened: bool,
44    /// Buffered Parquet rows (read all upfront, then iterate).
45    #[cfg(feature = "parquet-import")]
46    parquet_rows: Option<std::vec::IntoIter<Value>>,
47}
48
49impl LoadDataOperator {
50    /// Creates a new LOAD DATA operator.
51    pub fn new(
52        path: String,
53        format: LoadDataFormat,
54        with_headers: bool,
55        field_terminator: Option<char>,
56        _variable: String,
57    ) -> Self {
58        let delimiter = field_terminator.map_or(b',', |c| {
59            let mut buf = [0u8; 4];
60            c.encode_utf8(&mut buf);
61            buf[0]
62        });
63
64        Self {
65            format,
66            reader: None,
67            headers: None,
68            with_headers,
69            path,
70            delimiter,
71            opened: false,
72            #[cfg(feature = "parquet-import")]
73            parquet_rows: None,
74        }
75    }
76
77    /// Opens the file and reads headers if needed (CSV/JSONL).
78    fn open_text(&mut self) -> Result<(), OperatorError> {
79        let file_path = strip_file_prefix(&self.path);
80
81        let file = File::open(file_path).map_err(|e| {
82            OperatorError::Execution(format!(
83                "Failed to open {} file '{}': {}",
84                format_name(self.format),
85                self.path,
86                e
87            ))
88        })?;
89        let mut reader = BufReader::new(file);
90
91        if self.format == LoadDataFormat::Csv && self.with_headers {
92            let mut header_line = String::new();
93            reader.read_line(&mut header_line).map_err(|e| {
94                OperatorError::Execution(format!("Failed to read CSV headers: {e}"))
95            })?;
96            // Strip BOM if present
97            let header_line = header_line.strip_prefix('\u{feff}').unwrap_or(&header_line);
98            let header_line = header_line.trim_end_matches(['\r', '\n']);
99            self.headers = Some(parse_csv_row(header_line, self.delimiter));
100        }
101
102        self.reader = Some(reader);
103        self.opened = true;
104        Ok(())
105    }
106
107    /// Reads the next CSV record.
108    fn next_csv(&mut self) -> OperatorResult {
109        let reader = self
110            .reader
111            .as_mut()
112            .ok_or_else(|| OperatorError::Execution("CSV reader not initialized".to_string()))?;
113
114        let mut line = String::new();
115        loop {
116            line.clear();
117            let bytes_read = reader
118                .read_line(&mut line)
119                .map_err(|e| OperatorError::Execution(format!("Failed to read CSV line: {e}")))?;
120
121            if bytes_read == 0 {
122                return Ok(None); // EOF
123            }
124
125            let trimmed = line.trim_end_matches(['\r', '\n']);
126            if trimmed.is_empty() {
127                continue; // skip blank lines
128            }
129
130            let fields = parse_csv_row(trimmed, self.delimiter);
131
132            let row_value = if let Some(headers) = &self.headers {
133                // WITH HEADERS: produce a Map
134                let mut map = BTreeMap::new();
135                for (i, header) in headers.iter().enumerate() {
136                    let value = fields.get(i).map_or(Value::Null, |s| {
137                        if s.is_empty() {
138                            Value::Null
139                        } else {
140                            Value::String(ArcStr::from(s.as_str()))
141                        }
142                    });
143                    map.insert(PropertyKey::from(header.as_str()), value);
144                }
145                Value::Map(Arc::new(map))
146            } else {
147                // Without headers: produce a List
148                let values: Vec<Value> = fields
149                    .into_iter()
150                    .map(|s| {
151                        if s.is_empty() {
152                            Value::Null
153                        } else {
154                            Value::String(ArcStr::from(s.as_str()))
155                        }
156                    })
157                    .collect();
158                Value::List(Arc::from(values))
159            };
160
161            return Ok(Some(build_single_row_chunk(row_value)));
162        }
163    }
164
165    /// Reads the next JSONL record.
166    #[cfg(feature = "jsonl-import")]
167    fn next_jsonl(&mut self) -> OperatorResult {
168        let reader = self
169            .reader
170            .as_mut()
171            .ok_or_else(|| OperatorError::Execution("JSONL reader not initialized".to_string()))?;
172
173        let mut line = String::new();
174        loop {
175            line.clear();
176            let bytes_read = reader
177                .read_line(&mut line)
178                .map_err(|e| OperatorError::Execution(format!("Failed to read JSONL line: {e}")))?;
179
180            if bytes_read == 0 {
181                return Ok(None); // EOF
182            }
183
184            let trimmed = line.trim();
185            if trimmed.is_empty() {
186                continue; // skip blank lines
187            }
188
189            let json_value: serde_json::Value = serde_json::from_str(trimmed)
190                .map_err(|e| OperatorError::Execution(format!("Failed to parse JSON line: {e}")))?;
191
192            let row_value = json_to_value(&json_value);
193            return Ok(Some(build_single_row_chunk(row_value)));
194        }
195    }
196
197    /// Reads the next JSONL record (stub when feature disabled).
198    #[cfg(not(feature = "jsonl-import"))]
199    fn next_jsonl(&mut self) -> OperatorResult {
200        Err(OperatorError::Execution(
201            "JSONL import not enabled (compile with --features jsonl-import)".to_string(),
202        ))
203    }
204
205    /// Opens and reads all rows from a Parquet file into a buffer.
206    #[cfg(feature = "parquet-import")]
207    fn open_parquet(&mut self) -> Result<(), OperatorError> {
208        use parquet::file::reader::FileReader;
209
210        let file_path = strip_file_prefix(&self.path);
211        let file = File::open(file_path).map_err(|e| {
212            OperatorError::Execution(format!(
213                "Failed to open Parquet file '{}': {}",
214                self.path, e
215            ))
216        })?;
217
218        let reader = parquet::file::reader::SerializedFileReader::new(file).map_err(|e| {
219            OperatorError::Execution(format!(
220                "Failed to read Parquet file '{}': {}",
221                self.path, e
222            ))
223        })?;
224
225        let row_iter = reader.get_row_iter(None).map_err(|e| {
226            OperatorError::Execution(format!("Failed to create Parquet row iterator: {e}"))
227        })?;
228
229        let mut rows = Vec::new();
230        for row_result in row_iter {
231            let row = row_result.map_err(|e| {
232                OperatorError::Execution(format!("Failed to read Parquet row: {e}"))
233            })?;
234            rows.push(parquet_row_to_value(&row));
235        }
236
237        self.parquet_rows = Some(rows.into_iter());
238        self.opened = true;
239        Ok(())
240    }
241
242    /// Reads the next buffered Parquet record.
243    #[cfg(feature = "parquet-import")]
244    fn next_parquet(&mut self) -> OperatorResult {
245        let rows = self.parquet_rows.as_mut().ok_or_else(|| {
246            OperatorError::Execution("Parquet reader not initialized".to_string())
247        })?;
248
249        match rows.next() {
250            Some(row_value) => Ok(Some(build_single_row_chunk(row_value))),
251            None => Ok(None), // EOF
252        }
253    }
254}
255
256impl Operator for LoadDataOperator {
257    fn next(&mut self) -> OperatorResult {
258        match self.format {
259            LoadDataFormat::Csv => {
260                if !self.opened {
261                    self.open_text()?;
262                }
263                self.next_csv()
264            }
265            LoadDataFormat::Jsonl => {
266                if !self.opened {
267                    self.open_text()?;
268                }
269                self.next_jsonl()
270            }
271            LoadDataFormat::Parquet => {
272                #[cfg(feature = "parquet-import")]
273                {
274                    if !self.opened {
275                        self.open_parquet()?;
276                    }
277                    self.next_parquet()
278                }
279                #[cfg(not(feature = "parquet-import"))]
280                Err(OperatorError::Execution(
281                    "Parquet import not enabled (compile with --features parquet-import)"
282                        .to_string(),
283                ))
284            }
285        }
286    }
287
288    fn reset(&mut self) {
289        self.reader = None;
290        self.headers = None;
291        self.opened = false;
292        #[cfg(feature = "parquet-import")]
293        {
294            self.parquet_rows = None;
295        }
296    }
297
298    fn name(&self) -> &'static str {
299        match self.format {
300            LoadDataFormat::Csv => "LoadCsv",
301            LoadDataFormat::Jsonl => "LoadJsonl",
302            LoadDataFormat::Parquet => "LoadParquet",
303        }
304    }
305}
306
307// ============================================================================
308// Helper functions
309// ============================================================================
310
311/// Strips `file:///` or `file://` prefix from a path (Neo4j convention).
312fn strip_file_prefix(path: &str) -> &str {
313    path.strip_prefix("file:///")
314        .or_else(|| path.strip_prefix("file://"))
315        .unwrap_or(path)
316}
317
318/// Returns a human-readable format name.
319fn format_name(format: LoadDataFormat) -> &'static str {
320    match format {
321        LoadDataFormat::Csv => "CSV",
322        LoadDataFormat::Jsonl => "JSONL",
323        LoadDataFormat::Parquet => "Parquet",
324    }
325}
326
327/// Builds a single-row `DataChunk` with one column containing the given value.
328fn build_single_row_chunk(value: Value) -> crate::execution::DataChunk {
329    let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
330    if let Some(col) = builder.column_mut(0) {
331        col.push_value(value);
332    }
333    builder.advance_row();
334    builder.finish()
335}
336
337/// Parses a single CSV row into fields, respecting quoted fields.
338///
339/// Handles:
340/// - Unquoted fields separated by the delimiter
341/// - Double-quoted fields (can contain delimiters, newlines, and escaped quotes)
342/// - Escaped quotes within quoted fields (`""` becomes `"`)
343fn parse_csv_row(line: &str, delimiter: u8) -> Vec<String> {
344    let delim = delimiter as char;
345    let mut fields = Vec::new();
346    let mut chars = line.chars().peekable();
347    let mut field = String::new();
348
349    loop {
350        if chars.peek() == Some(&'"') {
351            // Quoted field
352            chars.next(); // consume opening quote
353            loop {
354                match chars.next() {
355                    Some('"') => {
356                        if chars.peek() == Some(&'"') {
357                            // Escaped quote
358                            chars.next();
359                            field.push('"');
360                        } else {
361                            // End of quoted field
362                            break;
363                        }
364                    }
365                    Some(c) => field.push(c),
366                    None => break, // Unterminated quote, take what we have
367                }
368            }
369            // Skip to delimiter or end
370            match chars.peek() {
371                Some(c) if *c == delim => {
372                    chars.next();
373                }
374                _ => {}
375            }
376            fields.push(std::mem::take(&mut field));
377        } else {
378            // Unquoted field
379            loop {
380                match chars.peek() {
381                    Some(c) if *c == delim => {
382                        chars.next();
383                        break;
384                    }
385                    Some(_) => {
386                        field.push(chars.next().unwrap());
387                    }
388                    None => break,
389                }
390            }
391            fields.push(std::mem::take(&mut field));
392        }
393
394        if chars.peek().is_none() {
395            break;
396        }
397    }
398
399    fields
400}
401
402// ============================================================================
403// JSONL helpers
404// ============================================================================
405
406/// Converts a `serde_json::Value` to a `grafeo_common::types::Value`.
407#[cfg(feature = "jsonl-import")]
408fn json_to_value(json: &serde_json::Value) -> Value {
409    match json {
410        serde_json::Value::Null => Value::Null,
411        serde_json::Value::Bool(b) => Value::Bool(*b),
412        serde_json::Value::Number(n) => {
413            if let Some(i) = n.as_i64() {
414                Value::Int64(i)
415            } else if let Some(f) = n.as_f64() {
416                Value::Float64(f)
417            } else {
418                Value::String(ArcStr::from(n.to_string().as_str()))
419            }
420        }
421        serde_json::Value::String(s) => Value::String(ArcStr::from(s.as_str())),
422        serde_json::Value::Array(arr) => {
423            let items: Vec<Value> = arr.iter().map(json_to_value).collect();
424            Value::List(Arc::from(items))
425        }
426        serde_json::Value::Object(obj) => {
427            let mut map = BTreeMap::new();
428            for (key, val) in obj {
429                map.insert(PropertyKey::from(key.as_str()), json_to_value(val));
430            }
431            Value::Map(Arc::new(map))
432        }
433    }
434}
435
436// ============================================================================
437// Parquet helpers
438// ============================================================================
439
440/// Converts a Parquet `Row` to a `Value::Map`.
441#[cfg(feature = "parquet-import")]
442fn parquet_row_to_value(row: &parquet::record::Row) -> Value {
443    use parquet::record::Field;
444
445    let mut map = BTreeMap::new();
446    for (name, field) in row.get_column_iter() {
447        let value = match field {
448            Field::Null => Value::Null,
449            Field::Bool(b) => Value::Bool(*b),
450            Field::Byte(b) => Value::Int64(i64::from(*b)),
451            Field::Short(s) => Value::Int64(i64::from(*s)),
452            Field::Int(i) => Value::Int64(i64::from(*i)),
453            Field::Long(l) => Value::Int64(*l),
454            Field::UByte(b) => Value::Int64(i64::from(*b)),
455            Field::UShort(s) => Value::Int64(i64::from(*s)),
456            Field::UInt(i) => Value::Int64(i64::from(*i)),
457            Field::ULong(l) => {
458                // u64 may overflow i64, store as string if too large
459                if let Ok(i) = i64::try_from(*l) {
460                    Value::Int64(i)
461                } else {
462                    Value::String(ArcStr::from(l.to_string().as_str()))
463                }
464            }
465            Field::Float(f) => Value::Float64(f64::from(*f)),
466            Field::Double(d) => Value::Float64(*d),
467            Field::Str(s) => Value::String(ArcStr::from(s.as_str())),
468            Field::Bytes(b) => Value::Bytes(Arc::from(b.data().to_vec())),
469            Field::Decimal(d) => {
470                // Convert decimal to f64 for simplicity
471                Value::Float64(decimal_to_f64(d))
472            }
473            Field::Float16(f) => Value::Float64(f64::from(*f)),
474            Field::Group(row) => parquet_row_to_value(row),
475            Field::ListInternal(list) => {
476                let items: Vec<Value> =
477                    list.elements().iter().map(parquet_field_to_value).collect();
478                Value::List(Arc::from(items))
479            }
480            Field::MapInternal(map_internal) => {
481                let mut inner_map = BTreeMap::new();
482                for (key_field, val_field) in map_internal.entries() {
483                    let key_str = match key_field {
484                        Field::Str(s) => s.clone(),
485                        other => format!("{other}"),
486                    };
487                    inner_map.insert(
488                        PropertyKey::from(key_str.as_str()),
489                        parquet_field_to_value(val_field),
490                    );
491                }
492                Value::Map(Arc::new(inner_map))
493            }
494            Field::TimestampMillis(ms) => Value::Int64(*ms),
495            Field::TimestampMicros(us) => Value::Int64(*us),
496            Field::TimeMillis(ms) => Value::Int64(i64::from(*ms)),
497            Field::TimeMicros(us) => Value::Int64(*us),
498            Field::Date(days) => Value::Int64(i64::from(*days)),
499        };
500        map.insert(PropertyKey::from(name.as_str()), value);
501    }
502    Value::Map(Arc::new(map))
503}
504
505/// Converts a single Parquet field to a Value.
506#[cfg(feature = "parquet-import")]
507fn parquet_field_to_value(field: &parquet::record::Field) -> Value {
508    use parquet::record::Field;
509
510    match field {
511        Field::Null => Value::Null,
512        Field::Bool(b) => Value::Bool(*b),
513        Field::Byte(b) => Value::Int64(i64::from(*b)),
514        Field::Short(s) => Value::Int64(i64::from(*s)),
515        Field::Int(i) => Value::Int64(i64::from(*i)),
516        Field::Long(l) => Value::Int64(*l),
517        Field::UByte(b) => Value::Int64(i64::from(*b)),
518        Field::UShort(s) => Value::Int64(i64::from(*s)),
519        Field::UInt(i) => Value::Int64(i64::from(*i)),
520        Field::ULong(l) => {
521            if let Ok(i) = i64::try_from(*l) {
522                Value::Int64(i)
523            } else {
524                Value::String(ArcStr::from(l.to_string().as_str()))
525            }
526        }
527        Field::Float(f) => Value::Float64(f64::from(*f)),
528        Field::Double(d) => Value::Float64(*d),
529        Field::Str(s) => Value::String(ArcStr::from(s.as_str())),
530        Field::Bytes(b) => Value::Bytes(Arc::from(b.data().to_vec())),
531        Field::Decimal(d) => Value::Float64(decimal_to_f64(d)),
532        Field::Float16(f) => Value::Float64(f64::from(*f)),
533        Field::Group(row) => parquet_row_to_value(row),
534        Field::ListInternal(list) => {
535            let items: Vec<Value> = list.elements().iter().map(parquet_field_to_value).collect();
536            Value::List(Arc::from(items))
537        }
538        Field::MapInternal(map_internal) => {
539            let mut inner_map = BTreeMap::new();
540            for (key_field, val_field) in map_internal.entries() {
541                let key_str = match key_field {
542                    Field::Str(s) => s.clone(),
543                    other => format!("{other}"),
544                };
545                inner_map.insert(
546                    PropertyKey::from(key_str.as_str()),
547                    parquet_field_to_value(val_field),
548                );
549            }
550            Value::Map(Arc::new(inner_map))
551        }
552        Field::TimestampMillis(ms) => Value::Int64(*ms),
553        Field::TimestampMicros(us) => Value::Int64(*us),
554        Field::TimeMillis(ms) => Value::Int64(i64::from(*ms)),
555        Field::TimeMicros(us) => Value::Int64(*us),
556        Field::Date(days) => Value::Int64(i64::from(*days)),
557    }
558}
559
560/// Converts a Parquet Decimal to f64.
561#[cfg(feature = "parquet-import")]
562fn decimal_to_f64(d: &parquet::data_type::Decimal) -> f64 {
563    let bytes = d.data();
564    let scale = d.scale();
565    // Interpret bytes as big-endian signed integer
566    let mut value: i128 = if !bytes.is_empty() && bytes[0] & 0x80 != 0 {
567        -1 // sign-extend for negative
568    } else {
569        0
570    };
571    for &b in bytes {
572        value = (value << 8) | i128::from(b);
573    }
574    value as f64 / 10f64.powi(scale)
575}
576
577#[cfg(test)]
578mod tests {
579    use super::*;
580
581    #[test]
582    fn test_parse_csv_simple() {
583        let fields = parse_csv_row("a,b,c", b',');
584        assert_eq!(fields, vec!["a", "b", "c"]);
585    }
586
587    #[test]
588    fn test_parse_csv_quoted() {
589        let fields = parse_csv_row(r#""hello","world""#, b',');
590        assert_eq!(fields, vec!["hello", "world"]);
591    }
592
593    #[test]
594    fn test_parse_csv_escaped_quotes() {
595        let fields = parse_csv_row(r#""say ""hi""","ok""#, b',');
596        assert_eq!(fields, vec![r#"say "hi""#, "ok"]);
597    }
598
599    #[test]
600    fn test_parse_csv_delimiter_in_quoted() {
601        let fields = parse_csv_row(r#""a,b",c"#, b',');
602        assert_eq!(fields, vec!["a,b", "c"]);
603    }
604
605    #[test]
606    fn test_parse_csv_empty_fields() {
607        let fields = parse_csv_row("a,,c", b',');
608        assert_eq!(fields, vec!["a", "", "c"]);
609    }
610
611    #[test]
612    fn test_parse_csv_tab_delimiter() {
613        let fields = parse_csv_row("a\tb\tc", b'\t');
614        assert_eq!(fields, vec!["a", "b", "c"]);
615    }
616
617    #[test]
618    fn test_parse_csv_single_field() {
619        let fields = parse_csv_row("hello", b',');
620        assert_eq!(fields, vec!["hello"]);
621    }
622
623    #[test]
624    fn test_strip_file_prefix() {
625        assert_eq!(strip_file_prefix("file:///data.csv"), "data.csv");
626        assert_eq!(strip_file_prefix("file://data.csv"), "data.csv");
627        assert_eq!(strip_file_prefix("data.csv"), "data.csv");
628        assert_eq!(strip_file_prefix("/tmp/data.csv"), "/tmp/data.csv");
629    }
630
631    #[test]
632    fn test_format_name() {
633        assert_eq!(format_name(LoadDataFormat::Csv), "CSV");
634        assert_eq!(format_name(LoadDataFormat::Jsonl), "JSONL");
635        assert_eq!(format_name(LoadDataFormat::Parquet), "Parquet");
636    }
637
638    #[cfg(feature = "jsonl-import")]
639    mod jsonl_tests {
640        use super::*;
641
642        #[test]
643        fn test_json_to_value_null() {
644            assert!(matches!(
645                json_to_value(&serde_json::Value::Null),
646                Value::Null
647            ));
648        }
649
650        #[test]
651        fn test_json_to_value_bool() {
652            assert!(matches!(
653                json_to_value(&serde_json::Value::Bool(true)),
654                Value::Bool(true)
655            ));
656        }
657
658        #[test]
659        fn test_json_to_value_integer() {
660            let json: serde_json::Value = serde_json::from_str("42").unwrap();
661            assert!(matches!(json_to_value(&json), Value::Int64(42)));
662        }
663
664        #[test]
665        fn test_json_to_value_float() {
666            let json: serde_json::Value = serde_json::from_str("1.5").unwrap();
667            match json_to_value(&json) {
668                Value::Float64(f) => assert!((f - 1.5_f64).abs() < f64::EPSILON),
669                other => panic!("expected Float64, got {other:?}"),
670            }
671        }
672
673        #[test]
674        fn test_json_to_value_string() {
675            let json: serde_json::Value = serde_json::from_str(r#""hello""#).unwrap();
676            match json_to_value(&json) {
677                Value::String(s) => assert_eq!(s.as_str(), "hello"),
678                other => panic!("expected String, got {other:?}"),
679            }
680        }
681
682        #[test]
683        fn test_json_to_value_array() {
684            let json: serde_json::Value = serde_json::from_str("[1, 2, 3]").unwrap();
685            match json_to_value(&json) {
686                Value::List(items) => {
687                    assert_eq!(items.len(), 3);
688                    assert!(matches!(items[0], Value::Int64(1)));
689                }
690                other => panic!("expected List, got {other:?}"),
691            }
692        }
693
694        #[test]
695        fn test_json_to_value_object() {
696            let json: serde_json::Value =
697                serde_json::from_str(r#"{"name": "Alix", "age": 30}"#).unwrap();
698            match json_to_value(&json) {
699                Value::Map(map) => {
700                    assert_eq!(map.len(), 2);
701                    assert!(matches!(
702                        map.get(&PropertyKey::from("age")),
703                        Some(Value::Int64(30))
704                    ));
705                }
706                other => panic!("expected Map, got {other:?}"),
707            }
708        }
709    }
710}