1use std::collections::BTreeMap;
4use std::fs::File;
5use std::io::{BufRead, BufReader};
6use std::sync::Arc;
7
8use super::{Operator, OperatorError, OperatorResult};
9use crate::execution::chunk::DataChunkBuilder;
10use grafeo_common::types::{ArcStr, LogicalType, PropertyKey, Value};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum LoadDataFormat {
15 Csv,
17 Jsonl,
19 Parquet,
21}
22
23pub struct LoadDataOperator {
30 format: LoadDataFormat,
32 reader: Option<BufReader<File>>,
34 headers: Option<Vec<String>>,
36 with_headers: bool,
38 path: String,
40 delimiter: u8,
42 opened: bool,
44 #[cfg(feature = "parquet-import")]
46 parquet_rows: Option<std::vec::IntoIter<Value>>,
47}
48
49impl LoadDataOperator {
50 pub fn new(
52 path: String,
53 format: LoadDataFormat,
54 with_headers: bool,
55 field_terminator: Option<char>,
56 _variable: String,
57 ) -> Self {
58 let delimiter = field_terminator.map_or(b',', |c| {
59 let mut buf = [0u8; 4];
60 c.encode_utf8(&mut buf);
61 buf[0]
62 });
63
64 Self {
65 format,
66 reader: None,
67 headers: None,
68 with_headers,
69 path,
70 delimiter,
71 opened: false,
72 #[cfg(feature = "parquet-import")]
73 parquet_rows: None,
74 }
75 }
76
77 fn open_text(&mut self) -> Result<(), OperatorError> {
79 let file_path = strip_file_prefix(&self.path);
80
81 let file = File::open(file_path).map_err(|e| {
82 OperatorError::Execution(format!(
83 "Failed to open {} file '{}': {}",
84 format_name(self.format),
85 self.path,
86 e
87 ))
88 })?;
89 let mut reader = BufReader::new(file);
90
91 if self.format == LoadDataFormat::Csv && self.with_headers {
92 let mut header_line = String::new();
93 reader.read_line(&mut header_line).map_err(|e| {
94 OperatorError::Execution(format!("Failed to read CSV headers: {e}"))
95 })?;
96 let header_line = header_line.strip_prefix('\u{feff}').unwrap_or(&header_line);
98 let header_line = header_line.trim_end_matches(['\r', '\n']);
99 self.headers = Some(parse_csv_row(header_line, self.delimiter));
100 }
101
102 self.reader = Some(reader);
103 self.opened = true;
104 Ok(())
105 }
106
107 fn next_csv(&mut self) -> OperatorResult {
109 let reader = self
110 .reader
111 .as_mut()
112 .ok_or_else(|| OperatorError::Execution("CSV reader not initialized".to_string()))?;
113
114 let mut line = String::new();
115 loop {
116 line.clear();
117 let bytes_read = reader
118 .read_line(&mut line)
119 .map_err(|e| OperatorError::Execution(format!("Failed to read CSV line: {e}")))?;
120
121 if bytes_read == 0 {
122 return Ok(None); }
124
125 let trimmed = line.trim_end_matches(['\r', '\n']);
126 if trimmed.is_empty() {
127 continue; }
129
130 let fields = parse_csv_row(trimmed, self.delimiter);
131
132 let row_value = if let Some(headers) = &self.headers {
133 let mut map = BTreeMap::new();
135 for (i, header) in headers.iter().enumerate() {
136 let value = fields.get(i).map_or(Value::Null, |s| {
137 if s.is_empty() {
138 Value::Null
139 } else {
140 Value::String(ArcStr::from(s.as_str()))
141 }
142 });
143 map.insert(PropertyKey::from(header.as_str()), value);
144 }
145 Value::Map(Arc::new(map))
146 } else {
147 let values: Vec<Value> = fields
149 .into_iter()
150 .map(|s| {
151 if s.is_empty() {
152 Value::Null
153 } else {
154 Value::String(ArcStr::from(s.as_str()))
155 }
156 })
157 .collect();
158 Value::List(Arc::from(values))
159 };
160
161 return Ok(Some(build_single_row_chunk(row_value)));
162 }
163 }
164
165 #[cfg(feature = "jsonl-import")]
167 fn next_jsonl(&mut self) -> OperatorResult {
168 let reader = self
169 .reader
170 .as_mut()
171 .ok_or_else(|| OperatorError::Execution("JSONL reader not initialized".to_string()))?;
172
173 let mut line = String::new();
174 loop {
175 line.clear();
176 let bytes_read = reader
177 .read_line(&mut line)
178 .map_err(|e| OperatorError::Execution(format!("Failed to read JSONL line: {e}")))?;
179
180 if bytes_read == 0 {
181 return Ok(None); }
183
184 let trimmed = line.trim();
185 if trimmed.is_empty() {
186 continue; }
188
189 let json_value: serde_json::Value = serde_json::from_str(trimmed)
190 .map_err(|e| OperatorError::Execution(format!("Failed to parse JSON line: {e}")))?;
191
192 let row_value = json_to_value(&json_value);
193 return Ok(Some(build_single_row_chunk(row_value)));
194 }
195 }
196
197 #[cfg(not(feature = "jsonl-import"))]
199 fn next_jsonl(&mut self) -> OperatorResult {
200 Err(OperatorError::Execution(
201 "JSONL import not enabled (compile with --features jsonl-import)".to_string(),
202 ))
203 }
204
205 #[cfg(feature = "parquet-import")]
207 fn open_parquet(&mut self) -> Result<(), OperatorError> {
208 use parquet::file::reader::FileReader;
209
210 let file_path = strip_file_prefix(&self.path);
211 let file = File::open(file_path).map_err(|e| {
212 OperatorError::Execution(format!(
213 "Failed to open Parquet file '{}': {}",
214 self.path, e
215 ))
216 })?;
217
218 let reader = parquet::file::reader::SerializedFileReader::new(file).map_err(|e| {
219 OperatorError::Execution(format!(
220 "Failed to read Parquet file '{}': {}",
221 self.path, e
222 ))
223 })?;
224
225 let row_iter = reader.get_row_iter(None).map_err(|e| {
226 OperatorError::Execution(format!("Failed to create Parquet row iterator: {e}"))
227 })?;
228
229 let mut rows = Vec::new();
230 for row_result in row_iter {
231 let row = row_result.map_err(|e| {
232 OperatorError::Execution(format!("Failed to read Parquet row: {e}"))
233 })?;
234 rows.push(parquet_row_to_value(&row));
235 }
236
237 self.parquet_rows = Some(rows.into_iter());
238 self.opened = true;
239 Ok(())
240 }
241
242 #[cfg(feature = "parquet-import")]
244 fn next_parquet(&mut self) -> OperatorResult {
245 let rows = self.parquet_rows.as_mut().ok_or_else(|| {
246 OperatorError::Execution("Parquet reader not initialized".to_string())
247 })?;
248
249 match rows.next() {
250 Some(row_value) => Ok(Some(build_single_row_chunk(row_value))),
251 None => Ok(None), }
253 }
254}
255
256impl Operator for LoadDataOperator {
257 fn next(&mut self) -> OperatorResult {
258 match self.format {
259 LoadDataFormat::Csv => {
260 if !self.opened {
261 self.open_text()?;
262 }
263 self.next_csv()
264 }
265 LoadDataFormat::Jsonl => {
266 if !self.opened {
267 self.open_text()?;
268 }
269 self.next_jsonl()
270 }
271 LoadDataFormat::Parquet => {
272 #[cfg(feature = "parquet-import")]
273 {
274 if !self.opened {
275 self.open_parquet()?;
276 }
277 self.next_parquet()
278 }
279 #[cfg(not(feature = "parquet-import"))]
280 Err(OperatorError::Execution(
281 "Parquet import not enabled (compile with --features parquet-import)"
282 .to_string(),
283 ))
284 }
285 }
286 }
287
288 fn reset(&mut self) {
289 self.reader = None;
290 self.headers = None;
291 self.opened = false;
292 #[cfg(feature = "parquet-import")]
293 {
294 self.parquet_rows = None;
295 }
296 }
297
298 fn name(&self) -> &'static str {
299 match self.format {
300 LoadDataFormat::Csv => "LoadCsv",
301 LoadDataFormat::Jsonl => "LoadJsonl",
302 LoadDataFormat::Parquet => "LoadParquet",
303 }
304 }
305}
306
307fn strip_file_prefix(path: &str) -> &str {
313 path.strip_prefix("file:///")
314 .or_else(|| path.strip_prefix("file://"))
315 .unwrap_or(path)
316}
317
318fn format_name(format: LoadDataFormat) -> &'static str {
320 match format {
321 LoadDataFormat::Csv => "CSV",
322 LoadDataFormat::Jsonl => "JSONL",
323 LoadDataFormat::Parquet => "Parquet",
324 }
325}
326
327fn build_single_row_chunk(value: Value) -> crate::execution::DataChunk {
329 let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
330 if let Some(col) = builder.column_mut(0) {
331 col.push_value(value);
332 }
333 builder.advance_row();
334 builder.finish()
335}
336
337fn parse_csv_row(line: &str, delimiter: u8) -> Vec<String> {
344 let delim = delimiter as char;
345 let mut fields = Vec::new();
346 let mut chars = line.chars().peekable();
347 let mut field = String::new();
348
349 loop {
350 if chars.peek() == Some(&'"') {
351 chars.next(); loop {
354 match chars.next() {
355 Some('"') => {
356 if chars.peek() == Some(&'"') {
357 chars.next();
359 field.push('"');
360 } else {
361 break;
363 }
364 }
365 Some(c) => field.push(c),
366 None => break, }
368 }
369 match chars.peek() {
371 Some(c) if *c == delim => {
372 chars.next();
373 }
374 _ => {}
375 }
376 fields.push(std::mem::take(&mut field));
377 } else {
378 loop {
380 match chars.peek() {
381 Some(c) if *c == delim => {
382 chars.next();
383 break;
384 }
385 Some(_) => {
386 field.push(chars.next().unwrap());
387 }
388 None => break,
389 }
390 }
391 fields.push(std::mem::take(&mut field));
392 }
393
394 if chars.peek().is_none() {
395 break;
396 }
397 }
398
399 fields
400}
401
402#[cfg(feature = "jsonl-import")]
408fn json_to_value(json: &serde_json::Value) -> Value {
409 match json {
410 serde_json::Value::Null => Value::Null,
411 serde_json::Value::Bool(b) => Value::Bool(*b),
412 serde_json::Value::Number(n) => {
413 if let Some(i) = n.as_i64() {
414 Value::Int64(i)
415 } else if let Some(f) = n.as_f64() {
416 Value::Float64(f)
417 } else {
418 Value::String(ArcStr::from(n.to_string().as_str()))
419 }
420 }
421 serde_json::Value::String(s) => Value::String(ArcStr::from(s.as_str())),
422 serde_json::Value::Array(arr) => {
423 let items: Vec<Value> = arr.iter().map(json_to_value).collect();
424 Value::List(Arc::from(items))
425 }
426 serde_json::Value::Object(obj) => {
427 let mut map = BTreeMap::new();
428 for (key, val) in obj {
429 map.insert(PropertyKey::from(key.as_str()), json_to_value(val));
430 }
431 Value::Map(Arc::new(map))
432 }
433 }
434}
435
436#[cfg(feature = "parquet-import")]
442fn parquet_row_to_value(row: &parquet::record::Row) -> Value {
443 use parquet::record::Field;
444
445 let mut map = BTreeMap::new();
446 for (name, field) in row.get_column_iter() {
447 let value = match field {
448 Field::Null => Value::Null,
449 Field::Bool(b) => Value::Bool(*b),
450 Field::Byte(b) => Value::Int64(i64::from(*b)),
451 Field::Short(s) => Value::Int64(i64::from(*s)),
452 Field::Int(i) => Value::Int64(i64::from(*i)),
453 Field::Long(l) => Value::Int64(*l),
454 Field::UByte(b) => Value::Int64(i64::from(*b)),
455 Field::UShort(s) => Value::Int64(i64::from(*s)),
456 Field::UInt(i) => Value::Int64(i64::from(*i)),
457 Field::ULong(l) => {
458 if let Ok(i) = i64::try_from(*l) {
460 Value::Int64(i)
461 } else {
462 Value::String(ArcStr::from(l.to_string().as_str()))
463 }
464 }
465 Field::Float(f) => Value::Float64(f64::from(*f)),
466 Field::Double(d) => Value::Float64(*d),
467 Field::Str(s) => Value::String(ArcStr::from(s.as_str())),
468 Field::Bytes(b) => Value::Bytes(Arc::from(b.data().to_vec())),
469 Field::Decimal(d) => {
470 Value::Float64(decimal_to_f64(d))
472 }
473 Field::Float16(f) => Value::Float64(f64::from(*f)),
474 Field::Group(row) => parquet_row_to_value(row),
475 Field::ListInternal(list) => {
476 let items: Vec<Value> =
477 list.elements().iter().map(parquet_field_to_value).collect();
478 Value::List(Arc::from(items))
479 }
480 Field::MapInternal(map_internal) => {
481 let mut inner_map = BTreeMap::new();
482 for (key_field, val_field) in map_internal.entries() {
483 let key_str = match key_field {
484 Field::Str(s) => s.clone(),
485 other => format!("{other}"),
486 };
487 inner_map.insert(
488 PropertyKey::from(key_str.as_str()),
489 parquet_field_to_value(val_field),
490 );
491 }
492 Value::Map(Arc::new(inner_map))
493 }
494 Field::TimestampMillis(ms) => Value::Int64(*ms),
495 Field::TimestampMicros(us) => Value::Int64(*us),
496 Field::TimeMillis(ms) => Value::Int64(i64::from(*ms)),
497 Field::TimeMicros(us) => Value::Int64(*us),
498 Field::Date(days) => Value::Int64(i64::from(*days)),
499 };
500 map.insert(PropertyKey::from(name.as_str()), value);
501 }
502 Value::Map(Arc::new(map))
503}
504
505#[cfg(feature = "parquet-import")]
507fn parquet_field_to_value(field: &parquet::record::Field) -> Value {
508 use parquet::record::Field;
509
510 match field {
511 Field::Null => Value::Null,
512 Field::Bool(b) => Value::Bool(*b),
513 Field::Byte(b) => Value::Int64(i64::from(*b)),
514 Field::Short(s) => Value::Int64(i64::from(*s)),
515 Field::Int(i) => Value::Int64(i64::from(*i)),
516 Field::Long(l) => Value::Int64(*l),
517 Field::UByte(b) => Value::Int64(i64::from(*b)),
518 Field::UShort(s) => Value::Int64(i64::from(*s)),
519 Field::UInt(i) => Value::Int64(i64::from(*i)),
520 Field::ULong(l) => {
521 if let Ok(i) = i64::try_from(*l) {
522 Value::Int64(i)
523 } else {
524 Value::String(ArcStr::from(l.to_string().as_str()))
525 }
526 }
527 Field::Float(f) => Value::Float64(f64::from(*f)),
528 Field::Double(d) => Value::Float64(*d),
529 Field::Str(s) => Value::String(ArcStr::from(s.as_str())),
530 Field::Bytes(b) => Value::Bytes(Arc::from(b.data().to_vec())),
531 Field::Decimal(d) => Value::Float64(decimal_to_f64(d)),
532 Field::Float16(f) => Value::Float64(f64::from(*f)),
533 Field::Group(row) => parquet_row_to_value(row),
534 Field::ListInternal(list) => {
535 let items: Vec<Value> = list.elements().iter().map(parquet_field_to_value).collect();
536 Value::List(Arc::from(items))
537 }
538 Field::MapInternal(map_internal) => {
539 let mut inner_map = BTreeMap::new();
540 for (key_field, val_field) in map_internal.entries() {
541 let key_str = match key_field {
542 Field::Str(s) => s.clone(),
543 other => format!("{other}"),
544 };
545 inner_map.insert(
546 PropertyKey::from(key_str.as_str()),
547 parquet_field_to_value(val_field),
548 );
549 }
550 Value::Map(Arc::new(inner_map))
551 }
552 Field::TimestampMillis(ms) => Value::Int64(*ms),
553 Field::TimestampMicros(us) => Value::Int64(*us),
554 Field::TimeMillis(ms) => Value::Int64(i64::from(*ms)),
555 Field::TimeMicros(us) => Value::Int64(*us),
556 Field::Date(days) => Value::Int64(i64::from(*days)),
557 }
558}
559
560#[cfg(feature = "parquet-import")]
562fn decimal_to_f64(d: &parquet::data_type::Decimal) -> f64 {
563 let bytes = d.data();
564 let scale = d.scale();
565 let mut value: i128 = if !bytes.is_empty() && bytes[0] & 0x80 != 0 {
567 -1 } else {
569 0
570 };
571 for &b in bytes {
572 value = (value << 8) | i128::from(b);
573 }
574 value as f64 / 10f64.powi(scale)
575}
576
577#[cfg(test)]
578mod tests {
579 use super::*;
580
581 #[test]
582 fn test_parse_csv_simple() {
583 let fields = parse_csv_row("a,b,c", b',');
584 assert_eq!(fields, vec!["a", "b", "c"]);
585 }
586
587 #[test]
588 fn test_parse_csv_quoted() {
589 let fields = parse_csv_row(r#""hello","world""#, b',');
590 assert_eq!(fields, vec!["hello", "world"]);
591 }
592
593 #[test]
594 fn test_parse_csv_escaped_quotes() {
595 let fields = parse_csv_row(r#""say ""hi""","ok""#, b',');
596 assert_eq!(fields, vec![r#"say "hi""#, "ok"]);
597 }
598
599 #[test]
600 fn test_parse_csv_delimiter_in_quoted() {
601 let fields = parse_csv_row(r#""a,b",c"#, b',');
602 assert_eq!(fields, vec!["a,b", "c"]);
603 }
604
605 #[test]
606 fn test_parse_csv_empty_fields() {
607 let fields = parse_csv_row("a,,c", b',');
608 assert_eq!(fields, vec!["a", "", "c"]);
609 }
610
611 #[test]
612 fn test_parse_csv_tab_delimiter() {
613 let fields = parse_csv_row("a\tb\tc", b'\t');
614 assert_eq!(fields, vec!["a", "b", "c"]);
615 }
616
617 #[test]
618 fn test_parse_csv_single_field() {
619 let fields = parse_csv_row("hello", b',');
620 assert_eq!(fields, vec!["hello"]);
621 }
622
623 #[test]
624 fn test_strip_file_prefix() {
625 assert_eq!(strip_file_prefix("file:///data.csv"), "data.csv");
626 assert_eq!(strip_file_prefix("file://data.csv"), "data.csv");
627 assert_eq!(strip_file_prefix("data.csv"), "data.csv");
628 assert_eq!(strip_file_prefix("/tmp/data.csv"), "/tmp/data.csv");
629 }
630
631 #[test]
632 fn test_format_name() {
633 assert_eq!(format_name(LoadDataFormat::Csv), "CSV");
634 assert_eq!(format_name(LoadDataFormat::Jsonl), "JSONL");
635 assert_eq!(format_name(LoadDataFormat::Parquet), "Parquet");
636 }
637
638 #[cfg(feature = "jsonl-import")]
639 mod jsonl_tests {
640 use super::*;
641
642 #[test]
643 fn test_json_to_value_null() {
644 assert!(matches!(
645 json_to_value(&serde_json::Value::Null),
646 Value::Null
647 ));
648 }
649
650 #[test]
651 fn test_json_to_value_bool() {
652 assert!(matches!(
653 json_to_value(&serde_json::Value::Bool(true)),
654 Value::Bool(true)
655 ));
656 }
657
658 #[test]
659 fn test_json_to_value_integer() {
660 let json: serde_json::Value = serde_json::from_str("42").unwrap();
661 assert!(matches!(json_to_value(&json), Value::Int64(42)));
662 }
663
664 #[test]
665 fn test_json_to_value_float() {
666 let json: serde_json::Value = serde_json::from_str("1.5").unwrap();
667 match json_to_value(&json) {
668 Value::Float64(f) => assert!((f - 1.5_f64).abs() < f64::EPSILON),
669 other => panic!("expected Float64, got {other:?}"),
670 }
671 }
672
673 #[test]
674 fn test_json_to_value_string() {
675 let json: serde_json::Value = serde_json::from_str(r#""hello""#).unwrap();
676 match json_to_value(&json) {
677 Value::String(s) => assert_eq!(s.as_str(), "hello"),
678 other => panic!("expected String, got {other:?}"),
679 }
680 }
681
682 #[test]
683 fn test_json_to_value_array() {
684 let json: serde_json::Value = serde_json::from_str("[1, 2, 3]").unwrap();
685 match json_to_value(&json) {
686 Value::List(items) => {
687 assert_eq!(items.len(), 3);
688 assert!(matches!(items[0], Value::Int64(1)));
689 }
690 other => panic!("expected List, got {other:?}"),
691 }
692 }
693
694 #[test]
695 fn test_json_to_value_object() {
696 let json: serde_json::Value =
697 serde_json::from_str(r#"{"name": "Alix", "age": 30}"#).unwrap();
698 match json_to_value(&json) {
699 Value::Map(map) => {
700 assert_eq!(map.len(), 2);
701 assert!(matches!(
702 map.get(&PropertyKey::from("age")),
703 Some(Value::Int64(30))
704 ));
705 }
706 other => panic!("expected Map, got {other:?}"),
707 }
708 }
709 }
710}