use std::path::Path;
use chrono::{Duration, Utc};
use tracing::{debug, info, warn};
use crate::backup_bundle_store::{self, BundleRecord, BundleState};
use crate::error::AppError;
use crate::store::KeyspaceHandle;
pub const RETENTION_DURATION_HOURS: i64 = 24;
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub struct SweepStats {
pub expired: usize,
pub deleted: usize,
pub blobs_removed: usize,
}
pub async fn sweep_bundles(
bundles_ks: &KeyspaceHandle,
blob_dir: &Path,
) -> Result<SweepStats, AppError> {
let mut stats = SweepStats::default();
let now = Utc::now();
let retention_cutoff = now - Duration::hours(RETENTION_DURATION_HOURS);
let all = backup_bundle_store::list_bundles(bundles_ks).await?;
for record in all {
if !record.state.is_terminal() && record.expires_at <= now {
let removed_blob = if let Some(ref path) = record.blob_path {
match tokio::fs::remove_file(path).await {
Ok(()) => true,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => false,
Err(e) => {
warn!(
bundle_id = %record.bundle_id,
path = %path.display(),
error = %e,
"sweeper: failed to delete blob during TTL expiry"
);
continue;
}
}
} else {
false
};
let mut expired = record.clone();
expired.state = BundleState::Expired;
expired.blob_path = None;
if let Err(e) = backup_bundle_store::store_bundle(bundles_ks, &expired).await {
warn!(
bundle_id = %record.bundle_id,
error = %e,
"sweeper: failed to persist Expired state; retry next pass"
);
continue;
}
stats.expired += 1;
if removed_blob {
stats.blobs_removed += 1;
}
debug!(
bundle_id = %record.bundle_id,
expired_at = %record.expires_at,
"sweeper: bundle expired"
);
} else if record.state.is_terminal() && record.created_at <= retention_cutoff {
if let Some(ref path) = record.blob_path {
match tokio::fs::remove_file(path).await {
Ok(()) => {
stats.blobs_removed += 1;
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => {
warn!(
bundle_id = %record.bundle_id,
path = %path.display(),
error = %e,
"sweeper: failed to delete orphan blob during retention pass"
);
continue;
}
}
}
if let Err(e) = backup_bundle_store::delete_bundle(bundles_ks, &record.bundle_id).await
{
warn!(
bundle_id = %record.bundle_id,
error = %e,
"sweeper: failed to delete terminal record; retry next pass"
);
continue;
}
stats.deleted += 1;
debug!(
bundle_id = %record.bundle_id,
state = ?record.state,
created_at = %record.created_at,
"sweeper: terminal bundle past retention; record removed"
);
}
}
let _ = blob_dir;
if stats.expired > 0 || stats.deleted > 0 {
info!(
expired = stats.expired,
deleted = stats.deleted,
blobs_removed = stats.blobs_removed,
"backup-bundle sweeper pruned bundles"
);
}
Ok(stats)
}
#[allow(dead_code)]
pub fn is_terminal(record: &BundleRecord) -> bool {
record.state.is_terminal()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backup_bundle_store::{BundleKind, BundleRecord};
use uuid::Uuid;
use vti_common::config::StoreConfig as VtiStoreConfig;
async fn setup() -> (tempfile::TempDir, KeyspaceHandle, std::path::PathBuf) {
let dir = tempfile::tempdir().unwrap();
let store = crate::store::Store::open(&VtiStoreConfig {
data_dir: dir.path().into(),
})
.unwrap();
let ks = store
.keyspace(crate::keyspaces::BACKUP_BUNDLES_SWEEPER_TEST)
.unwrap();
let blob_dir = dir.path().join("backups");
tokio::fs::create_dir_all(&blob_dir).await.unwrap();
(dir, ks, blob_dir)
}
fn record(
kind: BundleKind,
state: BundleState,
created_at: chrono::DateTime<Utc>,
expires_at: chrono::DateTime<Utc>,
blob_path: Option<std::path::PathBuf>,
) -> BundleRecord {
BundleRecord {
bundle_id: Uuid::new_v4(),
kind,
state,
created_at,
expires_at,
created_by: "did:example:admin".into(),
algorithm: "stream".into(),
expected_sha256: "0".repeat(64),
expected_size_bytes: 1,
token_hash: [0u8; 32],
blob_path,
}
}
#[tokio::test]
async fn ttl_pass_expires_non_terminal_records_past_deadline() {
let (_dir, ks, blob_dir) = setup().await;
let now = Utc::now();
let blob = blob_dir.join("expired.vtabak");
tokio::fs::write(&blob, b"bytes").await.unwrap();
let r = record(
BundleKind::Export,
BundleState::ExportReady,
now - Duration::minutes(10),
now - Duration::minutes(5),
Some(blob.clone()),
);
let id = r.bundle_id;
backup_bundle_store::store_bundle(&ks, &r).await.unwrap();
let stats = sweep_bundles(&ks, &blob_dir).await.unwrap();
assert_eq!(stats.expired, 1);
assert_eq!(stats.deleted, 0);
assert_eq!(stats.blobs_removed, 1);
let restored = backup_bundle_store::get_bundle(&ks, &id)
.await
.unwrap()
.unwrap();
assert_eq!(restored.state, BundleState::Expired);
assert!(restored.blob_path.is_none());
assert!(!blob.exists(), "blob file should be deleted");
}
#[tokio::test]
async fn ttl_pass_ignores_records_still_within_ttl() {
let (_dir, ks, blob_dir) = setup().await;
let now = Utc::now();
let r = record(
BundleKind::Import,
BundleState::ImportPending,
now,
now + Duration::minutes(5),
None,
);
backup_bundle_store::store_bundle(&ks, &r).await.unwrap();
let stats = sweep_bundles(&ks, &blob_dir).await.unwrap();
assert_eq!(stats, SweepStats::default());
}
#[tokio::test]
async fn ttl_pass_ignores_already_terminal_records() {
let (_dir, ks, blob_dir) = setup().await;
let now = Utc::now();
let r = record(
BundleKind::Export,
BundleState::ExportAcked,
now - Duration::minutes(30),
now - Duration::minutes(10),
None,
);
let id = r.bundle_id;
backup_bundle_store::store_bundle(&ks, &r).await.unwrap();
let stats = sweep_bundles(&ks, &blob_dir).await.unwrap();
assert_eq!(stats, SweepStats::default());
let restored = backup_bundle_store::get_bundle(&ks, &id)
.await
.unwrap()
.unwrap();
assert_eq!(restored.state, BundleState::ExportAcked);
}
#[tokio::test]
async fn retention_pass_deletes_terminal_records_past_cutoff() {
let (_dir, ks, blob_dir) = setup().await;
let now = Utc::now();
let r = record(
BundleKind::Import,
BundleState::ImportCommitted,
now - Duration::hours(48),
now - Duration::hours(47),
None,
);
let id = r.bundle_id;
backup_bundle_store::store_bundle(&ks, &r).await.unwrap();
let stats = sweep_bundles(&ks, &blob_dir).await.unwrap();
assert_eq!(stats.deleted, 1);
assert_eq!(stats.expired, 0);
assert!(
backup_bundle_store::get_bundle(&ks, &id)
.await
.unwrap()
.is_none()
);
}
#[tokio::test]
async fn retention_pass_keeps_fresh_terminal_records() {
let (_dir, ks, blob_dir) = setup().await;
let now = Utc::now();
let r = record(
BundleKind::Export,
BundleState::Aborted,
now - Duration::hours(1),
now - Duration::minutes(30),
None,
);
let id = r.bundle_id;
backup_bundle_store::store_bundle(&ks, &r).await.unwrap();
let stats = sweep_bundles(&ks, &blob_dir).await.unwrap();
assert_eq!(stats, SweepStats::default());
assert!(
backup_bundle_store::get_bundle(&ks, &id)
.await
.unwrap()
.is_some()
);
}
#[tokio::test]
async fn retention_pass_removes_orphan_blob_alongside_record() {
let (_dir, ks, blob_dir) = setup().await;
let now = Utc::now();
let blob = blob_dir.join("orphan.vtabak");
tokio::fs::write(&blob, b"stale").await.unwrap();
let r = record(
BundleKind::Export,
BundleState::ExportDownloaded,
now - Duration::hours(48),
now - Duration::hours(47),
Some(blob.clone()),
);
let id = r.bundle_id;
backup_bundle_store::store_bundle(&ks, &r).await.unwrap();
let stats = sweep_bundles(&ks, &blob_dir).await.unwrap();
assert_eq!(stats.deleted, 1);
assert_eq!(stats.blobs_removed, 1);
assert!(!blob.exists());
assert!(
backup_bundle_store::get_bundle(&ks, &id)
.await
.unwrap()
.is_none()
);
}
#[tokio::test]
async fn sweep_combines_ttl_and_retention_in_one_pass() {
let (_dir, ks, blob_dir) = setup().await;
let now = Utc::now();
let r1 = record(
BundleKind::Export,
BundleState::ExportReady,
now - Duration::minutes(10),
now - Duration::minutes(1),
None,
);
let r1_id = r1.bundle_id;
let r2 = record(
BundleKind::Export,
BundleState::Aborted,
now - Duration::hours(48),
now - Duration::hours(47),
None,
);
let r2_id = r2.bundle_id;
let r3 = record(
BundleKind::Import,
BundleState::ImportCommitted,
now - Duration::hours(1),
now - Duration::minutes(55),
None,
);
let r3_id = r3.bundle_id;
let r4 = record(
BundleKind::Import,
BundleState::ImportPending,
now,
now + Duration::minutes(5),
None,
);
let r4_id = r4.bundle_id;
for r in [&r1, &r2, &r3, &r4] {
backup_bundle_store::store_bundle(&ks, r).await.unwrap();
}
let stats = sweep_bundles(&ks, &blob_dir).await.unwrap();
assert_eq!(stats.expired, 1);
assert_eq!(stats.deleted, 1);
assert_eq!(stats.blobs_removed, 0);
assert_eq!(
backup_bundle_store::get_bundle(&ks, &r1_id)
.await
.unwrap()
.unwrap()
.state,
BundleState::Expired
);
assert!(
backup_bundle_store::get_bundle(&ks, &r2_id)
.await
.unwrap()
.is_none()
);
assert_eq!(
backup_bundle_store::get_bundle(&ks, &r3_id)
.await
.unwrap()
.unwrap()
.state,
BundleState::ImportCommitted
);
assert_eq!(
backup_bundle_store::get_bundle(&ks, &r4_id)
.await
.unwrap()
.unwrap()
.state,
BundleState::ImportPending
);
}
}