use asynq::error::{Error, Result};
use asynq::server::Handler;
use asynq::task::Task;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct EmailPayload {
to: String,
subject: String,
body: String,
}
#[derive(Serialize, Deserialize)]
struct ImageResizePayload {
src_url: String,
width: u32,
height: u32,
}
pub struct TaskProcessor;
#[async_trait::async_trait]
impl Handler for TaskProcessor {
async fn process_task(&self, task: Task) -> Result<()> {
match task.get_type() {
"email:send" => {
let payload: EmailPayload = serde_json::from_slice(task.get_payload()).unwrap();
self.handle_email_send(payload).await
}
"email:reminder" => {
let payload: EmailPayload = serde_json::from_slice(task.get_payload()).unwrap();
self.handle_email_reminder(payload).await
}
"image:resize" => {
let payload: ImageResizePayload = serde_json::from_slice(task.get_payload()).unwrap();
self.handle_image_resize(payload).await
}
"report:daily" => {
let payload: serde_json::Value = serde_json::from_slice(task.get_payload()).unwrap();
self.handle_daily_report(payload).await
}
"batch:process" => {
let payload: serde_json::Value = serde_json::from_slice(task.get_payload()).unwrap();
self.handle_batch_process(payload).await
}
"payment:process" => {
let payload: serde_json::Value = serde_json::from_slice(task.get_payload()).unwrap();
self.handle_payment_process(payload).await
}
"image:process" => {
let payload: ImageResizePayload = serde_json::from_slice(task.get_payload()).unwrap();
self.handle_image_process(payload).await
}
"demo:periodic_task" => {
println!(
"⏰ Received demo:periodic_task, payload: {:?}",
String::from_utf8_lossy(task.get_payload())
);
Ok(())
}
_ => {
println!("Unknown task type: {}", task.get_type());
Err(Error::other(format!(
"Unknown task type: {}",
task.get_type()
)))
}
}
}
}
impl TaskProcessor {
async fn handle_email_send(&self, payload: EmailPayload) -> Result<()> {
println!("📧 Sending email to: {}", payload.to);
println!(" Subject: {}", payload.subject);
println!(" Body: {}", payload.body);
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
println!("✅ Email sent successfully to {}", payload.to);
Ok(())
}
async fn handle_email_reminder(&self, payload: EmailPayload) -> Result<()> {
println!("⏰ Sending reminder email to: {}", payload.to);
println!(" Subject: {}", payload.subject);
tokio::time::sleep(std::time::Duration::from_secs(50)).await;
println!("✅ Reminder email sent to {}", payload.to);
Ok(())
}
async fn handle_image_resize(&self, payload: ImageResizePayload) -> Result<()> {
println!("🖼️ Resizing image: {}", payload.src_url);
println!(" Target size: {}x{}", payload.width, payload.height);
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
println!("✅ Image resized successfully: {}", payload.src_url);
Ok(())
}
async fn handle_daily_report(&self, payload: serde_json::Value) -> Result<()> {
println!("📊 Generating daily report for: {payload}");
tokio::time::sleep(std::time::Duration::from_secs(20)).await;
println!("✅ Daily report generated successfully");
Ok(())
}
async fn handle_batch_process(&self, payload: serde_json::Value) -> Result<()> {
println!("🔄 Processing batch item: {payload}");
tokio::time::sleep(std::time::Duration::from_secs(50)).await;
println!("✅ Batch item processed: {payload}");
Ok(())
}
async fn handle_payment_process(&self, payload: serde_json::Value) -> Result<()> {
println!("💰 Processing payment: {payload}");
let success_rate = 0.8; if rand::random::<f64>() < success_rate {
tokio::time::sleep(std::time::Duration::from_secs(20)).await;
println!("✅ Payment processed successfully: {payload}");
Ok(())
} else {
Err(Error::other("Payment gateway temporarily unavailable"))
}
}
async fn handle_image_process(&self, payload: ImageResizePayload) -> Result<()> {
println!(
"🖼️ Processing image with advanced retry: {}",
payload.src_url
);
println!(" Target size: {}x{}", payload.width, payload.height);
let success_rate = 0.7; if rand::random::<f64>() < success_rate {
tokio::time::sleep(std::time::Duration::from_secs(80)).await;
println!("✅ Image processed successfully: {}", payload.src_url);
Ok(())
} else {
Err(Error::other("Image processing service overloaded"))
}
}
}
fn aggregate_tasks(group: &str, tasks: Vec<Task>) -> Result<Task> {
println!(
"📦 Aggregating {} tasks from group '{}'",
tasks.len(),
group
);
let mut combined_payload = Vec::new();
for (idx, task) in tasks.iter().enumerate() {
println!(
" Task {}: type='{}', payload size={} bytes",
idx + 1,
task.get_type(),
task.get_payload().len()
);
combined_payload.push(String::from_utf8_lossy(task.get_payload()));
}
println!(" ✅ Created aggregated task with combined payload");
asynq::task::Task::new(
"batch:process",
&serde_json::to_vec(&combined_payload).unwrap_or_default(),
)
}
#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
println!("🚀 Starting Asynq worker server...");
let redis_url = std::env::var("REDIS_URL")
.unwrap_or_else(|_| "redis://tenant1:secure_pass123@localhost:6379".to_string());
println!("🔗 Using Redis URL: {redis_url}");
let redis_config = asynq::backend::RedisConnectionType::single(redis_url.clone())?;
let mut queues = std::collections::HashMap::new();
queues.insert("critical".to_string(), 6); queues.insert("default".to_string(), 3); queues.insert("image_processing".to_string(), 2); queues.insert("low".to_string(), 1); let aggregator = asynq::components::aggregator::GroupAggregatorFunc::new(aggregate_tasks);
let mut server_config = asynq::config::ServerConfig::new()
.concurrency(4) .queues(queues)
.strict_priority(false) .task_check_interval(std::time::Duration::from_secs(1))
.shutdown_timeout(std::time::Duration::from_secs(10))
.group_grace_period(std::time::Duration::from_secs(5))? .group_max_size(10) .enable_group_aggregator(true);
#[cfg(feature = "acl")]
{
if let Some(username) = extract_username_from_redis_url(&redis_url) {
println!("🔐 ACL enabled with tenant: {username}");
server_config = server_config.acl_tenant(username);
}
}
let mut server = asynq::server::ServerBuilder::new()
.redis_config(redis_config)
.server_config(server_config)
.build()
.await?;
server.set_group_aggregator(aggregator);
let handler = TaskProcessor;
println!("🔄 Server is running and waiting for tasks...");
println!("Press Ctrl+C to gracefully shutdown");
server.run(handler).await?;
println!("👋 Server shutdown complete");
Ok(())
}
#[cfg(feature = "acl")]
fn extract_username_from_redis_url(url: &str) -> Option<String> {
if let Some(start_idx) = url.find("://") {
let after_scheme = &url[start_idx + 3..];
if let Some(at_idx) = after_scheme.find('@') {
let credentials = &after_scheme[..at_idx];
if let Some(colon_idx) = credentials.find(':') {
let username = &credentials[..colon_idx];
if !username.is_empty() {
return Some(username.to_string());
}
}
}
}
None
}