use crate::base::Broker;
use crate::components::ComponentLifecycle;
use crate::error::Result;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;
#[derive(Debug, Clone)]
pub struct JanitorConfig {
pub interval: Duration,
pub batch_size: usize,
pub queues: Vec<String>,
}
impl Default for JanitorConfig {
fn default() -> Self {
Self {
interval: Duration::from_secs(8),
batch_size: 100,
queues: vec!["default".to_string()],
}
}
}
pub struct Janitor {
broker: Arc<dyn Broker>,
config: JanitorConfig,
done: Arc<AtomicBool>,
}
impl Janitor {
pub fn new(broker: Arc<dyn Broker>, config: JanitorConfig) -> Self {
Self {
broker,
config,
done: Arc::new(AtomicBool::new(false)),
}
}
pub fn start(self: Arc<Self>) -> JoinHandle<()> {
tracing::info!("starting janitor");
tokio::spawn(async move {
let mut interval = tokio::time::interval(self.config.interval);
loop {
interval.tick().await;
if self.done.load(Ordering::Relaxed) {
tracing::debug!("Janitor: shutting down");
break;
}
if let Err(e) = self.cleanup().await {
tracing::error!("Janitor cleanup error: {}", e);
}
}
})
}
async fn cleanup(&self) -> Result<()> {
for queue in &self.config.queues {
if let Err(e) = self.broker.delete_expired_completed_tasks(queue).await {
tracing::warn!(
"Janitor: failed to cleanup expired completed tasks for queue {}: {}",
queue,
e
);
}
}
Ok(())
}
pub fn shutdown(&self) {
self.done.store(true, Ordering::Relaxed);
}
pub fn is_done(&self) -> bool {
self.done.load(Ordering::Relaxed)
}
}
impl ComponentLifecycle for Janitor {
fn start(self: Arc<Self>) -> JoinHandle<()> {
Janitor::start(self)
}
fn shutdown(&self) {
Janitor::shutdown(self)
}
fn is_done(&self) -> bool {
Janitor::is_done(self)
}
}
#[cfg(feature = "default")]
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_janitor_config_default() {
let config = JanitorConfig::default();
assert_eq!(config.interval, Duration::from_secs(8));
assert_eq!(config.batch_size, 100);
assert_eq!(config.queues, vec!["default".to_string()]);
}
#[tokio::test]
async fn test_janitor_shutdown() {
use crate::backend::{RedisBroker, RedisConnectionType};
let redis_connection_config = RedisConnectionType::single("redis://localhost:6379").unwrap();
let broker = Arc::new(RedisBroker::new(redis_connection_config).await.unwrap());
let config = JanitorConfig::default();
let janitor = Janitor::new(broker, config);
assert!(!janitor.is_done());
janitor.shutdown();
assert!(janitor.is_done());
}
}