asynq 0.1.8

Simple, reliable & efficient distributed task queue in Rust, inspired by hibiken/asynq
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
//! 配置模块
//! Configuration module
//!
//! 定义了服务器和客户端的配置选项
//! Defines configuration options for server and client

use crate::base::constants::DEFAULT_QUEUE_NAME;
use crate::error::{Error, Result};
use std::collections::HashMap;
use std::time::Duration;

/// 服务器配置
/// Server configuration
#[derive(Debug, Clone)]
pub struct ServerConfig {
  /// 并发工作者数量
  /// Number of concurrent workers
  pub concurrency: usize,
  /// 队列配置,键为队列名称,值为优先级
  /// Queue configuration, key is queue name, value is priority
  pub queues: HashMap<String, i32>,
  /// 是否使用严格优先级
  /// Whether to use strict priority
  pub strict_priority: bool,
  /// 任务检查间隔
  /// Task check interval
  pub task_check_interval: Duration,
  /// 延迟任务检查间隔
  /// Delayed task check interval
  pub delayed_task_check_interval: Duration,
  /// 关闭超时时间
  /// Shutdown timeout
  pub shutdown_timeout: Duration,
  /// 健康检查间隔
  /// Health check interval
  pub health_check_interval: Duration,
  /// 组宽限期
  /// Group grace period
  pub group_grace_period: Duration,
  /// 组最大延迟
  /// Maximum group delay
  pub group_max_delay: Option<Duration>,
  /// 组最大大小
  /// Maximum group size
  pub group_max_size: Option<usize>,
  /// 清理任务间隔
  /// Janitor interval
  pub janitor_interval: Duration,
  /// 清理任务批量大小
  /// Janitor batch size
  pub janitor_batch_size: usize,
  /// 心跳间隔
  /// Heartbeat interval
  pub heartbeat_interval: Duration,
  /// 是否启用组聚合器
  /// Whether to enable group aggregator
  pub group_aggregator_enabled: bool,
  /// 是否启用周期性任务管理器
  /// Whether to enable periodic task manager
  pub periodic_task_manager_enabled: bool,
  /// 周期性任务管理器检查间隔
  /// Periodic task manager check interval
  pub periodic_task_manager_check_interval: Duration,
  /// ACL 租户名称(用户名),用作 ACL 前缀
  /// ACL tenant name (username), used as ACL prefix
  /// When set, ACL feature is automatically enabled
  pub acl_tenant: Option<String>,
}

impl Default for ServerConfig {
  fn default() -> Self {
    let mut queues = HashMap::new();
    queues.insert(DEFAULT_QUEUE_NAME.to_string(), 1);
    Self {
      concurrency: num_cpus::get(),
      queues,
      strict_priority: false,
      task_check_interval: Duration::from_secs(1),
      delayed_task_check_interval: Duration::from_secs(5),
      shutdown_timeout: Duration::from_secs(8),
      health_check_interval: Duration::from_secs(15),
      group_grace_period: Duration::from_secs(60),
      group_max_delay: None,
      group_max_size: None,
      janitor_interval: Duration::from_secs(8),
      janitor_batch_size: 100,
      heartbeat_interval: Duration::from_secs(5),
      group_aggregator_enabled: false,
      periodic_task_manager_enabled: false,
      periodic_task_manager_check_interval: Duration::from_secs(60),
      acl_tenant: None,
    }
  }
}

impl ServerConfig {
  /// 创建新的服务器配置
  /// Create a new server configuration
  pub fn new() -> Self {
    Self::default()
  }

  /// 设置并发数
  /// Set the number of concurrent workers
  pub fn concurrency(mut self, concurrency: usize) -> Self {
    self.concurrency = concurrency.max(1);
    self
  }

  /// 设置队列配置
  /// Set the queue configuration
  pub fn queues(mut self, queues: HashMap<String, i32>) -> Self {
    if queues.is_empty() {
      let mut default_queues = HashMap::new();
      default_queues.insert(DEFAULT_QUEUE_NAME.to_string(), 1);
      self.queues = default_queues;
    } else {
      self.queues = queues;
    }
    self
  }

  /// 添加队列
  /// Add a queue
  pub fn add_queue<S: AsRef<str>>(mut self, name: S, priority: i32) -> Result<Self> {
    let name = name.as_ref();
    if name.trim().is_empty() {
      return Err(Error::InvalidQueueName {
        name: name.to_string(),
      });
    }
    if priority <= 0 {
      return Err(Error::config("Queue priority must be positive"));
    }
    self.queues.insert(name.to_string(), priority);
    Ok(self)
  }

  /// 设置严格优先级
  /// Set strict priority
  pub fn strict_priority(mut self, strict: bool) -> Self {
    self.strict_priority = strict;
    self
  }

  /// 设置任务检查间隔
  /// Set the task check interval
  pub fn task_check_interval(mut self, interval: Duration) -> Self {
    self.task_check_interval = interval;
    self
  }

  /// 设置延迟任务检查间隔
  /// Set the delayed task check interval
  pub fn delayed_task_check_interval(mut self, interval: Duration) -> Self {
    self.delayed_task_check_interval = interval;
    self
  }

  /// 设置关闭超时时间
  /// Set the shutdown timeout
  pub fn shutdown_timeout(mut self, timeout: Duration) -> Self {
    self.shutdown_timeout = timeout;
    self
  }

  /// 设置健康检查间隔
  /// Set the health check interval
  pub fn health_check_interval(mut self, interval: Duration) -> Self {
    self.health_check_interval = interval;
    self
  }

  /// 设置组宽限期
  /// Set the group grace period
  pub fn group_grace_period(mut self, grace_period: Duration) -> Result<Self> {
    if grace_period < Duration::from_secs(1) {
      return Err(Error::config(
        "Group grace period cannot be less than 1 second",
      ));
    }
    self.group_grace_period = grace_period;
    Ok(self)
  }

  /// 设置组最大延迟
  /// Set the maximum group delay
  pub fn group_max_delay(mut self, max_delay: Duration) -> Self {
    self.group_max_delay = Some(max_delay);
    self
  }

  /// 设置组最大大小
  /// Set the maximum group size
  pub fn group_max_size(mut self, max_size: usize) -> Self {
    self.group_max_size = Some(max_size);
    self
  }

  /// 设置清理任务间隔
  /// Set the janitor interval
  pub fn janitor_interval(mut self, interval: Duration) -> Self {
    self.janitor_interval = interval;
    self
  }

  /// 设置清理任务批量大小
  /// Set the janitor batch size
  pub fn janitor_batch_size(mut self, batch_size: usize) -> Self {
    self.janitor_batch_size = batch_size.max(1);
    self
  }

  /// 启用组聚合器
  /// Enable group aggregator
  pub fn enable_group_aggregator(mut self, enabled: bool) -> Self {
    self.group_aggregator_enabled = enabled;
    self
  }

  /// 启用周期性任务管理器
  /// Enable periodic task manager
  pub fn enable_periodic_task_manager(mut self, enabled: bool) -> Self {
    self.periodic_task_manager_enabled = enabled;
    self
  }

  /// 设置周期性任务管理器检查间隔
  /// Set periodic task manager check interval
  pub fn periodic_task_manager_check_interval(mut self, interval: Duration) -> Self {
    self.periodic_task_manager_check_interval = interval;
    self
  }

  /// 设置 ACL 租户名称
  /// Set ACL tenant name
  ///
  /// 当设置租户名称时,ACL 特性会自动启用,队列名称将自动添加租户前缀
  /// When tenant name is set, ACL feature is automatically enabled and queue names will have the tenant prefix added
  pub fn acl_tenant<S: Into<String>>(mut self, tenant: S) -> Self {
    self.acl_tenant = Some(tenant.into());
    self
  }

  /// 获取带 ACL 前缀的队列名称
  /// Get queue name with ACL prefix
  ///
  /// 如果配置了租户,返回 `{tenant:queue}`,否则返回原始队列名
  /// If tenant is configured, returns `{tenant:queue}`, otherwise returns original queue name
  /// 注意:默认队列(DEFAULT_QUEUE_NAME)不添加前缀,这是所有租户共享的公共队列
  /// Note: Default queue (DEFAULT_QUEUE_NAME) is not prefixed, it's a shared public queue for all tenants
  pub fn get_queue_name_with_prefix(&self, queue: &str) -> String {
    // 默认队列不添加前缀
    // Default queue is not prefixed
    if queue == DEFAULT_QUEUE_NAME {
      return queue.to_string();
    }

    if let Some(tenant) = &self.acl_tenant {
      return format!("{}:{}", tenant, queue);
    }
    queue.to_string()
  }

  /// 获取所有带 ACL 前缀的队列
  /// Get all queues with ACL prefix
  ///
  /// 返回一个新的 HashMap,其中所有队列名称都添加了 ACL 前缀(如果配置了租户)
  /// Returns a new HashMap where all queue names have the ACL prefix added (if tenant is configured)
  pub fn get_queues_with_prefix(&self) -> HashMap<String, i32> {
    if self.acl_tenant.is_some() {
      self
        .queues
        .iter()
        .map(|(name, priority)| (self.get_queue_name_with_prefix(name), *priority))
        .collect()
    } else {
      self.queues.clone()
    }
  }

  /// 验证配置
  /// Validate the configuration
  pub fn validate(&self) -> Result<()> {
    if self.concurrency == 0 {
      return Err(Error::config("Concurrency must be greater than 0"));
    }

    if self.queues.is_empty() {
      return Err(Error::config("At least one queue must be configured"));
    }

    for (name, priority) in &self.queues {
      if name.trim().is_empty() {
        return Err(Error::InvalidQueueName { name: name.clone() });
      }
      if *priority <= 0 {
        return Err(Error::config("Queue priority must be positive"));
      }
    }

    if self.group_grace_period < Duration::from_secs(1) {
      return Err(Error::config(
        "Group grace period cannot be less than 1 second",
      ));
    }

    Ok(())
  }
}

/// 客户端配置
/// Client configuration
#[derive(Debug, Clone)]
pub struct ClientConfig {
  /// 连接超时时间
  /// Connection timeout
  pub connection_timeout: Duration,
  /// 请求超时时间
  /// Request timeout
  pub request_timeout: Duration,
  /// 最大重试次数
  /// Maximum number of retries
  pub max_retries: usize,
  /// 重试间隔
  /// Retry interval
  pub retry_interval: Duration,
  /// ACL 租户名称(用户名),用作 ACL 前缀
  /// ACL tenant name (username), used as ACL prefix
  /// When set, ACL feature is automatically enabled
  pub acl_tenant: Option<String>,
}

impl Default for ClientConfig {
  fn default() -> Self {
    Self {
      connection_timeout: Duration::from_secs(30),
      request_timeout: Duration::from_secs(60),
      max_retries: 3,
      retry_interval: Duration::from_secs(1),
      acl_tenant: None,
    }
  }
}

impl ClientConfig {
  /// 创建新的客户端配置
  /// Create a new client configuration
  pub fn new() -> Self {
    Self::default()
  }

  /// 设置连接超时时间
  /// Set the connection timeout
  pub fn connection_timeout(mut self, timeout: Duration) -> Self {
    self.connection_timeout = timeout;
    self
  }

  /// 设置请求超时时间
  /// Set the request timeout
  pub fn request_timeout(mut self, timeout: Duration) -> Self {
    self.request_timeout = timeout;
    self
  }

  /// 设置最大重试次数
  /// Set the maximum number of retries
  pub fn max_retries(mut self, max_retries: usize) -> Self {
    self.max_retries = max_retries;
    self
  }

  /// 设置重试间隔
  /// Set the retry interval
  pub fn retry_interval(mut self, interval: Duration) -> Self {
    self.retry_interval = interval;
    self
  }

  /// 设置 ACL 租户名称
  /// Set ACL tenant name
  ///
  /// 当设置租户名称时,ACL 特性会自动启用,队列名称将自动添加租户前缀
  /// When tenant name is set, ACL feature is automatically enabled and queue names will have the tenant prefix added
  pub fn acl_tenant<S: Into<String>>(mut self, tenant: S) -> Self {
    self.acl_tenant = Some(tenant.into());
    self
  }

  /// 获取带 ACL 前缀的队列名称
  /// Get queue name with ACL prefix
  ///
  /// 如果配置了租户,返回 `{tenant}:{queue}`,否则返回原始队列名
  /// If tenant is configured, returns `{tenant}:{queue}`, otherwise returns original queue name
  /// 注意:默认队列(DEFAULT_QUEUE_NAME)不添加前缀,这是所有租户共享的公共队列
  /// Note: Default queue (DEFAULT_QUEUE_NAME) is not prefixed, it's a shared public queue for all tenants
  pub fn get_queue_name_with_prefix(&self, queue: &str) -> String {
    // 默认队列不添加前缀
    // Default queue is not prefixed
    if queue == DEFAULT_QUEUE_NAME {
      return queue.to_string();
    }

    if let Some(tenant) = &self.acl_tenant {
      return format!("{}:{}", tenant, queue);
    }
    queue.to_string()
  }
}

/// 重试延迟函数类型
/// Retry delay function type
pub type RetryDelayFunc = Box<dyn Fn(i32, &str, &str) -> Duration + Send + Sync>;

/// 默认重试延迟函数
/// Default retry delay function
pub fn default_retry_delay(retried: i32, _error: &str, _task_type: &str) -> Duration {
  // 使用指数退避策略
  // Use exponential backoff strategy
  let base_delay = (retried as f64).powf(4.0) as i64 + 15;
  let jitter = rand::random::<i64>() % (30 * (retried as i64 + 1));
  Duration::from_secs((base_delay + jitter).max(1) as u64)
}

/// 错误处理函数类型
/// Error handler function type
pub type ErrorHandlerFunc = Box<dyn Fn(&str, &str, &str) + Send + Sync>;

/// 健康检查函数类型  
/// Health check function type
pub type HealthCheckFunc = Box<dyn Fn(Option<&str>) + Send + Sync>;

#[cfg(test)]
mod tests {
  use super::*;

  #[test]
  fn test_server_config_default() {
    let config = ServerConfig::default();
    assert!(config.concurrency > 0);
    assert_eq!(config.queues.len(), 1);
    assert!(config.queues.contains_key(DEFAULT_QUEUE_NAME));
    assert!(!config.strict_priority);
  }

  #[test]
  fn test_server_config_builder() {
    let mut queues = HashMap::new();
    queues.insert("high".to_string(), 10);
    queues.insert("low".to_string(), 1);

    let config = ServerConfig::new()
      .concurrency(4)
      .queues(queues.clone())
      .strict_priority(true);

    assert_eq!(config.concurrency, 4);
    assert_eq!(config.queues, queues);
    assert!(config.strict_priority);
  }

  #[test]
  fn test_server_config_add_queue() {
    let config = ServerConfig::new().add_queue("test", 5).unwrap();

    assert!(config.queues.contains_key("test"));
    assert_eq!(config.queues.get("test"), Some(&5));
  }

  #[test]
  fn test_server_config_validation() {
    let config = ServerConfig::new();
    assert!(config.validate().is_ok());

    let invalid_config = ServerConfig {
      concurrency: 0,
      ..ServerConfig::default()
    };
    assert!(invalid_config.validate().is_err());
  }

  #[test]
  fn test_client_config_default() {
    let config = ClientConfig::default();
    assert_eq!(config.connection_timeout, Duration::from_secs(30));
    assert_eq!(config.request_timeout, Duration::from_secs(60));
    assert_eq!(config.max_retries, 3);
  }

  #[test]
  fn test_default_retry_delay() {
    let delay1 = default_retry_delay(0, "error", "task");
    let delay2 = default_retry_delay(1, "error", "task");
    let delay3 = default_retry_delay(2, "error", "task");

    assert!(delay1 >= Duration::from_secs(1));
    // 由于随机性,我们只检查延迟函数是否返回合理的值
    // Due to randomness, we only check if the delay function returns reasonable values
    assert!(delay2 >= Duration::from_secs(1));
    assert!(delay3 >= Duration::from_secs(1));

    // 检查延迟计算的基本逻辑:重试次数越高,基础延迟越长
    // Check the basic logic of delay calculation: the higher the number of retries, the longer the base delay
    let base_delay_0 = default_retry_delay(0, "error", "task");
    let base_delay_5 = default_retry_delay(5, "error", "task");
    // 第5次重试的基础部分应该明显大于第0次
    // The base part of the 5th retry should be significantly larger than the 0th
    assert!(base_delay_5.as_secs() >= base_delay_0.as_secs());
  }

  #[test]
  fn test_aggregator_enabled_in_config() {
    // Test that group aggregator can be enabled
    let config = ServerConfig::new().enable_group_aggregator(true);

    assert!(config.group_aggregator_enabled);
  }

  #[test]
  fn test_aggregator_disabled_by_default() {
    // Test that group aggregator is disabled by default
    let config = ServerConfig::default();

    assert!(!config.group_aggregator_enabled);
  }

  #[test]
  fn test_aggregator_config_with_group_settings() {
    // Test that aggregator config works with group settings
    let config = ServerConfig::new()
      .group_grace_period(Duration::from_secs(30))
      .unwrap()
      .group_max_delay(Duration::from_secs(120))
      .group_max_size(50)
      .enable_group_aggregator(true);

    assert!(config.group_aggregator_enabled);
    assert_eq!(config.group_grace_period, Duration::from_secs(30));
    assert_eq!(config.group_max_delay, Some(Duration::from_secs(120)));
    assert_eq!(config.group_max_size, Some(50));
  }

  #[test]
  fn test_periodic_task_manager_enabled_in_config() {
    // Test that periodic task manager can be enabled
    let config = ServerConfig::new().enable_periodic_task_manager(true);

    assert!(config.periodic_task_manager_enabled);
  }

  #[test]
  fn test_periodic_task_manager_disabled_by_default() {
    // Test that periodic task manager is disabled by default
    let config = ServerConfig::default();

    assert!(!config.periodic_task_manager_enabled);
  }

  #[test]
  fn test_periodic_task_manager_check_interval() {
    // Test that periodic task manager check interval can be configured
    let config = ServerConfig::new()
      .periodic_task_manager_check_interval(Duration::from_secs(30))
      .enable_periodic_task_manager(true);

    assert!(config.periodic_task_manager_enabled);
    assert_eq!(
      config.periodic_task_manager_check_interval,
      Duration::from_secs(30)
    );
  }

  #[test]
  fn test_acl_disabled_by_default() {
    // Test that ACL is disabled by default in both configs
    let server_config = ServerConfig::default();
    assert!(server_config.acl_tenant.is_none());

    let client_config = ClientConfig::default();
    assert!(client_config.acl_tenant.is_none());
  }

  #[test]
  fn test_server_config_acl_configuration() {
    // Test that ACL can be enabled in server config
    let config = ServerConfig::new().acl_tenant("tenant1");

    assert_eq!(config.acl_tenant, Some("tenant1".to_string()));
  }

  #[test]
  fn test_client_config_acl_configuration() {
    // Test that ACL can be enabled in client config
    let config = ClientConfig::new().acl_tenant("tenant1");

    assert_eq!(config.acl_tenant, Some("tenant1".to_string()));
  }

  #[test]
  fn test_server_config_queue_name_with_prefix() {
    // Test queue name transformation with ACL prefix
    let config = ServerConfig::new().acl_tenant("tenant1");

    // Default queue should NOT be prefixed (shared public queue)
    assert_eq!(config.get_queue_name_with_prefix("default"), "default");
    // Other queues should be prefixed
    assert_eq!(
      config.get_queue_name_with_prefix("critical"),
      "tenant1:critical"
    );
  }

  #[test]
  fn test_server_config_queue_name_without_acl() {
    // Test that queue names are unchanged when ACL is disabled
    let config = ServerConfig::new();

    assert_eq!(config.get_queue_name_with_prefix("default"), "default");
    assert_eq!(config.get_queue_name_with_prefix("critical"), "critical");
  }

  #[test]
  fn test_server_config_get_queues_with_prefix() {
    // Test get_queues_with_prefix method
    let mut queues = HashMap::new();
    queues.insert("default".to_string(), 3);
    queues.insert("critical".to_string(), 6);

    let config = ServerConfig::new().queues(queues).acl_tenant("tenant1");

    let prefixed_queues = config.get_queues_with_prefix();
    assert_eq!(prefixed_queues.len(), 2);
    // Default queue should NOT be prefixed
    assert_eq!(prefixed_queues.get("default"), Some(&3));
    assert_eq!(prefixed_queues.get("tenant1:critical"), Some(&6));
    assert!(!prefixed_queues.contains_key("tenant1:default"));
  }

  #[test]
  fn test_server_config_get_queues_without_acl() {
    // Test that get_queues_with_prefix returns original queues when ACL is disabled
    let mut queues = HashMap::new();
    queues.insert("default".to_string(), 3);
    queues.insert("critical".to_string(), 6);

    let config = ServerConfig::new().queues(queues.clone());

    let result_queues = config.get_queues_with_prefix();
    assert_eq!(result_queues, queues);
  }

  #[test]
  fn test_client_config_queue_name_with_prefix() {
    // Test queue name transformation with ACL prefix for client
    let config = ClientConfig::new().acl_tenant("tenant1");

    // Default queue should NOT be prefixed (shared public queue)
    assert_eq!(config.get_queue_name_with_prefix("default"), "default");
    // Other queues should be prefixed
    assert_eq!(
      config.get_queue_name_with_prefix("critical"),
      "tenant1:critical"
    );
  }

  #[test]
  fn test_client_config_queue_name_without_acl() {
    // Test that queue names are unchanged when ACL is disabled for client
    let config = ClientConfig::new();

    assert_eq!(config.get_queue_name_with_prefix("default"), "default");
    assert_eq!(config.get_queue_name_with_prefix("critical"), "critical");
  }

  #[test]
  fn test_default_queue_not_prefixed() {
    // Test that DEFAULT_QUEUE_NAME is never prefixed, even with ACL enabled
    // This is a shared public queue for all tenants
    let server_config = ServerConfig::new().acl_tenant("tenant1");

    assert_eq!(
      server_config.get_queue_name_with_prefix(DEFAULT_QUEUE_NAME),
      DEFAULT_QUEUE_NAME
    );

    let client_config = ClientConfig::new().acl_tenant("tenant1");

    assert_eq!(
      client_config.get_queue_name_with_prefix(DEFAULT_QUEUE_NAME),
      DEFAULT_QUEUE_NAME
    );
  }
}