use std::collections::{btree_map::Entry, BTreeMap};
use timely::dataflow::Scope;
use crate::VecCollection;
pub struct CteRegistry<G, D, R = isize>
where
G: Scope,
{
collections: BTreeMap<String, VecCollection<G, D, R>>,
}
impl<G, D, R> Default for CteRegistry<G, D, R>
where
G: Scope,
{
fn default() -> Self {
Self {
collections: BTreeMap::new(),
}
}
}
impl<G, D, R> std::fmt::Debug for CteRegistry<G, D, R>
where
G: Scope,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CteRegistry")
.field("names", &self.collections.keys().collect::<Vec<_>>())
.finish()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CteAlreadyRegistered {
pub name: String,
}
impl std::fmt::Display for CteAlreadyRegistered {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "cte '{}' is already registered", self.name)
}
}
impl std::error::Error for CteAlreadyRegistered {}
impl<G, D, R> CteRegistry<G, D, R>
where
G: Scope,
D: Clone + 'static,
R: Clone + 'static,
{
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn register(
&mut self,
name: impl Into<String>,
collection: VecCollection<G, D, R>,
) -> Result<(), CteAlreadyRegistered> {
let name = name.into();
match self.collections.entry(name) {
Entry::Vacant(slot) => {
slot.insert(collection);
Ok(())
}
Entry::Occupied(slot) => Err(CteAlreadyRegistered {
name: slot.key().clone(),
}),
}
}
#[must_use]
pub fn cte_ref(&self, name: &str) -> Option<VecCollection<G, D, R>> {
self.collections.get(name).cloned()
}
pub fn names(&self) -> impl ExactSizeIterator<Item = &str> {
self.collections.keys().map(String::as_str)
}
#[must_use]
pub fn len(&self) -> usize {
self.collections.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.collections.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::CteRegistry;
use crate::input::Input;
#[test]
fn cte_ref_returns_clone_of_registered_collection() {
timely::example(|scope| {
let mut registry = CteRegistry::<_, i32, isize>::new();
let source = scope.new_collection_from(vec![1_i32, 2, 3]).1;
registry
.register("recent_posts", source.clone())
.expect("first register should succeed");
let handle = registry
.cte_ref("recent_posts")
.expect("registered CTE should resolve");
handle.assert_eq(&source);
});
}
#[test]
fn cte_ref_returns_none_for_unknown_names() {
timely::example(|scope| {
let mut registry = CteRegistry::<_, u8, isize>::new();
let collection = scope.new_collection_from(vec![0_u8]).1;
registry.register("known", collection).unwrap();
assert!(registry.cte_ref("missing").is_none());
});
}
#[test]
fn duplicate_registration_returns_error() {
timely::example(|scope| {
let mut registry = CteRegistry::<_, i32, isize>::new();
let collection = scope.new_collection_from(vec![1_i32]).1;
registry.register("x", collection.clone()).unwrap();
let err = registry
.register("x", collection)
.expect_err("duplicate name should error");
assert_eq!(err.name, "x");
});
}
#[test]
fn names_iterates_in_canonical_order() {
timely::example(|scope| {
let mut registry = CteRegistry::<_, i32, isize>::new();
let collection = scope.new_collection_from(vec![1_i32]).1;
registry.register("b", collection.clone()).unwrap();
registry.register("a", collection).unwrap();
let names: Vec<_> = registry.names().collect();
assert_eq!(names, ["a", "b"]);
assert_eq!(registry.len(), 2);
assert!(!registry.is_empty());
});
}
}