use crate::base::constants::DEFAULT_QUEUE_NAME;
use crate::error::{Error, Result};
use std::collections::HashMap;
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct ServerConfig {
pub concurrency: usize,
pub queues: HashMap<String, i32>,
pub strict_priority: bool,
pub task_check_interval: Duration,
pub delayed_task_check_interval: Duration,
pub shutdown_timeout: Duration,
pub health_check_interval: Duration,
pub group_grace_period: Duration,
pub group_max_delay: Option<Duration>,
pub group_max_size: Option<usize>,
pub janitor_interval: Duration,
pub janitor_batch_size: usize,
pub heartbeat_interval: Duration,
pub group_aggregator_enabled: bool,
pub periodic_task_manager_enabled: bool,
pub periodic_task_manager_check_interval: Duration,
pub acl_tenant: Option<String>,
}
impl Default for ServerConfig {
fn default() -> Self {
let mut queues = HashMap::new();
queues.insert(DEFAULT_QUEUE_NAME.to_string(), 1);
Self {
concurrency: num_cpus::get(),
queues,
strict_priority: false,
task_check_interval: Duration::from_secs(1),
delayed_task_check_interval: Duration::from_secs(5),
shutdown_timeout: Duration::from_secs(8),
health_check_interval: Duration::from_secs(15),
group_grace_period: Duration::from_secs(60),
group_max_delay: None,
group_max_size: None,
janitor_interval: Duration::from_secs(8),
janitor_batch_size: 100,
heartbeat_interval: Duration::from_secs(5),
group_aggregator_enabled: false,
periodic_task_manager_enabled: false,
periodic_task_manager_check_interval: Duration::from_secs(60),
acl_tenant: None,
}
}
}
impl ServerConfig {
pub fn new() -> Self {
Self::default()
}
pub fn concurrency(mut self, concurrency: usize) -> Self {
self.concurrency = concurrency.max(1);
self
}
pub fn queues(mut self, queues: HashMap<String, i32>) -> Self {
if queues.is_empty() {
let mut default_queues = HashMap::new();
default_queues.insert(DEFAULT_QUEUE_NAME.to_string(), 1);
self.queues = default_queues;
} else {
self.queues = queues;
}
self
}
pub fn add_queue<S: AsRef<str>>(mut self, name: S, priority: i32) -> Result<Self> {
let name = name.as_ref();
if name.trim().is_empty() {
return Err(Error::InvalidQueueName {
name: name.to_string(),
});
}
if priority <= 0 {
return Err(Error::config("Queue priority must be positive"));
}
self.queues.insert(name.to_string(), priority);
Ok(self)
}
pub fn strict_priority(mut self, strict: bool) -> Self {
self.strict_priority = strict;
self
}
pub fn task_check_interval(mut self, interval: Duration) -> Self {
self.task_check_interval = interval;
self
}
pub fn delayed_task_check_interval(mut self, interval: Duration) -> Self {
self.delayed_task_check_interval = interval;
self
}
pub fn shutdown_timeout(mut self, timeout: Duration) -> Self {
self.shutdown_timeout = timeout;
self
}
pub fn health_check_interval(mut self, interval: Duration) -> Self {
self.health_check_interval = interval;
self
}
pub fn group_grace_period(mut self, grace_period: Duration) -> Result<Self> {
if grace_period < Duration::from_secs(1) {
return Err(Error::config(
"Group grace period cannot be less than 1 second",
));
}
self.group_grace_period = grace_period;
Ok(self)
}
pub fn group_max_delay(mut self, max_delay: Duration) -> Self {
self.group_max_delay = Some(max_delay);
self
}
pub fn group_max_size(mut self, max_size: usize) -> Self {
self.group_max_size = Some(max_size);
self
}
pub fn janitor_interval(mut self, interval: Duration) -> Self {
self.janitor_interval = interval;
self
}
pub fn janitor_batch_size(mut self, batch_size: usize) -> Self {
self.janitor_batch_size = batch_size.max(1);
self
}
pub fn enable_group_aggregator(mut self, enabled: bool) -> Self {
self.group_aggregator_enabled = enabled;
self
}
pub fn enable_periodic_task_manager(mut self, enabled: bool) -> Self {
self.periodic_task_manager_enabled = enabled;
self
}
pub fn periodic_task_manager_check_interval(mut self, interval: Duration) -> Self {
self.periodic_task_manager_check_interval = interval;
self
}
pub fn acl_tenant<S: Into<String>>(mut self, tenant: S) -> Self {
self.acl_tenant = Some(tenant.into());
self
}
pub fn get_queue_name_with_prefix(&self, queue: &str) -> String {
if queue == DEFAULT_QUEUE_NAME {
return queue.to_string();
}
if let Some(tenant) = &self.acl_tenant {
return format!("{}:{}", tenant, queue);
}
queue.to_string()
}
pub fn get_queues_with_prefix(&self) -> HashMap<String, i32> {
if self.acl_tenant.is_some() {
self
.queues
.iter()
.map(|(name, priority)| (self.get_queue_name_with_prefix(name), *priority))
.collect()
} else {
self.queues.clone()
}
}
pub fn validate(&self) -> Result<()> {
if self.concurrency == 0 {
return Err(Error::config("Concurrency must be greater than 0"));
}
if self.queues.is_empty() {
return Err(Error::config("At least one queue must be configured"));
}
for (name, priority) in &self.queues {
if name.trim().is_empty() {
return Err(Error::InvalidQueueName { name: name.clone() });
}
if *priority <= 0 {
return Err(Error::config("Queue priority must be positive"));
}
}
if self.group_grace_period < Duration::from_secs(1) {
return Err(Error::config(
"Group grace period cannot be less than 1 second",
));
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct ClientConfig {
pub connection_timeout: Duration,
pub request_timeout: Duration,
pub max_retries: usize,
pub retry_interval: Duration,
pub acl_tenant: Option<String>,
}
impl Default for ClientConfig {
fn default() -> Self {
Self {
connection_timeout: Duration::from_secs(30),
request_timeout: Duration::from_secs(60),
max_retries: 3,
retry_interval: Duration::from_secs(1),
acl_tenant: None,
}
}
}
impl ClientConfig {
pub fn new() -> Self {
Self::default()
}
pub fn connection_timeout(mut self, timeout: Duration) -> Self {
self.connection_timeout = timeout;
self
}
pub fn request_timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = timeout;
self
}
pub fn max_retries(mut self, max_retries: usize) -> Self {
self.max_retries = max_retries;
self
}
pub fn retry_interval(mut self, interval: Duration) -> Self {
self.retry_interval = interval;
self
}
pub fn acl_tenant<S: Into<String>>(mut self, tenant: S) -> Self {
self.acl_tenant = Some(tenant.into());
self
}
pub fn get_queue_name_with_prefix(&self, queue: &str) -> String {
if queue == DEFAULT_QUEUE_NAME {
return queue.to_string();
}
if let Some(tenant) = &self.acl_tenant {
return format!("{}:{}", tenant, queue);
}
queue.to_string()
}
}
pub type RetryDelayFunc = Box<dyn Fn(i32, &str, &str) -> Duration + Send + Sync>;
pub fn default_retry_delay(retried: i32, _error: &str, _task_type: &str) -> Duration {
let base_delay = (retried as f64).powf(4.0) as i64 + 15;
let jitter = rand::random::<i64>() % (30 * (retried as i64 + 1));
Duration::from_secs((base_delay + jitter).max(1) as u64)
}
pub type ErrorHandlerFunc = Box<dyn Fn(&str, &str, &str) + Send + Sync>;
pub type HealthCheckFunc = Box<dyn Fn(Option<&str>) + Send + Sync>;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_server_config_default() {
let config = ServerConfig::default();
assert!(config.concurrency > 0);
assert_eq!(config.queues.len(), 1);
assert!(config.queues.contains_key(DEFAULT_QUEUE_NAME));
assert!(!config.strict_priority);
}
#[test]
fn test_server_config_builder() {
let mut queues = HashMap::new();
queues.insert("high".to_string(), 10);
queues.insert("low".to_string(), 1);
let config = ServerConfig::new()
.concurrency(4)
.queues(queues.clone())
.strict_priority(true);
assert_eq!(config.concurrency, 4);
assert_eq!(config.queues, queues);
assert!(config.strict_priority);
}
#[test]
fn test_server_config_add_queue() {
let config = ServerConfig::new().add_queue("test", 5).unwrap();
assert!(config.queues.contains_key("test"));
assert_eq!(config.queues.get("test"), Some(&5));
}
#[test]
fn test_server_config_validation() {
let config = ServerConfig::new();
assert!(config.validate().is_ok());
let invalid_config = ServerConfig {
concurrency: 0,
..ServerConfig::default()
};
assert!(invalid_config.validate().is_err());
}
#[test]
fn test_client_config_default() {
let config = ClientConfig::default();
assert_eq!(config.connection_timeout, Duration::from_secs(30));
assert_eq!(config.request_timeout, Duration::from_secs(60));
assert_eq!(config.max_retries, 3);
}
#[test]
fn test_default_retry_delay() {
let delay1 = default_retry_delay(0, "error", "task");
let delay2 = default_retry_delay(1, "error", "task");
let delay3 = default_retry_delay(2, "error", "task");
assert!(delay1 >= Duration::from_secs(1));
assert!(delay2 >= Duration::from_secs(1));
assert!(delay3 >= Duration::from_secs(1));
let base_delay_0 = default_retry_delay(0, "error", "task");
let base_delay_5 = default_retry_delay(5, "error", "task");
assert!(base_delay_5.as_secs() >= base_delay_0.as_secs());
}
#[test]
fn test_aggregator_enabled_in_config() {
let config = ServerConfig::new().enable_group_aggregator(true);
assert!(config.group_aggregator_enabled);
}
#[test]
fn test_aggregator_disabled_by_default() {
let config = ServerConfig::default();
assert!(!config.group_aggregator_enabled);
}
#[test]
fn test_aggregator_config_with_group_settings() {
let config = ServerConfig::new()
.group_grace_period(Duration::from_secs(30))
.unwrap()
.group_max_delay(Duration::from_secs(120))
.group_max_size(50)
.enable_group_aggregator(true);
assert!(config.group_aggregator_enabled);
assert_eq!(config.group_grace_period, Duration::from_secs(30));
assert_eq!(config.group_max_delay, Some(Duration::from_secs(120)));
assert_eq!(config.group_max_size, Some(50));
}
#[test]
fn test_periodic_task_manager_enabled_in_config() {
let config = ServerConfig::new().enable_periodic_task_manager(true);
assert!(config.periodic_task_manager_enabled);
}
#[test]
fn test_periodic_task_manager_disabled_by_default() {
let config = ServerConfig::default();
assert!(!config.periodic_task_manager_enabled);
}
#[test]
fn test_periodic_task_manager_check_interval() {
let config = ServerConfig::new()
.periodic_task_manager_check_interval(Duration::from_secs(30))
.enable_periodic_task_manager(true);
assert!(config.periodic_task_manager_enabled);
assert_eq!(
config.periodic_task_manager_check_interval,
Duration::from_secs(30)
);
}
#[test]
fn test_acl_disabled_by_default() {
let server_config = ServerConfig::default();
assert!(server_config.acl_tenant.is_none());
let client_config = ClientConfig::default();
assert!(client_config.acl_tenant.is_none());
}
#[test]
fn test_server_config_acl_configuration() {
let config = ServerConfig::new().acl_tenant("tenant1");
assert_eq!(config.acl_tenant, Some("tenant1".to_string()));
}
#[test]
fn test_client_config_acl_configuration() {
let config = ClientConfig::new().acl_tenant("tenant1");
assert_eq!(config.acl_tenant, Some("tenant1".to_string()));
}
#[test]
fn test_server_config_queue_name_with_prefix() {
let config = ServerConfig::new().acl_tenant("tenant1");
assert_eq!(config.get_queue_name_with_prefix("default"), "default");
assert_eq!(
config.get_queue_name_with_prefix("critical"),
"tenant1:critical"
);
}
#[test]
fn test_server_config_queue_name_without_acl() {
let config = ServerConfig::new();
assert_eq!(config.get_queue_name_with_prefix("default"), "default");
assert_eq!(config.get_queue_name_with_prefix("critical"), "critical");
}
#[test]
fn test_server_config_get_queues_with_prefix() {
let mut queues = HashMap::new();
queues.insert("default".to_string(), 3);
queues.insert("critical".to_string(), 6);
let config = ServerConfig::new().queues(queues).acl_tenant("tenant1");
let prefixed_queues = config.get_queues_with_prefix();
assert_eq!(prefixed_queues.len(), 2);
assert_eq!(prefixed_queues.get("default"), Some(&3));
assert_eq!(prefixed_queues.get("tenant1:critical"), Some(&6));
assert!(!prefixed_queues.contains_key("tenant1:default"));
}
#[test]
fn test_server_config_get_queues_without_acl() {
let mut queues = HashMap::new();
queues.insert("default".to_string(), 3);
queues.insert("critical".to_string(), 6);
let config = ServerConfig::new().queues(queues.clone());
let result_queues = config.get_queues_with_prefix();
assert_eq!(result_queues, queues);
}
#[test]
fn test_client_config_queue_name_with_prefix() {
let config = ClientConfig::new().acl_tenant("tenant1");
assert_eq!(config.get_queue_name_with_prefix("default"), "default");
assert_eq!(
config.get_queue_name_with_prefix("critical"),
"tenant1:critical"
);
}
#[test]
fn test_client_config_queue_name_without_acl() {
let config = ClientConfig::new();
assert_eq!(config.get_queue_name_with_prefix("default"), "default");
assert_eq!(config.get_queue_name_with_prefix("critical"), "critical");
}
#[test]
fn test_default_queue_not_prefixed() {
let server_config = ServerConfig::new().acl_tenant("tenant1");
assert_eq!(
server_config.get_queue_name_with_prefix(DEFAULT_QUEUE_NAME),
DEFAULT_QUEUE_NAME
);
let client_config = ClientConfig::new().acl_tenant("tenant1");
assert_eq!(
client_config.get_queue_name_with_prefix(DEFAULT_QUEUE_NAME),
DEFAULT_QUEUE_NAME
);
}
}