use reifydb_core::{
common::CommitVersion,
interface::catalog::{
id::{NamespaceId, RingBufferId},
ringbuffer::RingBuffer,
},
};
use crate::materialized::{MaterializedCatalog, MultiVersionRingBuffer};
impl MaterializedCatalog {
pub fn find_ringbuffer_at(&self, ringbuffer: RingBufferId, version: CommitVersion) -> Option<RingBuffer> {
self.ringbuffers.get(&ringbuffer).and_then(|entry| {
let multi = entry.value();
multi.get(version)
})
}
pub fn find_ringbuffer_by_name_at(
&self,
namespace: NamespaceId,
name: &str,
version: CommitVersion,
) -> Option<RingBuffer> {
self.ringbuffers_by_name.get(&(namespace, name.to_string())).and_then(|entry| {
let ringbuffer_id = *entry.value();
self.find_ringbuffer_at(ringbuffer_id, version)
})
}
pub fn find_ringbuffer(&self, ringbuffer: RingBufferId) -> Option<RingBuffer> {
self.ringbuffers.get(&ringbuffer).and_then(|entry| {
let multi = entry.value();
multi.get_latest()
})
}
pub fn list_ringbuffers(&self) -> Vec<RingBuffer> {
self.ringbuffers.iter().filter_map(|entry| entry.value().get_latest()).collect()
}
pub fn find_ringbuffer_by_name(&self, namespace: NamespaceId, name: &str) -> Option<RingBuffer> {
self.ringbuffers_by_name.get(&(namespace, name.to_string())).and_then(|entry| {
let ringbuffer_id = *entry.value();
self.find_ringbuffer(ringbuffer_id)
})
}
pub fn set_ringbuffer(&self, id: RingBufferId, version: CommitVersion, ringbuffer: Option<RingBuffer>) {
if let Some(entry) = self.ringbuffers.get(&id)
&& let Some(pre) = entry.value().get_latest()
{
self.ringbuffers_by_name.remove(&(pre.namespace, pre.name.clone()));
}
let multi = self.ringbuffers.get_or_insert_with(id, MultiVersionRingBuffer::new);
if let Some(new) = ringbuffer {
self.ringbuffers_by_name.insert((new.namespace, new.name.clone()), id);
multi.value().insert(version, new);
} else {
multi.value().remove(version);
}
}
}
#[cfg(test)]
pub mod tests {
use reifydb_core::interface::catalog::{
column::{Column, ColumnIndex},
id::ColumnId,
};
use reifydb_type::value::{constraint::TypeConstraint, r#type::Type};
use super::*;
fn create_test_ringbuffer(id: RingBufferId, namespace: NamespaceId, name: &str) -> RingBuffer {
RingBuffer {
id,
namespace,
name: name.to_string(),
columns: vec![
Column {
id: ColumnId(1),
name: "id".to_string(),
constraint: TypeConstraint::unconstrained(Type::Int4),
properties: vec![],
index: ColumnIndex(0),
auto_increment: true,
dictionary_id: None,
},
Column {
id: ColumnId(2),
name: "data".to_string(),
constraint: TypeConstraint::unconstrained(Type::Utf8),
properties: vec![],
index: ColumnIndex(1),
auto_increment: false,
dictionary_id: None,
},
],
capacity: 1000,
primary_key: None,
partition_by: vec![],
underlying: false,
}
}
#[test]
fn test_set_and_find_ringbuffer() {
let catalog = MaterializedCatalog::new();
let rb_id = RingBufferId(1);
let namespace_id = NamespaceId::SYSTEM;
let ringbuffer = create_test_ringbuffer(rb_id, namespace_id, "test_rb");
catalog.set_ringbuffer(rb_id, CommitVersion(1), Some(ringbuffer.clone()));
let found = catalog.find_ringbuffer_at(rb_id, CommitVersion(1));
assert_eq!(found, Some(ringbuffer.clone()));
let found = catalog.find_ringbuffer_at(rb_id, CommitVersion(5));
assert_eq!(found, Some(ringbuffer));
let found = catalog.find_ringbuffer_at(rb_id, CommitVersion(0));
assert_eq!(found, None);
}
#[test]
fn test_find_ringbuffer_by_name() {
let catalog = MaterializedCatalog::new();
let rb_id = RingBufferId(1);
let namespace_id = NamespaceId::SYSTEM;
let ringbuffer = create_test_ringbuffer(rb_id, namespace_id, "named_rb");
catalog.set_ringbuffer(rb_id, CommitVersion(1), Some(ringbuffer.clone()));
let found = catalog.find_ringbuffer_by_name_at(namespace_id, "named_rb", CommitVersion(1));
assert_eq!(found, Some(ringbuffer));
let found = catalog.find_ringbuffer_by_name_at(namespace_id, "wrong_name", CommitVersion(1));
assert_eq!(found, None);
let found = catalog.find_ringbuffer_by_name_at(NamespaceId::DEFAULT, "named_rb", CommitVersion(1));
assert_eq!(found, None);
}
#[test]
fn test_ringbuffer_rename() {
let catalog = MaterializedCatalog::new();
let rb_id = RingBufferId(1);
let namespace_id = NamespaceId::SYSTEM;
let rb_v1 = create_test_ringbuffer(rb_id, namespace_id, "old_name");
catalog.set_ringbuffer(rb_id, CommitVersion(1), Some(rb_v1.clone()));
assert!(catalog.find_ringbuffer_by_name_at(namespace_id, "old_name", CommitVersion(1)).is_some());
assert!(catalog.find_ringbuffer_by_name_at(namespace_id, "new_name", CommitVersion(1)).is_none());
let mut rb_v2 = rb_v1.clone();
rb_v2.name = "new_name".to_string();
catalog.set_ringbuffer(rb_id, CommitVersion(2), Some(rb_v2.clone()));
assert!(catalog.find_ringbuffer_by_name_at(namespace_id, "old_name", CommitVersion(2)).is_none());
assert_eq!(
catalog.find_ringbuffer_by_name_at(namespace_id, "new_name", CommitVersion(2)),
Some(rb_v2.clone())
);
assert_eq!(catalog.find_ringbuffer_at(rb_id, CommitVersion(1)), Some(rb_v1));
assert_eq!(catalog.find_ringbuffer_at(rb_id, CommitVersion(2)), Some(rb_v2));
}
#[test]
fn test_ringbuffer_deletion() {
let catalog = MaterializedCatalog::new();
let rb_id = RingBufferId(1);
let namespace_id = NamespaceId::SYSTEM;
let ringbuffer = create_test_ringbuffer(rb_id, namespace_id, "deletable_rb");
catalog.set_ringbuffer(rb_id, CommitVersion(1), Some(ringbuffer.clone()));
assert_eq!(catalog.find_ringbuffer_at(rb_id, CommitVersion(1)), Some(ringbuffer.clone()));
assert!(catalog.find_ringbuffer_by_name_at(namespace_id, "deletable_rb", CommitVersion(1)).is_some());
catalog.set_ringbuffer(rb_id, CommitVersion(2), None);
assert_eq!(catalog.find_ringbuffer_at(rb_id, CommitVersion(2)), None);
assert!(catalog.find_ringbuffer_by_name_at(namespace_id, "deletable_rb", CommitVersion(2)).is_none());
assert_eq!(catalog.find_ringbuffer_at(rb_id, CommitVersion(1)), Some(ringbuffer));
}
#[test]
fn test_find_latest_ringbuffer() {
let catalog = MaterializedCatalog::new();
let rb_id = RingBufferId(1);
let namespace_id = NamespaceId::SYSTEM;
assert_eq!(catalog.find_ringbuffer(rb_id), None);
let rb_v1 = create_test_ringbuffer(rb_id, namespace_id, "rb_v1");
let mut rb_v2 = rb_v1.clone();
rb_v2.name = "rb_v2".to_string();
catalog.set_ringbuffer(rb_id, CommitVersion(10), Some(rb_v1));
catalog.set_ringbuffer(rb_id, CommitVersion(20), Some(rb_v2.clone()));
assert_eq!(catalog.find_ringbuffer(rb_id), Some(rb_v2));
}
#[test]
fn test_find_latest_ringbuffer_by_name() {
let catalog = MaterializedCatalog::new();
let namespace_id = NamespaceId::SYSTEM;
let rb_id = RingBufferId(1);
assert_eq!(catalog.find_ringbuffer_by_name(namespace_id, "test_rb"), None);
let rb_v1 = create_test_ringbuffer(rb_id, namespace_id, "test_rb");
let mut rb_v2 = rb_v1.clone();
rb_v2.name = "renamed_rb".to_string();
catalog.set_ringbuffer(rb_id, CommitVersion(10), Some(rb_v1));
catalog.set_ringbuffer(rb_id, CommitVersion(20), Some(rb_v2.clone()));
assert_eq!(catalog.find_ringbuffer_by_name(namespace_id, "test_rb"), None);
assert_eq!(catalog.find_ringbuffer_by_name(namespace_id, "renamed_rb"), Some(rb_v2));
}
}