use std::sync::Arc;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use aa_core::config::{ColdAction as CoreColdAction, RetentionConfig as CoreRetentionConfig};
use super::retention::ColdAction as StorageColdAction;
use super::retention_config::{RetentionConfig as StorageRetentionConfig, RetentionConfigError};
use super::retention_engine::RetentionEngine;
use super::StorageBackend;
fn core_to_storage_retention(cfg: &CoreRetentionConfig) -> StorageRetentionConfig {
StorageRetentionConfig {
schedule: cfg.schedule.clone(),
hot_days: cfg.hot_days,
warm_days: cfg.warm_days,
cold_action: match cfg.cold_action {
CoreColdAction::Drop => StorageColdAction::Drop,
CoreColdAction::Archive => StorageColdAction::Archive,
},
archive_url: cfg.archive_url.clone(),
dry_run: cfg.dry_run,
}
}
pub fn spawn_retention_engine(
storage: Arc<dyn StorageBackend>,
retention: &CoreRetentionConfig,
shutdown: CancellationToken,
) -> Result<(Arc<RetentionEngine>, JoinHandle<()>), RetentionConfigError> {
let storage_cfg = core_to_storage_retention(retention);
storage_cfg.validate()?;
let engine = Arc::new(RetentionEngine::new(storage, storage_cfg));
let handle = engine.clone().start(shutdown)?;
Ok((engine, handle))
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use std::time::Duration;
use super::*;
use crate::storage::{open_sqlite_backend, StorageBackend};
fn tmp_sqlite() -> (tempfile::TempDir, PathBuf) {
let tmp = tempfile::tempdir().expect("tempdir");
let path = tmp.path().join("retention-boot.db");
(tmp, path)
}
#[tokio::test]
async fn spawn_returns_handle_and_token_drives_clean_shutdown() {
let (_tmp, db_path) = tmp_sqlite();
let storage: Arc<dyn StorageBackend> = open_sqlite_backend(&db_path).await.expect("open backend");
let cfg = CoreRetentionConfig {
schedule: "0 0 3 * * *".to_string(),
..CoreRetentionConfig::default()
};
let token = CancellationToken::new();
let (engine, handle) = match spawn_retention_engine(storage, &cfg, token.clone()) {
Ok(pair) => pair,
Err(e) => panic!("spawn_retention_engine should return Ok with default config: {e:?}"),
};
assert!(
!handle.is_finished(),
"engine loop must stay alive until shutdown is signalled"
);
token.cancel();
let drained = tokio::time::timeout(Duration::from_secs(2), handle).await;
assert!(drained.is_ok(), "engine loop must exit within 2 s of shutdown signal");
assert!(Arc::strong_count(&engine) >= 1);
}
#[tokio::test]
async fn invalid_schedule_returns_error_before_spawn() {
let (_tmp, db_path) = tmp_sqlite();
let storage: Arc<dyn StorageBackend> = open_sqlite_backend(&db_path).await.expect("open backend");
let cfg = CoreRetentionConfig {
schedule: "not a cron expression".to_string(),
..CoreRetentionConfig::default()
};
match spawn_retention_engine(storage, &cfg, CancellationToken::new()) {
Err(RetentionConfigError::InvalidSchedule { .. }) => {}
Err(other) => panic!("expected InvalidSchedule, got {other:?}"),
Ok(_) => panic!("invalid schedule must surface as Err"),
}
}
#[tokio::test]
async fn archive_action_without_url_returns_missing_archive_url() {
let (_tmp, db_path) = tmp_sqlite();
let storage: Arc<dyn StorageBackend> = open_sqlite_backend(&db_path).await.expect("open backend");
let cfg = CoreRetentionConfig {
cold_action: CoreColdAction::Archive,
archive_url: None,
..CoreRetentionConfig::default()
};
match spawn_retention_engine(storage, &cfg, CancellationToken::new()) {
Err(RetentionConfigError::MissingArchiveUrl) => {}
Err(other) => panic!("expected MissingArchiveUrl, got {other:?}"),
Ok(_) => panic!("archive without url must surface as Err"),
}
}
}