use asynq::error::{Error, Result};
use serde::{Deserialize, Serialize};
fn aggregate_tasks(group: &str, tasks: Vec<asynq::task::Task>) -> Result<asynq::task::Task> {
println!(
"📦 Aggregating {} tasks from group '{}'",
tasks.len(),
group
);
let mut combined_payload: Vec<serde_json::Value> = Vec::new();
for (idx, task) in tasks.iter().enumerate() {
println!(
" asynq::task::Task {}: type='{}', payload size={} bytes",
idx + 1,
task.get_type(),
task.get_payload().len()
);
if let Ok(payload_str) = serde_json::from_slice(task.get_payload()) {
combined_payload.push(payload_str);
}
}
println!(" ✅ Created aggregated task with combined payload");
let data =
serde_json::to_vec(&combined_payload).map_err(|e| Error::Serialization(e.to_string()))?;
asynq::task::Task::new("batch:process", &data)
}
#[derive(Serialize, Deserialize)]
struct EmailPayload {
to: String,
subject: String,
body: String,
}
#[derive(Serialize, Deserialize)]
struct ImageResizePayload {
src_url: String,
width: u32,
height: u32,
}
pub struct TaskProcessor;
#[async_trait::async_trait]
impl asynq::server::Handler for TaskProcessor {
async fn process_task(&self, task: asynq::task::Task) -> Result<()> {
match task.get_type() {
"email:send" => {
let payload: EmailPayload = serde_json::from_slice(task.get_payload())
.map_err(|e| Error::other(format!("Failed to deserialize email:send payload: {}", e)))?;
self.handle_email_send(payload).await
}
"email:reminder" => {
let payload: EmailPayload = serde_json::from_slice(task.get_payload()).map_err(|e| {
Error::other(format!(
"Failed to deserialize email:reminder payload: {}",
e
))
})?;
self.handle_email_reminder(payload).await
}
"image:resize" => {
let payload: ImageResizePayload =
serde_json::from_slice(task.get_payload()).map_err(|e| {
Error::other(format!("Failed to deserialize image:resize payload: {}", e))
})?;
self.handle_image_resize(payload).await
}
"report:daily" => {
let payload: serde_json::Value =
serde_json::from_slice(task.get_payload()).map_err(|e| {
Error::other(format!("Failed to deserialize report:daily payload: {}", e))
})?;
self.handle_daily_report(payload).await
}
"batch:process" => {
let payload: serde_json::Value =
serde_json::from_slice(task.get_payload()).map_err(|e| {
Error::other(format!(
"Failed to deserialize batch:process payload: {}",
e
))
})?;
self.handle_batch_process(payload).await
}
"group:process" => {
let payload: serde_json::Value =
serde_json::from_slice(task.get_payload()).map_err(|e| {
Error::other(format!(
"Failed to deserialize group:process payload: {}",
e
))
})?;
self.handle_group_process(payload).await
}
_ => {
println!("❌ Unknown task type: {}", task.get_type());
Err(Error::other(format!(
"Unknown task type: {}",
task.get_type()
)))
}
}
}
}
impl TaskProcessor {
async fn handle_email_send(&self, payload: EmailPayload) -> Result<()> {
println!("📧 Sending email to: {}", payload.to);
println!(" Subject: {}", payload.subject);
println!(" Body: {}", payload.body);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
println!("✅ Email sent successfully to {}", payload.to);
Ok(())
}
async fn handle_email_reminder(&self, payload: EmailPayload) -> Result<()> {
println!("⏰ Sending reminder email to: {}", payload.to);
println!(" Subject: {}", payload.subject);
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
println!("✅ Reminder email sent to {}", payload.to);
Ok(())
}
async fn handle_image_resize(&self, payload: ImageResizePayload) -> Result<()> {
println!("🖼️ Resizing image: {}", payload.src_url);
println!(" Target size: {}x{}", payload.width, payload.height);
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
println!("✅ Image resized successfully: {}", payload.src_url);
Ok(())
}
async fn handle_daily_report(&self, payload: serde_json::Value) -> Result<()> {
println!("📊 Generating daily report for: {payload}");
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
println!("✅ Daily report generated successfully");
Ok(())
}
async fn handle_batch_process(&self, payload: serde_json::Value) -> Result<()> {
println!("🔄 Processing batch item: {payload}");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
println!("✅ Batch item processed: {payload}");
Ok(())
}
async fn handle_group_process(&self, payload: serde_json::Value) -> Result<()> {
println!("🔄 Processing group item: {payload}");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
println!("✅ Group item processed: {payload}");
Ok(())
}
}
#[cfg(not(feature = "postgres"))]
fn main() {}
#[cfg(feature = "postgres")]
#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
use asynq::backend::PostgresBroker;
tracing_subscriber::fmt::init();
println!("🚀 Starting Asynq worker with PostgresSQL...");
let database_url = std::env::var("DATABASE_URL")
.unwrap_or_else(|_| "postgres://postgres:postgres@localhost:5432/asynq".to_string());
println!("🔗 Using PostgresSQL URL: {database_url}");
let broker = std::sync::Arc::new(
PostgresBroker::builder()
.database_url(&database_url)
.build()
.await?,
);
println!("✅ Connected to PostgresSQL");
let mut queues = std::collections::HashMap::new();
queues.insert("critical".to_string(), 6); queues.insert("default".to_string(), 3); queues.insert("image_processing".to_string(), 2); queues.insert("low".to_string(), 1);
let server_config = asynq::config::ServerConfig::new()
.concurrency(4) .queues(queues)
.strict_priority(false) .enable_group_aggregator(true)
.task_check_interval(std::time::Duration::from_secs(1))
.shutdown_timeout(std::time::Duration::from_secs(10));
let mut server = asynq::server::ServerBuilder::new()
.postgres_broker(broker)
.server_config(server_config)
.build()
.await?;
println!("📦 Setting up group aggregator function...");
let aggregator = asynq::components::aggregator::GroupAggregatorFunc::new(aggregate_tasks);
server.set_group_aggregator(aggregator);
println!(" ✅ Group aggregator configured");
let handler = TaskProcessor;
println!("🔄 Server is running and waiting for tasks...");
println!("Press Ctrl+C to gracefully shutdown");
server.run(handler).await?;
println!("👋 Server shutdown complete");
Ok(())
}