kermit_ds/
relation.rs

1//! This module defines the `Relation` trait and file reading extensions.
2use {
3    arrow::array::AsArray,
4    csv::Error,
5    kermit_iters::JoinIterable,
6    parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder,
7    std::{fs::File, path::Path},
8};
9
10pub enum ModelType {
11    Positional,
12    Named,
13}
14
15#[derive(Clone, Debug)]
16pub struct RelationHeader {
17    name: String,
18    attrs: Vec<String>,
19    arity: usize,
20}
21
22impl RelationHeader {
23    /// Creates a new `RelationHeader` with the specified name, attributes, and
24    /// arity.
25    pub fn new(name: impl Into<String>, attrs: Vec<String>) -> Self {
26        let arity = attrs.len();
27        RelationHeader {
28            name: name.into(),
29            attrs,
30            arity,
31        }
32    }
33
34    pub fn new_nameless(attrs: Vec<String>) -> Self {
35        let arity = attrs.len();
36        RelationHeader {
37            name: String::new(),
38            attrs,
39            arity,
40        }
41    }
42
43    pub fn new_positional(name: impl Into<String>, arity: usize) -> Self {
44        RelationHeader {
45            name: name.into(),
46            attrs: vec![],
47            arity,
48        }
49    }
50
51    pub fn new_nameless_positional(arity: usize) -> Self {
52        RelationHeader {
53            name: String::new(),
54            attrs: vec![],
55            arity,
56        }
57    }
58
59    pub fn is_nameless(&self) -> bool { self.name.is_empty() }
60
61    pub fn name(&self) -> &str { &self.name }
62
63    pub fn attrs(&self) -> &[String] { &self.attrs }
64
65    pub fn arity(&self) -> usize { self.arity }
66
67    pub fn model_type(&self) -> ModelType {
68        if self.attrs.is_empty() {
69            ModelType::Positional
70        } else {
71            ModelType::Named
72        }
73    }
74}
75
76impl From<usize> for RelationHeader {
77    fn from(value: usize) -> RelationHeader { RelationHeader::new_nameless_positional(value) }
78}
79
80pub trait Projectable {
81    fn project(&self, columns: Vec<usize>) -> Self;
82}
83
84/// The `Relation` trait defines a relational data structure.
85pub trait Relation: JoinIterable + Projectable {
86    fn header(&self) -> &RelationHeader;
87
88    /// Creates a new relation with the specified arity.
89    fn new(header: RelationHeader) -> Self;
90
91    /// Creates a new relation with the specified arity and given tuples.
92    fn from_tuples(header: RelationHeader, tuples: Vec<Vec<usize>>) -> Self;
93
94    /// Inserts a tuple into the relation, returning `true` if successful and
95    /// `false` if otherwise.
96    fn insert(&mut self, tuple: Vec<usize>) -> bool;
97
98    /// Inserts multiple tuples into the relation, returning `true` if
99    /// successful and `false` if otherwise.
100    fn insert_all(&mut self, tuples: Vec<Vec<usize>>) -> bool;
101}
102
103/// Extension trait for `Relation` to add file reading capabilities.
104pub trait RelationFileExt: Relation {
105    /// Creates a new relation from a Parquet file with header.
106    ///
107    /// This method extracts column names from the Parquet schema and the
108    /// relation name from the filename.
109    fn from_parquet<P: AsRef<Path>>(filepath: P) -> Result<Self, Error>
110    where
111        Self: Sized;
112
113    /// Creates a new relation from a CSV file.
114    ///
115    /// # Note
116    /// * Each line represents a tuple, and each value in the line should be
117    ///   parsable into `Relation::KT`.
118    fn from_csv<P: AsRef<Path>>(filepath: P) -> Result<Self, Error>
119    where
120        Self: Sized;
121}
122
123/// Blanket implementation of `RelationFileExt` for any type that
124/// implements `Relation`.
125impl<R> RelationFileExt for R
126where
127    R: Relation,
128{
129    fn from_csv<P: AsRef<Path>>(filepath: P) -> Result<Self, Error> {
130        let path = filepath.as_ref();
131        let file = File::open(path)?;
132
133        let mut rdr = csv::ReaderBuilder::new()
134            .has_headers(true)
135            .delimiter(b',')
136            .double_quote(false)
137            .escape(Some(b'\\'))
138            .flexible(false)
139            .comment(Some(b'#'))
140            .from_reader(file);
141
142        // Extract column names from CSV header
143        let attrs: Vec<String> = rdr.headers()?.iter().map(|s| s.to_string()).collect();
144
145        // Extract relation name from filename (without extension)
146        let relation_name = path
147            .file_stem()
148            .and_then(|s| s.to_str())
149            .unwrap_or("")
150            .to_string();
151
152        // Create header from the CSV header with the extracted name
153        let header = RelationHeader::new(relation_name, attrs);
154
155        let mut tuples = Vec::new();
156        for result in rdr.records() {
157            let record = result?;
158            let mut tuple: Vec<usize> = vec![];
159            for x in record.iter() {
160                if let Ok(y) = x.to_string().parse::<usize>() {
161                    tuple.push(y);
162                }
163            }
164            tuples.push(tuple);
165        }
166        Ok(R::from_tuples(header, tuples))
167    }
168
169    fn from_parquet<P: AsRef<Path>>(filepath: P) -> Result<Self, Error> {
170        let path = filepath.as_ref();
171        let file = File::open(path).map_err(|e| Error::from(std::io::Error::other(e)))?;
172
173        let builder = ParquetRecordBatchReaderBuilder::try_new(file)
174            .map_err(|e| Error::from(std::io::Error::other(e)))?;
175
176        // Extract schema to get column names
177        let schema = builder.schema();
178        let attrs: Vec<String> = schema
179            .fields()
180            .iter()
181            .map(|field| field.name().clone())
182            .collect();
183
184        // Extract relation name from filename (without extension)
185        let relation_name = path
186            .file_stem()
187            .and_then(|s| s.to_str())
188            .unwrap_or("")
189            .to_string();
190
191        // Create header from the parquet schema with the extracted name
192        let header = RelationHeader::new(relation_name, attrs);
193
194        // Build the reader
195        let reader = builder
196            .build()
197            .map_err(|e| Error::from(std::io::Error::other(e)))?;
198
199        // Collect all tuples first for efficient construction
200        let mut tuples = Vec::new();
201
202        // Read all record batches and collect tuples
203        for batch_result in reader {
204            let batch = batch_result.map_err(|e| Error::from(std::io::Error::other(e)))?;
205
206            let num_rows = batch.num_rows();
207            let num_cols = batch.num_columns();
208
209            // Convert columnar data to row format (tuples)
210            for row_idx in 0..num_rows {
211                let mut tuple: Vec<usize> = Vec::with_capacity(num_cols);
212
213                for col_idx in 0..num_cols {
214                    let column = batch.column(col_idx);
215                    let int_array = column.as_primitive::<arrow::datatypes::Int64Type>();
216
217                    if let Ok(value) = usize::try_from(int_array.value(row_idx)) {
218                        tuple.push(value);
219                    } else {
220                        return Err(Error::from(std::io::Error::new(
221                            std::io::ErrorKind::InvalidData,
222                            "Failed to convert parquet value",
223                        )));
224                    }
225                }
226
227                tuples.push(tuple);
228            }
229        }
230
231        // Use from_tuples for efficient construction (sorts before insertion)
232        Ok(R::from_tuples(header, tuples))
233    }
234}