use std::{
collections::{BTreeMap, BTreeSet},
error::Error,
fmt,
};
use crate::traits::FiniteRelation;
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum NaryRelationError {
EmptySchema,
BlankColumnName {
index: usize,
},
DuplicateColumn {
column: String,
},
RowArityMismatch {
expected: usize,
actual: usize,
},
UnknownColumn {
column: String,
},
MissingColumn {
column: String,
},
UnexpectedColumn {
column: String,
},
SchemaMismatch {
left: Vec<String>,
right: Vec<String>,
},
}
impl fmt::Display for NaryRelationError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::EmptySchema => f.write_str("n-ary relation schema cannot be empty"),
Self::BlankColumnName { index } => {
write!(f, "column at index {index} must have a non-blank name")
}
Self::DuplicateColumn { column } => {
write!(f, "column `{column}` appears more than once")
}
Self::RowArityMismatch { expected, actual } => write!(
f,
"row arity mismatch: expected {expected} values but found {actual}"
),
Self::UnknownColumn { column } => {
write!(f, "column `{column}` is not present in the schema")
}
Self::MissingColumn { column } => {
write!(f, "named row is missing required column `{column}`")
}
Self::UnexpectedColumn { column } => {
write!(f, "named row contains unexpected column `{column}`")
}
Self::SchemaMismatch { left, right } => {
write!(f, "schema mismatch: left = {:?}, right = {:?}", left, right)
}
}
}
}
impl Error for NaryRelationError {}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct GroupedRelation<T: Ord> {
key_schema: Vec<String>,
member_schema: Vec<String>,
groups: BTreeMap<Vec<T>, NaryRelation<T>>,
}
impl<T: Ord> GroupedRelation<T> {
#[must_use]
pub fn key_schema(&self) -> &[String] {
&self.key_schema
}
#[must_use]
pub fn member_schema(&self) -> &[String] {
&self.member_schema
}
#[must_use]
pub fn len(&self) -> usize {
self.groups.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.groups.is_empty()
}
#[must_use]
pub fn group(&self, key: &[T]) -> Option<&NaryRelation<T>> {
self.groups.get(key)
}
pub fn iter(&self) -> impl Iterator<Item = (&[T], &NaryRelation<T>)> {
self.groups
.iter()
.map(|(key, group)| (key.as_slice(), group))
}
#[must_use]
pub fn counts(&self) -> Vec<(Vec<T>, usize)>
where
T: Clone,
{
self.groups
.iter()
.map(|(key, group)| (key.clone(), group.len()))
.collect()
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct NaryRelation<T: Ord> {
schema: Vec<String>,
rows: BTreeSet<Vec<T>>,
}
impl<T: Ord> NaryRelation<T> {
pub fn new<I, S>(schema: I) -> Result<Self, NaryRelationError>
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
Ok(Self {
schema: Self::validate_schema(schema)?,
rows: BTreeSet::new(),
})
}
pub fn from_rows<I, S, R, Row>(schema: I, rows: R) -> Result<Self, NaryRelationError>
where
I: IntoIterator<Item = S>,
S: Into<String>,
R: IntoIterator<Item = Row>,
Row: IntoIterator<Item = T>,
{
let schema = Self::validate_schema(schema)?;
let arity = schema.len();
let mut dedup_rows = BTreeSet::new();
for row in rows {
dedup_rows.insert(Self::collect_row(arity, row)?);
}
Ok(Self {
schema,
rows: dedup_rows,
})
}
pub fn from_named_rows<I, S, R, K>(schema: I, rows: R) -> Result<Self, NaryRelationError>
where
I: IntoIterator<Item = S>,
S: Into<String>,
R: IntoIterator<Item = BTreeMap<K, T>>,
K: Into<String> + Ord,
{
let schema = Self::validate_schema(schema)?;
let mut dedup_rows = BTreeSet::new();
for named_row in rows {
dedup_rows.insert(Self::collect_named_row(&schema, named_row)?);
}
Ok(Self {
schema,
rows: dedup_rows,
})
}
#[must_use]
pub fn schema(&self) -> &[String] {
&self.schema
}
#[must_use]
pub fn arity(&self) -> usize {
self.schema.len()
}
pub fn column_index(&self, column: &str) -> Result<usize, NaryRelationError> {
self.schema
.iter()
.position(|candidate| candidate == column)
.ok_or_else(|| NaryRelationError::UnknownColumn {
column: column.to_string(),
})
}
pub fn insert_row<I>(&mut self, row: I) -> Result<bool, NaryRelationError>
where
I: IntoIterator<Item = T>,
{
Ok(self.rows.insert(Self::collect_row(self.arity(), row)?))
}
#[must_use]
pub fn contains_row(&self, row: &[T]) -> bool {
self.rows.contains(row)
}
pub fn iter(&self) -> impl Iterator<Item = &[T]> {
self.rows.iter().map(Vec::as_slice)
}
#[must_use]
pub fn to_rows(&self) -> Vec<Vec<T>>
where
T: Clone,
{
self.rows.iter().cloned().collect()
}
#[must_use]
pub fn to_named_rows(&self) -> Vec<BTreeMap<String, T>>
where
T: Clone,
{
self.rows
.iter()
.map(|row| {
self.schema
.iter()
.cloned()
.zip(row.iter().cloned())
.collect()
})
.collect()
}
pub fn group_by<I, S>(&self, columns: I) -> Result<GroupedRelation<T>, NaryRelationError>
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
T: Clone,
{
let columns: Vec<String> = columns
.into_iter()
.map(|column| column.as_ref().to_string())
.collect();
let key_schema = Self::validate_schema(columns)?;
let mut indices = Vec::with_capacity(key_schema.len());
for column in &key_schema {
indices.push(self.column_index(column)?);
}
let mut raw_groups: BTreeMap<Vec<T>, BTreeSet<Vec<T>>> = BTreeMap::new();
for row in &self.rows {
let key = indices.iter().map(|index| row[*index].clone()).collect();
raw_groups.entry(key).or_default().insert(row.clone());
}
let member_schema = self.schema.clone();
let groups = raw_groups
.into_iter()
.map(|(key, rows)| {
(
key,
Self {
schema: member_schema.clone(),
rows,
},
)
})
.collect();
Ok(GroupedRelation {
key_schema,
member_schema,
groups,
})
}
#[must_use]
pub fn select<F>(&self, mut predicate: F) -> Self
where
F: FnMut(&[T]) -> bool,
T: Clone,
{
let rows = self
.rows
.iter()
.filter(|row| predicate(row))
.cloned()
.collect();
Self {
schema: self.schema.clone(),
rows,
}
}
pub fn project<I, S>(&self, columns: I) -> Result<Self, NaryRelationError>
where
I: IntoIterator<Item = S>,
S: AsRef<str>,
T: Clone,
{
let columns: Vec<String> = columns
.into_iter()
.map(|column| column.as_ref().to_string())
.collect();
let projected_schema = Self::validate_schema(columns)?;
let mut indices = Vec::with_capacity(projected_schema.len());
for column in &projected_schema {
indices.push(self.column_index(column)?);
}
let rows = self
.rows
.iter()
.map(|row| indices.iter().map(|index| row[*index].clone()).collect())
.collect();
Ok(Self {
schema: projected_schema,
rows,
})
}
pub fn rename<S>(&self, from: &str, to: S) -> Result<Self, NaryRelationError>
where
S: Into<String>,
T: Clone,
{
let to = to.into();
let index = self.column_index(from)?;
Self::validate_column_name(&to, index)?;
if from != to && self.schema.iter().any(|column| column == &to) {
return Err(NaryRelationError::DuplicateColumn { column: to });
}
let mut schema = self.schema.clone();
schema[index] = to;
Ok(Self {
schema,
rows: self.rows.clone(),
})
}
pub fn union(&self, other: &Self) -> Result<Self, NaryRelationError>
where
T: Clone,
{
self.ensure_same_schema(other)?;
Ok(Self {
schema: self.schema.clone(),
rows: self.rows.union(&other.rows).cloned().collect(),
})
}
pub fn intersection(&self, other: &Self) -> Result<Self, NaryRelationError>
where
T: Clone,
{
self.ensure_same_schema(other)?;
Ok(Self {
schema: self.schema.clone(),
rows: self.rows.intersection(&other.rows).cloned().collect(),
})
}
pub fn difference(&self, other: &Self) -> Result<Self, NaryRelationError>
where
T: Clone,
{
self.ensure_same_schema(other)?;
Ok(Self {
schema: self.schema.clone(),
rows: self.rows.difference(&other.rows).cloned().collect(),
})
}
pub fn natural_join(&self, other: &Self) -> Self
where
T: Clone,
{
let right_index_by_name: BTreeMap<&str, usize> = other
.schema
.iter()
.enumerate()
.map(|(index, column)| (column.as_str(), index))
.collect();
let mut shared_indices = Vec::new();
let mut shared_right_indices = BTreeSet::new();
for (left_index, column) in self.schema.iter().enumerate() {
if let Some(&right_index) = right_index_by_name.get(column.as_str()) {
shared_indices.push((left_index, right_index));
shared_right_indices.insert(right_index);
}
}
let mut schema = self.schema.clone();
let mut right_only_indices = Vec::new();
for (right_index, column) in other.schema.iter().enumerate() {
if !shared_right_indices.contains(&right_index) {
schema.push(column.clone());
right_only_indices.push(right_index);
}
}
let mut rows = BTreeSet::new();
for left_row in &self.rows {
for right_row in &other.rows {
if shared_indices.iter().all(|(left_index, right_index)| {
left_row[*left_index] == right_row[*right_index]
}) {
let mut joined_row =
Vec::with_capacity(self.arity() + right_only_indices.len());
joined_row.extend(left_row.iter().cloned());
for right_index in &right_only_indices {
joined_row.push(right_row[*right_index].clone());
}
rows.insert(joined_row);
}
}
}
Self { schema, rows }
}
fn validate_schema<I, S>(schema: I) -> Result<Vec<String>, NaryRelationError>
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
let schema: Vec<String> = schema.into_iter().map(Into::into).collect();
if schema.is_empty() {
return Err(NaryRelationError::EmptySchema);
}
let mut seen = BTreeSet::new();
for (index, column) in schema.iter().enumerate() {
Self::validate_column_name(column, index)?;
if !seen.insert(column.clone()) {
return Err(NaryRelationError::DuplicateColumn {
column: column.clone(),
});
}
}
Ok(schema)
}
fn collect_row<I>(arity: usize, row: I) -> Result<Vec<T>, NaryRelationError>
where
I: IntoIterator<Item = T>,
{
let row: Vec<T> = row.into_iter().collect();
if row.len() != arity {
return Err(NaryRelationError::RowArityMismatch {
expected: arity,
actual: row.len(),
});
}
Ok(row)
}
fn collect_named_row<K>(
schema: &[String],
named_row: BTreeMap<K, T>,
) -> Result<Vec<T>, NaryRelationError>
where
K: Into<String> + Ord,
{
let mut normalized_row = BTreeMap::new();
for (column, value) in named_row {
let column = column.into();
if normalized_row.insert(column.clone(), value).is_some() {
return Err(NaryRelationError::DuplicateColumn { column });
}
}
let mut row = Vec::with_capacity(schema.len());
for column in schema {
row.push(normalized_row.remove(column.as_str()).ok_or_else(|| {
NaryRelationError::MissingColumn {
column: column.clone(),
}
})?);
}
if let Some((column, _)) = normalized_row.into_iter().next() {
return Err(NaryRelationError::UnexpectedColumn { column });
}
Ok(row)
}
fn validate_column_name(column: &str, index: usize) -> Result<(), NaryRelationError> {
if column.trim().is_empty() {
Err(NaryRelationError::BlankColumnName { index })
} else {
Ok(())
}
}
fn ensure_same_schema(&self, other: &Self) -> Result<(), NaryRelationError> {
if self.schema == other.schema {
Ok(())
} else {
Err(NaryRelationError::SchemaMismatch {
left: self.schema.clone(),
right: other.schema.clone(),
})
}
}
}
impl<T: Ord> FiniteRelation for NaryRelation<T> {
fn len(&self) -> usize {
self.rows.len()
}
}