use crate::base::Broker;
use crate::components::ComponentLifecycle;
use crate::error::Result;
use chrono::Utc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;
#[derive(Debug, Clone)]
pub struct RecovererConfig {
pub interval: Duration,
pub queues: Vec<String>,
}
impl Default for RecovererConfig {
fn default() -> Self {
Self {
interval: Duration::from_secs(8),
queues: vec!["default".to_string()],
}
}
}
pub struct Recoverer {
broker: Arc<dyn Broker>,
config: RecovererConfig,
done: Arc<AtomicBool>,
}
impl Recoverer {
pub fn new(broker: Arc<dyn Broker>, config: RecovererConfig) -> Self {
Self {
broker,
config,
done: Arc::new(AtomicBool::new(false)),
}
}
pub fn start(self: Arc<Self>) -> JoinHandle<()> {
tracing::info!("starting recoverer");
tokio::spawn(async move {
let mut interval = tokio::time::interval(self.config.interval);
loop {
interval.tick().await;
if self.done.load(Ordering::Relaxed) {
tracing::debug!("Recoverer: shutting down");
break;
}
if let Err(e) = self.recover().await {
tracing::error!("Recoverer error: {}", e);
}
}
})
}
async fn recover(&self) -> Result<()> {
self.recover_lease_expired_tasks().await?;
self.recover_stale_aggregation_sets().await?;
Ok(())
}
async fn recover_lease_expired_tasks(&self) -> Result<()> {
let cutoff = Utc::now() - chrono::Duration::seconds(30);
let msgs = match self
.broker
.list_lease_expired(cutoff, &self.config.queues)
.await
{
Ok(msgs) => msgs,
Err(e) => {
tracing::warn!("Recoverer: could not list lease expired tasks: {}", e);
return Err(e);
}
};
for msg in msgs {
if msg.retried >= msg.retry {
if let Err(e) = self.archive(&msg, "lease expired").await {
tracing::warn!("Recoverer: could not archive lease expired task: {}", e);
}
} else if let Err(e) = self.retry(&msg, "lease expired").await {
tracing::warn!("Recoverer: could not retry lease expired task: {}", e);
}
}
Ok(())
}
async fn recover_stale_aggregation_sets(&self) -> Result<()> {
for q in &self.config.queues {
if let Err(e) = self.broker.reclaim_stale_aggregation_sets(q).await {
tracing::warn!(
"Recoverer: could not reclaim stale aggregation sets in queue {}: {}",
q,
e
);
}
}
Ok(())
}
async fn retry(&self, msg: &crate::proto::TaskMessage, err: &str) -> Result<()> {
let delay = self.retry_delay_func(msg.retried, err, &msg.r#type);
let retry_at =
Utc::now() + chrono::Duration::from_std(delay).unwrap_or(chrono::Duration::seconds(1));
self
.broker
.retry(msg, retry_at, err, self.is_failure_func(err))
.await
}
async fn archive(&self, msg: &crate::proto::TaskMessage, err: &str) -> Result<()> {
self.broker.archive(msg, err).await
}
fn retry_delay_func(&self, retried: i32, _err: &str, _task_type: &str) -> std::time::Duration {
std::time::Duration::from_secs(10 * (retried as u64 + 1))
}
fn is_failure_func(&self, _err: &str) -> bool {
true
}
pub fn shutdown(&self) {
self.done.store(true, Ordering::Relaxed);
}
pub fn is_done(&self) -> bool {
self.done.load(Ordering::Relaxed)
}
}
impl ComponentLifecycle for Recoverer {
fn start(self: Arc<Self>) -> JoinHandle<()> {
Recoverer::start(self)
}
fn shutdown(&self) {
Recoverer::shutdown(self)
}
fn is_done(&self) -> bool {
Recoverer::is_done(self)
}
}
#[cfg(feature = "default")]
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_recoverer_config_default() {
let config = RecovererConfig::default();
assert_eq!(config.interval, Duration::from_secs(8));
assert_eq!(config.queues, vec!["default".to_string()]);
}
#[tokio::test]
async fn test_recoverer_shutdown() {
use crate::backend::{RedisBroker, RedisConnectionType};
let redis_connection_config = RedisConnectionType::single("redis://localhost:6379").unwrap();
let broker = Arc::new(RedisBroker::new(redis_connection_config).await.unwrap());
let config = RecovererConfig::default();
let recoverer = Recoverer::new(broker, config);
assert!(!recoverer.is_done());
recoverer.shutdown();
assert!(recoverer.is_done());
}
}