use asynq::error::{Error, Result};
use asynq::{server::Handler, task::Task};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
struct EmailPayload {
to: String,
subject: String,
body: String,
}
pub struct EmailProcessor;
#[async_trait::async_trait]
impl Handler for EmailProcessor {
async fn process_task(&self, task: Task) -> Result<()> {
println!("📨 Processing task: {}", task.get_type());
match task.get_type() {
"email:send" => {
let payload: EmailPayload = serde_json::from_slice(task.get_payload()).unwrap();
println!("✉️ Sending email to: {}", payload.to);
println!(" Subject: {}", payload.subject);
println!(" Body: {}", payload.body);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
println!("✅ Email sent successfully to {}", payload.to);
Ok(())
}
"email:reminder" => {
let payload: EmailPayload = serde_json::from_slice(task.get_payload()).unwrap();
println!("⏰ Sending reminder email to: {}", payload.to);
println!(" Subject: {}", payload.subject);
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
println!("✅ Reminder email sent to {}", payload.to);
Ok(())
}
_ => {
println!("❌ Unknown task type: {}", task.get_type());
Err(Error::other(format!(
"Unknown task type: {}",
task.get_type()
)))
}
}
}
}
#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
println!("🚀 Starting Asynq worker server with Processor...");
println!(" The Processor module is compatible with Go asynq processor.go");
println!();
let redis_url =
std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost:6379".to_string());
println!("🔗 Using Redis URL: {redis_url}");
let redis_config = asynq::backend::RedisConnectionType::single(redis_url)?;
let mut queues = std::collections::HashMap::new();
queues.insert("critical".to_string(), 6); queues.insert("default".to_string(), 3); queues.insert("low".to_string(), 1);
let server_config = asynq::config::ServerConfig::new()
.concurrency(2) .queues(queues)
.strict_priority(false) .task_check_interval(std::time::Duration::from_secs(1))
.shutdown_timeout(std::time::Duration::from_secs(10));
let mut server = asynq::server::ServerBuilder::new()
.redis_config(redis_config)
.server_config(server_config)
.build()
.await?;
println!();
println!("✨ Processor features:");
println!(" • Semaphore-based concurrency control");
println!(" • Task timeout support");
println!(" • Queue priority selection (strict and weighted)");
println!(" • Graceful shutdown");
println!(" • Automatic retry with exponential backoff");
println!(" • Task archival after max retries");
println!();
let handler = EmailProcessor;
println!("🔄 Server is running and waiting for tasks...");
println!(" Press Ctrl+C to gracefully shutdown");
println!();
server.run(handler).await?;
println!();
println!("👋 Server shutdown complete");
Ok(())
}