use std::sync::Arc;
use dashmap::DashMap;
use reifydb_core::{
common::CommitVersion,
interface::catalog::id::{SeriesId, TableId},
};
use crate::{
bucket::BucketId,
snapshot::{Snapshot, SnapshotId, SnapshotMeta, SnapshotSource},
};
#[derive(Clone)]
pub struct SnapshotRegistry {
snapshots: Arc<DashMap<SnapshotId, Arc<Snapshot>>>,
latest_table: Arc<DashMap<TableId, CommitVersion>>,
latest_series: Arc<DashMap<SeriesId, BucketId>>,
}
impl SnapshotRegistry {
pub fn new() -> Self {
Self {
snapshots: Arc::new(DashMap::new()),
latest_table: Arc::new(DashMap::new()),
latest_series: Arc::new(DashMap::new()),
}
}
pub fn insert(&self, snapshot: Arc<Snapshot>) {
let id = snapshot.id;
match &snapshot.source {
SnapshotSource::Table {
table_id,
commit_version,
} => {
self.latest_table
.entry(*table_id)
.and_modify(|cv| {
if *commit_version > *cv {
*cv = *commit_version;
}
})
.or_insert(*commit_version);
}
SnapshotSource::Series {
series_id,
bucket,
..
} => {
let bid = bucket.id();
self.latest_series
.entry(*series_id)
.and_modify(|b| {
if bid > *b {
*b = bid;
}
})
.or_insert(bid);
}
}
self.snapshots.insert(id, snapshot);
}
pub fn get(&self, id: &SnapshotId) -> Option<Arc<Snapshot>> {
self.snapshots.get(id).map(|e| Arc::clone(e.value()))
}
pub fn latest_table(&self, table_id: TableId) -> Option<Arc<Snapshot>> {
let cv = *self.latest_table.get(&table_id)?;
self.get(&SnapshotId::Table {
table_id,
commit_version: cv,
})
}
pub fn latest_series_bucket(&self, series_id: SeriesId) -> Option<Arc<Snapshot>> {
let bucket = *self.latest_series.get(&series_id)?;
self.get(&SnapshotId::Series {
series_id,
bucket,
})
}
pub fn series_bucket(&self, series_id: SeriesId, bucket: BucketId) -> Option<Arc<Snapshot>> {
self.get(&SnapshotId::Series {
series_id,
bucket,
})
}
pub fn series_buckets(&self, series_id: SeriesId) -> Vec<BucketId> {
let mut buckets: Vec<BucketId> = self
.snapshots
.iter()
.filter_map(|e| match e.key() {
SnapshotId::Series {
series_id: sid,
bucket,
} if *sid == series_id => Some(*bucket),
_ => None,
})
.collect();
buckets.sort();
buckets
}
pub fn list(&self) -> Vec<SnapshotMeta> {
self.snapshots.iter().map(|e| e.value().meta()).collect()
}
pub fn len(&self) -> usize {
self.snapshots.len()
}
pub fn is_empty(&self) -> bool {
self.snapshots.is_empty()
}
}
impl Default for SnapshotRegistry {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use reifydb_core::interface::catalog::id::{SeriesId, TableId};
use reifydb_runtime::context::clock::Clock;
use super::*;
use crate::{
bucket::Bucket,
snapshot::{ColumnBlock, Snapshot},
};
fn empty_block() -> ColumnBlock {
ColumnBlock::new(Arc::new(vec![]), vec![])
}
fn mktable_snapshot(table_id: TableId, cv: CommitVersion) -> Snapshot {
Snapshot {
id: SnapshotId::Table {
table_id,
commit_version: cv,
},
source: SnapshotSource::Table {
table_id,
commit_version: cv,
},
namespace: "test".into(),
name: "t".into(),
created_at: Clock::Real.instant(),
block: empty_block(),
}
}
fn mkseries_snapshot(series_id: SeriesId, bucket: Bucket, seq: u64) -> Snapshot {
Snapshot {
id: SnapshotId::Series {
series_id,
bucket: bucket.id(),
},
source: SnapshotSource::Series {
series_id,
bucket,
sequence_counter: seq,
},
namespace: "test".into(),
name: "s".into(),
created_at: Clock::Real.instant(),
block: empty_block(),
}
}
#[test]
fn inserts_and_retrieves_table_snapshots() {
let r = SnapshotRegistry::new();
r.insert(Arc::new(mktable_snapshot(TableId(1), CommitVersion(5))));
r.insert(Arc::new(mktable_snapshot(TableId(1), CommitVersion(10))));
assert_eq!(r.len(), 2);
let latest = r.latest_table(TableId(1)).unwrap();
assert!(matches!(
latest.id,
SnapshotId::Table {
commit_version: CommitVersion(10),
..
}
));
}
#[test]
fn latest_table_tracks_highest_commit_version() {
let r = SnapshotRegistry::new();
r.insert(Arc::new(mktable_snapshot(TableId(1), CommitVersion(10))));
r.insert(Arc::new(mktable_snapshot(TableId(1), CommitVersion(5))));
let latest = r.latest_table(TableId(1)).unwrap();
assert!(matches!(
latest.id,
SnapshotId::Table {
commit_version: CommitVersion(10),
..
}
));
}
#[test]
fn series_bucket_replacement_overwrites_atomically() {
let r = SnapshotRegistry::new();
let bucket = Bucket {
start: 0,
end: 100,
width: 100,
};
r.insert(Arc::new(mkseries_snapshot(SeriesId(1), bucket, 5)));
r.insert(Arc::new(mkseries_snapshot(SeriesId(1), bucket, 10)));
assert_eq!(r.len(), 1, "replacement should not grow registry");
let snap = r.series_bucket(SeriesId(1), bucket.id()).unwrap();
let SnapshotSource::Series {
sequence_counter,
..
} = snap.source
else {
panic!();
};
assert_eq!(sequence_counter, 10);
}
#[test]
fn series_buckets_returns_ordered_list() {
let r = SnapshotRegistry::new();
for start in [0u64, 200, 100, 300] {
let bucket = Bucket {
start,
end: start + 100,
width: 100,
};
r.insert(Arc::new(mkseries_snapshot(SeriesId(7), bucket, 1)));
}
let buckets = r.series_buckets(SeriesId(7));
assert_eq!(buckets, vec![BucketId(0), BucketId(100), BucketId(200), BucketId(300)]);
}
#[test]
fn latest_series_bucket_returns_newest() {
let r = SnapshotRegistry::new();
let b1 = Bucket {
start: 0,
end: 100,
width: 100,
};
let b2 = Bucket {
start: 100,
end: 200,
width: 100,
};
r.insert(Arc::new(mkseries_snapshot(SeriesId(9), b1, 1)));
r.insert(Arc::new(mkseries_snapshot(SeriesId(9), b2, 2)));
let latest = r.latest_series_bucket(SeriesId(9)).unwrap();
assert!(matches!(
latest.id,
SnapshotId::Series {
bucket: BucketId(100),
..
}
));
}
}