use crate::catalog::Catalog;
use crate::commit::apply::apply_mutation;
use crate::commit::validation::Mutation;
use crate::error::AedbError;
use crate::snapshot::reader::SnapshotReadView;
use crate::storage::keyspace::{Keyspace, KeyspaceSnapshot};
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;
#[derive(Debug, Clone)]
struct Version {
seq: u64,
keyspace: Option<Arc<KeyspaceSnapshot>>,
catalog: Option<Arc<Catalog>>,
delta: Option<Arc<CommitDelta>>,
created_at: Instant,
ref_count: Arc<AtomicU64>,
}
#[derive(Debug, Clone)]
pub struct VersionStore {
versions: VecDeque<Version>,
max_versions: usize,
full_snapshot_interval_deltas: usize,
deltas_since_full_snapshot: usize,
min_version_age_ms: u64,
}
#[derive(Debug, Clone)]
pub struct CommitDelta {
pub seq: u64,
pub mutations: Vec<Mutation>,
}
#[derive(Debug)]
pub struct ReadViewGuard {
view: SnapshotReadView,
ref_count: Arc<AtomicU64>,
}
impl Drop for ReadViewGuard {
fn drop(&mut self) {
self.ref_count.fetch_sub(1, Ordering::AcqRel);
}
}
impl ReadViewGuard {
pub fn view(&self) -> &SnapshotReadView {
&self.view
}
pub fn into_view(&self) -> SnapshotReadView {
self.view.clone()
}
}
impl VersionStore {
pub fn new(
max_versions: usize,
full_snapshot_interval_deltas: usize,
min_version_age_ms: u64,
) -> Self {
Self {
versions: VecDeque::new(),
max_versions,
full_snapshot_interval_deltas,
deltas_since_full_snapshot: 0,
min_version_age_ms,
}
}
pub fn bootstrap(&mut self, seq: u64, keyspace: KeyspaceSnapshot, catalog: Catalog) {
self.versions.clear();
self.deltas_since_full_snapshot = 0;
self.versions.push_back(Version {
seq,
keyspace: Some(Arc::new(keyspace)),
catalog: Some(Arc::new(catalog)),
delta: None,
created_at: Instant::now(),
ref_count: Arc::new(AtomicU64::new(0)),
});
}
pub fn publish_delta(&mut self, seq: u64, delta: Arc<CommitDelta>) -> bool {
self.versions.push_back(Version {
seq,
keyspace: None,
catalog: None,
delta: Some(delta),
created_at: Instant::now(),
ref_count: Arc::new(AtomicU64::new(0)),
});
self.deltas_since_full_snapshot = self.deltas_since_full_snapshot.saturating_add(1);
self.prune();
self.deltas_since_full_snapshot >= self.full_snapshot_interval_deltas
}
pub fn publish_full(&mut self, seq: u64, keyspace: KeyspaceSnapshot, catalog: Catalog) {
if let Some(existing) = self.versions.iter_mut().find(|v| v.seq == seq) {
existing.keyspace = Some(Arc::new(keyspace));
existing.catalog = Some(Arc::new(catalog));
self.deltas_since_full_snapshot = 0;
return;
}
self.versions.push_back(Version {
seq,
keyspace: Some(Arc::new(keyspace)),
catalog: Some(Arc::new(catalog)),
delta: None,
created_at: Instant::now(),
ref_count: Arc::new(AtomicU64::new(0)),
});
self.deltas_since_full_snapshot = 0;
self.prune();
}
pub fn gc(&mut self) {
self.prune();
}
pub fn acquire_latest(&mut self) -> Result<ReadViewGuard, AedbError> {
let Some(last_idx) = self.versions.len().checked_sub(1) else {
return Err(AedbError::Validation("version store is empty".into()));
};
self.materialize_at_index(last_idx)?;
let version = self
.versions
.get(last_idx)
.ok_or_else(|| AedbError::Validation("version store is empty".into()))?;
Ok(acquire(version))
}
pub fn acquire_at_seq(&mut self, seq: u64) -> Result<ReadViewGuard, AedbError> {
let Some(target_idx) = self.versions.iter().position(|v| v.seq == seq) else {
let oldest = self.versions.front().map(|v| v.seq).unwrap_or(0);
let newest = self.versions.back().map(|v| v.seq).unwrap_or(0);
if seq < oldest {
return Err(AedbError::Validation(format!(
"requested seq {seq} has been garbage collected (oldest retained seq: {oldest})"
)));
}
if seq > newest {
return Err(AedbError::Validation(format!(
"requested seq {seq} is not yet visible (latest visible seq: {newest})"
)));
}
return Err(AedbError::Validation(format!(
"requested seq {seq} not found in version store"
)));
};
self.materialize_at_index(target_idx)?;
let version = self
.versions
.get(target_idx)
.ok_or_else(|| AedbError::Validation("requested seq vanished".into()))?;
Ok(acquire(version))
}
pub fn oldest_seq(&self) -> u64 {
self.versions.front().map(|v| v.seq).unwrap_or(0)
}
pub fn deltas_since(
&self,
from_seq_exclusive: u64,
to_seq_inclusive: u64,
) -> Option<Vec<Arc<CommitDelta>>> {
if from_seq_exclusive >= to_seq_inclusive {
return Some(Vec::new());
}
let oldest = self.versions.front().map(|v| v.seq).unwrap_or(0);
if from_seq_exclusive < oldest {
return None;
}
let mut out = Vec::new();
for version in &self.versions {
if version.seq <= from_seq_exclusive || version.seq > to_seq_inclusive {
continue;
}
if let Some(delta) = &version.delta {
out.push(Arc::clone(delta));
}
}
Some(out)
}
fn prune(&mut self) {
while self.versions.len() > self.max_versions {
let has_other_full = self.versions.iter().skip(1).any(|v| v.keyspace.is_some());
let can_remove = self.versions.front().map(|v| {
let preserve_oldest_full = v.keyspace.is_some() && !has_other_full;
!preserve_oldest_full
&& v.ref_count.load(Ordering::Acquire) == 0
&& v.created_at.elapsed().as_millis() >= u128::from(self.min_version_age_ms)
});
let can_remove = can_remove.unwrap_or(false);
if !can_remove {
break;
}
self.versions.pop_front();
}
}
fn materialize_at_index(&mut self, target_idx: usize) -> Result<(), AedbError> {
let already_materialized = self
.versions
.get(target_idx)
.map(|v| v.keyspace.is_some() && v.catalog.is_some())
.unwrap_or(false);
if already_materialized {
return Ok(());
}
let base_idx = (0..=target_idx)
.rev()
.find(|idx| {
self.versions
.get(*idx)
.map(|v| v.keyspace.is_some() && v.catalog.is_some())
.unwrap_or(false)
})
.ok_or_else(|| AedbError::Validation("no materialized base version found".into()))?;
let base = self
.versions
.get(base_idx)
.ok_or_else(|| AedbError::Validation("base version missing".into()))?;
let base_keyspace = base
.keyspace
.as_ref()
.ok_or_else(|| AedbError::Validation("base keyspace missing".into()))?;
let base_catalog = base
.catalog
.as_ref()
.ok_or_else(|| AedbError::Validation("base catalog missing".into()))?;
let mut keyspace = snapshot_to_keyspace(base_keyspace);
let mut catalog = (**base_catalog).clone();
for version_index in (base_idx + 1)..=target_idx {
let version = self
.versions
.get(version_index)
.ok_or_else(|| AedbError::Validation("delta version missing".into()))?;
if let Some(delta) = &version.delta {
for mutation in &delta.mutations {
apply_mutation(
&mut catalog,
&mut keyspace,
mutation.clone(),
delta.seq,
None,
None,
)?;
}
} else if let (Some(ks), Some(cat)) = (&version.keyspace, &version.catalog) {
keyspace = snapshot_to_keyspace(ks);
catalog = (**cat).clone();
}
}
let target = self
.versions
.get_mut(target_idx)
.ok_or_else(|| AedbError::Validation("target version missing".into()))?;
target.keyspace = Some(Arc::new(keyspace.snapshot()));
target.catalog = Some(Arc::new(catalog));
Ok(())
}
}
fn acquire(version: &Version) -> ReadViewGuard {
let keyspace = version
.keyspace
.as_ref()
.expect("materialized keyspace required");
let catalog = version
.catalog
.as_ref()
.expect("materialized catalog required");
version.ref_count.fetch_add(1, Ordering::AcqRel);
ReadViewGuard {
view: SnapshotReadView {
keyspace: Arc::clone(keyspace),
catalog: Arc::clone(catalog),
seq: version.seq,
},
ref_count: Arc::clone(&version.ref_count),
}
}
fn snapshot_to_keyspace(snapshot: &KeyspaceSnapshot) -> Keyspace {
Keyspace {
primary_index_backend: snapshot.primary_index_backend,
namespaces: snapshot.namespaces.clone(),
async_indexes: snapshot.async_indexes.clone(),
}
}
#[cfg(test)]
mod tests {
use super::{CommitDelta, VersionStore};
use crate::commit::validation::Mutation;
use crate::storage::keyspace::Keyspace;
use std::sync::Arc;
#[test]
fn publish_delta_reuses_shared_arc_instance() {
let mut store = VersionStore::new(8, 4, 0);
store.bootstrap(
0,
Keyspace::default().snapshot(),
crate::catalog::Catalog::default(),
);
let delta = Arc::new(CommitDelta {
seq: 7,
mutations: vec![Mutation::KvSet {
project_id: "p".into(),
scope_id: "app".into(),
key: b"k".to_vec(),
value: b"v".to_vec(),
}],
});
assert!(!store.publish_delta(7, Arc::clone(&delta)));
let deltas = store.deltas_since(0, 7).expect("delta range");
assert_eq!(deltas.len(), 1);
assert!(Arc::ptr_eq(&delta, &deltas[0]));
}
#[test]
fn publish_delta_requests_full_snapshot_after_interval() {
let mut store = VersionStore::new(8, 2, 0);
store.bootstrap(
0,
Keyspace::default().snapshot(),
crate::catalog::Catalog::default(),
);
let first = Arc::new(CommitDelta {
seq: 1,
mutations: Vec::new(),
});
let second = Arc::new(CommitDelta {
seq: 2,
mutations: Vec::new(),
});
assert!(!store.publish_delta(1, first));
assert!(store.publish_delta(2, second));
store.publish_full(
2,
Keyspace::default().snapshot(),
crate::catalog::Catalog::default(),
);
let third = Arc::new(CommitDelta {
seq: 3,
mutations: Vec::new(),
});
assert!(!store.publish_delta(3, third));
}
}