Skip to main content

kermit_ds/
relation.rs

1//! Core relation abstraction: the [`Relation`] trait that every storage
2//! backend implements (see `TreeTrie` and `ColumnTrie` in [`crate::ds`]),
3//! plus the blanket [`RelationFileExt`] for loading from CSV or Parquet.
4//!
5//! The trait exists so join algorithms in `kermit-algos` can be written
6//! generically over different trie layouts without coupling to a specific
7//! representation. All tuple values are `usize` keys — typically
8//! dictionary-encoded IDs from a separate symbol table — so a relation never
9//! stores raw strings or domain values directly.
10use {
11    arrow::array::AsArray,
12    kermit_iters::JoinIterable,
13    parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder,
14    std::{fmt, fs::File, path::Path},
15};
16
17/// Error type for relation file operations (CSV and Parquet).
18#[derive(Debug)]
19pub enum RelationError {
20    /// A CSV library error.
21    Csv(csv::Error),
22    /// A filesystem I/O error.
23    Io(std::io::Error),
24    /// A Parquet library error.
25    Parquet(parquet::errors::ParquetError),
26    /// An Arrow conversion error.
27    Arrow(arrow::error::ArrowError),
28    /// A data value that could not be converted (e.g. non-integer in a CSV).
29    InvalidData(String),
30}
31
32impl fmt::Display for RelationError {
33    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34        match self {
35            | RelationError::Csv(e) => write!(f, "CSV error: {e}"),
36            | RelationError::Io(e) => write!(f, "I/O error: {e}"),
37            | RelationError::Parquet(e) => write!(f, "Parquet error: {e}"),
38            | RelationError::Arrow(e) => write!(f, "Arrow error: {e}"),
39            | RelationError::InvalidData(msg) => write!(f, "Invalid data: {msg}"),
40        }
41    }
42}
43
44impl std::error::Error for RelationError {
45    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
46        match self {
47            | RelationError::Csv(e) => Some(e),
48            | RelationError::Io(e) => Some(e),
49            | RelationError::Parquet(e) => Some(e),
50            | RelationError::Arrow(e) => Some(e),
51            | RelationError::InvalidData(_) => None,
52        }
53    }
54}
55
56impl From<csv::Error> for RelationError {
57    fn from(e: csv::Error) -> Self { RelationError::Csv(e) }
58}
59
60impl From<std::io::Error> for RelationError {
61    fn from(e: std::io::Error) -> Self { RelationError::Io(e) }
62}
63
64impl From<parquet::errors::ParquetError> for RelationError {
65    fn from(e: parquet::errors::ParquetError) -> Self { RelationError::Parquet(e) }
66}
67
68impl From<arrow::error::ArrowError> for RelationError {
69    fn from(e: arrow::error::ArrowError) -> Self { RelationError::Arrow(e) }
70}
71
72/// Whether a relation's attributes are identified by name or by position.
73///
74/// Returned by [`RelationHeader::model_type`]. A header is [`Named`] when it
75/// carries explicit attribute names (e.g. from a CSV/Parquet schema), and
76/// [`Positional`] otherwise — typical for intermediate relations produced
77/// during query evaluation where only arity matters.
78///
79/// [`Named`]: ModelType::Named
80/// [`Positional`]: ModelType::Positional
81pub enum ModelType {
82    /// Attributes are accessed by column index only; attribute names are
83    /// absent.
84    Positional,
85    /// Attributes have explicit string names, typically sourced from a file
86    /// header or schema.
87    Named,
88}
89
90/// Metadata for a relation: its name, attribute names, and arity.
91///
92/// A header is **named** when `attrs` is non-empty (then `arity ==
93/// attrs.len()`) and **positional** when `attrs` is empty (then `arity` is
94/// the only authoritative column count). Orthogonally, a header is
95/// **nameless** when its `name` is empty — used for intermediate or
96/// projected relations whose origin no longer matters.
97#[derive(Clone, Debug)]
98pub struct RelationHeader {
99    name: String,
100    /// Attribute names. Empty iff this is a positional header.
101    attrs: Vec<String>,
102    /// Number of columns. For named headers this equals `attrs.len()`; for
103    /// positional headers (`attrs.is_empty()`) it is the only authoritative
104    /// column count.
105    arity: usize,
106}
107
108impl RelationHeader {
109    /// Creates a named header with the given attribute names. Arity is
110    /// derived from `attrs.len()`.
111    pub fn new(name: impl Into<String>, attrs: Vec<String>) -> Self {
112        let arity = attrs.len();
113        RelationHeader {
114            name: name.into(),
115            attrs,
116            arity,
117        }
118    }
119
120    /// Creates a nameless header with the given attribute names. Arity is
121    /// inferred from the length of `attrs`.
122    pub fn new_nameless(attrs: Vec<String>) -> Self {
123        let arity = attrs.len();
124        RelationHeader {
125            name: String::new(),
126            attrs,
127            arity,
128        }
129    }
130
131    /// Creates a named header with positional (unnamed) attributes.
132    pub fn new_positional(name: impl Into<String>, arity: usize) -> Self {
133        RelationHeader {
134            name: name.into(),
135            attrs: vec![],
136            arity,
137        }
138    }
139
140    /// Creates a nameless header with positional attributes of the given arity.
141    pub fn new_nameless_positional(arity: usize) -> Self {
142        RelationHeader {
143            name: String::new(),
144            attrs: vec![],
145            arity,
146        }
147    }
148
149    /// Returns `true` if this header has an empty name (i.e. was created via
150    /// one of the `new_nameless*` constructors).
151    pub fn is_nameless(&self) -> bool { self.name.is_empty() }
152
153    /// Returns the relation's name (empty string for nameless headers).
154    pub fn name(&self) -> &str { &self.name }
155
156    /// Returns the attribute names. Empty for positional headers.
157    pub fn attrs(&self) -> &[String] { &self.attrs }
158
159    /// Returns the arity (number of columns) of the relation.
160    pub fn arity(&self) -> usize { self.arity }
161
162    /// Returns [`ModelType::Named`] when attribute names are set, otherwise
163    /// [`ModelType::Positional`].
164    pub fn model_type(&self) -> ModelType {
165        if self.attrs.is_empty() {
166            ModelType::Positional
167        } else {
168            ModelType::Named
169        }
170    }
171}
172
173impl From<usize> for RelationHeader {
174    fn from(value: usize) -> RelationHeader { RelationHeader::new_nameless_positional(value) }
175}
176
177/// A relation that can produce a new relation containing only the specified
178/// columns.
179///
180/// Projection is the π operator from relational algebra: given column indices
181/// `[c₀, c₁, …]` it yields a relation whose `i`-th column is the `cᵢ`-th
182/// column of the source. Duplicate and reordered indices are permitted; the
183/// resulting relation has arity `columns.len()`.
184pub trait Projectable {
185    /// Returns a new relation containing only the columns at the given
186    /// indices, in the order supplied.
187    ///
188    /// # Panics
189    ///
190    /// Panics if any element of `columns` is `>= self.header().arity()`.
191    fn project(&self, columns: Vec<usize>) -> Self;
192}
193
194/// A relational data structure that stores tuples of `usize` keys and can
195/// participate in joins.
196///
197/// Tuple values are `usize` keys (typically dictionary-encoded — see the
198/// module-level docs). The supertraits expose:
199///
200/// - [`JoinIterable`] — produces iterators that the join algorithms in
201///   `kermit-algos` consume. Implementors typically also implement
202///   [`TrieIterable`](kermit_iters::TrieIterable) so the iterator can be driven
203///   hierarchically.
204/// - [`Projectable`] — the relational π operator (column projection).
205pub trait Relation: JoinIterable + Projectable {
206    /// Returns the header describing this relation's name, attributes, and
207    /// arity.
208    fn header(&self) -> &RelationHeader;
209
210    /// Creates an empty relation matching `header`.
211    fn new(header: RelationHeader) -> Self;
212
213    /// Creates a relation populated with `tuples`, matching `header`.
214    /// Implementations may sort or deduplicate during bulk construction;
215    /// prefer this over `new` followed by repeated `insert` calls when all
216    /// tuples are known up front.
217    ///
218    /// # Panics
219    ///
220    /// Panics if any tuple's length does not equal `header.arity()`.
221    fn from_tuples(header: RelationHeader, tuples: Vec<Vec<usize>>) -> Self;
222
223    /// Inserts a tuple. Duplicate tuples are silently absorbed (the relation
224    /// behaves as a set).
225    ///
226    /// # Panics
227    ///
228    /// Panics if `tuple.len() != self.header().arity()`.
229    fn insert(&mut self, tuple: Vec<usize>);
230
231    /// Inserts every tuple in `tuples`. Equivalent to calling
232    /// [`insert`](Self::insert) in a loop; provided so implementations can
233    /// specialise bulk insertion.
234    ///
235    /// # Panics
236    ///
237    /// Panics if any tuple's length does not match the relation's arity.
238    fn insert_all(&mut self, tuples: Vec<Vec<usize>>);
239}
240
241/// Loads a [`Relation`] from a CSV or Parquet file.
242///
243/// Defined as an extension trait (with a blanket impl over every
244/// [`Relation`]) so file-loading is added without bloating the core trait
245/// or requiring each concrete data structure to reimplement it. Anything
246/// that implements [`Relation`] automatically gains
247/// [`from_csv`](Self::from_csv) and [`from_parquet`](Self::from_parquet).
248pub trait RelationFileExt: Relation {
249    /// Creates a new relation from a Parquet file.
250    ///
251    /// Column names are extracted from the Parquet schema and the relation
252    /// name is taken from the file stem. All columns must be `Int64` and
253    /// every value must be non-negative so it fits in `usize`.
254    ///
255    /// # Errors
256    ///
257    /// Returns a [`RelationError`] if any of the following occur:
258    /// - [`RelationError::Io`] — the file cannot be opened.
259    /// - [`RelationError::Parquet`] — the file is not a valid Parquet file or
260    ///   the reader cannot be constructed.
261    /// - [`RelationError::Arrow`] — a record batch fails to decode.
262    /// - [`RelationError::InvalidData`] — an `Int64` value cannot be converted
263    ///   to `usize` (e.g. it is negative).
264    fn from_parquet<P: AsRef<Path>>(filepath: P) -> Result<Self, RelationError>
265    where
266        Self: Sized;
267
268    /// Creates a new relation from a CSV file.
269    ///
270    /// The first row is treated as a header providing attribute names; each
271    /// subsequent row is one tuple. Every field must parse as a `usize`. The
272    /// relation name is taken from the file stem.
273    ///
274    /// # Errors
275    ///
276    /// Returns a [`RelationError`] if any of the following occur:
277    /// - [`RelationError::Io`] — the file cannot be opened.
278    /// - [`RelationError::Csv`] — the CSV reader cannot parse the header or a
279    ///   row (e.g. inconsistent column count).
280    /// - [`RelationError::InvalidData`] — a field cannot be parsed as a
281    ///   `usize`; the message identifies the offending row and column.
282    fn from_csv<P: AsRef<Path>>(filepath: P) -> Result<Self, RelationError>
283    where
284        Self: Sized;
285}
286
287/// Blanket implementation of `RelationFileExt` for any type that
288/// implements `Relation`.
289impl<R> RelationFileExt for R
290where
291    R: Relation,
292{
293    fn from_csv<P: AsRef<Path>>(filepath: P) -> Result<Self, RelationError> {
294        let path = filepath.as_ref();
295        let file = File::open(path)?;
296
297        let mut rdr = csv::ReaderBuilder::new()
298            .has_headers(true)
299            .delimiter(b',')
300            .double_quote(false)
301            .escape(Some(b'\\'))
302            .flexible(false)
303            .comment(Some(b'#'))
304            .from_reader(file);
305
306        // Extract column names from CSV header
307        let attrs: Vec<String> = rdr.headers()?.iter().map(|s| s.to_string()).collect();
308
309        // Extract relation name from filename (without extension)
310        let relation_name = path
311            .file_stem()
312            .and_then(|s| s.to_str())
313            .unwrap_or("")
314            .to_string();
315
316        // Create header from the CSV header with the extracted name
317        let header = RelationHeader::new(relation_name, attrs);
318
319        let mut tuples = Vec::new();
320        for (row_idx, result) in rdr.records().enumerate() {
321            let record = result?;
322            let mut tuple: Vec<usize> = Vec::with_capacity(record.len());
323            for (col_idx, field) in record.iter().enumerate() {
324                let value = field.parse::<usize>().map_err(|_| {
325                    RelationError::InvalidData(format!(
326                        "row {row_idx}, column {col_idx}: cannot parse {:?} as usize",
327                        field,
328                    ))
329                })?;
330                tuple.push(value);
331            }
332            tuples.push(tuple);
333        }
334        Ok(R::from_tuples(header, tuples))
335    }
336
337    fn from_parquet<P: AsRef<Path>>(filepath: P) -> Result<Self, RelationError> {
338        let path = filepath.as_ref();
339        let file = File::open(path)?;
340
341        let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
342
343        // Extract schema to get column names
344        let schema = builder.schema();
345        let attrs: Vec<String> = schema
346            .fields()
347            .iter()
348            .map(|field| field.name().clone())
349            .collect();
350
351        // Extract relation name from filename (without extension)
352        let relation_name = path
353            .file_stem()
354            .and_then(|s| s.to_str())
355            .unwrap_or("")
356            .to_string();
357
358        // Create header from the parquet schema with the extracted name
359        let header = RelationHeader::new(relation_name, attrs);
360
361        // Build the reader
362        let reader = builder.build()?;
363
364        // Collect all tuples first for efficient construction
365        let mut tuples = Vec::new();
366
367        // Read all record batches and collect tuples
368        for batch_result in reader {
369            let batch = batch_result?;
370
371            let num_rows = batch.num_rows();
372            let num_cols = batch.num_columns();
373
374            // Convert columnar data to row format (tuples)
375            for row_idx in 0..num_rows {
376                let mut tuple: Vec<usize> = Vec::with_capacity(num_cols);
377
378                for col_idx in 0..num_cols {
379                    let column = batch.column(col_idx);
380                    let int_array = column.as_primitive::<arrow::datatypes::Int64Type>();
381
382                    if let Ok(value) = usize::try_from(int_array.value(row_idx)) {
383                        tuple.push(value);
384                    } else {
385                        return Err(RelationError::InvalidData(
386                            "failed to convert Parquet value to usize".into(),
387                        ));
388                    }
389                }
390
391                tuples.push(tuple);
392            }
393        }
394
395        // Use from_tuples for efficient construction (sorts before insertion)
396        Ok(R::from_tuples(header, tuples))
397    }
398}
399
400#[cfg(test)]
401mod tests {
402    use super::*;
403
404    // ── RelationError Display ──────────────────────────────────────────
405
406    #[test]
407    fn relation_error_display_csv() {
408        let csv_err = csv::Error::from(std::io::Error::new(
409            std::io::ErrorKind::NotFound,
410            "file not found",
411        ));
412        let err = RelationError::from(csv_err);
413        let msg = err.to_string();
414        assert!(msg.starts_with("CSV error:"), "got: {msg}");
415    }
416
417    #[test]
418    fn relation_error_display_io() {
419        let err = RelationError::from(std::io::Error::new(std::io::ErrorKind::NotFound, "gone"));
420        assert!(err.to_string().starts_with("I/O error:"));
421    }
422
423    #[test]
424    fn relation_error_display_invalid_data() {
425        let err = RelationError::InvalidData("bad value".into());
426        assert_eq!(err.to_string(), "Invalid data: bad value");
427    }
428
429    #[test]
430    fn relation_error_source_delegates() {
431        use std::error::Error;
432
433        let io_err = std::io::Error::other("inner");
434        let err = RelationError::Io(io_err);
435        assert!(err.source().is_some());
436
437        let err = RelationError::InvalidData("no source".into());
438        assert!(err.source().is_none());
439    }
440
441    // ── from_csv error on invalid data ─────────────────────────────────
442
443    #[test]
444    fn from_csv_rejects_non_integer_values() {
445        use crate::ds::TreeTrie;
446
447        let dir = std::env::temp_dir();
448        let path = dir.join("test_csv_bad_value.csv");
449        std::fs::write(&path, "a,b\n1,2\n3,hello\n").unwrap();
450
451        let result = TreeTrie::from_csv(&path);
452        assert!(result.is_err(), "expected error for non-integer CSV value");
453
454        let err = result.unwrap_err();
455        let msg = err.to_string();
456        assert!(
457            msg.contains("hello"),
458            "error should mention the bad value, got: {msg}"
459        );
460        assert!(
461            msg.contains("row 1"),
462            "error should mention the row, got: {msg}"
463        );
464        assert!(
465            msg.contains("column 1"),
466            "error should mention the column, got: {msg}"
467        );
468
469        std::fs::remove_file(path).ok();
470    }
471
472    #[test]
473    fn from_csv_missing_file_returns_error() {
474        use crate::ds::TreeTrie;
475
476        let result = TreeTrie::from_csv("/tmp/nonexistent_kermit_test_file.csv");
477        assert!(result.is_err());
478        assert!(
479            matches!(result.unwrap_err(), RelationError::Io(_)),
480            "expected Io variant for missing file"
481        );
482    }
483
484    // ── from_parquet error paths ───────────────────────────────────────
485
486    #[test]
487    fn from_parquet_missing_file_returns_error() {
488        use crate::ds::TreeTrie;
489
490        let result = TreeTrie::from_parquet("/tmp/nonexistent_kermit_test_file.parquet");
491        assert!(result.is_err());
492        assert!(
493            matches!(result.unwrap_err(), RelationError::Io(_)),
494            "expected Io variant for missing file"
495        );
496    }
497
498    #[test]
499    fn from_parquet_invalid_file_returns_error() {
500        use crate::ds::TreeTrie;
501
502        let dir = std::env::temp_dir();
503        let path = dir.join("test_bad_parquet.parquet");
504        std::fs::write(&path, b"this is not a parquet file").unwrap();
505
506        let result = TreeTrie::from_parquet(&path);
507        assert!(result.is_err());
508        assert!(
509            matches!(result.unwrap_err(), RelationError::Parquet(_)),
510            "expected Parquet variant for corrupt file"
511        );
512
513        std::fs::remove_file(path).ok();
514    }
515}