partiql_extension_csv/
lib.rs

1#![deny(rust_2018_idioms)]
2#![deny(clippy::all)]
3
4use partiql_catalog::call_defs::{CallDef, CallSpec, CallSpecArg};
5use partiql_catalog::catalog::Catalog;
6use partiql_catalog::context::SessionContext;
7use partiql_catalog::extension::{ExtensionError, ExtensionResultError};
8use partiql_catalog::table_fn::{
9    BaseTableExpr, BaseTableExprResult, BaseTableFunctionInfo, TableFunction,
10};
11use partiql_logical as logical;
12use partiql_value::{Tuple, Value};
13use std::borrow::Cow;
14use std::error::Error;
15use std::fmt::Debug;
16use std::fs::File;
17use std::path::PathBuf;
18use thiserror::Error;
19
20/// Errors in csv extension.
21///
22/// ### Notes
23/// This is marked `#[non_exhaustive]`, to reserve the right to add more variants in the future.
24#[derive(Error, Debug)]
25#[non_exhaustive]
26pub enum CsvExtensionError {
27    /// Function error.
28    #[error("`scan_csv` function error: `{}`", .0)]
29    FunctionError(String),
30
31    /// Io error.
32    #[error("`scan_csv` io error: `{}`", .0)]
33    IoError(std::io::Error),
34
35    /// CSV read error.
36    #[error("`scan_csv` io error: `{}`", .0)]
37    CsvReadError(Box<dyn Error>),
38
39    /// Data error. Generally this will result in a `MISSING` in place of this data item.
40    #[error("Data error: `{}`", .0)]
41    DataError(ExtensionError),
42
43    /// Any other reading error.
44    #[error("csv read error: unknown error")]
45    Unknown,
46}
47
48pub type CsvTableExprResult<'a> = Result<CsvTableExprResultValueIter<'a>, CsvExtensionError>;
49
50pub type CsvTableExprResultValueIter<'a> =
51    Box<dyn 'a + Iterator<Item = Result<Value, CsvExtensionError>>>;
52
53impl From<std::io::Error> for CsvExtensionError {
54    fn from(e: std::io::Error) -> Self {
55        CsvExtensionError::IoError(e)
56    }
57}
58
59impl From<csv::Error> for CsvExtensionError {
60    fn from(e: csv::Error) -> Self {
61        CsvExtensionError::CsvReadError(Box::new(e))
62    }
63}
64
65impl From<CsvExtensionError> for ExtensionResultError {
66    fn from(value: CsvExtensionError) -> Self {
67        match value {
68            CsvExtensionError::IoError(_) => ExtensionResultError::ReadError(Box::new(value)),
69            CsvExtensionError::Unknown => ExtensionResultError::ReadError(Box::new(value)),
70            CsvExtensionError::FunctionError(_) => ExtensionResultError::LoadError(Box::new(value)),
71            CsvExtensionError::CsvReadError(_) => ExtensionResultError::ReadError(Box::new(value)),
72            CsvExtensionError::DataError(_) => ExtensionResultError::DataError(Box::new(value)),
73        }
74    }
75}
76
77#[derive(Debug)]
78pub struct CsvExtension {}
79
80impl partiql_catalog::extension::Extension for CsvExtension {
81    fn name(&self) -> String {
82        "csv".into()
83    }
84
85    fn load(&self, catalog: &mut dyn Catalog) -> Result<(), ExtensionResultError> {
86        match catalog.add_table_function(TableFunction::new(Box::new(ScanCsvFunction::new()))) {
87            Ok(_) => Ok(()),
88            Err(e) => Err(ExtensionResultError::LoadError(e.into())),
89        }
90    }
91}
92
93#[derive(Debug)]
94pub(crate) struct ScanCsvFunction {
95    call_def: CallDef,
96}
97
98/// `scan_csv` scans csv data lazily, wrapping it into PartiQL Boxed Variants
99impl ScanCsvFunction {
100    pub fn new() -> Self {
101        ScanCsvFunction {
102            call_def: CallDef {
103                names: vec!["scan_csv"],
104                overloads: vec![CallSpec {
105                    input: vec![CallSpecArg::Positional],
106                    output: Box::new(|args| {
107                        logical::ValueExpr::Call(logical::CallExpr {
108                            name: logical::CallName::ByName("scan_csv".to_string()),
109                            arguments: args,
110                        })
111                    }),
112                }],
113            },
114        }
115    }
116}
117
118impl BaseTableFunctionInfo for ScanCsvFunction {
119    fn call_def(&self) -> &CallDef {
120        &self.call_def
121    }
122
123    fn plan_eval(&self) -> Box<dyn BaseTableExpr> {
124        Box::new(EvalFnScanCsv {})
125    }
126}
127
128#[derive(Debug)]
129pub(crate) struct EvalFnScanCsv {}
130
131impl BaseTableExpr for EvalFnScanCsv {
132    fn evaluate<'c>(
133        &self,
134        args: &[Cow<'_, Value>],
135        _ctx: &'c dyn SessionContext,
136    ) -> BaseTableExprResult<'c> {
137        if let Some(arg1) = args.first() {
138            match arg1.as_ref() {
139                Value::String(path) => Ok(Box::new(
140                    parse_csv_file(path)?.map(|it| it.map_err(Into::into)),
141                )),
142                _ => {
143                    let error = CsvExtensionError::FunctionError(
144                        "expected string path argument".to_string(),
145                    );
146                    Err(ExtensionResultError::ReadError(error.into()))
147                }
148            }
149        } else {
150            let error = CsvExtensionError::FunctionError("expected path argument".to_string());
151            Err(ExtensionResultError::ReadError(error.into()))
152        }
153    }
154}
155
156fn parse_csv_file<'a>(path: &str) -> CsvTableExprResult<'a> {
157    let path = PathBuf::from(path).canonicalize()?;
158    let file = File::open(path)?;
159
160    let mut rdr = csv::Reader::from_reader(file);
161
162    let keys = rdr.headers()?.clone();
163    let data = rdr.into_records();
164
165    let rows = data.map(move |row| {
166        match row {
167            Ok(row) => {
168                //
169                let vals = row.iter();
170                Ok(keys.iter().zip(vals).collect::<Tuple>().into())
171            }
172            Err(err) => Err(CsvExtensionError::DataError(err.into())),
173        }
174    });
175    Ok(Box::new(rows))
176}