use std::{
collections::BTreeSet,
path::{Path, PathBuf},
};
use nautilus_system::event_store::RetentionMode;
use crate::{EventStore, EventStoreError, RedbBackend, RunManifest, RunStatus, SnapshotAnchor};
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RetentionRun {
pub manifest: RunManifest,
pub path: PathBuf,
pub snapshot_anchor: SnapshotAnchorStatus,
}
impl RetentionRun {
#[must_use]
pub fn new(
manifest: RunManifest,
path: impl Into<PathBuf>,
snapshot_anchor: SnapshotAnchorStatus,
) -> Self {
Self {
manifest,
path: path.into(),
snapshot_anchor,
}
}
#[must_use]
pub fn run_id(&self) -> &str {
self.manifest.run_id.as_str()
}
#[must_use]
pub fn is_known_good_restore_point(&self) -> bool {
!matches!(
self.manifest.status,
RunStatus::Running | RunStatus::Quarantined
) && matches!(&self.snapshot_anchor, SnapshotAnchorStatus::Valid(_))
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum SnapshotAnchorStatus {
Missing,
Valid(SnapshotAnchor),
Invalid(String),
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct RetentionPlan {
pub sealed_runs: Vec<RetentionRun>,
pub reclaim_candidates: Vec<RetentionRun>,
}
pub fn plan_redb_retention(
base_dir: &Path,
instance_id: &str,
mode: RetentionMode,
) -> Result<RetentionPlan, EventStoreError> {
Ok(plan_retention(
list_redb_sealed_runs(base_dir, instance_id)?,
mode,
))
}
pub fn list_redb_sealed_runs(
base_dir: &Path,
instance_id: &str,
) -> Result<Vec<RetentionRun>, EventStoreError> {
let manifests = RedbBackend::list_runs(base_dir, instance_id)?;
let mut runs = Vec::new();
for manifest in manifests {
if !manifest.is_sealed() {
continue;
}
let path = base_dir
.join(instance_id)
.join(format!("{}.redb", manifest.run_id));
let reader = RedbBackend::open_sealed(base_dir, instance_id, manifest.run_id.as_str())?;
let snapshot_anchor = match reader.latest_snapshot_anchor() {
Ok(anchor) => snapshot_anchor_status(&manifest, anchor),
Err(EventStoreError::Corrupted(msg)) => SnapshotAnchorStatus::Invalid(msg),
Err(e) => return Err(e),
};
runs.push(RetentionRun::new(manifest, path, snapshot_anchor));
}
Ok(runs)
}
#[must_use]
pub fn plan_retention(mut sealed_runs: Vec<RetentionRun>, mode: RetentionMode) -> RetentionPlan {
sealed_runs.retain(|run| run.manifest.is_sealed());
sealed_runs.sort_by_key(|run| run.manifest.start_ts_init);
let reclaim_candidates = match mode {
RetentionMode::Full => Vec::new(),
RetentionMode::Bounded { keep_last } => bounded_reclaim_candidates(&sealed_runs, keep_last),
RetentionMode::SnapshotAnchored => snapshot_anchored_reclaim_candidates(&sealed_runs),
};
RetentionPlan {
sealed_runs,
reclaim_candidates,
}
}
fn bounded_reclaim_candidates(sealed_runs: &[RetentionRun], keep_last: usize) -> Vec<RetentionRun> {
let Some(latest_restore_point) = latest_known_good_restore_point(sealed_runs) else {
return Vec::new();
};
let keep_last = keep_last.min(sealed_runs.len());
let mut retained = BTreeSet::new();
retained.insert(latest_restore_point);
if keep_last > 0 {
for index in sealed_runs.len() - keep_last..sealed_runs.len() {
retained.insert(index);
}
}
sealed_runs
.iter()
.enumerate()
.filter(|(index, _)| !retained.contains(index))
.map(|(_, run)| run.clone())
.collect()
}
fn snapshot_anchored_reclaim_candidates(sealed_runs: &[RetentionRun]) -> Vec<RetentionRun> {
let Some(latest_restore_point) = latest_known_good_restore_point(sealed_runs) else {
return Vec::new();
};
sealed_runs[..latest_restore_point].to_vec()
}
fn latest_known_good_restore_point(sealed_runs: &[RetentionRun]) -> Option<usize> {
sealed_runs
.iter()
.rposition(RetentionRun::is_known_good_restore_point)
}
fn snapshot_anchor_status(
manifest: &RunManifest,
anchor: Option<SnapshotAnchor>,
) -> SnapshotAnchorStatus {
let Some(anchor) = anchor else {
return SnapshotAnchorStatus::Missing;
};
if anchor.high_watermark <= manifest.high_watermark {
return SnapshotAnchorStatus::Valid(anchor);
}
SnapshotAnchorStatus::Invalid(format!(
"snapshot anchor high_watermark {} exceeds manifest high_watermark {}",
anchor.high_watermark, manifest.high_watermark,
))
}
#[cfg(test)]
mod tests {
use indexmap::IndexMap;
use nautilus_core::UnixNanos;
use rstest::rstest;
use super::*;
use crate::RegisteredComponents;
#[rstest]
fn snapshot_anchor_status_rejects_anchor_past_manifest_watermark() {
let status = snapshot_anchor_status(
&manifest_with_high_watermark(1),
Some(SnapshotAnchor::new(2, "cache://snapshots/2", "blake3:abc")),
);
match status {
SnapshotAnchorStatus::Invalid(msg) => {
assert!(
msg.contains("exceeds manifest high_watermark"),
"msg was: {msg}",
);
}
other => panic!("expected Invalid, was {other:?}"),
}
}
fn manifest_with_high_watermark(high_watermark: u64) -> RunManifest {
RunManifest {
run_id: "run-1".to_string(),
parent_run_id: None,
instance_id: "trader-001".to_string(),
binary_hash: "deadbeef".to_string(),
schema_version: 1,
crate_versions: "feedface".to_string(),
feature_flags: Vec::new(),
adapter_versions: IndexMap::new(),
config_hash: "cafebabe".to_string(),
registered_components: RegisteredComponents::default(),
seed: None,
start_ts_init: UnixNanos::from(1),
end_ts_init: None,
high_watermark,
status: RunStatus::Ended,
}
}
}