partiql_extension_csv/
lib.rs1#![deny(rust_2018_idioms)]
2#![deny(clippy::all)]
3
4use partiql_catalog::call_defs::{CallDef, CallSpec, CallSpecArg};
5use partiql_catalog::catalog::Catalog;
6use partiql_catalog::context::SessionContext;
7use partiql_catalog::extension::{ExtensionError, ExtensionResultError};
8use partiql_catalog::table_fn::{
9 BaseTableExpr, BaseTableExprResult, BaseTableFunctionInfo, TableFunction,
10};
11use partiql_logical as logical;
12use partiql_value::{Tuple, Value};
13use std::borrow::Cow;
14use std::error::Error;
15use std::fmt::Debug;
16use std::fs::File;
17use std::path::PathBuf;
18use thiserror::Error;
19
20#[derive(Error, Debug)]
25#[non_exhaustive]
26pub enum CsvExtensionError {
27 #[error("`scan_csv` function error: `{}`", .0)]
29 FunctionError(String),
30
31 #[error("`scan_csv` io error: `{}`", .0)]
33 IoError(std::io::Error),
34
35 #[error("`scan_csv` io error: `{}`", .0)]
37 CsvReadError(Box<dyn Error>),
38
39 #[error("Data error: `{}`", .0)]
41 DataError(ExtensionError),
42
43 #[error("csv read error: unknown error")]
45 Unknown,
46}
47
48pub type CsvTableExprResult<'a> = Result<CsvTableExprResultValueIter<'a>, CsvExtensionError>;
49
50pub type CsvTableExprResultValueIter<'a> =
51 Box<dyn 'a + Iterator<Item = Result<Value, CsvExtensionError>>>;
52
53impl From<std::io::Error> for CsvExtensionError {
54 fn from(e: std::io::Error) -> Self {
55 CsvExtensionError::IoError(e)
56 }
57}
58
59impl From<csv::Error> for CsvExtensionError {
60 fn from(e: csv::Error) -> Self {
61 CsvExtensionError::CsvReadError(Box::new(e))
62 }
63}
64
65impl From<CsvExtensionError> for ExtensionResultError {
66 fn from(value: CsvExtensionError) -> Self {
67 match value {
68 CsvExtensionError::IoError(_) => ExtensionResultError::ReadError(Box::new(value)),
69 CsvExtensionError::Unknown => ExtensionResultError::ReadError(Box::new(value)),
70 CsvExtensionError::FunctionError(_) => ExtensionResultError::LoadError(Box::new(value)),
71 CsvExtensionError::CsvReadError(_) => ExtensionResultError::ReadError(Box::new(value)),
72 CsvExtensionError::DataError(_) => ExtensionResultError::DataError(Box::new(value)),
73 }
74 }
75}
76
77#[derive(Debug)]
78pub struct CsvExtension {}
79
80impl partiql_catalog::extension::Extension for CsvExtension {
81 fn name(&self) -> String {
82 "csv".into()
83 }
84
85 fn load(&self, catalog: &mut dyn Catalog) -> Result<(), ExtensionResultError> {
86 match catalog.add_table_function(TableFunction::new(Box::new(ScanCsvFunction::new()))) {
87 Ok(_) => Ok(()),
88 Err(e) => Err(ExtensionResultError::LoadError(e.into())),
89 }
90 }
91}
92
93#[derive(Debug)]
94pub(crate) struct ScanCsvFunction {
95 call_def: CallDef,
96}
97
98impl ScanCsvFunction {
100 pub fn new() -> Self {
101 ScanCsvFunction {
102 call_def: CallDef {
103 names: vec!["scan_csv"],
104 overloads: vec![CallSpec {
105 input: vec![CallSpecArg::Positional],
106 output: Box::new(|args| {
107 logical::ValueExpr::Call(logical::CallExpr {
108 name: logical::CallName::ByName("scan_csv".to_string()),
109 arguments: args,
110 })
111 }),
112 }],
113 },
114 }
115 }
116}
117
118impl BaseTableFunctionInfo for ScanCsvFunction {
119 fn call_def(&self) -> &CallDef {
120 &self.call_def
121 }
122
123 fn plan_eval(&self) -> Box<dyn BaseTableExpr> {
124 Box::new(EvalFnScanCsv {})
125 }
126}
127
128#[derive(Debug)]
129pub(crate) struct EvalFnScanCsv {}
130
131impl BaseTableExpr for EvalFnScanCsv {
132 fn evaluate<'c>(
133 &self,
134 args: &[Cow<'_, Value>],
135 _ctx: &'c dyn SessionContext,
136 ) -> BaseTableExprResult<'c> {
137 if let Some(arg1) = args.first() {
138 match arg1.as_ref() {
139 Value::String(path) => Ok(Box::new(
140 parse_csv_file(path)?.map(|it| it.map_err(Into::into)),
141 )),
142 _ => {
143 let error = CsvExtensionError::FunctionError(
144 "expected string path argument".to_string(),
145 );
146 Err(ExtensionResultError::ReadError(error.into()))
147 }
148 }
149 } else {
150 let error = CsvExtensionError::FunctionError("expected path argument".to_string());
151 Err(ExtensionResultError::ReadError(error.into()))
152 }
153 }
154}
155
156fn parse_csv_file<'a>(path: &str) -> CsvTableExprResult<'a> {
157 let path = PathBuf::from(path).canonicalize()?;
158 let file = File::open(path)?;
159
160 let mut rdr = csv::Reader::from_reader(file);
161
162 let keys = rdr.headers()?.clone();
163 let data = rdr.into_records();
164
165 let rows = data.map(move |row| {
166 match row {
167 Ok(row) => {
168 let vals = row.iter();
170 Ok(keys.iter().zip(vals).collect::<Tuple>().into())
171 }
172 Err(err) => Err(CsvExtensionError::DataError(err.into())),
173 }
174 });
175 Ok(Box::new(rows))
176}