use asynq::error::{Error, Result};
use asynq::{server::Handler, task::Task};
fn aggregate_tasks(group: &str, tasks: Vec<Task>) -> Result<Task> {
println!(
"📦 Aggregating {} tasks from group '{}'",
tasks.len(),
group
);
let mut combined_payload = String::new();
for (idx, task) in tasks.iter().enumerate() {
println!(
" Task {}: type='{}', payload size={} bytes",
idx + 1,
task.get_type(),
task.get_payload().len()
);
if let Ok(payload_str) = std::str::from_utf8(task.get_payload()) {
if !combined_payload.is_empty() {
combined_payload.push('\n');
}
combined_payload.push_str(payload_str);
}
}
println!(" ✅ Created aggregated task with combined payload");
asynq::task::Task::new("batch:process", combined_payload.as_bytes())
}
pub struct BatchProcessor;
#[async_trait::async_trait]
impl Handler for BatchProcessor {
async fn process_task(&self, task: Task) -> Result<()> {
match task.get_type() {
"batch:process" => {
println!("\n🔄 Processing aggregated batch task");
if let Ok(payload) = std::str::from_utf8(task.get_payload()) {
println!(" Combined payload:");
for (idx, line) in payload.lines().enumerate() {
println!(" {}. {}", idx + 1, line);
}
}
println!(" ✅ Batch processing completed\n");
Ok(())
}
"email:send" => {
println!("📧 Processing individual email:send task");
Ok(())
}
_ => {
println!("❌ Unknown task type: {}", task.get_type());
Err(Error::other(format!(
"Unknown task type: {}",
task.get_type()
)))
}
}
}
}
#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
use asynq::backend::RedisConnectionType;
use asynq::config::ServerConfig;
tracing_subscriber::fmt::init();
println!("🚀 Starting Asynq server with Group Aggregator...");
println!(" Group Aggregator aggregates multiple tasks into one before processing");
println!();
let redis_url =
std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost:6379".to_string());
println!("🔗 Using Redis URL: {redis_url}");
let redis_config = RedisConnectionType::single(redis_url)?;
let mut queues = std::collections::HashMap::new();
queues.insert("default".to_string(), 1);
let server_config = ServerConfig::default()
.concurrency(2)
.queues(queues)
.enable_group_aggregator(true) .group_grace_period(std::time::Duration::from_secs(10))? .group_max_size(5) .group_max_delay(std::time::Duration::from_secs(30));
println!("⚙️ Server configuration:");
println!(" • Group aggregator: enabled");
println!(" • Grace period: 10 seconds");
println!(" • Max group size: 5 tasks");
println!(" • Max delay: 30 seconds");
println!();
let mut server = asynq::server::ServerBuilder::new()
.redis_config(redis_config.clone())
.server_config(server_config)
.build()
.await?;
println!("📦 Setting up group aggregator function...");
let aggregator = asynq::components::aggregator::GroupAggregatorFunc::new(aggregate_tasks);
server.set_group_aggregator(aggregator);
println!(" ✅ Group aggregator configured");
println!();
println!("💡 To use GroupAggregator in your application:");
println!(" 1. Create tasks with .with_group(\"your-group-name\")");
println!(" 2. Call server.set_group_aggregator(your_aggregator)");
println!(" 3. Tasks in the same group will be aggregated");
println!();
let handler = BatchProcessor;
println!("🔄 Server is running...");
println!(" Press Ctrl+C to gracefully shutdown");
println!();
server.run(handler).await?;
println!("👋 Server shutdown complete");
Ok(())
}