Skip to main content

kyu_copy/
lib.rs

1//! kyu-copy: COPY FROM readers for CSV, Parquet, Arrow IPC, and Kafka.
2//!
3//! Provides a unified `DataReader` trait and `open_reader()` factory that
4//! auto-detects the source format by URL scheme or file extension.
5
6mod arrow_reader;
7mod csv_reader;
8pub mod kafka_reader;
9mod parquet_reader;
10
11pub use arrow_reader::ArrowIpcReader;
12pub use csv_reader::CsvReader;
13pub use kafka_reader::KafkaReader;
14pub use parquet_reader::ParquetReader;
15
16use kyu_common::{KyuError, KyuResult};
17use kyu_types::{LogicalType, TypedValue};
18
19/// Trait for reading rows from an external data source.
20///
21/// Implementations yield one row at a time as `Vec<TypedValue>`,
22/// matching the target table's column schema.
23pub trait DataReader: Iterator<Item = KyuResult<Vec<TypedValue>>> {
24    /// The expected column types for each row.
25    fn schema(&self) -> &[LogicalType];
26}
27
28/// Open a data reader for the given file path, auto-detecting format by extension.
29///
30/// Supported extensions: `.csv`, `.tsv`, `.parquet`, `.arrow`, `.ipc`.
31pub fn open_reader(path: &str, schema: &[LogicalType]) -> KyuResult<Box<dyn DataReader>> {
32    let lower = path.to_lowercase();
33    if lower.ends_with(".parquet") {
34        Ok(Box::new(ParquetReader::open(path, schema)?))
35    } else if lower.ends_with(".arrow") || lower.ends_with(".ipc") {
36        Ok(Box::new(ArrowIpcReader::open(path, schema)?))
37    } else {
38        // Default to CSV (covers .csv, .tsv, and anything else).
39        Ok(Box::new(CsvReader::open(path, schema)?))
40    }
41}
42
43/// Parse a string field into a TypedValue based on the target LogicalType.
44pub(crate) fn parse_field(field: &str, ty: &LogicalType) -> KyuResult<TypedValue> {
45    if field.is_empty() {
46        return Ok(TypedValue::Null);
47    }
48    match ty {
49        LogicalType::Int8 => field
50            .parse::<i8>()
51            .map(TypedValue::Int8)
52            .map_err(|e| KyuError::Copy(format!("cannot parse '{field}' as INT8: {e}"))),
53        LogicalType::Int16 => field
54            .parse::<i16>()
55            .map(TypedValue::Int16)
56            .map_err(|e| KyuError::Copy(format!("cannot parse '{field}' as INT16: {e}"))),
57        LogicalType::Int32 => field
58            .parse::<i32>()
59            .map(TypedValue::Int32)
60            .map_err(|e| KyuError::Copy(format!("cannot parse '{field}' as INT32: {e}"))),
61        LogicalType::Int64 | LogicalType::Serial => field
62            .parse::<i64>()
63            .map(TypedValue::Int64)
64            .map_err(|e| KyuError::Copy(format!("cannot parse '{field}' as INT64: {e}"))),
65        LogicalType::Float => field
66            .parse::<f32>()
67            .map(TypedValue::Float)
68            .map_err(|e| KyuError::Copy(format!("cannot parse '{field}' as FLOAT: {e}"))),
69        LogicalType::Double => field
70            .parse::<f64>()
71            .map(TypedValue::Double)
72            .map_err(|e| KyuError::Copy(format!("cannot parse '{field}' as DOUBLE: {e}"))),
73        LogicalType::Bool => match field.to_lowercase().as_str() {
74            "true" | "1" | "t" | "yes" => Ok(TypedValue::Bool(true)),
75            "false" | "0" | "f" | "no" => Ok(TypedValue::Bool(false)),
76            _ => Err(KyuError::Copy(format!("cannot parse '{field}' as BOOL"))),
77        },
78        LogicalType::String => Ok(TypedValue::String(smol_str::SmolStr::new(field))),
79        _ => Err(KyuError::Copy(format!(
80            "unsupported type {} for import",
81            ty.type_name()
82        ))),
83    }
84}
85
86#[cfg(test)]
87mod tests {
88    use super::*;
89
90    #[test]
91    fn parse_field_types() {
92        assert_eq!(
93            parse_field("42", &LogicalType::Int64).unwrap(),
94            TypedValue::Int64(42)
95        );
96        assert_eq!(
97            parse_field("3.14", &LogicalType::Double).unwrap(),
98            TypedValue::Double(3.14)
99        );
100        assert_eq!(
101            parse_field("true", &LogicalType::Bool).unwrap(),
102            TypedValue::Bool(true)
103        );
104        assert_eq!(
105            parse_field("hello", &LogicalType::String).unwrap(),
106            TypedValue::String(smol_str::SmolStr::new("hello"))
107        );
108        assert_eq!(
109            parse_field("", &LogicalType::Int64).unwrap(),
110            TypedValue::Null
111        );
112    }
113
114    #[test]
115    fn parse_field_errors() {
116        assert!(parse_field("not_a_number", &LogicalType::Int64).is_err());
117        assert!(parse_field("not_bool", &LogicalType::Bool).is_err());
118    }
119}