asynq 0.1.8

Simple, reliable & efficient distributed task queue in Rust, inspired by hibiken/asynq
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
//! Redis 键名常量 - 与 Go 版本保持兼容
//! Redis key name constants - Compatible with Go version

use crate::base::constants::TIME_LAYOUT_YMD;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::str::FromStr;
pub const QUEUE_START: &str = "{";
pub const QUEUE_END: &str = "}";
/// 全局 Redis 键
/// Global Redis keys
pub const ALL_SERVERS: &str = "asynq:servers";
pub const ALL_WORKERS: &str = "asynq:workers";
pub const ALL_SCHEDULERS: &str = "asynq:schedulers";
pub const ALL_QUEUES: &str = "asynq:queues";
pub const CANCEL_CHANNEL: &str = "asynq:cancel";
pub const SCHEDULER_EVENTS: &str = "asynq:scheduler:events";

// 为了向后兼容,保留旧常量
// For backward compatibility, keep old constants
pub const QUEUE_PREFIX: &str = "asynq:";
pub const SCHEDULED_PREFIX: &str = "asynq:scheduled:";
pub const AGGREGATING_PREFIX: &str = "asynq:aggregating:";
pub const SERVERS_PREFIX: &str = "asynq:servers:";
pub const WORKERS_PREFIX: &str = "asynq:workers:";
pub const TASK_RESULT_PREFIX: &str = "asynq:result:";
/// 任务状态
/// Task state
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum TaskState {
  /// 任务正在被处理
  /// Task is being processed
  Active,
  /// 任务准备好被处理
  /// Task is ready to be processed
  Pending,
  /// 任务被安排在将来某个时间处理
  /// Task is scheduled to be processed at a later time
  Scheduled,
  /// 任务之前失败了,安排在将来某个时间重试
  /// Task has failed before, scheduled to retry at a later time
  Retry,
  /// 任务被归档并存储以供检查
  /// Task is archived and stored for inspection
  Archived,
  /// 任务处理成功并保留到保留 TTL 过期
  /// Task is successfully processed and retained until retention TTL expires
  Completed,
  /// 任务在组中等待聚合
  /// Task is waiting for aggregation in the group
  Aggregating,
}

impl TaskState {
  /// 将任务状态转换为字符串
  /// Convert task state to string
  pub fn as_str(&self) -> &'static str {
    match self {
      Self::Active => "active",
      Self::Pending => "pending",
      Self::Scheduled => "scheduled",
      Self::Retry => "retry",
      Self::Archived => "archived",
      Self::Completed => "completed",
      Self::Aggregating => "aggregating",
    }
  }
  pub fn queue_key(&self, qname: &str, gname: Option<&str>) -> String {
    match self {
      Self::Active => active_key(qname),
      Self::Pending => pending_key(qname),
      Self::Scheduled => scheduled_key(qname),
      Self::Retry => retry_key(qname),
      Self::Archived => archived_key(qname),
      Self::Completed => completed_key(qname),
      Self::Aggregating => aggregating_key(qname, gname.unwrap_or("")), // 需要提供组名
    }
  }
}

impl FromStr for TaskState {
  type Err = ();

  fn from_str(s: &str) -> Result<Self, Self::Err> {
    match s {
      "active" => Ok(Self::Active),
      "pending" => Ok(Self::Pending),
      "scheduled" => Ok(Self::Scheduled),
      "retry" => Ok(Self::Retry),
      "archived" => Ok(Self::Archived),
      "completed" => Ok(Self::Completed),
      "aggregating" => Ok(Self::Aggregating),
      _ => Err(()),
    }
  }
}

impl std::fmt::Display for TaskState {
  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    write!(f, "{}", self.as_str())
  }
}

/// 生成队列键前缀 - 与 Go 版本兼容: asynq:{qname}:
/// Generate queue key prefix - Compatible with Go version: asynq:{qname}:
pub fn queue_key_prefix(qname: &str) -> String {
  format!("asynq:{{{qname}}}:")
}

/// 生成任务键前缀
/// Generate task key prefix
pub fn task_key_prefix(qname: &str) -> String {
  format!("{}t:", queue_key_prefix(qname))
}

/// 生成任务键
/// Generate task key
pub fn task_key(qname: &str, id: &str) -> String {
  format!("{}{}", task_key_prefix(qname), id)
}

/// 生成队列键 - 对应 Go 的 PendingKey
/// Generate queue key - Corresponds to Go's PendingKey
pub fn pending_key(qname: &str) -> String {
  format!("{}{}", queue_key_prefix(qname), TaskState::Pending)
}

/// 生成活跃任务键 - 对应 Go 的 ActiveKey  
/// Generate active task key - Corresponds to Go's ActiveKey
pub fn active_key(qname: &str) -> String {
  format!("{}{}", queue_key_prefix(qname), TaskState::Active)
}

/// 生成调度任务键 - 对应 Go 的 ScheduledKey
/// Generate scheduled task key - Corresponds to Go's ScheduledKey
pub fn scheduled_key(qname: &str) -> String {
  format!("{}{}", queue_key_prefix(qname), TaskState::Scheduled)
}

/// 生成重试任务键 - 对应 Go 的 RetryKey
/// Generate retry task key - Corresponds to Go's RetryKey
pub fn retry_key(qname: &str) -> String {
  format!("{}{}", queue_key_prefix(qname), TaskState::Retry)
}

/// 生成已归档任务键 - 对应 Go 的 ArchivedKey
/// Generate archived task key - Corresponds to Go's ArchivedKey
pub fn archived_key(qname: &str) -> String {
  format!("{}{}", queue_key_prefix(qname), TaskState::Archived)
}

/// 生成已完成任务键 - 对应 Go 的 CompletedKey
/// Generate completed task key - Corresponds to Go's CompletedKey
pub fn completed_key(qname: &str) -> String {
  format!("{}{}", queue_key_prefix(qname), TaskState::Completed)
}

/// 生成暂停键 - 对应 Go 的 PausedKey
/// Generate paused key - Corresponds to Go's PausedKey
pub fn paused_key(qname: &str) -> String {
  format!("{}paused", queue_key_prefix(qname))
}

/// 生成租约键 - 对应 Go 的 LeaseKey
/// Generate lease key - Corresponds to Go's LeaseKey
pub fn lease_key(qname: &str) -> String {
  format!("{}lease", queue_key_prefix(qname))
}

/// 生成聚合任务键(向后兼容) - 映射到组键
/// Generate aggregating task key (backward compatible) - Mapped to group key
pub fn aggregating_key(queue: &str, group: &str) -> String {
  group_key(queue, group)
}

/// 生成唯一键 - 对应 Go 的 UniqueKey,使用 MD5 校验和
/// Generate unique key - Corresponds to Go's UniqueKey, using MD5 checksum
pub fn unique_key(qname: &str, task_type: &str, payload: &[u8]) -> String {
  if payload.is_empty() {
    return format!("{}unique:{}:", queue_key_prefix(qname), task_type);
  }

  // 使用 MD5 哈希与 Go 版本保持兼容
  // Use MD5 hash to保持兼容 with Go version
  let digest = md5::compute(payload);
  let checksum = format!("{digest:x}");

  format!(
    "{}unique:{}:{}",
    queue_key_prefix(qname),
    task_type,
    checksum
  )
}

/// 生成组键前缀
/// Generate group key prefix
pub fn group_key_prefix(qname: &str) -> String {
  format!("{}g:", queue_key_prefix(qname))
}

/// 生成组键 - 对应 Go 的 GroupKey
/// Generate group key - Corresponds to Go's GroupKey
pub fn group_key(qname: &str, group_key: &str) -> String {
  format!("{}{}", group_key_prefix(qname), group_key)
}

/// 生成聚合集合键 - 对应 Go 的 AggregationSetKey
/// Generate aggregation set key - Corresponds to Go's AggregationSetKey
pub fn aggregation_set_key(qname: &str, group_name: &str, set_id: &str) -> String {
  format!("{}:{}", group_key(qname, group_name), set_id)
}

/// 生成所有组键 - 对应 Go 的 AllGroups
/// Generate all group keys - Corresponds to Go's AllGroups
pub fn all_groups(qname: &str) -> String {
  format!("{}groups", queue_key_prefix(qname))
}

/// 生成组键(别名 all_groups)
/// Generate groups key (alias for all_groups)
pub fn groups_key(qname: &str) -> String {
  all_groups(qname)
}

/// 生成所有聚合集合键 - 对应 Go 的 AllAggregationSets
/// Generate all aggregation set keys - Corresponds to Go's AllAggregationSets
pub fn all_aggregation_sets(qname: &str) -> String {
  format!("{}aggregation_sets", queue_key_prefix(qname))
}

/// 生成服务器信息键 - 对应 Go 的 ServerInfoKey
/// Generate server info key - Corresponds to Go's ServerInfoKey
pub fn server_info_key(hostname: &str, pid: i32, server_id: &str) -> String {
  format!("{SERVERS_PREFIX}{{{hostname}:{pid}:{server_id}}}")
}

/// 生成带租户隔离的服务器信息键
/// Generate server info key with tenant isolation
///
/// 返回格式: `asynq:servers:{tenant:hostname:pid:server_id}`
/// Returns format: `asynq:servers:{tenant:hostname:pid:server_id}`
pub fn server_info_key_with_tenant(
  tenant: &str,
  hostname: &str,
  pid: i32,
  server_id: &str,
) -> String {
  format!("{SERVERS_PREFIX}{{{tenant}:{hostname}:{pid}:{server_id}}}")
}

/// 生成工作者键 - 对应 Go 的 WorkersKey
/// Generate workers key - Corresponds to Go's WorkersKey
pub fn workers_key(hostname: &str, pid: i32, server_id: &str) -> String {
  format!("{WORKERS_PREFIX}{{{hostname}:{pid}:{server_id}}}")
}

/// 生成带租户隔离的工作者键
/// Generate workers key with tenant isolation
///
/// 返回格式: `asynq:workers:{tenant:hostname:pid:server_id}`
/// Returns format: `asynq:workers:{tenant:hostname:pid:server_id}`
pub fn workers_key_with_tenant(tenant: &str, hostname: &str, pid: i32, server_id: &str) -> String {
  format!("{WORKERS_PREFIX}{{{tenant}:{hostname}:{pid}:{server_id}}}")
}

/// 生成服务器和工作者键对
/// Generate server and workers key pair
///
/// 当提供租户时,返回带租户隔离的键格式
/// When tenant is provided, returns tenant-isolated key format
///
/// 返回: (server_key, workers_key)
/// Returns: (server_key, workers_key)
pub fn server_and_workers_keys(
  tenant: Option<&str>,
  hostname: &str,
  pid: i32,
  server_id: &str,
) -> (String, String) {
  if let Some(t) = tenant {
    (
      server_info_key_with_tenant(t, hostname, pid, server_id),
      workers_key_with_tenant(t, hostname, pid, server_id),
    )
  } else {
    (
      server_info_key(hostname, pid, server_id),
      workers_key(hostname, pid, server_id),
    )
  }
}

/// 生成调度器条目键 - 对应 Go 的 SchedulerEntriesKey
/// Generate scheduler entries key - Corresponds to Go's SchedulerEntriesKey
pub fn scheduler_entries_key(scheduler_id: &str) -> String {
  format!("{ALL_SCHEDULERS}:{{{scheduler_id}}}")
}

/// 生成带租户隔离的调度器条目键
/// Generate scheduler entries key with tenant isolation
///
/// 返回格式: `asynq:schedulers:{tenant:scheduler_id}`
/// Returns format: `asynq:schedulers:{tenant:scheduler_id}`
pub fn scheduler_entries_key_with_tenant(tenant: &str, scheduler_id: &str) -> String {
  format!("{ALL_SCHEDULERS}:{{{tenant}:{scheduler_id}}}")
}

/// 生成调度器历史键 - 对应 Go 的 SchedulerHistoryKey
/// Generate scheduler history key - Corresponds to Go's SchedulerHistoryKey
pub fn scheduler_history_key(entry_id: &str) -> String {
  format!("asynq:scheduler_history:{entry_id}")
}

/// 生成处理总数键 - 对应 Go 的 ProcessedTotalKey
/// Generate processed total key - Corresponds to Go's ProcessedTotalKey
pub fn processed_total_key(qname: &str) -> String {
  format!("{}processed", queue_key_prefix(qname))
}

/// 生成失败总数键 - 对应 Go 的 FailedTotalKey  
/// Generate failed total key - Corresponds to Go's FailedTotalKey
pub fn failed_total_key(qname: &str) -> String {
  format!("{}failed", queue_key_prefix(qname))
}

/// 生成按日处理数键 - 对应 Go 的 ProcessedKey
/// Generate daily processed key - Corresponds to Go's ProcessedKey
pub fn processed_key(qname: &str, date: &DateTime<Utc>) -> String {
  format!(
    "{}processed:{}",
    queue_key_prefix(qname),
    date.format(TIME_LAYOUT_YMD)
  )
}

/// 生成按日失败数键 - 对应 Go 的 FailedKey
/// Generate daily failed key - Corresponds to Go's FailedKey
pub fn failed_key(qname: &str, date: &DateTime<Utc>) -> String {
  format!(
    "{}failed:{}",
    queue_key_prefix(qname),
    date.format(TIME_LAYOUT_YMD)
  )
}

// 保留这些函数以便向后兼容
// Keep these functions for backward compatibility
pub fn server_info_key_legacy(server_id: &str) -> String {
  format!("{SERVERS_PREFIX}{server_id}")
}

/// 完整的服务器信息键生成函数 - 对应 Go 的 ServerInfoKey
/// Full server info key generation function - Corresponds to Go's ServerInfoKey
pub fn server_info_key_full(hostname: &str, pid: i32, server_id: &str) -> String {
  format!("{SERVERS_PREFIX}{{{hostname}:{pid}:{server_id}}}")
}

/// 完整的带租户隔离的服务器信息键生成函数
/// Full server info key generation function with tenant isolation
///
/// 返回格式: `asynq:servers:{tenant:hostname:pid:server_id}`
/// Returns format: `asynq:servers:{tenant:hostname:pid:server_id}`
///
/// 注意:这是 `server_info_key_with_tenant` 的别名,保留以便保持命名一致性
/// Note: This is an alias for `server_info_key_with_tenant`, kept for naming consistency
#[inline]
pub fn server_info_key_full_with_tenant(
  tenant: &str,
  hostname: &str,
  pid: i32,
  server_id: &str,
) -> String {
  server_info_key_with_tenant(tenant, hostname, pid, server_id)
}

/// 完整的工作者键生成函数 - 对应 Go 的 WorkersKey
/// Full workers key generation function - Corresponds to Go's WorkersKey
pub fn workers_key_full(hostname: &str, pid: i32, server_id: &str) -> String {
  format!("{WORKERS_PREFIX}{{{hostname}:{pid}:{server_id}}}")
}

/// 完整的带租户隔离的工作者键生成函数
/// Full workers key generation function with tenant isolation
///
/// 返回格式: `asynq:workers:{tenant:hostname:pid:server_id}`
/// Returns format: `asynq:workers:{tenant:hostname:pid:server_id}`
///
/// 注意:这是 `workers_key_with_tenant` 的别名,保留以便保持命名一致性
/// Note: This is an alias for `workers_key_with_tenant`, kept for naming consistency
#[inline]
pub fn workers_key_full_with_tenant(
  tenant: &str,
  hostname: &str,
  pid: i32,
  server_id: &str,
) -> String {
  workers_key_with_tenant(tenant, hostname, pid, server_id)
}

#[cfg(test)]
mod tests {
  use crate::base::keys;

  #[test]
  fn test_keys_generation() {
    // 测试与 Go 版本兼容的键生成
    // Test key generation compatible with Go version
    assert_eq!(keys::pending_key("default"), "asynq:{default}:pending");
    assert_eq!(keys::pending_key("default"), "asynq:{default}:pending"); // 别名
    assert_eq!(keys::active_key("default"), "asynq:{default}:active");
    assert_eq!(keys::scheduled_key("default"), "asynq:{default}:scheduled");
    assert_eq!(keys::retry_key("default"), "asynq:{default}:retry");
    assert_eq!(keys::archived_key("default"), "asynq:{default}:archived");
    assert_eq!(keys::completed_key("default"), "asynq:{default}:completed");

    // 测试全局键
    // Test global keys
    assert_eq!(keys::ALL_SERVERS, "asynq:servers");
    assert_eq!(keys::ALL_WORKERS, "asynq:workers");
    assert_eq!(keys::ALL_QUEUES, "asynq:queues");

    // 测试服务器和工作者键(新格式)
    // Test server and worker keys (new format)
    assert_eq!(
      keys::server_info_key("localhost", 12345, "server1"),
      "asynq:servers:{localhost:12345:server1}"
    );
    assert_eq!(
      keys::workers_key("localhost", 12345, "server1"),
      "asynq:workers:{localhost:12345:server1}"
    );

    // 测试任务和组相关键
    // Test task and group related keys
    assert_eq!(
      keys::task_key("default", "task1"),
      "asynq:{default}:t:task1"
    );
    assert_eq!(
      keys::group_key("default", "group1"),
      "asynq:{default}:g:group1"
    );

    // 测试唯一键
    // Test unique key
    let unique_key = keys::unique_key("default", "email:send", b"test payload");
    assert!(unique_key.starts_with("asynq:{default}:unique:email:send:"));

    let empty_unique_key = keys::unique_key("default", "email:send", b"");
    assert_eq!(empty_unique_key, "asynq:{default}:unique:email:send:");
    assert_eq!(
      keys::task_key("default", "task1"),
      "asynq:{default}:t:task1"
    );
  }

  #[test]
  fn test_tenant_keys_generation() {
    // 测试带租户隔离的服务器和工作者键
    // Test server and worker keys with tenant isolation

    // 测试服务器信息键(带租户)
    // Test server info key with tenant
    assert_eq!(
      keys::server_info_key_with_tenant(
        "tenant1",
        "Arch",
        6492,
        "10b398de-d250-4bdf-b513-4b5f52247352"
      ),
      "asynq:servers:{tenant1:Arch:6492:10b398de-d250-4bdf-b513-4b5f52247352}"
    );

    // 测试工作者键(带租户)
    // Test workers key with tenant
    assert_eq!(
      keys::workers_key_with_tenant(
        "tenant1",
        "Arch",
        6492,
        "10b398de-d250-4bdf-b513-4b5f52247352"
      ),
      "asynq:workers:{tenant1:Arch:6492:10b398de-d250-4bdf-b513-4b5f52247352}"
    );

    // 测试完整服务器信息键(带租户)
    // Test full server info key with tenant
    assert_eq!(
      keys::server_info_key_full_with_tenant("tenant1", "localhost", 12345, "server1"),
      "asynq:servers:{tenant1:localhost:12345:server1}"
    );

    // 测试完整工作者键(带租户)
    // Test full workers key with tenant
    assert_eq!(
      keys::workers_key_full_with_tenant("tenant1", "localhost", 12345, "server1"),
      "asynq:workers:{tenant1:localhost:12345:server1}"
    );

    // 测试不同租户的键隔离
    // Test key isolation between different tenants
    let tenant1_server_key = keys::server_info_key_with_tenant("tenant1", "host", 1234, "server1");
    let tenant2_server_key = keys::server_info_key_with_tenant("tenant2", "host", 1234, "server1");
    assert_ne!(tenant1_server_key, tenant2_server_key);
    assert!(tenant1_server_key.contains("tenant1"));
    assert!(tenant2_server_key.contains("tenant2"));

    // 测试 server_and_workers_keys 辅助函数
    // Test server_and_workers_keys helper function
    let (server_key, workers_key) =
      keys::server_and_workers_keys(Some("tenant1"), "host", 1234, "server1");
    assert_eq!(server_key, "asynq:servers:{tenant1:host:1234:server1}");
    assert_eq!(workers_key, "asynq:workers:{tenant1:host:1234:server1}");

    // 测试无租户的 server_and_workers_keys
    // Test server_and_workers_keys without tenant
    let (server_key_no_tenant, workers_key_no_tenant) =
      keys::server_and_workers_keys(None, "host", 1234, "server1");
    assert_eq!(server_key_no_tenant, "asynq:servers:{host:1234:server1}");
    assert_eq!(workers_key_no_tenant, "asynq:workers:{host:1234:server1}");
  }

  #[test]
  fn test_scheduler_entries_key() {
    // Test scheduler entries key without tenant
    assert_eq!(
      keys::scheduler_entries_key("arch:51547:b7ae5325-3c4a-491a-bd29-25ce9fcc00e9"),
      "asynq:schedulers:{arch:51547:b7ae5325-3c4a-491a-bd29-25ce9fcc00e9}"
    );

    // Test scheduler entries key with tenant via scheduler_entries_key_with_tenant
    assert_eq!(
      keys::scheduler_entries_key_with_tenant(
        "tenant1",
        "arch:51547:b7ae5325-3c4a-491a-bd29-25ce9fcc00e9"
      ),
      "asynq:schedulers:{tenant1:arch:51547:b7ae5325-3c4a-491a-bd29-25ce9fcc00e9}"
    );

    // Test different tenants produce different keys
    let tenant1_key = keys::scheduler_entries_key_with_tenant("tenant1", "host:1234:uuid1");
    let tenant2_key = keys::scheduler_entries_key_with_tenant("tenant2", "host:1234:uuid1");
    assert_ne!(tenant1_key, tenant2_key);
    assert!(tenant1_key.contains("tenant1"));
    assert!(tenant2_key.contains("tenant2"));

    // Same scheduler_id without tenant produces a different (non-tenant) key
    let no_tenant_key = keys::scheduler_entries_key("host:1234:uuid1");
    assert_ne!(no_tenant_key, tenant1_key);
  }
}