use tracing::{debug, info, warn};
use crate::acl::{AclEntry, delete_acl_entry};
use crate::error::AppError;
use crate::store::KeyspaceHandle;
fn now_epoch() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
pub async fn sweep_expired(
acl_ks: &KeyspaceHandle,
audit_ks: &KeyspaceHandle,
) -> Result<(), AppError> {
let now = now_epoch();
let mut pruned = 0usize;
let rows = acl_ks.prefix_iter_raw("acl:").await?;
for (key, value) in rows {
let entry: AclEntry = match serde_json::from_slice(&value) {
Ok(e) => e,
Err(e) => {
debug!(
key = %String::from_utf8_lossy(&key),
error = %e,
"sweeper: skipping unreadable acl row",
);
continue;
}
};
if entry.is_expired(now) {
let did = entry.did.clone();
let role = entry.role.to_string();
let expired_at = entry.expires_at;
delete_acl_entry(acl_ks, &did).await?;
pruned += 1;
info!(
did = %did,
role = %role,
expired_at = ?expired_at,
now_epoch = now,
reason = "expired",
"acl sweeper deleted expired entry"
);
if let Err(e) = crate::audit::record(
audit_ks,
"acl.expire",
"system:sweeper",
Some(&did),
"success",
None,
None,
)
.await
{
warn!(
did = %did,
error = %e,
"acl sweeper: deletion succeeded but audit::record failed; audit log will be missing this removal"
);
}
}
}
if pruned > 0 {
info!(acl_pruned = pruned, "acl sweeper pruned expired rows");
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::acl::{ConsumerKind, Role, store_acl_entry};
use crate::store::Store;
use vti_common::config::StoreConfig;
async fn fresh_store() -> (Store, KeyspaceHandle, KeyspaceHandle, tempfile::TempDir) {
let dir = tempfile::tempdir().unwrap();
let config = StoreConfig {
data_dir: dir.path().to_path_buf(),
};
let store = Store::open(&config).unwrap();
let acl_ks = store.keyspace("acl").unwrap();
let audit_ks = store.keyspace("audit").unwrap();
(store, acl_ks, audit_ks, dir)
}
fn entry(did: &str, expires_at: Option<u64>) -> AclEntry {
AclEntry {
did: did.into(),
role: Role::Admin,
label: None,
allowed_contexts: vec![],
created_at: now_epoch(),
created_by: "test".into(),
expires_at,
kind: ConsumerKind::default(),
capabilities: Vec::new(),
device: None,
version: 0,
}
}
#[tokio::test]
async fn sweeper_deletes_expired_and_preserves_permanent() {
let (_store, acl_ks, audit_ks, _dir) = fresh_store().await;
let now = now_epoch();
let expired = entry("did:key:zExpired", Some(now - 1));
let live_ttl = entry("did:key:zLiveTtl", Some(now + 3600));
let permanent = entry("did:key:zPermanent", None);
store_acl_entry(&acl_ks, &expired).await.unwrap();
store_acl_entry(&acl_ks, &live_ttl).await.unwrap();
store_acl_entry(&acl_ks, &permanent).await.unwrap();
sweep_expired(&acl_ks, &audit_ks).await.unwrap();
assert!(
crate::acl::get_acl_entry(&acl_ks, &expired.did)
.await
.unwrap()
.is_none(),
"expired entry must be pruned"
);
assert!(
crate::acl::get_acl_entry(&acl_ks, &live_ttl.did)
.await
.unwrap()
.is_some(),
"live-TTL entry must NOT be pruned"
);
assert!(
crate::acl::get_acl_entry(&acl_ks, &permanent.did)
.await
.unwrap()
.is_some(),
"permanent entry must NOT be pruned"
);
let audit_rows = audit_ks.prefix_iter_raw("log:").await.unwrap();
let mut found_expire_row_for_did = false;
for (_, value) in audit_rows {
let s = String::from_utf8_lossy(&value);
if s.contains("acl.expire") && s.contains(&expired.did) {
found_expire_row_for_did = true;
break;
}
}
assert!(
found_expire_row_for_did,
"audit log must contain an acl.expire entry for the pruned DID"
);
}
#[tokio::test]
async fn sweeper_handles_empty_keyspace() {
let (_store, acl_ks, audit_ks, _dir) = fresh_store().await;
sweep_expired(&acl_ks, &audit_ks).await.unwrap();
}
}