1mod arrow_reader;
7mod csv_reader;
8pub mod kafka_reader;
9mod parquet_reader;
10
11pub use arrow_reader::ArrowIpcReader;
12pub use csv_reader::CsvReader;
13pub use kafka_reader::KafkaReader;
14pub use parquet_reader::ParquetReader;
15
16use kyu_common::{KyuError, KyuResult};
17use kyu_types::{LogicalType, TypedValue};
18
19pub trait DataReader: Iterator<Item = KyuResult<Vec<TypedValue>>> {
24 fn schema(&self) -> &[LogicalType];
26}
27
28pub fn open_reader(path: &str, schema: &[LogicalType]) -> KyuResult<Box<dyn DataReader>> {
32 let lower = path.to_lowercase();
33 if lower.ends_with(".parquet") {
34 Ok(Box::new(ParquetReader::open(path, schema)?))
35 } else if lower.ends_with(".arrow") || lower.ends_with(".ipc") {
36 Ok(Box::new(ArrowIpcReader::open(path, schema)?))
37 } else {
38 Ok(Box::new(CsvReader::open(path, schema)?))
40 }
41}
42
43pub(crate) fn parse_field(field: &str, ty: &LogicalType) -> KyuResult<TypedValue> {
45 if field.is_empty() {
46 return Ok(TypedValue::Null);
47 }
48 match ty {
49 LogicalType::Int8 => field
50 .parse::<i8>()
51 .map(TypedValue::Int8)
52 .map_err(|e| KyuError::Copy(format!("cannot parse '{field}' as INT8: {e}"))),
53 LogicalType::Int16 => field
54 .parse::<i16>()
55 .map(TypedValue::Int16)
56 .map_err(|e| KyuError::Copy(format!("cannot parse '{field}' as INT16: {e}"))),
57 LogicalType::Int32 => field
58 .parse::<i32>()
59 .map(TypedValue::Int32)
60 .map_err(|e| KyuError::Copy(format!("cannot parse '{field}' as INT32: {e}"))),
61 LogicalType::Int64 | LogicalType::Serial => field
62 .parse::<i64>()
63 .map(TypedValue::Int64)
64 .map_err(|e| KyuError::Copy(format!("cannot parse '{field}' as INT64: {e}"))),
65 LogicalType::Float => field
66 .parse::<f32>()
67 .map(TypedValue::Float)
68 .map_err(|e| KyuError::Copy(format!("cannot parse '{field}' as FLOAT: {e}"))),
69 LogicalType::Double => field
70 .parse::<f64>()
71 .map(TypedValue::Double)
72 .map_err(|e| KyuError::Copy(format!("cannot parse '{field}' as DOUBLE: {e}"))),
73 LogicalType::Bool => match field.to_lowercase().as_str() {
74 "true" | "1" | "t" | "yes" => Ok(TypedValue::Bool(true)),
75 "false" | "0" | "f" | "no" => Ok(TypedValue::Bool(false)),
76 _ => Err(KyuError::Copy(format!("cannot parse '{field}' as BOOL"))),
77 },
78 LogicalType::String => Ok(TypedValue::String(smol_str::SmolStr::new(field))),
79 _ => Err(KyuError::Copy(format!(
80 "unsupported type {} for import",
81 ty.type_name()
82 ))),
83 }
84}
85
86#[cfg(test)]
87mod tests {
88 use super::*;
89
90 #[test]
91 fn parse_field_types() {
92 assert_eq!(
93 parse_field("42", &LogicalType::Int64).unwrap(),
94 TypedValue::Int64(42)
95 );
96 assert_eq!(
97 parse_field("3.14", &LogicalType::Double).unwrap(),
98 TypedValue::Double(3.14)
99 );
100 assert_eq!(
101 parse_field("true", &LogicalType::Bool).unwrap(),
102 TypedValue::Bool(true)
103 );
104 assert_eq!(
105 parse_field("hello", &LogicalType::String).unwrap(),
106 TypedValue::String(smol_str::SmolStr::new("hello"))
107 );
108 assert_eq!(
109 parse_field("", &LogicalType::Int64).unwrap(),
110 TypedValue::Null
111 );
112 }
113
114 #[test]
115 fn parse_field_errors() {
116 assert!(parse_field("not_a_number", &LogicalType::Int64).is_err());
117 assert!(parse_field("not_bool", &LogicalType::Bool).is_err());
118 }
119}