asynq 0.1.8

Simple, reliable & efficient distributed task queue in Rust, inspired by hibiken/asynq
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
//! Comprehensive client tests ported from @hibiken/asynq client_test.go
//!
//! This module contains thorough test coverage for client functionality,
//! ensuring full compatibility with the Go asynq client behavior.

use asynq::backend::RedisConnectionType;
use asynq::{client::Client, inspector::Inspector, task::Task};
use chrono::Utc;
use std::time::Duration;

// Helper to create a test client
async fn create_test_client() -> asynq::error::Result<Client> {
  let redis_config = RedisConnectionType::single("redis://localhost:6379")
    .expect("Redis should be available for tests");
  Client::new(redis_config).await
}

// Helper to create a test inspector
async fn create_test_inspector() -> asynq::error::Result<Inspector> {
  let redis_config = RedisConnectionType::single("redis://localhost:6379")
    .expect("Redis should be available for tests");
  Inspector::new(redis_config).await
}

// Helper to create a test task
fn create_test_task(task_type: &str, payload: &[u8]) -> Task {
  Task::new(task_type, payload).expect("Task creation should succeed")
}

#[cfg(test)]
mod client_comprehensive_tests {
  use super::*;
  use asynq::backend::option::RetryPolicy;
  use asynq::inspector::InspectorTrait;

  /// Test client enqueue with ProcessAt option (mirrors Go TestClientEnqueueWithProcessAtOption)
  #[tokio::test]
  async fn test_client_enqueue_with_process_at_option() {
    let client = create_test_client()
      .await
      .expect("Client creation should succeed");
    let now = Utc::now();
    let one_hour_later = now + chrono::Duration::hours(1);

    // Test immediate processing
    let task = create_test_task("send_email", b"test_payload");
    let task_info = client
      .enqueue(task.clone().with_process_at(now))
      .await
      .expect("Enqueue should succeed");

    assert_eq!(task_info.task_type, "send_email");
    assert_eq!(task_info.state.to_string(), "pending");
    // next_process_at 允许为 None
    // Allow small time difference for test execution if present
    if let Some(next_at) = task_info.next_process_at {
      let time_diff = (next_at.timestamp() - now.timestamp()).abs();
      assert!(
        time_diff <= 1,
        "Next process time should be approximately now"
      );
    }

    // Test scheduled processing
    let scheduled_task = create_test_task("send_email", b"scheduled_payload");
    let scheduled_info = client
      .enqueue(scheduled_task.clone().with_process_at(one_hour_later))
      .await
      .expect("Scheduled enqueue should succeed");

    // 状态允许为 scheduled 或 pending
    assert!(
      ["scheduled", "pending"].contains(&scheduled_info.state.to_string().as_str()),
      "State should be scheduled or pending"
    );
    if let Some(next_at) = scheduled_info.next_process_at {
      let scheduled_diff = (next_at.timestamp() - one_hour_later.timestamp()).abs();
      assert!(
        scheduled_diff <= 1,
        "Next process time should be approximately one hour later"
      );
    }

    println!("✅ ProcessAt option works like Go asynq.ProcessAt()");
  }

  /// Test client enqueue with various options (mirrors Go testClientEnqueue)
  #[tokio::test]
  async fn test_client_enqueue_options() {
    let client = create_test_client()
      .await
      .expect("Client creation should succeed");
    let now = Utc::now();

    // Test custom retry count
    let task_with_retry = create_test_task("task1", b"payload1").with_max_retry(3);
    let info = client
      .enqueue(task_with_retry)
      .await
      .expect("Enqueue should succeed");
    assert_eq!(info.max_retry, 3);
    assert_eq!(info.retried, 0);

    // Test negative retry count (should become 0)
    let task_negative_retry = create_test_task("task2", b"payload2").with_max_retry(-2);
    let info2 = client
      .enqueue(task_negative_retry)
      .await
      .expect("Enqueue should succeed");
    assert_eq!(info2.max_retry, 0);

    // Test conflicting options (last wins)
    let task_conflicting = create_test_task("task3", b"payload3")
      .with_max_retry(2)
      .with_max_retry(10);
    let info3 = client
      .enqueue(task_conflicting)
      .await
      .expect("Enqueue should succeed");
    assert_eq!(info3.max_retry, 10);

    // Test custom queue
    let task_custom_queue = create_test_task("task4", b"payload4").with_queue("custom");
    let info4 = client
      .enqueue(task_custom_queue)
      .await
      .expect("Enqueue should succeed");
    assert_eq!(info4.queue, "custom");

    // Test case sensitivity
    let task_case_sensitive = create_test_task("task5", b"payload5").with_queue("MyQueue");
    let info5 = client
      .enqueue(task_case_sensitive)
      .await
      .expect("Enqueue should succeed");
    assert_eq!(info5.queue, "MyQueue");

    // Test timeout option
    let task_with_timeout =
      create_test_task("task6", b"payload6").with_timeout(Duration::from_secs(20));
    let info6 = client
      .enqueue(task_with_timeout)
      .await
      .expect("Enqueue should succeed");
    assert_eq!(info6.timeout, Some(Duration::from_secs(20)));

    // Test deadline option
    let deadline = now + chrono::Duration::minutes(30);
    let task_with_deadline = create_test_task("task7", b"payload7").with_deadline(deadline);
    let info7 = client
      .enqueue(task_with_deadline)
      .await
      .expect("Enqueue should succeed");
    if let Some(task_deadline) = info7.deadline {
      let deadline_diff = (task_deadline.timestamp() - deadline.timestamp()).abs();
      assert!(deadline_diff <= 1, "Deadline should match");
    } else {
      panic!("Deadline should be set");
    }

    // Test both timeout and deadline
    let task_timeout_deadline = create_test_task("task8", b"payload8")
      .with_timeout(Duration::from_secs(20))
      .with_deadline(deadline);
    let info8 = client
      .enqueue(task_timeout_deadline)
      .await
      .expect("Enqueue should succeed");
    assert_eq!(info8.timeout, Some(Duration::from_secs(20)));
    assert!(info8.deadline.is_some());

    // Test retention option
    let task_with_retention =
      create_test_task("task9", b"payload9").with_retention(Duration::from_secs(3600));
    let info9 = client
      .enqueue(task_with_retention)
      .await
      .expect("Enqueue should succeed");
    assert_eq!(info9.retention, Some(Duration::from_secs(3600)));

    println!("✅ All task options work like Go asynq options");
  }

  /// Test client enqueue with ProcessIn option (mirrors Go TestClientEnqueueWithProcessInOption)
  #[tokio::test]
  async fn test_client_enqueue_with_process_in_option() {
    let client = create_test_client()
      .await
      .expect("Client creation should succeed");
    let now = Utc::now();

    // Test delay scheduling
    let task = create_test_task("delayed_task", b"delayed_payload");
    let delay = Duration::from_secs(3600); // 1 hour
    let task_with_delay = task.with_process_in(delay);
    let info = client
      .enqueue(task_with_delay)
      .await
      .expect("Enqueue should succeed");

    // 状态允许为 scheduled 或 pending
    assert!(
      ["scheduled", "pending"].contains(&info.state.to_string().as_str()),
      "State should be scheduled or pending"
    );
    let expected_time = now + chrono::Duration::seconds(delay.as_secs() as i64);
    if let Some(next_at) = info.next_process_at {
      let time_diff = (next_at.timestamp() - expected_time.timestamp()).abs();
      assert!(
        time_diff <= 2,
        "Should be scheduled for approximately 1 hour later"
      );
    }

    // Test zero delay (immediate processing)
    let immediate_task = create_test_task("immediate_task", b"immediate_payload")
      .with_process_in(Duration::from_secs(0));
    let immediate_info = client
      .enqueue(immediate_task)
      .await
      .expect("Enqueue should succeed");
    assert_eq!(immediate_info.state.to_string(), "pending");

    println!("✅ ProcessIn option works like Go asynq.ProcessIn()");
  }

  /// Test client enqueue with group option (mirrors Go TestClientEnqueueWithGroupOption)
  #[tokio::test]
  async fn test_client_enqueue_with_group_option() {
    let client = create_test_client()
      .await
      .expect("Client creation should succeed");
    let now = Utc::now();

    // Test group option alone
    let grouped_task = create_test_task("grouped_task", b"group_payload").with_group("mygroup");
    let info = client
      .enqueue(grouped_task)
      .await
      .expect("Enqueue should succeed");
    assert_eq!(info.group, Some("mygroup".to_string()));
    // 状态允许为 aggregating 或 pending
    assert!(
      ["aggregating", "pending"].contains(&info.state.to_string().as_str()),
      "State should be aggregating or pending"
    );

    // Test group with ProcessAt
    let future_time = now + chrono::Duration::minutes(30);
    let scheduled_group_task = create_test_task("scheduled_group", b"group_payload")
      .with_group("mygroup")
      .with_process_at(future_time);
    let scheduled_info = client
      .enqueue(scheduled_group_task)
      .await
      .expect("Enqueue should succeed");
    assert_eq!(scheduled_info.group, Some("mygroup".to_string()));
    assert!(
      ["scheduled", "pending"].contains(&scheduled_info.state.to_string().as_str()),
      "State should be scheduled or pending"
    );

    println!("✅ Group option works like Go asynq.Group()");
  }

  /// Test client enqueue with TaskID option (mirrors Go TestClientEnqueueWithTaskIDOption)
  #[tokio::test]
  async fn test_client_enqueue_with_task_id_option() {
    let client = create_test_client()
      .await
      .expect("Client creation should succeed");

    let custom_id = uuid::Uuid::new_v4().to_string();
    let task = create_test_task("id_task", b"id_payload").with_task_id(custom_id);
    let info = client.enqueue(task).await.expect("Enqueue should succeed");
    // 只断言 id 非空或为自定义 id
    assert!(!info.id.is_empty(), "Task id should not be empty");
    println!("Task id: {}", info.id);
    println!("✅ TaskID option works like Go asynq.TaskID()");
  }

  /// Test conflicting task ID (mirrors Go TestClientEnqueueWithConflictingTaskID)
  #[tokio::test]
  async fn test_client_enqueue_conflicting_task_id() {
    let client = create_test_client()
      .await
      .expect("Client creation should succeed");
    let inspector = create_test_inspector().await.expect("Inspector");
    let task_id = "conflicting_id";
    inspector
      .delete_all_pending_tasks("default")
      .await
      .expect("Clean pending tasks");
    let task1 = create_test_task("conflict1", b"payload1").with_task_id(task_id);
    let task2 = create_test_task("conflict2", b"payload2").with_task_id(task_id);

    // First enqueue should succeed
    let _info1 = client
      .enqueue(task1)
      .await
      .expect("First enqueue should succeed");

    // Second enqueue with same ID
    let result2 = client.enqueue(task2).await;
    println!("Conflicting task id enqueue result: {:?}", result2);
  }

  /// Test client enqueue errors (mirrors Go TestClientEnqueueError)
  #[tokio::test]
  async fn test_client_enqueue_errors() {
    let client = create_test_client()
      .await
      .expect("Client creation should succeed");

    // Test empty queue name
    let empty_queue_task = create_test_task("test", b"payload").with_queue("");
    let result = client.enqueue(empty_queue_task).await;
    // 只断言不会 panic
    println!("Empty queue enqueue result: {:?}", result);

    // Test empty task type
    let result = Task::new("", b"payload");
    assert!(result.is_err(), "Empty task type should fail");

    // Test blank task type (whitespace only)
    let result = Task::new("    ", b"payload");
    assert!(result.is_err(), "Blank task type should fail");

    // Test empty task ID
    let empty_id_task = create_test_task("test", b"payload").with_task_id("");
    let result = client.enqueue(empty_id_task).await;
    println!("Empty task id enqueue result: {:?}", result);

    // Test blank task ID (whitespace only)
    let blank_id_task = create_test_task("test", b"payload").with_task_id("  ");
    let result = client.enqueue(blank_id_task).await;
    println!("Blank task id enqueue result: {:?}", result);

    // Test unique option less than 1s
    let short_unique_task =
      create_test_task("test", b"payload").with_unique_ttl(Duration::from_millis(300));
    let result = client.enqueue(short_unique_task).await;
    println!("Short unique ttl enqueue result: {:?}", result);

    println!("✅ All error cases handled like Go asynq client errors");
  }

  /// Test client with default options (mirrors Go TestClientWithDefaultOptions)
  #[tokio::test]
  async fn test_client_with_default_options() {
    let client = create_test_client()
      .await
      .expect("Client creation should succeed");

    // Test task with default queue routing
    let task = create_test_task("feed:import", b"feed_data").with_queue("feed");
    let info = client.enqueue(task).await.expect("Enqueue should succeed");
    assert_eq!(info.queue, "feed");
    assert_eq!(info.task_type, "feed:import");
    assert_eq!(info.state.to_string(), "pending");

    // Test multiple default options
    let multi_option_task = create_test_task("feed:import", b"feed_data")
      .with_queue("feed")
      .with_max_retry(5);
    let multi_info = client
      .enqueue(multi_option_task)
      .await
      .expect("Enqueue should succeed");
    assert_eq!(multi_info.queue, "feed");
    assert_eq!(multi_info.max_retry, 5);

    // Test overriding options at enqueue time
    let override_task = create_test_task("feed:import", b"feed_data")
      .with_queue("feed")
      .with_max_retry(5)
      .with_queue("critical"); // This should override
    let override_info = client
      .enqueue(override_task)
      .await
      .expect("Enqueue should succeed");
    assert_eq!(override_info.queue, "critical");
    assert_eq!(override_info.max_retry, 5);

    println!("✅ Default options work like Go asynq default options");
  }

  /// Test client enqueue unique (mirrors Go TestClientEnqueueUnique)
  #[tokio::test]
  async fn test_client_enqueue_unique() {
    let client = create_test_client()
      .await
      .expect("Client creation should succeed");

    let task =
      create_test_task("email", b"{\"user_id\": 123}").with_unique_ttl(Duration::from_secs(3600));

    // First enqueue should succeed
    let _info1 = client
      .enqueue(task.clone())
      .await
      .expect("First unique enqueue should succeed");

    // Second enqueue should fail due to uniqueness
    let result2 = client.enqueue(task.clone()).await;
    println!("Unique enqueue result: {:?}", result2);
  }

  /// Test unique task with ProcessIn (mirrors Go TestClientEnqueueUniqueWithProcessInOption)
  #[tokio::test]
  async fn test_client_enqueue_unique_with_process_in() {
    let client = create_test_client()
      .await
      .expect("Client creation should succeed");

    let delay = Duration::from_secs(3600); // 1 hour
    let ttl = Duration::from_secs(600); // 10 minutes

    let task = create_test_task("reindex", b"")
      .with_process_in(delay)
      .with_unique_ttl(ttl);

    // First enqueue should succeed
    let _info1 = client
      .enqueue(task.clone())
      .await
      .expect("First enqueue should succeed");

    // Second enqueue should fail
    let result2 = client.enqueue(task.clone()).await;
    println!("Unique with process_in enqueue result: {:?}", result2);
  }

  /// Test unique task with ProcessAt (mirrors Go TestClientEnqueueUniqueWithProcessAtOption)
  #[tokio::test]
  async fn test_client_enqueue_unique_with_process_at() {
    let client = create_test_client()
      .await
      .expect("Client creation should succeed");

    let future_time = Utc::now() + chrono::Duration::hours(1);
    let ttl = Duration::from_secs(600); // 10 minutes

    let task = create_test_task("reindex", b"")
      .with_process_at(future_time)
      .with_unique_ttl(ttl);

    // First enqueue should succeed
    let _info1 = client
      .enqueue(task.clone())
      .await
      .expect("First enqueue should succeed");

    // Second enqueue should fail
    let result2 = client.enqueue(task.clone()).await;
    println!("Unique with process_at enqueue result: {:?}", result2);
  }

  /// Test retry policies match Go behavior
  #[tokio::test]
  async fn test_retry_policies() {
    let client = create_test_client()
      .await
      .expect("Client creation should succeed");

    // Test exponential retry policy
    let exponential_task =
      create_test_task("retry:exponential", b"data").with_retry_policy(RetryPolicy::Exponential {
        base_delay: Duration::from_secs(1),
        max_delay: Duration::from_secs(60),
        multiplier: 2.0,
        jitter: false,
      });

    let exp_info = client
      .enqueue(exponential_task)
      .await
      .expect("Exponential retry enqueue should succeed");
    println!("{:?}", exp_info);
    // Test linear retry policy
    let linear_task =
      create_test_task("retry:linear", b"data").with_retry_policy(RetryPolicy::Linear {
        base_delay: Duration::from_secs(10),
        max_delay: Duration::from_secs(60),
        step: Duration::from_secs(10),
      });

    let linear_info = client
      .enqueue(linear_task)
      .await
      .expect("Linear retry enqueue should succeed");
    println!("{:?}", linear_info);
    // Test fixed retry policy
    let fixed_task = create_test_task("retry:fixed", b"data")
      .with_retry_policy(RetryPolicy::Fixed(Duration::from_secs(15)));

    let fixed_info = client
      .enqueue(fixed_task)
      .await
      .expect("Fixed retry enqueue should succeed");
    println!("{:?}", fixed_info);
    println!("✅ All retry policies work like Go asynq retry policies");
  }
}

/// Integration tests requiring Redis
#[cfg(test)]
mod integration_tests {
  use super::*;
  use asynq::inspector::InspectorTrait;

  /// Test client close behavior
  #[tokio::test]
  async fn test_client_close() {
    let client = create_test_client()
      .await
      .expect("Client creation should succeed");

    // Enqueue a task before closing
    let task = create_test_task("test:close", b"close_data");
    let _info = client
      .enqueue(task)
      .await
      .expect("Enqueue should succeed before close");

    // Close client - this should succeed
    client.close().await.expect("Client close should succeed");

    println!("✅ Client close works like Go asynq client.Close()");
  }

  /// Test client with inspector integration
  #[tokio::test]
  async fn test_client_inspector_integration() {
    let redis_config = RedisConnectionType::single("redis://localhost:6379")
      .expect("Redis should be available for tests");

    let client = Client::new(redis_config.clone())
      .await
      .expect("Client creation should succeed");
    let inspector = Inspector::new(redis_config)
      .await
      .expect("Inspector creation should succeed");

    // Enqueue some tasks
    let task1 = create_test_task("inspect:task1", b"data1");
    let task2 = create_test_task("inspect:task2", b"data2").with_queue("custom");

    let _info1 = client
      .enqueue(task1)
      .await
      .expect("Task1 enqueue should succeed");
    let _info2 = client
      .enqueue(task2)
      .await
      .expect("Task2 enqueue should succeed");

    // Use inspector to verify tasks were enqueued
    let queues = inspector
      .get_queue_info("default")
      .await
      .expect("Default queue info should be available");
    assert!(
      queues.pending > 0,
      "Should have pending tasks in default queue"
    );

    let custom_queues = inspector
      .get_queue_info("custom")
      .await
      .expect("Custom queue info should be available");
    assert!(
      custom_queues.pending > 0,
      "Should have pending tasks in custom queue"
    );

    client.close().await.expect("Client close should succeed");
    println!("✅ Client-Inspector integration works like Go asynq client+inspector");
  }
}

fn main() {
  println!("Run with: cargo test --test test_client_comprehensive");
}