#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
use asynq::backend::RedisBroker;
use asynq::backend::RedisConnectionType;
use asynq::base::Broker;
use asynq::components::subscriber::{Subscriber, SubscriberConfig, SubscriptionEvent};
use std::sync::Arc;
tracing_subscriber::fmt::init();
println!("🚀 Task Cancellation Example");
println!("=============================\n");
let redis_config = RedisConnectionType::single("redis://localhost:6379")?;
let broker: Arc<dyn Broker> = std::sync::Arc::new(RedisBroker::new(redis_config).await?);
println!("✅ Connected to Redis\n");
let subscriber_config = SubscriberConfig { buffer_size: 100 };
let mut subscriber = Subscriber::new(Arc::clone(&broker), subscriber_config);
let mut event_rx = subscriber
.take_receiver()
.expect("Failed to get event receiver");
let subscriber_arc = std::sync::Arc::new(subscriber);
let handle = subscriber_arc.clone().start();
println!("📢 Subscriber started and listening for cancellation events\n");
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
println!("📤 Publishing cancellation events...\n");
let task_ids = vec!["task_001", "task_002", "task_003"];
for task_id in &task_ids {
println!(" Cancelling task: {task_id}");
broker.publish_cancellation(task_id).await?;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
println!("\n📥 Receiving cancellation events:\n");
let mut received_count = 0;
let timeout_duration = std::time::Duration::from_secs(3);
while received_count < task_ids.len() {
match tokio::time::timeout(timeout_duration, event_rx.recv()).await {
Ok(Some(event)) => match event {
SubscriptionEvent::TaskCancelled { task_id } => {
received_count += 1;
println!(
" ✓ Task cancelled: {} ({}/{})",
task_id,
received_count,
task_ids.len()
);
}
_ => {
println!(" ℹ Other event received: {event:?}");
}
},
Ok(None) => {
println!(" ⚠ Channel closed");
break;
}
Err(_) => {
println!(" ⏱ Timeout waiting for events");
break;
}
}
}
println!("\n📊 Summary:");
println!(" Total cancellation events published: {}", task_ids.len());
println!(" Total cancellation events received: {received_count}");
println!("\n🛑 Shutting down subscriber...");
subscriber_arc.shutdown();
let _ = tokio::time::timeout(std::time::Duration::from_secs(1), handle).await;
println!("✅ Example completed successfully!");
Ok(())
}