use asynq::error::Result;
use asynq::server::Handler;
use asynq::task::Task;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
struct ComputePayload {
operation: String,
values: Vec<i32>,
}
#[derive(Serialize, Deserialize, Debug)]
struct ComputeResult {
operation: String,
input: Vec<i32>,
result: i32,
timestamp: String,
}
pub struct ResultWriterHandler;
#[async_trait::async_trait]
impl Handler for ResultWriterHandler {
async fn process_task(&self, task: Task) -> Result<()> {
match task.get_type() {
"default:sum" | "default:multiply" => {
let payload: ComputePayload = serde_json::from_slice(task.get_payload()).unwrap();
self.handle_compute(task, payload).await
}
_ => {
println!("Unknown task type: {}", task.get_type());
Ok(())
}
}
}
}
impl ResultWriterHandler {
async fn handle_compute(&self, task: Task, payload: ComputePayload) -> Result<()> {
println!("📊 Processing compute task: {}", payload.operation);
println!(" Input values: {:?}", payload.values);
let result = match payload.operation.as_str() {
"sum" => payload.values.iter().sum(),
"multiply" => payload.values.iter().product(),
_ => 0,
};
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
println!(" Result: {}", result);
if let Some(writer) = task.result_writer() {
let compute_result = ComputeResult {
operation: payload.operation.clone(),
input: payload.values.clone(),
result,
timestamp: chrono::Utc::now().to_rfc3339(),
};
let result_json = serde_json::to_vec(&compute_result).unwrap();
match writer.write(&result_json).await {
Ok(bytes_written) => {
println!("✅ Result written successfully: {} bytes", bytes_written);
println!(" Task ID: {}", writer.task_id());
}
Err(e) => {
println!("❌ Failed to write result: {}", e);
}
}
} else {
println!("⚠️ No ResultWriter available (task not from processor)");
}
Ok(())
}
}
#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
use asynq::backend::RedisConnectionType;
tracing_subscriber::fmt::init();
println!("🚀 Starting ResultWriter Example Server...");
let redis_url =
std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost:6379".to_string());
println!("🔗 Using Redis URL: {redis_url}");
let redis_config = RedisConnectionType::single(redis_url)?;
let mut queues = std::collections::HashMap::new();
queues.insert("default".to_string(), 3);
let server_config = asynq::config::ServerConfig::new()
.concurrency(2)
.queues(queues)
.strict_priority(false)
.task_check_interval(std::time::Duration::from_secs(1))
.shutdown_timeout(std::time::Duration::from_secs(10));
let mut server = asynq::server::ServerBuilder::new()
.redis_config(redis_config)
.server_config(server_config)
.build()
.await?;
let handler = ResultWriterHandler;
println!("🔄 Server is running and waiting for tasks...");
println!("💡 Run the producer to enqueue tasks with:");
println!(" cargo run --example result_writer_producer");
println!("Press Ctrl+C to gracefully shutdown");
server.run(handler).await?;
println!("👋 Server shutdown complete");
Ok(())
}