use crate::authn::event::AuthEvent;
use std::time::Duration;
pub trait AuditArchiver: Send + Sync + 'static {
type Error: std::error::Error + Send + Sync + 'static;
fn archive_batch(
&self,
events: &[AuthEvent],
) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
fn name(&self) -> &'static str;
}
#[derive(Debug, Clone)]
pub struct AuditRetentionPolicy {
pub archive_after: Duration,
pub purge_hot_after_archive: Duration,
pub delete_archive_after: Option<Duration>,
}
impl Default for AuditRetentionPolicy {
fn default() -> Self {
Self {
archive_after: Duration::from_secs(90 * 24 * 60 * 60),
purge_hot_after_archive: Duration::from_secs(7 * 24 * 60 * 60),
delete_archive_after: None,
}
}
}
impl AuditRetentionPolicy {
pub fn with_archive_after(mut self, d: Duration) -> Self {
self.archive_after = d;
self
}
pub fn with_purge_hot_after_archive(mut self, d: Duration) -> Self {
self.purge_hot_after_archive = d;
self
}
pub fn with_delete_archive_after(mut self, d: Duration) -> Self {
self.delete_archive_after = Some(d);
self
}
pub fn keep_archive_forever(mut self) -> Self {
self.delete_archive_after = None;
self
}
}
#[derive(Debug, Default, Clone, Copy)]
pub struct NoopAuditArchiver;
impl AuditArchiver for NoopAuditArchiver {
type Error = std::convert::Infallible;
async fn archive_batch(&self, events: &[AuthEvent]) -> Result<(), Self::Error> {
tracing::trace!(
target: "axess::audit::archive",
count = events.len(),
"NoopAuditArchiver: batch acknowledged, no cold-storage copy made",
);
Ok(())
}
fn name(&self) -> &'static str {
"noop"
}
}
pub trait AuditRetentionSource: Send + Sync + 'static {
type EventId: Clone + Send + Sync + 'static;
type Error: std::error::Error + Send + Sync + 'static;
fn select_unarchived_before(
&self,
cutoff: chrono::DateTime<chrono::Utc>,
limit: usize,
) -> impl std::future::Future<Output = Result<Vec<(Self::EventId, AuthEvent)>, Self::Error>> + Send;
fn mark_archived(
&self,
ids: &[Self::EventId],
at: chrono::DateTime<chrono::Utc>,
) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
fn purge_hot_archived_before(
&self,
cutoff: chrono::DateTime<chrono::Utc>,
) -> impl std::future::Future<Output = Result<u64, Self::Error>> + Send;
}
#[derive(Debug, Default, Clone, Copy)]
pub struct RetentionTickReport {
pub archived: u64,
pub purged: u64,
}
#[derive(Debug, thiserror::Error)]
pub enum RetentionError<S, A>
where
S: std::error::Error + Send + Sync + 'static,
A: std::error::Error + Send + Sync + 'static,
{
#[error("retention source: {0}")]
Source(#[source] S),
#[error("archiver: {0}")]
Archiver(#[source] A),
}
pub struct AuditRetentionLoop<S, A>
where
S: AuditRetentionSource,
A: AuditArchiver,
{
source: std::sync::Arc<S>,
archiver: std::sync::Arc<A>,
policy: AuditRetentionPolicy,
tick_interval: Duration,
batch_size: usize,
clock: std::sync::Arc<dyn axess_clock::Clock>,
}
impl<S, A> AuditRetentionLoop<S, A>
where
S: AuditRetentionSource,
A: AuditArchiver,
{
pub fn new(source: S, archiver: A, policy: AuditRetentionPolicy) -> Self {
Self {
source: std::sync::Arc::new(source),
archiver: std::sync::Arc::new(archiver),
policy,
tick_interval: Duration::from_secs(3600),
batch_size: 10_000,
clock: std::sync::Arc::new(axess_clock::SystemClock),
}
}
pub fn with_tick_interval(mut self, d: Duration) -> Self {
self.tick_interval = d;
self
}
pub fn with_batch_size(mut self, n: usize) -> Self {
self.batch_size = n.max(1);
self
}
pub fn with_clock(mut self, c: std::sync::Arc<dyn axess_clock::Clock>) -> Self {
self.clock = c;
self
}
pub async fn tick(&self) -> Result<RetentionTickReport, RetentionError<S::Error, A::Error>> {
let now = self.clock.now();
let archive_cutoff = now
- chrono::Duration::from_std(self.policy.archive_after)
.unwrap_or(chrono::Duration::zero());
let purge_cutoff = now
- chrono::Duration::from_std(self.policy.purge_hot_after_archive)
.unwrap_or(chrono::Duration::zero());
let batch = self
.source
.select_unarchived_before(archive_cutoff, self.batch_size)
.await
.map_err(RetentionError::Source)?;
let mut report = RetentionTickReport::default();
if !batch.is_empty() {
let events: Vec<AuthEvent> = batch.iter().map(|(_, e)| e.clone()).collect();
self.archiver
.archive_batch(&events)
.await
.map_err(RetentionError::Archiver)?;
let ids: Vec<S::EventId> = batch.into_iter().map(|(id, _)| id).collect();
self.source
.mark_archived(&ids, now)
.await
.map_err(RetentionError::Source)?;
report.archived = events.len() as u64;
}
let purged = self
.source
.purge_hot_archived_before(purge_cutoff)
.await
.map_err(RetentionError::Source)?;
report.purged = purged;
Ok(report)
}
pub fn spawn(self) -> tokio::task::JoinHandle<()> {
let interval_dur = self.tick_interval;
tokio::spawn(async move {
let mut interval = tokio::time::interval(interval_dur);
interval.tick().await; loop {
interval.tick().await;
match self.tick().await {
Ok(report) if report.archived > 0 || report.purged > 0 => {
tracing::debug!(
archived = report.archived,
purged = report.purged,
"audit retention tick",
);
}
Ok(_) => {}
Err(e) => {
tracing::warn!(error = %e, "audit retention tick failed; loop continuing");
}
}
}
})
}
}
#[cfg(feature = "audit-archive-fs")]
mod filesystem;
#[cfg(feature = "audit-archive-fs")]
pub use filesystem::{FilesystemArchiveError, FilesystemAuditArchiver};
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn noop_archiver_accepts_any_batch() {
let archiver = NoopAuditArchiver;
let result = archiver.archive_batch(&[]).await;
assert!(result.is_ok());
}
#[test]
fn default_policy_archives_at_90_days_with_indefinite_retention() {
let p = AuditRetentionPolicy::default();
assert_eq!(p.archive_after, Duration::from_secs(90 * 24 * 60 * 60));
assert_eq!(
p.purge_hot_after_archive,
Duration::from_secs(7 * 24 * 60 * 60)
);
assert!(p.delete_archive_after.is_none());
}
#[test]
fn policy_builder_chain_overrides_each_threshold() {
let p = AuditRetentionPolicy::default()
.with_archive_after(Duration::from_secs(60 * 24 * 60 * 60))
.with_purge_hot_after_archive(Duration::from_secs(14 * 24 * 60 * 60))
.with_delete_archive_after(Duration::from_secs(10 * 365 * 24 * 60 * 60));
assert_eq!(p.archive_after.as_secs(), 60 * 24 * 60 * 60);
assert_eq!(p.purge_hot_after_archive.as_secs(), 14 * 24 * 60 * 60);
assert_eq!(
p.delete_archive_after.map(|d| d.as_secs()),
Some(10 * 365 * 24 * 60 * 60)
);
}
#[test]
fn keep_archive_forever_unsets_delete_threshold() {
let p = AuditRetentionPolicy::default()
.with_delete_archive_after(Duration::from_secs(60))
.keep_archive_forever();
assert!(p.delete_archive_after.is_none());
}
}