1use std::collections::BTreeMap;
4use std::fs::File;
5use std::io::{BufRead, BufReader};
6use std::sync::Arc;
7
8use super::{Operator, OperatorError, OperatorResult};
9use crate::execution::chunk::DataChunkBuilder;
10use grafeo_common::types::{ArcStr, LogicalType, PropertyKey, Value};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14#[non_exhaustive]
15pub enum LoadDataFormat {
16 Csv,
18 Jsonl,
20 Parquet,
22}
23
24pub struct LoadDataOperator {
31 format: LoadDataFormat,
33 reader: Option<BufReader<File>>,
35 headers: Option<Vec<String>>,
37 with_headers: bool,
39 path: String,
41 delimiter: u8,
43 opened: bool,
45 #[cfg(feature = "parquet-import")]
47 parquet_rows: Option<std::vec::IntoIter<Value>>,
48}
49
50impl LoadDataOperator {
51 pub fn new(
53 path: String,
54 format: LoadDataFormat,
55 with_headers: bool,
56 field_terminator: Option<char>,
57 _variable: String,
58 ) -> Self {
59 let delimiter = field_terminator.map_or(b',', |c| {
60 let mut buf = [0u8; 4];
61 c.encode_utf8(&mut buf);
62 buf[0]
63 });
64
65 Self {
66 format,
67 reader: None,
68 headers: None,
69 with_headers,
70 path,
71 delimiter,
72 opened: false,
73 #[cfg(feature = "parquet-import")]
74 parquet_rows: None,
75 }
76 }
77
78 fn open_text(&mut self) -> Result<(), OperatorError> {
80 let file_path = strip_file_prefix(&self.path);
81
82 let file = File::open(file_path).map_err(|e| {
83 OperatorError::Execution(format!(
84 "Failed to open {} file '{}': {}",
85 format_name(self.format),
86 self.path,
87 e
88 ))
89 })?;
90 let mut reader = BufReader::new(file);
91
92 if self.format == LoadDataFormat::Csv && self.with_headers {
93 let mut header_line = String::new();
94 reader.read_line(&mut header_line).map_err(|e| {
95 OperatorError::Execution(format!("Failed to read CSV headers: {e}"))
96 })?;
97 let header_line = header_line.strip_prefix('\u{feff}').unwrap_or(&header_line);
99 let header_line = header_line.trim_end_matches(['\r', '\n']);
100 self.headers = Some(parse_csv_row(header_line, self.delimiter));
101 }
102
103 self.reader = Some(reader);
104 self.opened = true;
105 Ok(())
106 }
107
108 fn next_csv(&mut self) -> OperatorResult {
110 let reader = self
111 .reader
112 .as_mut()
113 .ok_or_else(|| OperatorError::Execution("CSV reader not initialized".to_string()))?;
114
115 let mut line = String::new();
116 loop {
117 line.clear();
118 let bytes_read = reader
119 .read_line(&mut line)
120 .map_err(|e| OperatorError::Execution(format!("Failed to read CSV line: {e}")))?;
121
122 if bytes_read == 0 {
123 return Ok(None); }
125
126 let trimmed = line.trim_end_matches(['\r', '\n']);
127 if trimmed.is_empty() {
128 continue; }
130
131 let fields = parse_csv_row(trimmed, self.delimiter);
132
133 let row_value = if let Some(headers) = &self.headers {
134 let mut map = BTreeMap::new();
136 for (i, header) in headers.iter().enumerate() {
137 let value = fields.get(i).map_or(Value::Null, |s| {
138 if s.is_empty() {
139 Value::Null
140 } else {
141 Value::String(ArcStr::from(s.as_str()))
142 }
143 });
144 map.insert(PropertyKey::from(header.as_str()), value);
145 }
146 Value::Map(Arc::new(map))
147 } else {
148 let values: Vec<Value> = fields
150 .into_iter()
151 .map(|s| {
152 if s.is_empty() {
153 Value::Null
154 } else {
155 Value::String(ArcStr::from(s.as_str()))
156 }
157 })
158 .collect();
159 Value::List(Arc::from(values))
160 };
161
162 return Ok(Some(build_single_row_chunk(row_value)));
163 }
164 }
165
166 #[cfg(feature = "jsonl-import")]
168 fn next_jsonl(&mut self) -> OperatorResult {
169 let reader = self
170 .reader
171 .as_mut()
172 .ok_or_else(|| OperatorError::Execution("JSONL reader not initialized".to_string()))?;
173
174 let mut line = String::new();
175 loop {
176 line.clear();
177 let bytes_read = reader
178 .read_line(&mut line)
179 .map_err(|e| OperatorError::Execution(format!("Failed to read JSONL line: {e}")))?;
180
181 if bytes_read == 0 {
182 return Ok(None); }
184
185 let trimmed = line.trim();
186 if trimmed.is_empty() {
187 continue; }
189
190 let json_value: serde_json::Value = serde_json::from_str(trimmed)
191 .map_err(|e| OperatorError::Execution(format!("Failed to parse JSON line: {e}")))?;
192
193 let row_value = json_to_value(&json_value);
194 return Ok(Some(build_single_row_chunk(row_value)));
195 }
196 }
197
198 #[cfg(not(feature = "jsonl-import"))]
200 fn next_jsonl(&mut self) -> OperatorResult {
201 Err(OperatorError::Execution(
202 "JSONL import not enabled (compile with --features jsonl-import)".to_string(),
203 ))
204 }
205
206 #[cfg(feature = "parquet-import")]
208 fn open_parquet(&mut self) -> Result<(), OperatorError> {
209 use parquet::file::reader::FileReader;
210
211 let file_path = strip_file_prefix(&self.path);
212 let file = File::open(file_path).map_err(|e| {
213 OperatorError::Execution(format!(
214 "Failed to open Parquet file '{}': {}",
215 self.path, e
216 ))
217 })?;
218
219 let reader = parquet::file::reader::SerializedFileReader::new(file).map_err(|e| {
220 OperatorError::Execution(format!(
221 "Failed to read Parquet file '{}': {}",
222 self.path, e
223 ))
224 })?;
225
226 let row_iter = reader.get_row_iter(None).map_err(|e| {
227 OperatorError::Execution(format!("Failed to create Parquet row iterator: {e}"))
228 })?;
229
230 let mut rows = Vec::new();
231 for row_result in row_iter {
232 let row = row_result.map_err(|e| {
233 OperatorError::Execution(format!("Failed to read Parquet row: {e}"))
234 })?;
235 rows.push(parquet_row_to_value(&row));
236 }
237
238 self.parquet_rows = Some(rows.into_iter());
239 self.opened = true;
240 Ok(())
241 }
242
243 #[cfg(feature = "parquet-import")]
245 fn next_parquet(&mut self) -> OperatorResult {
246 let rows = self.parquet_rows.as_mut().ok_or_else(|| {
247 OperatorError::Execution("Parquet reader not initialized".to_string())
248 })?;
249
250 match rows.next() {
251 Some(row_value) => Ok(Some(build_single_row_chunk(row_value))),
252 None => Ok(None), }
254 }
255}
256
257impl Operator for LoadDataOperator {
258 fn next(&mut self) -> OperatorResult {
259 match self.format {
260 LoadDataFormat::Csv => {
261 if !self.opened {
262 self.open_text()?;
263 }
264 self.next_csv()
265 }
266 LoadDataFormat::Jsonl => {
267 if !self.opened {
268 self.open_text()?;
269 }
270 self.next_jsonl()
271 }
272 LoadDataFormat::Parquet => {
273 #[cfg(feature = "parquet-import")]
274 {
275 if !self.opened {
276 self.open_parquet()?;
277 }
278 self.next_parquet()
279 }
280 #[cfg(not(feature = "parquet-import"))]
281 Err(OperatorError::Execution(
282 "Parquet import not enabled (compile with --features parquet-import)"
283 .to_string(),
284 ))
285 }
286 }
287 }
288
289 fn reset(&mut self) {
290 self.reader = None;
291 self.headers = None;
292 self.opened = false;
293 #[cfg(feature = "parquet-import")]
294 {
295 self.parquet_rows = None;
296 }
297 }
298
299 fn name(&self) -> &'static str {
300 match self.format {
301 LoadDataFormat::Csv => "LoadCsv",
302 LoadDataFormat::Jsonl => "LoadJsonl",
303 LoadDataFormat::Parquet => "LoadParquet",
304 }
305 }
306
307 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
308 self
309 }
310}
311
312fn strip_file_prefix(path: &str) -> &str {
318 path.strip_prefix("file:///")
319 .or_else(|| path.strip_prefix("file://"))
320 .unwrap_or(path)
321}
322
323fn format_name(format: LoadDataFormat) -> &'static str {
325 match format {
326 LoadDataFormat::Csv => "CSV",
327 LoadDataFormat::Jsonl => "JSONL",
328 LoadDataFormat::Parquet => "Parquet",
329 }
330}
331
332fn build_single_row_chunk(value: Value) -> crate::execution::DataChunk {
334 let mut builder = DataChunkBuilder::new(&[LogicalType::Any]);
335 if let Some(col) = builder.column_mut(0) {
336 col.push_value(value);
337 }
338 builder.advance_row();
339 builder.finish()
340}
341
342fn parse_csv_row(line: &str, delimiter: u8) -> Vec<String> {
349 let delim = delimiter as char;
350 let mut fields = Vec::new();
351 let mut chars = line.chars().peekable();
352 let mut field = String::new();
353
354 loop {
355 if chars.peek() == Some(&'"') {
356 chars.next(); loop {
359 match chars.next() {
360 Some('"') => {
361 if chars.peek() == Some(&'"') {
362 chars.next();
364 field.push('"');
365 } else {
366 break;
368 }
369 }
370 Some(c) => field.push(c),
371 None => break, }
373 }
374 match chars.peek() {
376 Some(c) if *c == delim => {
377 chars.next();
378 }
379 _ => {}
380 }
381 fields.push(std::mem::take(&mut field));
382 } else {
383 loop {
385 match chars.peek() {
386 Some(c) if *c == delim => {
387 chars.next();
388 break;
389 }
390 Some(_) => {
391 field.push(chars.next().unwrap());
392 }
393 None => break,
394 }
395 }
396 fields.push(std::mem::take(&mut field));
397 }
398
399 if chars.peek().is_none() {
400 break;
401 }
402 }
403
404 fields
405}
406
407#[cfg(feature = "jsonl-import")]
413fn json_to_value(json: &serde_json::Value) -> Value {
414 match json {
415 serde_json::Value::Null => Value::Null,
416 serde_json::Value::Bool(b) => Value::Bool(*b),
417 serde_json::Value::Number(n) => {
418 if let Some(i) = n.as_i64() {
419 Value::Int64(i)
420 } else if let Some(f) = n.as_f64() {
421 Value::Float64(f)
422 } else {
423 Value::String(ArcStr::from(n.to_string().as_str()))
424 }
425 }
426 serde_json::Value::String(s) => Value::String(ArcStr::from(s.as_str())),
427 serde_json::Value::Array(arr) => {
428 let items: Vec<Value> = arr.iter().map(json_to_value).collect();
429 Value::List(Arc::from(items))
430 }
431 serde_json::Value::Object(obj) => {
432 let mut map = BTreeMap::new();
433 for (key, val) in obj {
434 map.insert(PropertyKey::from(key.as_str()), json_to_value(val));
435 }
436 Value::Map(Arc::new(map))
437 }
438 }
439}
440
441#[cfg(feature = "parquet-import")]
447fn parquet_row_to_value(row: &parquet::record::Row) -> Value {
448 use parquet::record::Field;
449
450 let mut map = BTreeMap::new();
451 for (name, field) in row.get_column_iter() {
452 let value = match field {
453 Field::Null => Value::Null,
454 Field::Bool(b) => Value::Bool(*b),
455 Field::Byte(b) => Value::Int64(i64::from(*b)),
456 Field::Short(s) => Value::Int64(i64::from(*s)),
457 Field::Int(i) => Value::Int64(i64::from(*i)),
458 Field::Long(l) => Value::Int64(*l),
459 Field::UByte(b) => Value::Int64(i64::from(*b)),
460 Field::UShort(s) => Value::Int64(i64::from(*s)),
461 Field::UInt(i) => Value::Int64(i64::from(*i)),
462 Field::ULong(l) => {
463 if let Ok(i) = i64::try_from(*l) {
465 Value::Int64(i)
466 } else {
467 Value::String(ArcStr::from(l.to_string().as_str()))
468 }
469 }
470 Field::Float(f) => Value::Float64(f64::from(*f)),
471 Field::Double(d) => Value::Float64(*d),
472 Field::Str(s) => Value::String(ArcStr::from(s.as_str())),
473 Field::Bytes(b) => Value::Bytes(Arc::from(b.data().to_vec())),
474 Field::Decimal(d) => {
475 Value::Float64(decimal_to_f64(d))
477 }
478 Field::Float16(f) => Value::Float64(f64::from(*f)),
479 Field::Group(row) => parquet_row_to_value(row),
480 Field::ListInternal(list) => {
481 let items: Vec<Value> =
482 list.elements().iter().map(parquet_field_to_value).collect();
483 Value::List(Arc::from(items))
484 }
485 Field::MapInternal(map_internal) => {
486 let mut inner_map = BTreeMap::new();
487 for (key_field, val_field) in map_internal.entries() {
488 let key_str = match key_field {
489 Field::Str(s) => s.clone(),
490 other => format!("{other}"),
491 };
492 inner_map.insert(
493 PropertyKey::from(key_str.as_str()),
494 parquet_field_to_value(val_field),
495 );
496 }
497 Value::Map(Arc::new(inner_map))
498 }
499 Field::TimestampMillis(ms) => Value::Int64(*ms),
500 Field::TimestampMicros(us) => Value::Int64(*us),
501 Field::TimeMillis(ms) => Value::Int64(i64::from(*ms)),
502 Field::TimeMicros(us) => Value::Int64(*us),
503 Field::Date(days) => Value::Int64(i64::from(*days)),
504 };
505 map.insert(PropertyKey::from(name.as_str()), value);
506 }
507 Value::Map(Arc::new(map))
508}
509
510#[cfg(feature = "parquet-import")]
512fn parquet_field_to_value(field: &parquet::record::Field) -> Value {
513 use parquet::record::Field;
514
515 match field {
516 Field::Null => Value::Null,
517 Field::Bool(b) => Value::Bool(*b),
518 Field::Byte(b) => Value::Int64(i64::from(*b)),
519 Field::Short(s) => Value::Int64(i64::from(*s)),
520 Field::Int(i) => Value::Int64(i64::from(*i)),
521 Field::Long(l) => Value::Int64(*l),
522 Field::UByte(b) => Value::Int64(i64::from(*b)),
523 Field::UShort(s) => Value::Int64(i64::from(*s)),
524 Field::UInt(i) => Value::Int64(i64::from(*i)),
525 Field::ULong(l) => {
526 if let Ok(i) = i64::try_from(*l) {
527 Value::Int64(i)
528 } else {
529 Value::String(ArcStr::from(l.to_string().as_str()))
530 }
531 }
532 Field::Float(f) => Value::Float64(f64::from(*f)),
533 Field::Double(d) => Value::Float64(*d),
534 Field::Str(s) => Value::String(ArcStr::from(s.as_str())),
535 Field::Bytes(b) => Value::Bytes(Arc::from(b.data().to_vec())),
536 Field::Decimal(d) => Value::Float64(decimal_to_f64(d)),
537 Field::Float16(f) => Value::Float64(f64::from(*f)),
538 Field::Group(row) => parquet_row_to_value(row),
539 Field::ListInternal(list) => {
540 let items: Vec<Value> = list.elements().iter().map(parquet_field_to_value).collect();
541 Value::List(Arc::from(items))
542 }
543 Field::MapInternal(map_internal) => {
544 let mut inner_map = BTreeMap::new();
545 for (key_field, val_field) in map_internal.entries() {
546 let key_str = match key_field {
547 Field::Str(s) => s.clone(),
548 other => format!("{other}"),
549 };
550 inner_map.insert(
551 PropertyKey::from(key_str.as_str()),
552 parquet_field_to_value(val_field),
553 );
554 }
555 Value::Map(Arc::new(inner_map))
556 }
557 Field::TimestampMillis(ms) => Value::Int64(*ms),
558 Field::TimestampMicros(us) => Value::Int64(*us),
559 Field::TimeMillis(ms) => Value::Int64(i64::from(*ms)),
560 Field::TimeMicros(us) => Value::Int64(*us),
561 Field::Date(days) => Value::Int64(i64::from(*days)),
562 }
563}
564
565#[cfg(feature = "parquet-import")]
567fn decimal_to_f64(d: &parquet::data_type::Decimal) -> f64 {
568 let bytes = d.data();
569 let scale = d.scale();
570 let mut value: i128 = if !bytes.is_empty() && bytes[0] & 0x80 != 0 {
572 -1 } else {
574 0
575 };
576 for &b in bytes {
577 value = (value << 8) | i128::from(b);
578 }
579 value as f64 / 10f64.powi(scale)
580}
581
582#[cfg(test)]
583mod tests {
584 use super::*;
585
586 #[test]
587 fn test_parse_csv_simple() {
588 let fields = parse_csv_row("a,b,c", b',');
589 assert_eq!(fields, vec!["a", "b", "c"]);
590 }
591
592 #[test]
593 fn test_parse_csv_quoted() {
594 let fields = parse_csv_row(r#""hello","world""#, b',');
595 assert_eq!(fields, vec!["hello", "world"]);
596 }
597
598 #[test]
599 fn test_parse_csv_escaped_quotes() {
600 let fields = parse_csv_row(r#""say ""hi""","ok""#, b',');
601 assert_eq!(fields, vec![r#"say "hi""#, "ok"]);
602 }
603
604 #[test]
605 fn test_parse_csv_delimiter_in_quoted() {
606 let fields = parse_csv_row(r#""a,b",c"#, b',');
607 assert_eq!(fields, vec!["a,b", "c"]);
608 }
609
610 #[test]
611 fn test_parse_csv_empty_fields() {
612 let fields = parse_csv_row("a,,c", b',');
613 assert_eq!(fields, vec!["a", "", "c"]);
614 }
615
616 #[test]
617 fn test_parse_csv_tab_delimiter() {
618 let fields = parse_csv_row("a\tb\tc", b'\t');
619 assert_eq!(fields, vec!["a", "b", "c"]);
620 }
621
622 #[test]
623 fn test_parse_csv_single_field() {
624 let fields = parse_csv_row("hello", b',');
625 assert_eq!(fields, vec!["hello"]);
626 }
627
628 #[test]
629 fn test_strip_file_prefix() {
630 assert_eq!(strip_file_prefix("file:///data.csv"), "data.csv");
631 assert_eq!(strip_file_prefix("file://data.csv"), "data.csv");
632 assert_eq!(strip_file_prefix("data.csv"), "data.csv");
633 assert_eq!(strip_file_prefix("/tmp/data.csv"), "/tmp/data.csv");
634 }
635
636 #[test]
637 fn test_format_name() {
638 assert_eq!(format_name(LoadDataFormat::Csv), "CSV");
639 assert_eq!(format_name(LoadDataFormat::Jsonl), "JSONL");
640 assert_eq!(format_name(LoadDataFormat::Parquet), "Parquet");
641 }
642
643 #[test]
644 fn test_load_data_into_any() {
645 let op = LoadDataOperator::new(
646 "test.csv".to_string(),
647 LoadDataFormat::Csv,
648 true,
649 None,
650 "row".to_string(),
651 );
652 let any = Box::new(op).into_any();
653 assert!(any.downcast::<LoadDataOperator>().is_ok());
654 }
655
656 #[cfg(feature = "jsonl-import")]
657 mod jsonl_tests {
658 use super::*;
659
660 #[test]
661 fn test_json_to_value_null() {
662 assert!(matches!(
663 json_to_value(&serde_json::Value::Null),
664 Value::Null
665 ));
666 }
667
668 #[test]
669 fn test_json_to_value_bool() {
670 assert!(matches!(
671 json_to_value(&serde_json::Value::Bool(true)),
672 Value::Bool(true)
673 ));
674 }
675
676 #[test]
677 fn test_json_to_value_integer() {
678 let json: serde_json::Value = serde_json::from_str("42").unwrap();
679 assert!(matches!(json_to_value(&json), Value::Int64(42)));
680 }
681
682 #[test]
683 fn test_json_to_value_float() {
684 let json: serde_json::Value = serde_json::from_str("1.5").unwrap();
685 match json_to_value(&json) {
686 Value::Float64(f) => assert!((f - 1.5_f64).abs() < f64::EPSILON),
687 other => panic!("expected Float64, got {other:?}"),
688 }
689 }
690
691 #[test]
692 fn test_json_to_value_string() {
693 let json: serde_json::Value = serde_json::from_str(r#""hello""#).unwrap();
694 match json_to_value(&json) {
695 Value::String(s) => assert_eq!(s.as_str(), "hello"),
696 other => panic!("expected String, got {other:?}"),
697 }
698 }
699
700 #[test]
701 fn test_json_to_value_array() {
702 let json: serde_json::Value = serde_json::from_str("[1, 2, 3]").unwrap();
703 match json_to_value(&json) {
704 Value::List(items) => {
705 assert_eq!(items.len(), 3);
706 assert!(matches!(items[0], Value::Int64(1)));
707 }
708 other => panic!("expected List, got {other:?}"),
709 }
710 }
711
712 #[test]
713 fn test_json_to_value_object() {
714 let json: serde_json::Value =
715 serde_json::from_str(r#"{"name": "Alix", "age": 30}"#).unwrap();
716 match json_to_value(&json) {
717 Value::Map(map) => {
718 assert_eq!(map.len(), 2);
719 assert!(matches!(
720 map.get(&PropertyKey::from("age")),
721 Some(Value::Int64(30))
722 ));
723 }
724 other => panic!("expected Map, got {other:?}"),
725 }
726 }
727 }
728}