asynq 0.1.8

Simple, reliable & efficient distributed task queue in Rust, inspired by hibiken/asynq
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
//! 处理器模块
//! Processor module
//!
//! 实现与 Go asynq processor.go 兼容的任务处理器
//! Implements task processor compatible with Go asynq processor.go
//!
//! ## 概述 / Overview
//!
//! Processor 是任务处理的核心组件,负责从 Redis 队列中取出任务并执行。
//! 它实现了与 Go 版本 asynq processor.go 相似的架构和功能。
//!
//! The Processor is the core component for task processing, responsible for dequeuing
//! tasks from Redis queues and executing them. It implements an architecture and
//! functionality similar to Go's asynq processor.go.
//!
//! ## 主要特性 / Key Features
//!
//! - **信号量并发控制**: 使用 Tokio Semaphore 限制并发工作者数量
//!   - **Semaphore-based concurrency**: Uses Tokio Semaphore to limit concurrent workers
//!
//! - **队列优先级**: 支持严格优先级和加权优先级两种模式
//!   - **Queue priority**: Supports both strict priority and weighted priority modes
//!
//! - **任务超时**: 支持任务级别和全局超时设置
//!   - **Task timeout**: Supports task-level and global timeout settings
//!
//! - **优雅关闭**: 等待所有活跃工作者完成后再关闭
//!   - **Graceful shutdown**: Waits for all active workers to finish before shutting down
//!
//! - **自动重试**: 失败任务自动重试,支持指数退避策略
//!   - **Automatic retry**: Failed tasks are automatically retried with exponential backoff
//!
//! - **任务归档**: 达到最大重试次数后自动归档任务
//!   - **Task archival**: Tasks are automatically archived after max retries
//!
//! ## 与 Go 版本的兼容性 / Compatibility with Go version
//!
//! 本实现与 Go 版本的 processor.go 在以下方面保持兼容:
//! This implementation maintains compatibility with Go's processor.go in the following aspects:
//!
//! - 相同的队列优先级算法 / Same queue priority algorithm
//! - 相同的重试延迟计算 / Same retry delay calculation
//! - 相同的任务超时处理 / Same task timeout handling
//! - 相同的并发控制机制 / Same concurrency control mechanism
//!
//! ## 使用示例 / Usage Example
//!
//! ```rust,no_run
//! use asynq::components::processor::{Processor, ProcessorParams};
//! use asynq::server::Handler;
//! use std::collections::HashMap;
//! use std::sync::Arc;
//! use std::sync::atomic::AtomicUsize;
//! use std::time::Duration;
//! #[cfg(feature = "default")]
//! async fn example() -> Result<(), Box<dyn std::error::Error>> {
//! // 创建 broker 和其他必要组件
//! // Create broker and other necessary components
//! # use asynq::backend::RedisConnectionType;
//! # use asynq::backend::RedisBroker;
//! # let redis_config = asynq::backend::RedisConnectionType::single("redis://localhost:6379")?;
//! # let broker = Arc::new(RedisBroker::new(redis_config).await?);
//!
//! let mut queues = HashMap::new();
//! queues.insert("default".to_string(), 3);
//! queues.insert("critical".to_string(), 6);
//!
//! let params = ProcessorParams {
//!   broker: broker.clone(),
//!   inspector: Arc::new(asynq::inspector::Inspector::from_broker(broker)),
//!   queues,
//!   concurrency: 10,
//!   strict_priority: false,
//!   task_check_interval: Duration::from_secs(1),
//!   shutdown_timeout: Duration::from_secs(30),
//!   worker_event_sender:None,
//! };
//!
//! let mut processor = Processor::new(params);
//!
//! // 启动处理器
//! // Start processor
//! # struct MyHandler;
//! # #[async_trait::async_trait]
//! # impl Handler for MyHandler {
//! #   async fn process_task(&self, _task: asynq::task::Task) -> asynq::error::Result<()> { Ok(()) }
//! # }
//! processor.start(Arc::new(MyHandler));
//!
//! // 停止处理器
//! // Stop processor
//! processor.shutdown().await;
//! # Ok(())
//! # }
//! ```

use crate::base::Broker;
use crate::error::Error;
use crate::inspector::InspectorTrait;
use crate::server::Handler;
use crate::task::Task;
use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::{mpsc, Semaphore};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;

/// 处理器参数
/// Processor parameters
pub struct ProcessorParams {
  pub broker: Arc<dyn Broker>,
  pub inspector: Arc<dyn InspectorTrait>,
  pub queues: HashMap<String, i32>,
  pub concurrency: usize,
  pub strict_priority: bool,
  pub task_check_interval: Duration,
  pub shutdown_timeout: Duration,
  /// Worker 事件发送器(发送给 Heartbeat,支持多生产者)
  /// Worker event sender (send to Heartbeat, supports multi-producer)
  pub worker_event_sender: Option<crate::components::heartbeat::WorkerEventSender>,
}

/// 任务取消追踪结构
/// Task cancellation tracking structure
///
/// 对应 Go asynq 的 internal/base/base.go 中的 Cancellations
/// Corresponds to Cancellations in Go asynq's internal/base/base.go
#[derive(Clone)]
pub struct CancellationMap {
  // 正在运行的任务及其取消令牌
  // Running tasks with their cancellation tokens
  tasks: Arc<Mutex<HashMap<String, CancellationToken>>>,
}

impl CancellationMap {
  /// 创建新的 Cancellations 实例
  /// Create a new Cancellations instance
  pub fn new() -> Self {
    Self {
      tasks: Arc::new(Mutex::new(HashMap::new())),
    }
  }

  /// 添加任务取消令牌
  /// Add task cancellation token
  pub fn add(&self, task_id: String, token: CancellationToken) {
    if let Ok(mut tasks) = self.tasks.lock() {
      tasks.insert(task_id, token);
    };
  }

  /// 移除任务取消令牌
  /// Remove task cancellation token
  pub fn remove(&self, task_id: &str) {
    if let Ok(mut tasks) = self.tasks.lock() {
      tasks.remove(task_id);
    }
  }

  /// 取消指定的任务
  /// Cancel specified task
  ///
  /// 对应 Go 的 cancelations.Cancel(taskID)
  /// Corresponds to Go's cancelations.Cancel(taskID)
  pub fn cancel(&self, task_id: &str) -> bool {
    tracing::info!("canceling task {}", task_id);
    if let Ok(tasks) = self.tasks.lock() {
      if let Some(token) = tasks.get(task_id) {
        token.cancel();
        true
      } else {
        false
      }
    } else {
      false
    }
  }
  /// 获取正在运行的任务数量
  /// Get the number of running tasks
  pub fn len(&self) -> usize {
    if let Ok(tasks) = self.tasks.lock() {
      tasks.len()
    } else {
      0
    }
  }

  /// 检查是否为空
  /// Check if empty
  pub fn is_empty(&self) -> bool {
    self.len() == 0
  }
}

impl Default for CancellationMap {
  fn default() -> Self {
    Self::new()
  }
}

/// 处理器 - 负责从队列中取出任务并处理
/// Processor - responsible for dequeuing and processing tasks
pub struct Processor {
  broker: Arc<dyn Broker>,
  inspector: Arc<dyn InspectorTrait>,
  queue_config: HashMap<String, i32>,
  ordered_queues: Option<Vec<String>>,
  task_check_interval: Duration,
  shutdown_timeout: Duration,

  // 信号量用于限制并发工作者数量
  // Semaphore to limit number of concurrent workers
  sema: Arc<Semaphore>,

  // 运行状态标志
  // Running state flag
  running: Arc<AtomicBool>,

  // 退出信号通道
  // Quit signal channel
  quit_tx: Option<mpsc::Sender<()>>,
  quit_rx: Option<mpsc::Receiver<()>>,

  // 中止信号通道 - 用于强制停止所有工作者
  // Abort signal channel - used to forcefully stop all workers
  abort_tx: Option<mpsc::Sender<()>>,

  // 处理器主循环句柄
  // Main processor loop handle
  handle: Option<JoinHandle<()>>,

  // 活跃工作者计数
  // Active worker count
  active_workers: Arc<AtomicUsize>,

  // 任务取消追踪
  // Task cancellation tracking
  /// 对应 Go asynq 的 cancelations *base.Cancellations
  /// Corresponds to Go asynq's cancelations *base.Cancellations
  cancellations: CancellationMap,

  // Worker 事件发送器(发送给 Heartbeat,支持多生产者)
  // Worker event sender (send to Heartbeat, supports multi-producer)
  worker_event_sender: Option<crate::components::heartbeat::WorkerEventSender>,
}

impl Processor {
  /// 创建新的处理器
  /// Create a new processor
  pub fn new(params: ProcessorParams) -> Self {
    let queues = normalize_queues(params.queues);
    let ordered_queues = if params.strict_priority {
      Some(sort_by_priority(&queues))
    } else {
      None
    };

    let (quit_tx, quit_rx) = mpsc::channel(1);
    let (abort_tx, _abort_rx) = mpsc::channel(1);

    Self {
      broker: params.broker,
      inspector: params.inspector,
      queue_config: queues,
      ordered_queues,
      task_check_interval: params.task_check_interval,
      shutdown_timeout: params.shutdown_timeout,
      sema: Arc::new(Semaphore::new(params.concurrency)),
      running: Arc::new(AtomicBool::new(false)),
      quit_tx: Some(quit_tx),
      quit_rx: Some(quit_rx),
      abort_tx: Some(abort_tx),
      handle: None,
      active_workers: Arc::new(AtomicUsize::new(0)),
      cancellations: CancellationMap::new(),
      worker_event_sender: params.worker_event_sender,
    }
  }

  /// 获取任务取消追踪器的克隆
  /// Get a clone of the task cancellation tracker
  ///
  /// 用于在服务器中接收取消事件后调用 cancel 方法
  /// Used to call the cancel method after receiving cancellation events in the server
  pub fn cancellations(&self) -> CancellationMap {
    self.cancellations.clone()
  }

  /// 启动处理器
  /// Start the processor
  pub fn start<H>(&mut self, handler: Arc<H>)
  where
    H: Handler + 'static,
  {
    self.running.store(true, Ordering::SeqCst);

    let broker = Arc::clone(&self.broker);
    let inspector = Arc::clone(&self.inspector);
    let running = Arc::clone(&self.running);
    let sema = Arc::clone(&self.sema);
    let queue_config = self.queue_config.clone();
    let ordered_queues = self.ordered_queues.clone();
    let task_check_interval = self.task_check_interval;
    let active_workers = Arc::clone(&self.active_workers);
    let cancelations = self.cancellations.clone();
    let worker_event_sender = self.worker_event_sender.clone();
    if let Some(mut quit_rx) = self.quit_rx.take() {
      let handle = tokio::spawn(async move {
        loop {
          // 检查是否收到退出信号
          // Check if quit signal received
          if quit_rx.try_recv().is_ok() {
            tracing::debug!("Processor received quit signal");
            break;
          }

          if !running.load(Ordering::SeqCst) {
            break;
          }

          // 尝试获取信号量令牌
          // Try to acquire semaphore permit
          let permit = match sema.clone().try_acquire_owned() {
            Ok(permit) => permit,
            Err(_) => {
              // 没有可用的工作者槽位,短暂等待
              // No available worker slots, wait briefly
              tokio::time::sleep(Duration::from_millis(100)).await;
              continue;
            }
          };

          // 获取队列列表
          // Get queue list
          let queues = get_queues(&queue_config, ordered_queues.as_ref());

          // 从队列中取出任务
          // Dequeue a task from the queue
          match broker.dequeue(&queues).await {
            Ok(Some(task_msg)) => {
              // 增加活跃工作者计数
              // Increment active worker count
              active_workers.fetch_add(1, Ordering::Relaxed);

              let handler = Arc::clone(&handler);
              let broker = Arc::clone(&broker);
              let inspector = Arc::clone(&inspector);
              let active_workers = Arc::clone(&active_workers);
              let cancelations = cancelations.clone();
              let worker_event_sender = worker_event_sender.clone();

              // 在新的任务中处理
              // Process in a new task
              tokio::spawn(async move {
                let _permit = permit; // 持有许可直到任务完成

                let task_id = task_msg.id.clone();

                // 发送 worker 开始事件到 Heartbeat(对应 Go 的 h.starting <- w)
                // Send worker starting event to Heartbeat (corresponds to Go's h.starting <- w)
                if let Some(ref sender) = worker_event_sender {
                  let worker_info = crate::components::heartbeat::WorkerInfoEntry {
                    msg: task_msg.clone(),
                    started: std::time::SystemTime::now(),
                    deadline: if task_msg.deadline > 0 {
                      std::time::UNIX_EPOCH
                        + std::time::Duration::from_secs(task_msg.deadline as u64)
                    } else {
                      std::time::SystemTime::now() + std::time::Duration::from_secs(3600)
                    },
                  };
                  if let Err(e) = sender.send_started(worker_info).await {
                    tracing::warn!("Failed to send worker starting event: {}", e);
                  }
                }

                // 创建任务,包含头部信息
                // Create task with headers to preserve workflow path and other metadata
                let mut task = match Task::new_with_headers(
                  &task_msg.r#type,
                  &task_msg.payload,
                  task_msg.headers.clone(),
                ) {
                  Ok(task) => task,
                  Err(e) => {
                    tracing::error!("Failed to create task: {}", e);
                    // 发送 worker 完成事件(即使失败也需要发送)
                    // Send worker finished event (needs to send even on failure)
                    if let Some(ref sender) = worker_event_sender {
                      if let Err(e) = sender.send_finished(task_id.clone()).await {
                        tracing::warn!("Failed to send worker finished event: {}", e);
                      }
                    }
                    active_workers.fetch_sub(1, Ordering::Relaxed);
                    return;
                  }
                };

                // 创建并附加 ResultWriter
                // Create and attach ResultWriter
                // Note: ResultWriter is created for every task to match Go asynq behavior.
                // The broker's write_result will only persist data when retention is configured.
                let result_writer = Arc::new(crate::task::ResultWriter::new(
                  task_msg.id.clone(),
                  task_msg.queue.clone(),
                  broker.clone(),
                ));
                task = task.with_result_writer(result_writer);
                task = task.with_inspector(inspector);
                // 创建取消令牌
                // Create cancellation token
                let cancel_token = CancellationToken::new();

                // 注册取消令牌
                // Register cancellation token
                cancelations.add(task_id.clone(), cancel_token.clone());

                // 计算任务超时
                // Calculate task timeout
                let timeout_duration = calculate_task_timeout(&task_msg);

                // 执行任务,支持超时和取消
                // Execute task with timeout and cancellation support
                let result = if let Some(timeout) = timeout_duration {
                  tokio::select! {
                    // 任务执行
                    // Task execution
                    result = handler.process_task(task.clone()) => result,
                    // 超时
                    // Timeout
                    _ = tokio::time::sleep(timeout) => {
                      tracing::warn!("Task {} timed out after {:?}", task_id, timeout);
                      Err(Error::other("Task execution timeout"))
                    }
                    // 取消信号
                    // Cancellation signal
                    _ = cancel_token.cancelled() => {
                      tracing::info!("Task {} was cancelled", task_id);
                      Ok(())
                    }
                  }
                } else {
                  tokio::select! {
                    // 任务执行
                    // Task execution
                    result = handler.process_task(task.clone()) => result,
                    // 取消信号
                    // Cancellation signal
                    _ = cancel_token.cancelled() => {
                      tracing::info!("Task {} was cancelled", task_id);
                      Err(Error::other("Task cancelled"))
                    }
                  }
                };

                // 移除取消令牌
                // Remove cancellation token
                cancelations.remove(&task_id);

                // 处理结果
                // Handle result
                match result {
                  Ok(()) => {
                    // 任务成功
                    // Task succeeded
                    if task_msg.retention == 0 {
                      if let Err(e) = broker.done(&task_msg).await {
                        tracing::error!("Failed to mark task as done: {}", e);
                      }
                    } else if let Err(e) = broker.mark_as_complete(&task_msg).await {
                      tracing::error!("Failed to mark task as complete: {}", e);
                    }
                  }
                  Err(e) => {
                    // 任务失败,决定重试还是归档
                    // Task failed, decide retry or archive
                    if should_retry_task(&task_msg, &e) {
                      // 计算重试延迟
                      // Calculate retry delay
                      let retry_delay =
                        calculate_retry_delay(task_msg.retried, task.options.retry_policy.as_ref());
                      let retry_at = chrono::Utc::now()
                        + chrono::Duration::seconds(retry_delay.as_secs() as i64);

                      if let Err(e) = broker.requeue(&task_msg, retry_at, &e.to_string()).await {
                        tracing::error!("Failed to requeue task: {}", e);
                      }
                    } else {
                      // 归档任务
                      // Archive task
                      if let Err(e) = broker.archive(&task_msg, &e.to_string()).await {
                        tracing::error!("Failed to archive task: {}", e);
                      }
                    }
                  }
                }

                // 发送 worker 完成事件到 Heartbeat(对应 Go 的 h.finished <- msg)
                // Send worker finished event to Heartbeat (corresponds to Go's h.finished <- msg)
                if let Some(ref sender) = worker_event_sender {
                  if let Err(e) = sender.send_finished(task_id.clone()).await {
                    tracing::warn!("Failed to send worker finished event: {}", e);
                  }
                }

                // 减少活跃工作者计数
                // Decrement active worker count
                active_workers.fetch_sub(1, Ordering::Relaxed);
              });
            }
            Ok(None) => {
              // 没有任务,等待后重试
              // No tasks, wait and retry
              drop(permit); // 释放许可
              tokio::time::sleep(task_check_interval).await;
            }
            Err(e) => {
              // 出队错误
              // Dequeue error
              tracing::error!("Dequeue error: {}", e);
              drop(permit); // 释放许可
              tokio::time::sleep(Duration::from_secs(1)).await;
            }
          }
        }

        tracing::debug!("Processor loop exited");
      });
      self.handle = Some(handle);
    } else {
      self.handle = None;
    };
  }

  /// 停止处理器(不等待工作者完成)
  /// Stop the processor (without waiting for workers)
  pub fn stop(&mut self) {
    self.running.store(false, Ordering::SeqCst);
    if let Some(tx) = self.quit_tx.take() {
      let _ = tx.try_send(());
    }
  }

  /// 关闭处理器并等待所有工作者完成
  /// Shutdown the processor and wait for all workers to finish
  pub async fn shutdown(&mut self) {
    self.stop();

    // 启动超时定时器,之后发送中止信号
    // Start timeout timer, then send abort signal
    let abort_tx = self.abort_tx.clone();
    let shutdown_timeout = self.shutdown_timeout;
    tokio::spawn(async move {
      tokio::time::sleep(shutdown_timeout).await;
      if let Some(tx) = abort_tx {
        let _ = tx.send(()).await;
      }
    });

    // 等待处理器主循环退出
    // Wait for processor main loop to exit
    if let Some(handle) = self.handle.take() {
      let _ = handle.await;
    }

    tracing::info!("Waiting for all workers to finish...");

    // 等待所有信号量令牌被释放(即所有工作者完成)
    // Wait for all semaphore permits to be released (i.e., all workers finished)
    let sema = Arc::clone(&self.sema);
    let concurrency = sema.available_permits();
    for _ in 0..concurrency {
      let _ = sema.acquire().await;
    }

    tracing::info!("All workers have finished");
  }
}
/// 标准化队列配置,确保优先级为正数
/// Normalize queue config, ensure priorities are positive
fn normalize_queues(queues: HashMap<String, i32>) -> HashMap<String, i32> {
  queues
    .into_iter()
    .map(|(name, priority)| (name, priority.max(1)))
    .collect()
}

/// 按优先级排序队列(降序)
/// Sort queues by priority (descending)
fn sort_by_priority(queues: &HashMap<String, i32>) -> Vec<String> {
  let mut queue_vec: Vec<_> = queues.iter().collect();
  queue_vec.sort_by(|a, b| b.1.cmp(a.1)); // 降序排序
  queue_vec
    .into_iter()
    .map(|(name, _)| name.clone())
    .collect()
}

/// 获取队列列表,基于优先级
/// Get queue list based on priority
fn get_queues(
  queue_config: &HashMap<String, i32>,
  ordered_queues: Option<&Vec<String>>,
) -> Vec<String> {
  // 如果只有一个队列,直接返回
  // If only one queue, return directly
  if queue_config.len() == 1 {
    return queue_config.keys().cloned().collect();
  }

  // 如果有排序的队列(严格优先级模式),返回排序后的队列
  // If ordered queues exist (strict priority mode), return ordered queues
  if let Some(ordered) = ordered_queues {
    return ordered.clone();
  }

  // 否则,基于优先级加权随机选择
  // Otherwise, weighted random selection based on priority
  let mut names = Vec::new();
  for (name, &priority) in queue_config {
    for _ in 0..priority {
      names.push(name.clone());
    }
  }

  // 随机打乱
  // Shuffle randomly
  use rand::seq::SliceRandom;
  let mut rng = rand::rng();
  names.shuffle(&mut rng);

  // 去重并返回
  // Deduplicate and return
  let mut seen = std::collections::HashSet::new();
  let mut result = Vec::new();
  for name in names {
    if seen.insert(name.clone()) {
      result.push(name);
    }
    if result.len() == queue_config.len() {
      break;
    }
  }
  result
}

/// 计算任务超时时间
/// Calculate task timeout
fn calculate_task_timeout(task_msg: &crate::proto::TaskMessage) -> Option<Duration> {
  use crate::base::constants::DEFAULT_TIMEOUT;

  // 优先使用任务级别的超时设置
  // Prefer task-level timeout settings
  if task_msg.timeout > 0 {
    return Some(Duration::from_secs(task_msg.timeout as u64));
  }

  // 如果任务有截止时间,计算剩余时间作为超时
  // If the task has a deadline, calculate remaining time as timeout
  if task_msg.deadline > 0 {
    let now = chrono::Utc::now().timestamp();
    let remaining = task_msg.deadline - now;
    if remaining > 0 {
      return Some(Duration::from_secs(remaining as u64));
    }
  }

  // 使用默认超时
  // Use default timeout
  Some(DEFAULT_TIMEOUT)
}

/// 计算重试延迟
/// Calculate retry delay
fn calculate_retry_delay(
  retried: i32,
  retry_policy: Option<&crate::backend::option::RetryPolicy>,
) -> Duration {
  match retry_policy {
    Some(policy) => policy.calculate_delay(retried),
    None => {
      // 默认指数退避策略 - 与 Go 版本兼容
      // Default exponential backoff strategy - compatible with Go version
      let base_delay = (retried as f64).powf(4.0) as u64 + 15;
      let jitter = rand::random::<u64>() % (30 * (retried as u64 + 1));
      Duration::from_secs(base_delay + jitter)
    }
  }
}

/// 判断任务失败后是否应重试
/// Determine whether a failed task should be retried
fn should_retry_task(task_msg: &crate::proto::TaskMessage, err: &Error) -> bool {
  task_msg.retried < task_msg.retry && !matches!(err, Error::SkipRetry(_))
}

#[cfg(test)]
mod tests {
  use super::*;
  use crate::error::SkipRetryError;

  #[test]
  fn test_normalize_queues() {
    let mut queues = HashMap::new();
    queues.insert("default".to_string(), 3);
    queues.insert("low".to_string(), 0);
    queues.insert("high".to_string(), -5);

    let normalized = normalize_queues(queues);
    assert_eq!(normalized.get("default"), Some(&3));
    assert_eq!(normalized.get("low"), Some(&1));
    assert_eq!(normalized.get("high"), Some(&1));
  }

  #[test]
  fn test_sort_by_priority() {
    let mut queues = HashMap::new();
    queues.insert("default".to_string(), 3);
    queues.insert("low".to_string(), 1);
    queues.insert("high".to_string(), 6);

    let sorted = sort_by_priority(&queues);
    assert_eq!(sorted[0], "high");
    assert_eq!(sorted[1], "default");
    assert_eq!(sorted[2], "low");
  }

  #[test]
  fn test_get_queues_single() {
    let mut queues = HashMap::new();
    queues.insert("default".to_string(), 3);

    let result = get_queues(&queues, None);
    assert_eq!(result.len(), 1);
    assert_eq!(result[0], "default");
  }

  #[test]
  fn test_get_queues_strict_priority() {
    let mut queues = HashMap::new();
    queues.insert("default".to_string(), 3);
    queues.insert("low".to_string(), 1);
    queues.insert("high".to_string(), 6);

    let ordered = vec!["high".to_string(), "default".to_string(), "low".to_string()];
    let result = get_queues(&queues, Some(&ordered));
    assert_eq!(result, ordered);
  }

  #[test]
  fn test_should_retry_task_normal_error() {
    let task_msg = crate::proto::TaskMessage {
      retried: 0,
      retry: 5,
      ..Default::default()
    };

    assert!(should_retry_task(&task_msg, &Error::other("failed")));
  }

  #[test]
  fn test_should_retry_task_skip_retry_error() {
    let task_msg = crate::proto::TaskMessage {
      retried: 0,
      retry: 5,
      ..Default::default()
    };
    let skip_retry = Error::from(SkipRetryError::new(std::io::Error::other(
      "invalid payload",
    )));

    assert!(!should_retry_task(&task_msg, &skip_retry));
  }

  #[test]
  fn test_should_retry_task_reached_limit() {
    let task_msg = crate::proto::TaskMessage {
      retried: 3,
      retry: 3,
      ..Default::default()
    };

    assert!(!should_retry_task(&task_msg, &Error::other("failed")));
  }

  #[test]
  fn test_cancelations_new() {
    let cancelations = CancellationMap::new();
    assert!(cancelations.is_empty());
    assert_eq!(cancelations.len(), 0);
  }

  #[test]
  fn test_cancelations_add_remove() {
    let cancelations = CancellationMap::new();
    let token = CancellationToken::new();

    cancelations.add("task1".to_string(), token.clone());
    assert_eq!(cancelations.len(), 1);
    assert!(!cancelations.is_empty());

    cancelations.remove("task1");
    assert_eq!(cancelations.len(), 0);
    assert!(cancelations.is_empty());
  }

  #[tokio::test]
  async fn test_cancelations_cancel() {
    let cancellations = CancellationMap::new();
    let token = CancellationToken::new();
    let task_id = "task1".to_string();

    // Add token
    cancellations.add(task_id.clone(), token.clone());

    // Verify token is not cancelled
    assert!(!token.is_cancelled());

    // Cancel the task
    let cancelled = cancellations.cancel(&task_id);
    assert!(cancelled);

    // Verify token is now cancelled
    assert!(token.is_cancelled());
  }

  #[tokio::test]
  async fn test_cancelations_cancel_nonexistent() {
    let cancellations = CancellationMap::new();

    // Try to cancel a non-existent task
    let cancelled = cancellations.cancel("nonexistent");
    assert!(!cancelled);
  }

  #[tokio::test]
  async fn test_cancelations_multiple_tasks() {
    let cancellations = CancellationMap::new();

    let token1 = CancellationToken::new();
    let token2 = CancellationToken::new();
    let token3 = CancellationToken::new();

    cancellations.add("task1".to_string(), token1.clone());
    cancellations.add("task2".to_string(), token2.clone());
    cancellations.add("task3".to_string(), token3.clone());

    assert_eq!(cancellations.len(), 3);

    // Cancel task2
    cancellations.cancel("task2");
    assert!(!token1.is_cancelled());
    assert!(token2.is_cancelled());
    assert!(!token3.is_cancelled());

    // Remove task1
    cancellations.remove("task1");
    assert_eq!(cancellations.len(), 2);

    // Cancel task3
    cancellations.cancel("task3");
    assert!(token3.is_cancelled());
  }
}