asynq 0.1.8

Simple, reliable & efficient distributed task queue in Rust, inspired by hibiken/asynq
Documentation
//! 消费者示例
//! Consumer example
//!
//! 演示如何使用 asynq 服务器处理任务
//! Demonstrates how to use asynq server to process tasks

use asynq::error::{Error, Result};
use asynq::server::Handler;
use asynq::task::Task;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
struct EmailPayload {
  to: String,
  subject: String,
  body: String,
}

#[derive(Serialize, Deserialize)]
struct ImageResizePayload {
  src_url: String,
  width: u32,
  height: u32,
}
/// 任务处理器
pub struct TaskProcessor;

#[async_trait::async_trait]
impl Handler for TaskProcessor {
  async fn process_task(&self, task: Task) -> Result<()> {
    match task.get_type() {
      "email:send" => {
        let payload: EmailPayload = serde_json::from_slice(task.get_payload()).unwrap();
        self.handle_email_send(payload).await
      }
      "email:reminder" => {
        let payload: EmailPayload = serde_json::from_slice(task.get_payload()).unwrap();
        self.handle_email_reminder(payload).await
      }
      "image:resize" => {
        let payload: ImageResizePayload = serde_json::from_slice(task.get_payload()).unwrap();
        self.handle_image_resize(payload).await
      }
      "report:daily" => {
        let payload: serde_json::Value = serde_json::from_slice(task.get_payload()).unwrap();
        self.handle_daily_report(payload).await
      }
      "batch:process" => {
        let payload: serde_json::Value = serde_json::from_slice(task.get_payload()).unwrap();
        self.handle_batch_process(payload).await
      }
      "payment:process" => {
        let payload: serde_json::Value = serde_json::from_slice(task.get_payload()).unwrap();
        self.handle_payment_process(payload).await
      }
      "image:process" => {
        let payload: ImageResizePayload = serde_json::from_slice(task.get_payload()).unwrap();
        self.handle_image_process(payload).await
      }
      "demo:periodic_task" => {
        println!(
          "⏰ Received demo:periodic_task, payload: {:?}",
          String::from_utf8_lossy(task.get_payload())
        );
        Ok(())
      }
      _ => {
        println!("Unknown task type: {}", task.get_type());
        Err(Error::other(format!(
          "Unknown task type: {}",
          task.get_type()
        )))
      }
    }
  }
}

impl TaskProcessor {
  async fn handle_email_send(&self, payload: EmailPayload) -> Result<()> {
    println!("📧 Sending email to: {}", payload.to);
    println!("   Subject: {}", payload.subject);
    println!("   Body: {}", payload.body);

    // 模拟邮件发送处理(延长耗时以测试任务取消)
    tokio::time::sleep(std::time::Duration::from_secs(10)).await;

    println!("✅ Email sent successfully to {}", payload.to);
    Ok(())
  }

  async fn handle_email_reminder(&self, payload: EmailPayload) -> Result<()> {
    println!("⏰ Sending reminder email to: {}", payload.to);
    println!("   Subject: {}", payload.subject);

    // 模拟提醒邮件处理
    tokio::time::sleep(std::time::Duration::from_secs(50)).await;

    println!("✅ Reminder email sent to {}", payload.to);
    Ok(())
  }

  async fn handle_image_resize(&self, payload: ImageResizePayload) -> Result<()> {
    println!("🖼️  Resizing image: {}", payload.src_url);
    println!("   Target size: {}x{}", payload.width, payload.height);

    // 模拟图片处理(延长耗时以测试任务取消)
    tokio::time::sleep(std::time::Duration::from_secs(60)).await;

    println!("✅ Image resized successfully: {}", payload.src_url);
    Ok(())
  }

  async fn handle_daily_report(&self, payload: serde_json::Value) -> Result<()> {
    println!("📊 Generating daily report for: {payload}");

    // 模拟报告生成
    tokio::time::sleep(std::time::Duration::from_secs(20)).await;

    println!("✅ Daily report generated successfully");
    Ok(())
  }

  async fn handle_batch_process(&self, payload: serde_json::Value) -> Result<()> {
    println!("🔄 Processing batch item: {payload}");
    // 模拟批处理
    tokio::time::sleep(std::time::Duration::from_secs(50)).await;
    println!("✅ Batch item processed: {payload}");
    Ok(())
  }

  async fn handle_payment_process(&self, payload: serde_json::Value) -> Result<()> {
    println!("💰 Processing payment: {payload}");

    // 模拟支付处理 - 可能需要重试
    let success_rate = 0.8; // 80% 成功率
    if rand::random::<f64>() < success_rate {
      tokio::time::sleep(std::time::Duration::from_secs(20)).await;
      println!("✅ Payment processed successfully: {payload}");
      Ok(())
    } else {
      // 模拟支付失败,需要重试
      Err(Error::other("Payment gateway temporarily unavailable"))
    }
  }

  async fn handle_image_process(&self, payload: ImageResizePayload) -> Result<()> {
    println!(
      "🖼️  Processing image with advanced retry: {}",
      payload.src_url
    );
    println!("   Target size: {}x{}", payload.width, payload.height);

    // 模拟可能失败的图片处理
    let success_rate = 0.7; // 70% 成功率
    if rand::random::<f64>() < success_rate {
      tokio::time::sleep(std::time::Duration::from_secs(80)).await;
      println!("✅ Image processed successfully: {}", payload.src_url);
      Ok(())
    } else {
      // 模拟处理失败,将会使用指数退避重试
      Err(Error::other("Image processing service overloaded"))
    }
  }
}

/// 自定义聚合器示例 - 将多个任务的 payload 合并
/// Custom aggregator example - combines payloads from multiple tasks
fn aggregate_tasks(group: &str, tasks: Vec<Task>) -> Result<Task> {
  println!(
    "📦 Aggregating {} tasks from group '{}'",
    tasks.len(),
    group
  );

  // 合并所有任务的 payload
  // Combine payloads from all tasks
  let mut combined_payload = Vec::new();
  for (idx, task) in tasks.iter().enumerate() {
    println!(
      "   Task {}: type='{}', payload size={} bytes",
      idx + 1,
      task.get_type(),
      task.get_payload().len()
    );

    combined_payload.push(String::from_utf8_lossy(task.get_payload()));
  }
  println!("   ✅ Created aggregated task with combined payload");

  // 创建聚合后的任务
  // Create the aggregated task
  asynq::task::Task::new(
    "batch:process",
    &serde_json::to_vec(&combined_payload).unwrap_or_default(),
  )
}

#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
  tracing_subscriber::fmt::init();

  println!("🚀 Starting Asynq worker server...");

  // 创建 Redis 配置 - 优先从环境变量中读取,否则使用本地 Redis
  let redis_url = std::env::var("REDIS_URL")
    .unwrap_or_else(|_| "redis://tenant1:secure_pass123@localhost:6379".to_string());
  println!("🔗 Using Redis URL: {redis_url}");
  let redis_config = asynq::backend::RedisConnectionType::single(redis_url.clone())?;

  // 配置队列优先级
  let mut queues = std::collections::HashMap::new();
  queues.insert("critical".to_string(), 6); // 最高优先级
  queues.insert("default".to_string(), 3); // 默认优先级
  queues.insert("image_processing".to_string(), 2); // 图片处理队列
  queues.insert("low".to_string(), 1); // 低优先级
  let aggregator = asynq::components::aggregator::GroupAggregatorFunc::new(aggregate_tasks);

  // 创建服务器配置
  let mut server_config = asynq::config::ServerConfig::new()
    .concurrency(4) // 4 个并发工作者
    .queues(queues)
    .strict_priority(false) // 不使用严格优先级
    .task_check_interval(std::time::Duration::from_secs(1))
    .shutdown_timeout(std::time::Duration::from_secs(10))
    .group_grace_period(std::time::Duration::from_secs(5))? // 组聚合宽限期
    .group_max_size(10) // 组最大大小
    .enable_group_aggregator(true); // 启用组聚合器 / Enable group aggregator

  // 如果启用了 acl 特性,配置 ACL
  // If acl feature is enabled, configure ACL
  #[cfg(feature = "acl")]
  {
    // 从 Redis URL 中提取用户名作为租户
    // Extract username from Redis URL as tenant
    if let Some(username) = extract_username_from_redis_url(&redis_url) {
      println!("🔐 ACL enabled with tenant: {username}");
      server_config = server_config.acl_tenant(username);
    }
  }

  // 创建服务器
  let mut server = asynq::server::ServerBuilder::new()
    .redis_config(redis_config)
    .server_config(server_config)
    .build()
    .await?;
  server.set_group_aggregator(aggregator);
  // 创建任务处理器
  let handler = TaskProcessor;

  // 运行服务器
  println!("🔄 Server is running and waiting for tasks...");
  println!("Press Ctrl+C to gracefully shutdown");

  server.run(handler).await?;

  println!("👋 Server shutdown complete");

  Ok(())
}

/// 从 Redis URL 中提取用户名
/// Extract username from Redis URL
/// 格式: redis://username:password@host:port
#[cfg(feature = "acl")]
fn extract_username_from_redis_url(url: &str) -> Option<String> {
  // 查找 "://" 之后和 "@" 之前的部分
  if let Some(start_idx) = url.find("://") {
    let after_scheme = &url[start_idx + 3..];
    if let Some(at_idx) = after_scheme.find('@') {
      let credentials = &after_scheme[..at_idx];
      // 分离用户名和密码
      if let Some(colon_idx) = credentials.find(':') {
        let username = &credentials[..colon_idx];
        if !username.is_empty() {
          return Some(username.to_string());
        }
      }
    }
  }
  None
}