1use {
3 arrow::array::AsArray,
4 csv::Error,
5 kermit_iters::JoinIterable,
6 parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder,
7 std::{fs::File, path::Path},
8};
9
10pub enum ModelType {
11 Positional,
12 Named,
13}
14
15#[derive(Clone, Debug)]
16pub struct RelationHeader {
17 name: String,
18 attrs: Vec<String>,
19 arity: usize,
20}
21
22impl RelationHeader {
23 pub fn new(name: impl Into<String>, attrs: Vec<String>) -> Self {
26 let arity = attrs.len();
27 RelationHeader {
28 name: name.into(),
29 attrs,
30 arity,
31 }
32 }
33
34 pub fn new_nameless(attrs: Vec<String>) -> Self {
35 let arity = attrs.len();
36 RelationHeader {
37 name: String::new(),
38 attrs,
39 arity,
40 }
41 }
42
43 pub fn new_positional(name: impl Into<String>, arity: usize) -> Self {
44 RelationHeader {
45 name: name.into(),
46 attrs: vec![],
47 arity,
48 }
49 }
50
51 pub fn new_nameless_positional(arity: usize) -> Self {
52 RelationHeader {
53 name: String::new(),
54 attrs: vec![],
55 arity,
56 }
57 }
58
59 pub fn is_nameless(&self) -> bool { self.name.is_empty() }
60
61 pub fn name(&self) -> &str { &self.name }
62
63 pub fn attrs(&self) -> &[String] { &self.attrs }
64
65 pub fn arity(&self) -> usize { self.arity }
66
67 pub fn model_type(&self) -> ModelType {
68 if self.attrs.is_empty() {
69 ModelType::Positional
70 } else {
71 ModelType::Named
72 }
73 }
74}
75
76impl From<usize> for RelationHeader {
77 fn from(value: usize) -> RelationHeader { RelationHeader::new_nameless_positional(value) }
78}
79
80pub trait Projectable {
81 fn project(&self, columns: Vec<usize>) -> Self;
82}
83
84pub trait Relation: JoinIterable + Projectable {
86 fn header(&self) -> &RelationHeader;
87
88 fn new(header: RelationHeader) -> Self;
90
91 fn from_tuples(header: RelationHeader, tuples: Vec<Vec<usize>>) -> Self;
93
94 fn insert(&mut self, tuple: Vec<usize>) -> bool;
97
98 fn insert_all(&mut self, tuples: Vec<Vec<usize>>) -> bool;
101}
102
103pub trait RelationFileExt: Relation {
105 fn from_parquet<P: AsRef<Path>>(filepath: P) -> Result<Self, Error>
110 where
111 Self: Sized;
112
113 fn from_csv<P: AsRef<Path>>(filepath: P) -> Result<Self, Error>
119 where
120 Self: Sized;
121}
122
123impl<R> RelationFileExt for R
126where
127 R: Relation,
128{
129 fn from_csv<P: AsRef<Path>>(filepath: P) -> Result<Self, Error> {
130 let path = filepath.as_ref();
131 let file = File::open(path)?;
132
133 let mut rdr = csv::ReaderBuilder::new()
134 .has_headers(true)
135 .delimiter(b',')
136 .double_quote(false)
137 .escape(Some(b'\\'))
138 .flexible(false)
139 .comment(Some(b'#'))
140 .from_reader(file);
141
142 let attrs: Vec<String> = rdr.headers()?.iter().map(|s| s.to_string()).collect();
144
145 let relation_name = path
147 .file_stem()
148 .and_then(|s| s.to_str())
149 .unwrap_or("")
150 .to_string();
151
152 let header = RelationHeader::new(relation_name, attrs);
154
155 let mut tuples = Vec::new();
156 for result in rdr.records() {
157 let record = result?;
158 let mut tuple: Vec<usize> = vec![];
159 for x in record.iter() {
160 if let Ok(y) = x.to_string().parse::<usize>() {
161 tuple.push(y);
162 }
163 }
164 tuples.push(tuple);
165 }
166 Ok(R::from_tuples(header, tuples))
167 }
168
169 fn from_parquet<P: AsRef<Path>>(filepath: P) -> Result<Self, Error> {
170 let path = filepath.as_ref();
171 let file = File::open(path).map_err(|e| Error::from(std::io::Error::other(e)))?;
172
173 let builder = ParquetRecordBatchReaderBuilder::try_new(file)
174 .map_err(|e| Error::from(std::io::Error::other(e)))?;
175
176 let schema = builder.schema();
178 let attrs: Vec<String> = schema
179 .fields()
180 .iter()
181 .map(|field| field.name().clone())
182 .collect();
183
184 let relation_name = path
186 .file_stem()
187 .and_then(|s| s.to_str())
188 .unwrap_or("")
189 .to_string();
190
191 let header = RelationHeader::new(relation_name, attrs);
193
194 let reader = builder
196 .build()
197 .map_err(|e| Error::from(std::io::Error::other(e)))?;
198
199 let mut tuples = Vec::new();
201
202 for batch_result in reader {
204 let batch = batch_result.map_err(|e| Error::from(std::io::Error::other(e)))?;
205
206 let num_rows = batch.num_rows();
207 let num_cols = batch.num_columns();
208
209 for row_idx in 0..num_rows {
211 let mut tuple: Vec<usize> = Vec::with_capacity(num_cols);
212
213 for col_idx in 0..num_cols {
214 let column = batch.column(col_idx);
215 let int_array = column.as_primitive::<arrow::datatypes::Int64Type>();
216
217 if let Ok(value) = usize::try_from(int_array.value(row_idx)) {
218 tuple.push(value);
219 } else {
220 return Err(Error::from(std::io::Error::new(
221 std::io::ErrorKind::InvalidData,
222 "Failed to convert parquet value",
223 )));
224 }
225 }
226
227 tuples.push(tuple);
228 }
229 }
230
231 Ok(R::from_tuples(header, tuples))
233 }
234}