#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
use asynq::backend::RedisBroker;
use asynq::backend::RedisConnectionType;
use asynq::components::aggregator::{Aggregator, AggregatorConfig};
use asynq::components::forwarder::{Forwarder, ForwarderConfig};
use asynq::components::healthcheck::{Healthcheck, HealthcheckConfig};
use asynq::components::janitor::{Janitor, JanitorConfig};
use asynq::components::recoverer::{Recoverer, RecovererConfig};
use asynq::components::subscriber::{Subscriber, SubscriberConfig};
use std::sync::Arc;
tracing_subscriber::fmt::init();
println!("🚀 Starting asynq components example...\n");
let redis_url =
std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost:6379".to_string());
println!("📡 Connecting to Redis: {redis_url}");
let redis_config = RedisConnectionType::single(redis_url)?;
let broker = RedisBroker::new(redis_config.clone()).await?;
let broker: Arc<dyn asynq::base::Broker> = std::sync::Arc::new(broker);
println!("✅ Connected to Redis\n");
println!("🧹 Starting Janitor component...");
let janitor_config = JanitorConfig {
interval: std::time::Duration::from_secs(10),
batch_size: 100,
queues: vec!["default".to_string(), "critical".to_string()],
};
let janitor = std::sync::Arc::new(Janitor::new(Arc::clone(&broker), janitor_config));
let janitor_handle = janitor.clone().start();
println!(" ✓ Janitor started (interval: 10s)\n");
println!("🔄 Starting Recoverer component...");
let recoverer_config = RecovererConfig {
interval: std::time::Duration::from_secs(8),
queues: vec!["default".to_string(), "critical".to_string()],
};
let recoverer = std::sync::Arc::new(Recoverer::new(Arc::clone(&broker), recoverer_config));
let recoverer_handle = recoverer.clone().start();
println!(" ✓ Recoverer started (interval: 8s)\n");
println!("⏩ Starting Forwarder component...");
let forwarder_config = ForwarderConfig {
interval: std::time::Duration::from_secs(5),
queues: vec!["default".to_string(), "critical".to_string()],
};
let forwarder = std::sync::Arc::new(Forwarder::new(Arc::clone(&broker), forwarder_config));
let forwarder_handle = forwarder.clone().start();
println!(" ✓ Forwarder started (interval: 5s)\n");
println!("🏥 Starting Healthcheck component...");
let healthcheck_config = HealthcheckConfig {
interval: std::time::Duration::from_secs(15),
};
let healthcheck = std::sync::Arc::new(Healthcheck::new(Arc::clone(&broker), healthcheck_config));
let healthcheck_handle = healthcheck.clone().start();
println!(" ✓ Healthcheck started (interval: 15s)\n");
println!("📦 Starting Aggregator component...");
let aggregator_config = AggregatorConfig {
interval: std::time::Duration::from_secs(5),
queues: vec!["default".to_string()],
grace_period: std::time::Duration::from_secs(60),
max_delay: Some(std::time::Duration::from_secs(300)),
max_size: Some(100),
group_aggregator: None,
};
let aggregator = std::sync::Arc::new(Aggregator::new(Arc::clone(&broker), aggregator_config));
let aggregator_handle = aggregator.clone().start();
println!(" ✓ Aggregator started (interval: 5s)\n");
println!("📢 Starting Subscriber component...");
let subscriber_config = SubscriberConfig { buffer_size: 100 };
let subscriber = Subscriber::new(Arc::clone(&broker), subscriber_config);
let subscriber_arc = std::sync::Arc::new(subscriber);
let subscriber_handle = subscriber_arc.clone().start();
println!(" ✓ Subscriber started (buffer size: 100)\n");
println!("⏰ Note: Periodic Task Manager requires Scheduler to be started separately");
println!(" See periodic_task_manager_example.rs for full demonstration\n");
println!("🎉 All components started successfully!\n");
println!("📊 Component Status:");
println!(" • Janitor: Running");
println!(" • Recoverer: Running");
println!(" • Forwarder: Running");
println!(" • Healthcheck: Healthy = {}", healthcheck.is_healthy());
println!(" • Aggregator: Running");
println!(" • Subscriber: Running\n");
println!("⏳ Running for 30 seconds...");
println!(" (Components are working in the background)\n");
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
println!("🛑 Shutting down components...");
janitor.shutdown();
recoverer.shutdown();
forwarder.shutdown();
healthcheck.shutdown();
aggregator.shutdown();
subscriber_arc.shutdown();
let _ = tokio::time::timeout(std::time::Duration::from_secs(5), janitor_handle).await;
let _ = tokio::time::timeout(std::time::Duration::from_secs(5), recoverer_handle).await;
let _ = tokio::time::timeout(std::time::Duration::from_secs(5), forwarder_handle).await;
let _ = tokio::time::timeout(std::time::Duration::from_secs(5), healthcheck_handle).await;
let _ = tokio::time::timeout(std::time::Duration::from_secs(5), aggregator_handle).await;
let _ = tokio::time::timeout(std::time::Duration::from_secs(5), subscriber_handle).await;
println!("✅ All components shut down successfully!\n");
println!("👋 Example completed!");
Ok(())
}