use crate::base::Broker;
use crate::components::ComponentLifecycle;
use crate::error::Result;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;
#[derive(Debug, Clone)]
pub struct ForwarderConfig {
pub interval: Duration,
pub queues: Vec<String>,
}
impl Default for ForwarderConfig {
fn default() -> Self {
Self {
interval: Duration::from_secs(5),
queues: vec!["default".to_string()],
}
}
}
pub struct Forwarder {
broker: Arc<dyn Broker>,
config: ForwarderConfig,
done: Arc<AtomicBool>,
}
impl Forwarder {
pub fn new(broker: Arc<dyn Broker>, config: ForwarderConfig) -> Self {
Self {
broker,
config,
done: Arc::new(AtomicBool::new(false)),
}
}
pub fn start(self: Arc<Self>) -> JoinHandle<()> {
tracing::info!("starting forwarder");
tokio::spawn(async move {
let mut interval = tokio::time::interval(self.config.interval);
loop {
interval.tick().await;
if self.done.load(Ordering::Relaxed) {
tracing::debug!("Forwarder: shutting down");
break;
}
if let Err(e) = self.forward().await {
tracing::error!("Forwarder error: {}", e);
}
}
})
}
async fn forward(&self) -> Result<()> {
if let Err(e) = self.broker.forward_if_ready(&self.config.queues).await {
tracing::warn!("Forwarder: failed to forward ready tasks: {}", e);
return Err(e);
}
Ok(())
}
pub fn shutdown(&self) {
self.done.store(true, Ordering::Relaxed);
}
pub fn is_done(&self) -> bool {
self.done.load(Ordering::Relaxed)
}
}
impl ComponentLifecycle for Forwarder {
fn start(self: Arc<Self>) -> JoinHandle<()> {
Forwarder::start(self)
}
fn shutdown(&self) {
Forwarder::shutdown(self)
}
fn is_done(&self) -> bool {
Forwarder::is_done(self)
}
}
#[cfg(feature = "default")]
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_forwarder_config_default() {
let config = ForwarderConfig::default();
assert_eq!(config.interval, Duration::from_secs(5));
assert_eq!(config.queues, vec!["default".to_string()]);
}
#[tokio::test]
async fn test_forwarder_shutdown() {
use crate::backend::{RedisBroker, RedisConnectionType};
let redis_connection_config = RedisConnectionType::single("redis://localhost:6379").unwrap();
let broker = Arc::new(RedisBroker::new(redis_connection_config).await.unwrap());
let config = ForwarderConfig::default();
let forwarder = Forwarder::new(broker, config);
assert!(!forwarder.is_done());
forwarder.shutdown();
assert!(forwarder.is_done());
}
}