use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct EmailPayload {
to: String,
subject: String,
body: String,
}
#[derive(Serialize, Deserialize)]
struct ImageResizePayload {
src_url: String,
width: u32,
height: u32,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
use asynq::backend::RedisConnectionType;
use asynq::serve_mux::ServeMux;
use asynq::task::Task;
tracing_subscriber::fmt::init();
let redis_url =
std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost:6379".to_string());
println!("🔗 Using Redis URL: {redis_url}");
let redis_config = RedisConnectionType::single(redis_url)?;
let mut queues = std::collections::HashMap::new();
queues.insert("critical".to_string(), 6);
queues.insert("default".to_string(), 3);
queues.insert("image_processing".to_string(), 2);
queues.insert("low".to_string(), 1);
let server_config = asynq::config::ServerConfig::new()
.concurrency(2) .queues(queues)
.strict_priority(false)
.task_check_interval(std::time::Duration::from_secs(1))
.shutdown_timeout(std::time::Duration::from_secs(10));
let mut mux = ServeMux::new();
mux.handle_func("email:send", |task: Task| {
println!("📧 Rust Consumer: Processing email:send task");
if let Ok(payload) = serde_json::from_slice::<EmailPayload>(task.get_payload()) {
println!(" To: {}", payload.to);
println!(" Subject: {}", payload.subject);
} else {
println!(
" Payload: {:?}",
String::from_utf8_lossy(task.get_payload())
);
}
Ok(())
});
mux.handle_async_func("image:resize", |task: Task| async move {
println!("🖼️ Rust Consumer: Processing image:resize task");
if let Ok(payload) = serde_json::from_slice::<ImageResizePayload>(task.get_payload()) {
println!(" Source: {}", payload.src_url);
println!(" Dimensions: {}x{}", payload.width, payload.height);
} else {
println!(
" Payload: {:?}",
String::from_utf8_lossy(task.get_payload())
);
}
Ok(())
});
mux.handle_async_func("payment:process", |task: Task| async move {
println!("💰 Rust Consumer: Processing payment:process task");
println!(
" Payload: {:?}",
String::from_utf8_lossy(task.get_payload())
);
Ok(())
});
mux.handle_func("report:daily", |task: Task| {
println!("📊 Rust Consumer: Processing report:daily task");
println!(
" Payload: {:?}",
String::from_utf8_lossy(task.get_payload())
);
Ok(())
});
mux.handle_func("batch:process", |task: Task| {
println!("🔄 Rust Consumer: Processing batch:process task");
println!(
" Payload: {:?}",
String::from_utf8_lossy(task.get_payload())
);
Ok(())
});
let mut server = asynq::server::ServerBuilder::new()
.redis_config(redis_config)
.server_config(server_config)
.build()
.await?;
println!("🚀 Rust Consumer: Starting server with ServeMux...");
println!("🔄 Server is running and waiting for tasks...");
println!("Press Ctrl+C to gracefully shutdown");
server.run(mux).await?;
println!("👋 Server shutdown complete");
Ok(())
}