use crate::{Job, JobId, JobStatus, Result};
use chrono::{DateTime, Duration, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use uuid::Uuid;
pub type ArchivalPolicyId = String;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum ArchivalReason {
Automatic,
Manual,
Compliance,
Maintenance,
}
impl Default for ArchivalReason {
fn default() -> Self {
Self::Automatic
}
}
impl std::fmt::Display for ArchivalReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Automatic => write!(f, "Automatic"),
Self::Manual => write!(f, "Manual"),
Self::Compliance => write!(f, "Compliance"),
Self::Maintenance => write!(f, "Maintenance"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ArchiveEvent {
JobArchived {
job_id: JobId,
queue: String,
reason: ArchivalReason,
},
JobRestored {
job_id: JobId,
queue: String,
restored_by: Option<String>,
},
BulkArchiveStarted {
operation_id: String,
estimated_jobs: u64,
},
BulkArchiveProgress {
operation_id: String,
jobs_processed: u64,
total: u64,
},
BulkArchiveCompleted {
operation_id: String,
stats: ArchivalStats,
},
JobsPurged {
count: u64,
older_than: DateTime<Utc>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ArchivalPolicy {
pub archive_completed_after: Option<Duration>,
pub archive_failed_after: Option<Duration>,
pub archive_dead_after: Option<Duration>,
pub archive_timed_out_after: Option<Duration>,
pub purge_archived_after: Option<Duration>,
pub compress_payloads: bool,
pub batch_size: usize,
pub enabled: bool,
}
impl Default for ArchivalPolicy {
fn default() -> Self {
Self {
archive_completed_after: Some(Duration::days(30)),
archive_failed_after: Some(Duration::days(90)),
archive_dead_after: Some(Duration::days(90)),
archive_timed_out_after: Some(Duration::days(90)),
purge_archived_after: Some(Duration::days(365)),
compress_payloads: true,
batch_size: 1000,
enabled: true,
}
}
}
impl ArchivalPolicy {
pub fn new() -> Self {
Self::default()
}
pub fn archive_completed_after(mut self, duration: Duration) -> Self {
self.archive_completed_after = Some(duration);
self
}
pub fn archive_failed_after(mut self, duration: Duration) -> Self {
self.archive_failed_after = Some(duration);
self
}
pub fn archive_dead_after(mut self, duration: Duration) -> Self {
self.archive_dead_after = Some(duration);
self
}
pub fn archive_timed_out_after(mut self, duration: Duration) -> Self {
self.archive_timed_out_after = Some(duration);
self
}
pub fn purge_archived_after(mut self, duration: Duration) -> Self {
self.purge_archived_after = Some(duration);
self
}
pub fn compress_archived_payloads(mut self, compress: bool) -> Self {
self.compress_payloads = compress;
self
}
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}
pub fn enabled(mut self, enabled: bool) -> Self {
self.enabled = enabled;
self
}
pub fn should_archive(&self, status: &JobStatus, age: Duration) -> bool {
if !self.enabled {
return false;
}
match status {
JobStatus::Completed => self
.archive_completed_after
.is_some_and(|threshold| age >= threshold),
JobStatus::Failed => self
.archive_failed_after
.is_some_and(|threshold| age >= threshold),
JobStatus::Dead => self
.archive_dead_after
.is_some_and(|threshold| age >= threshold),
JobStatus::TimedOut => self
.archive_timed_out_after
.is_some_and(|threshold| age >= threshold),
_ => false,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ArchivalConfig {
pub compression_level: u32,
pub max_payload_size: usize,
pub verify_compression: bool,
}
impl Default for ArchivalConfig {
fn default() -> Self {
Self {
compression_level: 6, max_payload_size: 1024 * 1024, verify_compression: true,
}
}
}
impl ArchivalConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_compression_level(mut self, level: u32) -> Self {
self.compression_level = level.min(9);
self
}
pub fn with_max_payload_size(mut self, size: usize) -> Self {
self.max_payload_size = size;
self
}
pub fn with_compression_verification(mut self, verify: bool) -> Self {
self.verify_compression = verify;
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ArchivalStats {
pub jobs_archived: u64,
pub jobs_purged: u64,
pub bytes_archived: u64,
pub bytes_purged: u64,
pub compression_ratio: f64,
pub operation_duration: std::time::Duration,
pub last_run_at: DateTime<Utc>,
}
impl Default for ArchivalStats {
fn default() -> Self {
Self {
jobs_archived: 0,
jobs_purged: 0,
bytes_archived: 0,
bytes_purged: 0,
compression_ratio: 1.0,
operation_duration: std::time::Duration::from_secs(0),
last_run_at: Utc::now(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ArchivedJob {
pub id: JobId,
pub queue_name: String,
pub status: JobStatus,
pub created_at: DateTime<Utc>,
pub archived_at: DateTime<Utc>,
pub archival_reason: ArchivalReason,
pub original_payload_size: Option<usize>,
pub payload_compressed: bool,
pub archived_by: Option<String>,
}
#[derive(Debug)]
pub struct JobArchiver<DB>
where
DB: sqlx::Database,
{
#[allow(dead_code)]
pool: sqlx::Pool<DB>,
policies: HashMap<String, ArchivalPolicy>,
config: ArchivalConfig,
}
impl<DB> JobArchiver<DB>
where
DB: sqlx::Database,
{
pub fn new(pool: sqlx::Pool<DB>) -> Self {
Self {
pool,
policies: HashMap::new(),
config: ArchivalConfig::default(),
}
}
pub fn set_policy(&mut self, queue_name: impl Into<String>, policy: ArchivalPolicy) {
self.policies.insert(queue_name.into(), policy);
}
pub fn get_policy(&self, queue_name: &str) -> Option<&ArchivalPolicy> {
self.policies.get(queue_name)
}
pub fn remove_policy(&mut self, queue_name: &str) -> Option<ArchivalPolicy> {
self.policies.remove(queue_name)
}
pub fn set_config(&mut self, config: ArchivalConfig) {
self.config = config;
}
pub fn get_config(&self) -> &ArchivalConfig {
&self.config
}
pub async fn archive_jobs_with_progress<Q>(
&self,
queue: &Q,
queue_name: Option<&str>,
reason: ArchivalReason,
archived_by: Option<&str>,
progress_callback: Option<Box<dyn Fn(u64, u64) + Send + Sync>>,
) -> Result<(String, ArchivalStats)>
where
Q: crate::queue::DatabaseQueue,
{
let operation_id = Uuid::new_v4().to_string();
let default_policy = ArchivalPolicy::default();
let policy = queue_name
.and_then(|name| self.policies.get(name))
.unwrap_or(&default_policy);
let estimated_jobs = self
.estimate_archival_jobs(queue, queue_name, policy)
.await?;
if let Some(callback) = &progress_callback {
callback(0, estimated_jobs);
}
let stats = queue
.archive_jobs(queue_name, policy, &self.config, reason, archived_by)
.await?;
if let Some(callback) = &progress_callback {
callback(stats.jobs_archived, estimated_jobs);
}
Ok((operation_id, stats))
}
pub async fn archive_jobs_with_events<Q, F>(
&self,
queue: &Q,
queue_name: Option<&str>,
reason: ArchivalReason,
archived_by: Option<&str>,
event_publisher: F,
) -> Result<(String, ArchivalStats)>
where
Q: crate::queue::DatabaseQueue,
F: Fn(ArchiveEvent) + Send + Sync,
{
let operation_id = Uuid::new_v4().to_string();
let default_policy = ArchivalPolicy::default();
let policy = queue_name
.and_then(|name| self.policies.get(name))
.unwrap_or(&default_policy);
let estimated_jobs = self
.estimate_archival_jobs(queue, queue_name, policy)
.await?;
event_publisher(ArchiveEvent::BulkArchiveStarted {
operation_id: operation_id.clone(),
estimated_jobs,
});
let stats = queue
.archive_jobs(queue_name, policy, &self.config, reason, archived_by)
.await?;
event_publisher(ArchiveEvent::BulkArchiveCompleted {
operation_id: operation_id.clone(),
stats: stats.clone(),
});
Ok((operation_id, stats))
}
async fn estimate_archival_jobs<Q>(
&self,
queue: &Q,
queue_name: Option<&str>,
policy: &ArchivalPolicy,
) -> Result<u64>
where
Q: crate::queue::DatabaseQueue,
{
if let Some(queue_name) = queue_name {
let stats = queue.get_queue_stats(queue_name).await?;
let mut estimate = 0u64;
if policy.archive_completed_after.is_some() {
estimate += stats.completed_count;
}
if policy.archive_failed_after.is_some() {
estimate += stats.statistics.failed;
}
if policy.archive_dead_after.is_some() {
estimate += stats.dead_count;
}
if policy.archive_timed_out_after.is_some() {
estimate += stats.timed_out_count;
}
Ok(estimate)
} else {
let base_estimate = if policy.archive_completed_after.is_some()
&& policy.archive_failed_after.is_some()
{
2000 } else if policy.archive_completed_after.is_some()
|| policy.archive_failed_after.is_some()
{
1000 } else {
100 };
Ok(base_estimate)
}
}
}
pub trait ArchivalOperations {
fn archive_jobs(
&self,
queue_name: Option<&str>,
policy: &ArchivalPolicy,
config: &ArchivalConfig,
reason: ArchivalReason,
archived_by: Option<&str>,
) -> impl std::future::Future<Output = Result<ArchivalStats>> + Send;
fn restore_job(&self, job_id: JobId) -> impl std::future::Future<Output = Result<Job>> + Send;
fn list_archived_jobs(
&self,
queue_name: Option<&str>,
limit: Option<u32>,
offset: Option<u32>,
) -> impl std::future::Future<Output = Result<Vec<ArchivedJob>>> + Send;
fn purge_archived_jobs(
&self,
older_than: DateTime<Utc>,
) -> impl std::future::Future<Output = Result<u64>> + Send;
fn get_archival_stats(
&self,
queue_name: Option<&str>,
) -> impl std::future::Future<Output = Result<ArchivalStats>> + Send;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_archival_policy_default() {
let policy = ArchivalPolicy::default();
assert!(policy.enabled);
assert!(policy.compress_payloads);
assert_eq!(policy.batch_size, 1000);
assert!(policy.archive_completed_after.is_some());
}
#[test]
fn test_archival_policy_builder() {
let policy = ArchivalPolicy::new()
.archive_completed_after(Duration::days(7))
.archive_failed_after(Duration::days(30))
.purge_archived_after(Duration::days(365))
.compress_archived_payloads(true)
.with_batch_size(500)
.enabled(true);
assert_eq!(policy.archive_completed_after, Some(Duration::days(7)));
assert_eq!(policy.archive_failed_after, Some(Duration::days(30)));
assert_eq!(policy.purge_archived_after, Some(Duration::days(365)));
assert!(policy.compress_payloads);
assert_eq!(policy.batch_size, 500);
assert!(policy.enabled);
}
#[test]
fn test_should_archive() {
let policy = ArchivalPolicy::new()
.archive_completed_after(Duration::days(7))
.archive_failed_after(Duration::days(30));
assert!(policy.should_archive(&JobStatus::Completed, Duration::days(8)));
assert!(!policy.should_archive(&JobStatus::Completed, Duration::days(6)));
assert!(policy.should_archive(&JobStatus::Failed, Duration::days(31)));
assert!(!policy.should_archive(&JobStatus::Failed, Duration::days(29)));
assert!(!policy.should_archive(&JobStatus::Pending, Duration::days(100)));
assert!(!policy.should_archive(&JobStatus::Running, Duration::days(100)));
}
#[test]
fn test_should_archive_disabled_policy() {
let policy = ArchivalPolicy::new()
.archive_completed_after(Duration::days(1))
.enabled(false);
assert!(!policy.should_archive(&JobStatus::Completed, Duration::days(10)));
}
#[test]
fn test_archival_config_default() {
let config = ArchivalConfig::default();
assert_eq!(config.compression_level, 6);
assert_eq!(config.max_payload_size, 1024 * 1024);
assert!(config.verify_compression);
}
#[test]
fn test_archival_config_builder() {
let config = ArchivalConfig::new()
.with_compression_level(9)
.with_max_payload_size(2048)
.with_compression_verification(false);
assert_eq!(config.compression_level, 9);
assert_eq!(config.max_payload_size, 2048);
assert!(!config.verify_compression);
}
#[test]
fn test_archival_reason_display() {
assert_eq!(ArchivalReason::Automatic.to_string(), "Automatic");
assert_eq!(ArchivalReason::Manual.to_string(), "Manual");
assert_eq!(ArchivalReason::Compliance.to_string(), "Compliance");
assert_eq!(ArchivalReason::Maintenance.to_string(), "Maintenance");
}
}