use std::{sync::Arc, time::Duration};
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use crate::{
resolver::state::ResolverState,
storage::{Error, query_log::QueryLogRepository},
time::{self, Clock},
};
const PURGE_INTERVAL: Duration = Duration::from_secs(3_600);
const PURGE_BATCH: i64 = 1_000;
pub struct QueryLogPurger<R> {
repo: R,
state: Arc<ResolverState>,
}
impl<R> QueryLogPurger<R>
where
R: QueryLogRepository,
{
pub fn new(repo: R, state: Arc<ResolverState>) -> Self {
Self { repo, state }
}
pub async fn purge_once(&self) -> Result<u64, Error> {
let retention_days = u64::from(self.state.settings().query_log_retention_days);
let cutoff_ms = Clock::millis_ago(time::days(retention_days));
let mut removed = 0u64;
loop {
let n = self.repo.purge_older_than(cutoff_ms, PURGE_BATCH).await?;
removed += n;
if n == 0 {
break;
}
}
self.repo.incremental_vacuum().await?;
Ok(removed)
}
pub async fn run(self, token: CancellationToken) {
let mut ticker = tokio::time::interval(PURGE_INTERVAL);
loop {
tokio::select! {
biased;
_ = token.cancelled() => break,
_ = ticker.tick() => match self.purge_once().await {
Ok(removed) => info!(removed, "query-log purge cycle complete"),
Err(e) => warn!(error = %e, "query-log purge cycle failed"),
},
}
}
}
}
#[cfg(test)]
mod tests {
use tempfile::TempDir;
use super::*;
use crate::{
resolver::pipeline::Outcome,
resolver::state::RuntimeSettings,
storage::{Db, query_log::QueryLogRecord},
};
async fn setup() -> (TempDir, Db, Arc<ResolverState>) {
let (dir, db) = crate::test_support::temp_db().await;
let state = ResolverState::hydrate(&db).await.expect("hydrate");
(dir, db, state)
}
fn record(ts: i64) -> QueryLogRecord {
QueryLogRecord {
id: 0,
ts,
client: "10.0.0.1".to_owned(),
qname: "x.test.".to_owned(),
qtype: "A".to_owned(),
outcome: Outcome::Forwarded,
rcode: Some(0),
upstream: None,
latency_ms: 1,
blocklist_id: None,
}
}
fn set_retention(state: &ResolverState, days: u32) {
state.store_settings(RuntimeSettings {
query_log_retention_days: days,
..(*state.settings_full()).clone()
});
}
#[tokio::test]
async fn removes_old_rows_keeps_recent() {
let (_dir, db, state) = setup().await;
let repo = db.query_log();
let now = Clock::now_millis();
let forty_days = time::days(40).as_millis() as i64;
repo.insert_batch(&[record(now - forty_days), record(now - 1_000)])
.await
.expect("insert");
let purger = QueryLogPurger::new(db.query_log(), state);
let removed = purger.purge_once().await.expect("purge");
assert_eq!(removed, 1, "only the aged-out row is removed");
let rows = repo.page(None, 10).await.expect("page");
assert_eq!(rows.len(), 1);
assert!(rows[0].ts >= now - 1_000, "the recent row survives");
}
#[tokio::test]
async fn empty_table_purges_nothing() {
let (_dir, db, state) = setup().await;
let purger = QueryLogPurger::new(db.query_log(), state);
let removed = purger.purge_once().await.expect("purge empty");
assert_eq!(removed, 0);
}
#[tokio::test]
async fn honors_changed_retention_from_snapshot() {
let (_dir, db, state) = setup().await;
let repo = db.query_log();
let now = Clock::now_millis();
let two_days = time::days(2).as_millis() as i64;
repo.insert_batch(&[record(now - two_days)])
.await
.expect("insert");
let purger = QueryLogPurger::new(db.query_log(), Arc::clone(&state));
assert_eq!(purger.purge_once().await.expect("purge 30d"), 0);
set_retention(&state, 1);
assert_eq!(purger.purge_once().await.expect("purge 1d"), 1);
assert!(repo.page(None, 10).await.expect("page").is_empty());
}
#[tokio::test]
async fn run_exits_promptly_on_cancel() {
let (_dir, db, state) = setup().await;
let purger = QueryLogPurger::new(db.query_log(), state);
let token = CancellationToken::new();
let t2 = token.clone();
let handle = tokio::spawn(async move { purger.run(t2).await });
token.cancel();
tokio::time::timeout(Duration::from_secs(5), handle)
.await
.expect("run must exit promptly after cancel")
.expect("task join");
}
}