pub mod memory;
use std::{collections::Bound, sync};
use memory::MemoryCdcStorage;
use reifydb_core::{
common::CommitVersion,
encoded::key::EncodedKey,
interface::cdc::{Cdc, CdcBatch},
};
use crate::error::CdcError;
pub type CdcStorageResult<T> = Result<T, CdcError>;
#[derive(Debug, Clone)]
pub struct DroppedCdcEntry {
pub key: EncodedKey,
pub value_bytes: u64,
}
#[derive(Debug, Clone, Default)]
pub struct DropBeforeResult {
pub count: usize,
pub entries: Vec<DroppedCdcEntry>,
}
pub trait CdcStorage: Send + Sync + Clone + 'static {
fn write(&self, cdc: &Cdc) -> CdcStorageResult<()>;
fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>>;
fn read_range(
&self,
start: Bound<CommitVersion>,
end: Bound<CommitVersion>,
batch_size: u64,
) -> CdcStorageResult<CdcBatch>;
fn count(&self, version: CommitVersion) -> CdcStorageResult<usize>;
fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>>;
fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>>;
fn exists(&self, version: CommitVersion) -> CdcStorageResult<bool> {
Ok(self.read(version)?.is_some())
}
fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult>;
fn range(&self, start: Bound<CommitVersion>, end: Bound<CommitVersion>) -> CdcStorageResult<CdcBatch> {
self.read_range(start, end, 1024)
}
fn scan(&self, batch_size: u64) -> CdcStorageResult<CdcBatch> {
self.read_range(Bound::Unbounded, Bound::Unbounded, batch_size)
}
}
impl<T: CdcStorage> CdcStorage for sync::Arc<T> {
fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
(**self).write(cdc)
}
fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
(**self).read(version)
}
fn read_range(
&self,
start: Bound<CommitVersion>,
end: Bound<CommitVersion>,
batch_size: u64,
) -> CdcStorageResult<CdcBatch> {
(**self).read_range(start, end, batch_size)
}
fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
(**self).count(version)
}
fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
(**self).min_version()
}
fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
(**self).max_version()
}
fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
(**self).drop_before(version)
}
}
#[derive(Clone)]
pub enum CdcStore {
Memory(MemoryCdcStorage),
}
impl CdcStore {
pub fn memory() -> Self {
Self::Memory(MemoryCdcStorage::new())
}
pub fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
match self {
Self::Memory(s) => s.write(cdc),
}
}
pub fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
match self {
Self::Memory(s) => s.read(version),
}
}
pub fn read_range(
&self,
start: Bound<CommitVersion>,
end: Bound<CommitVersion>,
batch_size: u64,
) -> CdcStorageResult<CdcBatch> {
match self {
Self::Memory(s) => s.read_range(start, end, batch_size),
}
}
pub fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
match self {
Self::Memory(s) => s.count(version),
}
}
pub fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
match self {
Self::Memory(s) => s.min_version(),
}
}
pub fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
match self {
Self::Memory(s) => s.max_version(),
}
}
pub fn delete_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
match self {
Self::Memory(s) => s.drop_before(version),
}
}
}
impl CdcStorage for CdcStore {
fn write(&self, cdc: &Cdc) -> CdcStorageResult<()> {
CdcStore::write(self, cdc)
}
fn read(&self, version: CommitVersion) -> CdcStorageResult<Option<Cdc>> {
CdcStore::read(self, version)
}
fn read_range(
&self,
start: Bound<CommitVersion>,
end: Bound<CommitVersion>,
batch_size: u64,
) -> CdcStorageResult<CdcBatch> {
CdcStore::read_range(self, start, end, batch_size)
}
fn count(&self, version: CommitVersion) -> CdcStorageResult<usize> {
CdcStore::count(self, version)
}
fn min_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
CdcStore::min_version(self)
}
fn max_version(&self) -> CdcStorageResult<Option<CommitVersion>> {
CdcStore::max_version(self)
}
fn drop_before(&self, version: CommitVersion) -> CdcStorageResult<DropBeforeResult> {
CdcStore::delete_before(self, version)
}
}
#[cfg(test)]
pub mod tests {
use reifydb_core::{
encoded::{key::EncodedKey, row::EncodedRow},
interface::cdc::SystemChange,
};
use reifydb_type::util::cowvec::CowVec;
use super::*;
fn create_test_cdc(version: u64, num_changes: usize) -> Cdc {
let system_changes: Vec<SystemChange> = (0..num_changes)
.map(|i| SystemChange::Insert {
key: EncodedKey::new(vec![i as u8]),
post: EncodedRow(CowVec::new(vec![])),
})
.collect();
Cdc::new(CommitVersion(version), 12345, Vec::new(), system_changes)
}
#[test]
fn test_memory_storage_write_read() {
let storage = MemoryCdcStorage::new();
let cdc = create_test_cdc(1, 3);
storage.write(&cdc).unwrap();
let read_cdc = storage.read(CommitVersion(1)).unwrap();
assert!(read_cdc.is_some());
let read_cdc = read_cdc.unwrap();
assert_eq!(read_cdc.version, CommitVersion(1));
assert_eq!(read_cdc.system_changes.len(), 3);
}
#[test]
fn test_memory_storage_read_nonexistent() {
let storage = MemoryCdcStorage::new();
let result = storage.read(CommitVersion(999)).unwrap();
assert!(result.is_none());
}
#[test]
fn test_memory_storage_range() {
let storage = MemoryCdcStorage::new();
for v in 1..=10 {
storage.write(&create_test_cdc(v, 1)).unwrap();
}
let batch = storage
.read_range(Bound::Included(CommitVersion(3)), Bound::Included(CommitVersion(7)), 100)
.unwrap();
assert_eq!(batch.items.len(), 5);
assert!(!batch.has_more);
assert_eq!(batch.items[0].version, CommitVersion(3));
assert_eq!(batch.items[4].version, CommitVersion(7));
}
#[test]
fn test_memory_storage_range_batch_size() {
let storage = MemoryCdcStorage::new();
for v in 1..=10 {
storage.write(&create_test_cdc(v, 1)).unwrap();
}
let batch = storage.read_range(Bound::Unbounded, Bound::Unbounded, 3).unwrap();
assert_eq!(batch.items.len(), 3);
assert!(batch.has_more);
}
#[test]
fn test_memory_storage_count() {
let storage = MemoryCdcStorage::new();
let cdc = create_test_cdc(1, 5);
storage.write(&cdc).unwrap();
assert_eq!(storage.count(CommitVersion(1)).unwrap(), 5);
assert_eq!(storage.count(CommitVersion(2)).unwrap(), 0);
}
#[test]
fn test_memory_storage_min_max_version() {
let storage = MemoryCdcStorage::new();
assert!(storage.min_version().unwrap().is_none());
assert!(storage.max_version().unwrap().is_none());
storage.write(&create_test_cdc(5, 1)).unwrap();
storage.write(&create_test_cdc(3, 1)).unwrap();
storage.write(&create_test_cdc(8, 1)).unwrap();
assert_eq!(storage.min_version().unwrap(), Some(CommitVersion(3)));
assert_eq!(storage.max_version().unwrap(), Some(CommitVersion(8)));
}
#[test]
fn test_delete_before_empty_storage() {
let storage = MemoryCdcStorage::new();
let result = storage.drop_before(CommitVersion(10)).unwrap();
assert_eq!(result.count, 0);
assert!(result.entries.is_empty());
}
#[test]
fn test_delete_before_some_entries() {
let storage = MemoryCdcStorage::new();
for v in [1, 3, 5, 7, 9] {
storage.write(&create_test_cdc(v, 1)).unwrap();
}
let result = storage.drop_before(CommitVersion(5)).unwrap();
assert_eq!(result.count, 2);
assert_eq!(result.entries.len(), 2);
assert!(storage.read(CommitVersion(1)).unwrap().is_none());
assert!(storage.read(CommitVersion(3)).unwrap().is_none());
assert!(storage.read(CommitVersion(5)).unwrap().is_some());
assert!(storage.read(CommitVersion(7)).unwrap().is_some());
assert!(storage.read(CommitVersion(9)).unwrap().is_some());
assert_eq!(storage.min_version().unwrap(), Some(CommitVersion(5)));
}
#[test]
fn test_delete_before_all_entries() {
let storage = MemoryCdcStorage::new();
for v in 1..=3 {
storage.write(&create_test_cdc(v, 1)).unwrap();
}
let result = storage.drop_before(CommitVersion(10)).unwrap();
assert_eq!(result.count, 3);
assert_eq!(result.entries.len(), 3);
assert!(storage.min_version().unwrap().is_none());
assert!(storage.max_version().unwrap().is_none());
}
#[test]
fn test_delete_before_none_when_version_too_low() {
let storage = MemoryCdcStorage::new();
for v in 5..=7 {
storage.write(&create_test_cdc(v, 1)).unwrap();
}
let result = storage.drop_before(CommitVersion(3)).unwrap();
assert_eq!(result.count, 0);
assert!(result.entries.is_empty());
assert_eq!(storage.min_version().unwrap(), Some(CommitVersion(5)));
assert_eq!(storage.max_version().unwrap(), Some(CommitVersion(7)));
}
#[test]
fn test_delete_before_boundary_condition() {
let storage = MemoryCdcStorage::new();
for v in 1..=5 {
storage.write(&create_test_cdc(v, 1)).unwrap();
}
let result = storage.drop_before(CommitVersion(3)).unwrap();
assert_eq!(result.count, 2);
assert_eq!(result.entries.len(), 2);
assert!(storage.read(CommitVersion(3)).unwrap().is_some());
assert_eq!(storage.min_version().unwrap(), Some(CommitVersion(3)));
}
#[test]
fn test_drop_before_returns_entry_stats() {
let storage = MemoryCdcStorage::new();
let cdc = Cdc::new(
CommitVersion(1),
12345,
Vec::new(),
vec![SystemChange::Insert {
key: EncodedKey::new(vec![1, 2, 3]), post: EncodedRow(CowVec::new(vec![10, 20, 30, 40, 50])), }],
);
storage.write(&cdc).unwrap();
let result = storage.drop_before(CommitVersion(2)).unwrap();
assert_eq!(result.count, 1);
assert_eq!(result.entries.len(), 1);
assert_eq!(result.entries[0].key.as_ref(), &[1, 2, 3]);
assert_eq!(result.entries[0].value_bytes, 5);
}
}