kermit_ds/relation.rs
1//! Core relation abstraction: the [`Relation`] trait that every storage
2//! backend implements (see `TreeTrie` and `ColumnTrie` in [`crate::ds`]),
3//! plus the blanket [`RelationFileExt`] for loading from CSV or Parquet.
4//!
5//! The trait exists so join algorithms in `kermit-algos` can be written
6//! generically over different trie layouts without coupling to a specific
7//! representation. All tuple values are `usize` keys — typically
8//! dictionary-encoded IDs from a separate symbol table — so a relation never
9//! stores raw strings or domain values directly.
10use {
11 arrow::array::AsArray,
12 kermit_iters::JoinIterable,
13 parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder,
14 std::{fmt, fs::File, path::Path},
15};
16
17/// Error type for relation file operations (CSV and Parquet).
18#[derive(Debug)]
19pub enum RelationError {
20 /// A CSV library error.
21 Csv(csv::Error),
22 /// A filesystem I/O error.
23 Io(std::io::Error),
24 /// A Parquet library error.
25 Parquet(parquet::errors::ParquetError),
26 /// An Arrow conversion error.
27 Arrow(arrow::error::ArrowError),
28 /// A data value that could not be converted (e.g. non-integer in a CSV).
29 InvalidData(String),
30}
31
32impl fmt::Display for RelationError {
33 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34 match self {
35 | RelationError::Csv(e) => write!(f, "CSV error: {e}"),
36 | RelationError::Io(e) => write!(f, "I/O error: {e}"),
37 | RelationError::Parquet(e) => write!(f, "Parquet error: {e}"),
38 | RelationError::Arrow(e) => write!(f, "Arrow error: {e}"),
39 | RelationError::InvalidData(msg) => write!(f, "Invalid data: {msg}"),
40 }
41 }
42}
43
44impl std::error::Error for RelationError {
45 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
46 match self {
47 | RelationError::Csv(e) => Some(e),
48 | RelationError::Io(e) => Some(e),
49 | RelationError::Parquet(e) => Some(e),
50 | RelationError::Arrow(e) => Some(e),
51 | RelationError::InvalidData(_) => None,
52 }
53 }
54}
55
56impl From<csv::Error> for RelationError {
57 fn from(e: csv::Error) -> Self { RelationError::Csv(e) }
58}
59
60impl From<std::io::Error> for RelationError {
61 fn from(e: std::io::Error) -> Self { RelationError::Io(e) }
62}
63
64impl From<parquet::errors::ParquetError> for RelationError {
65 fn from(e: parquet::errors::ParquetError) -> Self { RelationError::Parquet(e) }
66}
67
68impl From<arrow::error::ArrowError> for RelationError {
69 fn from(e: arrow::error::ArrowError) -> Self { RelationError::Arrow(e) }
70}
71
72/// Whether a relation's attributes are identified by name or by position.
73///
74/// Returned by [`RelationHeader::model_type`]. A header is [`Named`] when it
75/// carries explicit attribute names (e.g. from a CSV/Parquet schema), and
76/// [`Positional`] otherwise — typical for intermediate relations produced
77/// during query evaluation where only arity matters.
78///
79/// [`Named`]: ModelType::Named
80/// [`Positional`]: ModelType::Positional
81pub enum ModelType {
82 /// Attributes are accessed by column index only; attribute names are
83 /// absent.
84 Positional,
85 /// Attributes have explicit string names, typically sourced from a file
86 /// header or schema.
87 Named,
88}
89
90/// Metadata for a relation: its name, attribute names, and arity.
91///
92/// A header is **named** when `attrs` is non-empty (then `arity ==
93/// attrs.len()`) and **positional** when `attrs` is empty (then `arity` is
94/// the only authoritative column count). Orthogonally, a header is
95/// **nameless** when its `name` is empty — used for intermediate or
96/// projected relations whose origin no longer matters.
97#[derive(Clone, Debug)]
98pub struct RelationHeader {
99 name: String,
100 /// Attribute names. Empty iff this is a positional header.
101 attrs: Vec<String>,
102 /// Number of columns. For named headers this equals `attrs.len()`; for
103 /// positional headers (`attrs.is_empty()`) it is the only authoritative
104 /// column count.
105 arity: usize,
106}
107
108impl RelationHeader {
109 /// Creates a named header with the given attribute names. Arity is
110 /// derived from `attrs.len()`.
111 pub fn new(name: impl Into<String>, attrs: Vec<String>) -> Self {
112 let arity = attrs.len();
113 RelationHeader {
114 name: name.into(),
115 attrs,
116 arity,
117 }
118 }
119
120 /// Creates a nameless header with the given attribute names. Arity is
121 /// inferred from the length of `attrs`.
122 pub fn new_nameless(attrs: Vec<String>) -> Self {
123 let arity = attrs.len();
124 RelationHeader {
125 name: String::new(),
126 attrs,
127 arity,
128 }
129 }
130
131 /// Creates a named header with positional (unnamed) attributes.
132 pub fn new_positional(name: impl Into<String>, arity: usize) -> Self {
133 RelationHeader {
134 name: name.into(),
135 attrs: vec![],
136 arity,
137 }
138 }
139
140 /// Creates a nameless header with positional attributes of the given arity.
141 pub fn new_nameless_positional(arity: usize) -> Self {
142 RelationHeader {
143 name: String::new(),
144 attrs: vec![],
145 arity,
146 }
147 }
148
149 /// Returns `true` if this header has an empty name (i.e. was created via
150 /// one of the `new_nameless*` constructors).
151 pub fn is_nameless(&self) -> bool { self.name.is_empty() }
152
153 /// Returns the relation's name (empty string for nameless headers).
154 pub fn name(&self) -> &str { &self.name }
155
156 /// Returns the attribute names. Empty for positional headers.
157 pub fn attrs(&self) -> &[String] { &self.attrs }
158
159 /// Returns the arity (number of columns) of the relation.
160 pub fn arity(&self) -> usize { self.arity }
161
162 /// Returns [`ModelType::Named`] when attribute names are set, otherwise
163 /// [`ModelType::Positional`].
164 pub fn model_type(&self) -> ModelType {
165 if self.attrs.is_empty() {
166 ModelType::Positional
167 } else {
168 ModelType::Named
169 }
170 }
171}
172
173impl From<usize> for RelationHeader {
174 fn from(value: usize) -> RelationHeader { RelationHeader::new_nameless_positional(value) }
175}
176
177/// A relation that can produce a new relation containing only the specified
178/// columns.
179///
180/// Projection is the π operator from relational algebra: given column indices
181/// `[c₀, c₁, …]` it yields a relation whose `i`-th column is the `cᵢ`-th
182/// column of the source. Duplicate and reordered indices are permitted; the
183/// resulting relation has arity `columns.len()`.
184pub trait Projectable {
185 /// Returns a new relation containing only the columns at the given
186 /// indices, in the order supplied.
187 ///
188 /// # Panics
189 ///
190 /// Panics if any element of `columns` is `>= self.header().arity()`.
191 fn project(&self, columns: Vec<usize>) -> Self;
192}
193
194/// A relational data structure that stores tuples of `usize` keys and can
195/// participate in joins.
196///
197/// Tuple values are `usize` keys (typically dictionary-encoded — see the
198/// module-level docs). The supertraits expose:
199///
200/// - [`JoinIterable`] — produces iterators that the join algorithms in
201/// `kermit-algos` consume. Implementors typically also implement
202/// [`TrieIterable`](kermit_iters::TrieIterable) so the iterator can be driven
203/// hierarchically.
204/// - [`Projectable`] — the relational π operator (column projection).
205pub trait Relation: JoinIterable + Projectable {
206 /// Returns the header describing this relation's name, attributes, and
207 /// arity.
208 fn header(&self) -> &RelationHeader;
209
210 /// Creates an empty relation matching `header`.
211 fn new(header: RelationHeader) -> Self;
212
213 /// Creates a relation populated with `tuples`, matching `header`.
214 /// Implementations may sort or deduplicate during bulk construction;
215 /// prefer this over `new` followed by repeated `insert` calls when all
216 /// tuples are known up front.
217 ///
218 /// # Panics
219 ///
220 /// Panics if any tuple's length does not equal `header.arity()`.
221 fn from_tuples(header: RelationHeader, tuples: Vec<Vec<usize>>) -> Self;
222
223 /// Inserts a tuple. Duplicate tuples are silently absorbed (the relation
224 /// behaves as a set).
225 ///
226 /// # Panics
227 ///
228 /// Panics if `tuple.len() != self.header().arity()`.
229 fn insert(&mut self, tuple: Vec<usize>);
230
231 /// Inserts every tuple in `tuples`. Equivalent to calling
232 /// [`insert`](Self::insert) in a loop; provided so implementations can
233 /// specialise bulk insertion.
234 ///
235 /// # Panics
236 ///
237 /// Panics if any tuple's length does not match the relation's arity.
238 fn insert_all(&mut self, tuples: Vec<Vec<usize>>);
239}
240
241/// Loads a [`Relation`] from a CSV or Parquet file.
242///
243/// Defined as an extension trait (with a blanket impl over every
244/// [`Relation`]) so file-loading is added without bloating the core trait
245/// or requiring each concrete data structure to reimplement it. Anything
246/// that implements [`Relation`] automatically gains
247/// [`from_csv`](Self::from_csv) and [`from_parquet`](Self::from_parquet).
248pub trait RelationFileExt: Relation {
249 /// Creates a new relation from a Parquet file.
250 ///
251 /// Column names are extracted from the Parquet schema and the relation
252 /// name is taken from the file stem. All columns must be `Int64` and
253 /// every value must be non-negative so it fits in `usize`.
254 ///
255 /// # Errors
256 ///
257 /// Returns a [`RelationError`] if any of the following occur:
258 /// - [`RelationError::Io`] — the file cannot be opened.
259 /// - [`RelationError::Parquet`] — the file is not a valid Parquet file or
260 /// the reader cannot be constructed.
261 /// - [`RelationError::Arrow`] — a record batch fails to decode.
262 /// - [`RelationError::InvalidData`] — an `Int64` value cannot be converted
263 /// to `usize` (e.g. it is negative).
264 fn from_parquet<P: AsRef<Path>>(filepath: P) -> Result<Self, RelationError>
265 where
266 Self: Sized;
267
268 /// Creates a new relation from a CSV file.
269 ///
270 /// The first row is treated as a header providing attribute names; each
271 /// subsequent row is one tuple. Every field must parse as a `usize`. The
272 /// relation name is taken from the file stem.
273 ///
274 /// # Errors
275 ///
276 /// Returns a [`RelationError`] if any of the following occur:
277 /// - [`RelationError::Io`] — the file cannot be opened.
278 /// - [`RelationError::Csv`] — the CSV reader cannot parse the header or a
279 /// row (e.g. inconsistent column count).
280 /// - [`RelationError::InvalidData`] — a field cannot be parsed as a
281 /// `usize`; the message identifies the offending row and column.
282 fn from_csv<P: AsRef<Path>>(filepath: P) -> Result<Self, RelationError>
283 where
284 Self: Sized;
285}
286
287/// Blanket implementation of `RelationFileExt` for any type that
288/// implements `Relation`.
289impl<R> RelationFileExt for R
290where
291 R: Relation,
292{
293 fn from_csv<P: AsRef<Path>>(filepath: P) -> Result<Self, RelationError> {
294 let path = filepath.as_ref();
295 let file = File::open(path)?;
296
297 let mut rdr = csv::ReaderBuilder::new()
298 .has_headers(true)
299 .delimiter(b',')
300 .double_quote(false)
301 .escape(Some(b'\\'))
302 .flexible(false)
303 .comment(Some(b'#'))
304 .from_reader(file);
305
306 // Extract column names from CSV header
307 let attrs: Vec<String> = rdr.headers()?.iter().map(|s| s.to_string()).collect();
308
309 // Extract relation name from filename (without extension)
310 let relation_name = path
311 .file_stem()
312 .and_then(|s| s.to_str())
313 .unwrap_or("")
314 .to_string();
315
316 // Create header from the CSV header with the extracted name
317 let header = RelationHeader::new(relation_name, attrs);
318
319 let mut tuples = Vec::new();
320 for (row_idx, result) in rdr.records().enumerate() {
321 let record = result?;
322 let mut tuple: Vec<usize> = Vec::with_capacity(record.len());
323 for (col_idx, field) in record.iter().enumerate() {
324 let value = field.parse::<usize>().map_err(|_| {
325 RelationError::InvalidData(format!(
326 "row {row_idx}, column {col_idx}: cannot parse {:?} as usize",
327 field,
328 ))
329 })?;
330 tuple.push(value);
331 }
332 tuples.push(tuple);
333 }
334 Ok(R::from_tuples(header, tuples))
335 }
336
337 fn from_parquet<P: AsRef<Path>>(filepath: P) -> Result<Self, RelationError> {
338 let path = filepath.as_ref();
339 let file = File::open(path)?;
340
341 let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
342
343 // Extract schema to get column names
344 let schema = builder.schema();
345 let attrs: Vec<String> = schema
346 .fields()
347 .iter()
348 .map(|field| field.name().clone())
349 .collect();
350
351 // Extract relation name from filename (without extension)
352 let relation_name = path
353 .file_stem()
354 .and_then(|s| s.to_str())
355 .unwrap_or("")
356 .to_string();
357
358 // Create header from the parquet schema with the extracted name
359 let header = RelationHeader::new(relation_name, attrs);
360
361 // Build the reader
362 let reader = builder.build()?;
363
364 // Collect all tuples first for efficient construction
365 let mut tuples = Vec::new();
366
367 // Read all record batches and collect tuples
368 for batch_result in reader {
369 let batch = batch_result?;
370
371 let num_rows = batch.num_rows();
372 let num_cols = batch.num_columns();
373
374 // Convert columnar data to row format (tuples)
375 for row_idx in 0..num_rows {
376 let mut tuple: Vec<usize> = Vec::with_capacity(num_cols);
377
378 for col_idx in 0..num_cols {
379 let column = batch.column(col_idx);
380 let int_array = column.as_primitive::<arrow::datatypes::Int64Type>();
381
382 if let Ok(value) = usize::try_from(int_array.value(row_idx)) {
383 tuple.push(value);
384 } else {
385 return Err(RelationError::InvalidData(
386 "failed to convert Parquet value to usize".into(),
387 ));
388 }
389 }
390
391 tuples.push(tuple);
392 }
393 }
394
395 // Use from_tuples for efficient construction (sorts before insertion)
396 Ok(R::from_tuples(header, tuples))
397 }
398}
399
400#[cfg(test)]
401mod tests {
402 use super::*;
403
404 // ── RelationError Display ──────────────────────────────────────────
405
406 #[test]
407 fn relation_error_display_csv() {
408 let csv_err = csv::Error::from(std::io::Error::new(
409 std::io::ErrorKind::NotFound,
410 "file not found",
411 ));
412 let err = RelationError::from(csv_err);
413 let msg = err.to_string();
414 assert!(msg.starts_with("CSV error:"), "got: {msg}");
415 }
416
417 #[test]
418 fn relation_error_display_io() {
419 let err = RelationError::from(std::io::Error::new(std::io::ErrorKind::NotFound, "gone"));
420 assert!(err.to_string().starts_with("I/O error:"));
421 }
422
423 #[test]
424 fn relation_error_display_invalid_data() {
425 let err = RelationError::InvalidData("bad value".into());
426 assert_eq!(err.to_string(), "Invalid data: bad value");
427 }
428
429 #[test]
430 fn relation_error_source_delegates() {
431 use std::error::Error;
432
433 let io_err = std::io::Error::other("inner");
434 let err = RelationError::Io(io_err);
435 assert!(err.source().is_some());
436
437 let err = RelationError::InvalidData("no source".into());
438 assert!(err.source().is_none());
439 }
440
441 // ── from_csv error on invalid data ─────────────────────────────────
442
443 #[test]
444 fn from_csv_rejects_non_integer_values() {
445 use crate::ds::TreeTrie;
446
447 let dir = std::env::temp_dir();
448 let path = dir.join("test_csv_bad_value.csv");
449 std::fs::write(&path, "a,b\n1,2\n3,hello\n").unwrap();
450
451 let result = TreeTrie::from_csv(&path);
452 assert!(result.is_err(), "expected error for non-integer CSV value");
453
454 let err = result.unwrap_err();
455 let msg = err.to_string();
456 assert!(
457 msg.contains("hello"),
458 "error should mention the bad value, got: {msg}"
459 );
460 assert!(
461 msg.contains("row 1"),
462 "error should mention the row, got: {msg}"
463 );
464 assert!(
465 msg.contains("column 1"),
466 "error should mention the column, got: {msg}"
467 );
468
469 std::fs::remove_file(path).ok();
470 }
471
472 #[test]
473 fn from_csv_missing_file_returns_error() {
474 use crate::ds::TreeTrie;
475
476 let result = TreeTrie::from_csv("/tmp/nonexistent_kermit_test_file.csv");
477 assert!(result.is_err());
478 assert!(
479 matches!(result.unwrap_err(), RelationError::Io(_)),
480 "expected Io variant for missing file"
481 );
482 }
483
484 // ── from_parquet error paths ───────────────────────────────────────
485
486 #[test]
487 fn from_parquet_missing_file_returns_error() {
488 use crate::ds::TreeTrie;
489
490 let result = TreeTrie::from_parquet("/tmp/nonexistent_kermit_test_file.parquet");
491 assert!(result.is_err());
492 assert!(
493 matches!(result.unwrap_err(), RelationError::Io(_)),
494 "expected Io variant for missing file"
495 );
496 }
497
498 #[test]
499 fn from_parquet_invalid_file_returns_error() {
500 use crate::ds::TreeTrie;
501
502 let dir = std::env::temp_dir();
503 let path = dir.join("test_bad_parquet.parquet");
504 std::fs::write(&path, b"this is not a parquet file").unwrap();
505
506 let result = TreeTrie::from_parquet(&path);
507 assert!(result.is_err());
508 assert!(
509 matches!(result.unwrap_err(), RelationError::Parquet(_)),
510 "expected Parquet variant for corrupt file"
511 );
512
513 std::fs::remove_file(path).ok();
514 }
515}