celers_cli/
config.rs

1//! Configuration file support for `CeleRS` CLI.
2//!
3//! This module provides configuration management for the `CeleRS` CLI tool, including:
4//! - TOML file parsing and validation
5//! - Environment variable expansion
6//! - Configuration profiles (dev, staging, prod)
7//! - Broker and worker settings
8//! - Auto-scaling and alerting configuration
9//!
10//! # Configuration File Format
11//!
12//! The configuration file uses TOML format and supports the following sections:
13//! - `[broker]`: Broker connection settings (Redis, `PostgreSQL`, etc.)
14//! - `[worker]`: Worker runtime settings (concurrency, retries, timeouts)
15//! - `[autoscale]`: Auto-scaling configuration
16//! - `[alerts]`: Alert and notification settings
17//!
18//! # Environment Variables
19//!
20//! Configuration values can reference environment variables using the syntax:
21//! - `${VAR_NAME}` - Required environment variable
22//! - `${VAR_NAME:default}` - Optional with default value
23//!
24//! # Examples
25//!
26//! ```toml
27//! [broker]
28//! type = "redis"
29//! url = "${REDIS_URL:redis://localhost:6379}"
30//! queue = "my_queue"
31//!
32//! [worker]
33//! concurrency = 4
34//! max_retries = 3
35//! ```
36//!
37//! # Usage
38//!
39//! ```no_run
40//! use celers_cli::config::Config;
41//! use std::path::PathBuf;
42//!
43//! # fn main() -> anyhow::Result<()> {
44//! // Load from file
45//! let config = Config::from_file(PathBuf::from("celers.toml"))?;
46//!
47//! // Get default configuration
48//! let default_config = Config::default_config();
49//!
50//! // Save to file
51//! default_config.to_file("celers.toml")?;
52//! # Ok(())
53//! # }
54//! ```
55
56use serde::{Deserialize, Serialize};
57use std::env;
58use std::path::Path;
59
60/// Main CLI configuration structure.
61///
62/// Contains all settings for broker connection, worker configuration,
63/// auto-scaling, and alerting. Can be loaded from TOML files with
64/// environment variable support.
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct Config {
67    /// Configuration profile name (dev, staging, prod, etc.)
68    #[serde(default)]
69    pub profile: Option<String>,
70
71    /// Broker configuration
72    pub broker: BrokerConfig,
73
74    /// Worker configuration
75    #[serde(default)]
76    pub worker: WorkerConfig,
77
78    /// Queue names
79    #[serde(default)]
80    pub queues: Vec<String>,
81
82    /// Auto-scaling configuration
83    #[serde(default)]
84    pub autoscale: Option<AutoScaleConfig>,
85
86    /// Alert configuration
87    #[serde(default)]
88    pub alerts: Option<AlertConfig>,
89}
90
91/// Auto-scaling configuration
92#[derive(Debug, Clone, Serialize, Deserialize)]
93pub struct AutoScaleConfig {
94    /// Enable auto-scaling
95    #[serde(default)]
96    pub enabled: bool,
97
98    /// Minimum number of workers
99    #[serde(default = "default_min_workers")]
100    pub min_workers: usize,
101
102    /// Maximum number of workers
103    #[serde(default = "default_max_workers")]
104    pub max_workers: usize,
105
106    /// Queue depth threshold for scaling up
107    #[serde(default = "default_scale_up_threshold")]
108    pub scale_up_threshold: usize,
109
110    /// Queue depth threshold for scaling down
111    #[serde(default = "default_scale_down_threshold")]
112    pub scale_down_threshold: usize,
113
114    /// Check interval in seconds
115    #[serde(default = "default_autoscale_check_interval")]
116    pub check_interval_secs: u64,
117}
118
119/// Alert configuration
120#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct AlertConfig {
122    /// Enable alerts
123    #[serde(default)]
124    pub enabled: bool,
125
126    /// Webhook URL for notifications
127    pub webhook_url: Option<String>,
128
129    /// DLQ size threshold for alerts
130    #[serde(default = "default_dlq_threshold")]
131    pub dlq_threshold: usize,
132
133    /// Failed task threshold for alerts
134    #[serde(default = "default_failed_threshold")]
135    pub failed_threshold: usize,
136
137    /// Alert check interval in seconds
138    #[serde(default = "default_alert_check_interval")]
139    pub check_interval_secs: u64,
140}
141
142/// Broker configuration
143#[derive(Debug, Clone, Serialize, Deserialize)]
144pub struct BrokerConfig {
145    /// Broker type (redis or postgres)
146    #[serde(rename = "type")]
147    pub broker_type: String,
148
149    /// Connection URL
150    pub url: String,
151
152    /// Failover broker URLs (optional)
153    #[serde(default)]
154    pub failover_urls: Vec<String>,
155
156    /// Failover retry attempts
157    #[serde(default = "default_failover_retries")]
158    pub failover_retries: u32,
159
160    /// Failover timeout in seconds
161    #[serde(default = "default_failover_timeout")]
162    pub failover_timeout_secs: u64,
163
164    /// Default queue name
165    #[serde(default = "default_queue_name")]
166    pub queue: String,
167
168    /// Queue mode (fifo or priority)
169    #[serde(default = "default_queue_mode")]
170    pub mode: String,
171}
172
173/// Worker configuration
174#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct WorkerConfig {
176    /// Number of concurrent tasks
177    #[serde(default = "default_concurrency")]
178    pub concurrency: usize,
179
180    /// Poll interval in milliseconds
181    #[serde(default = "default_poll_interval")]
182    pub poll_interval_ms: u64,
183
184    /// Maximum number of retries
185    #[serde(default = "default_max_retries")]
186    pub max_retries: u32,
187
188    /// Default task timeout in seconds
189    #[serde(default = "default_timeout")]
190    pub default_timeout_secs: u64,
191}
192
193impl Default for WorkerConfig {
194    fn default() -> Self {
195        Self {
196            concurrency: default_concurrency(),
197            poll_interval_ms: default_poll_interval(),
198            max_retries: default_max_retries(),
199            default_timeout_secs: default_timeout(),
200        }
201    }
202}
203
204fn default_queue_name() -> String {
205    "celers".to_string()
206}
207
208fn default_queue_mode() -> String {
209    "fifo".to_string()
210}
211
212fn default_concurrency() -> usize {
213    4
214}
215
216fn default_poll_interval() -> u64 {
217    1000
218}
219
220fn default_max_retries() -> u32 {
221    3
222}
223
224fn default_timeout() -> u64 {
225    300
226}
227
228fn default_failover_retries() -> u32 {
229    3
230}
231
232fn default_failover_timeout() -> u64 {
233    5
234}
235
236fn default_min_workers() -> usize {
237    1
238}
239
240fn default_max_workers() -> usize {
241    10
242}
243
244fn default_scale_up_threshold() -> usize {
245    100
246}
247
248fn default_scale_down_threshold() -> usize {
249    10
250}
251
252fn default_autoscale_check_interval() -> u64 {
253    30
254}
255
256fn default_dlq_threshold() -> usize {
257    50
258}
259
260fn default_failed_threshold() -> usize {
261    100
262}
263
264fn default_alert_check_interval() -> u64 {
265    60
266}
267
268/// Expand environment variables in a string
269/// Supports ${VAR} and ${`VAR:default_value`} syntax
270fn expand_env_vars(s: &str) -> String {
271    let mut result = s.to_string();
272    let mut start_idx = 0;
273
274    while let Some(start) = result[start_idx..].find("${") {
275        let start = start_idx + start;
276        if let Some(end) = result[start..].find('}') {
277            let end = start + end;
278            let var_expr = &result[start + 2..end];
279
280            // Parse variable name and default value
281            let (var_name, default_value) = if let Some(colon_idx) = var_expr.find(':') {
282                let var_name = &var_expr[..colon_idx];
283                let default = &var_expr[colon_idx + 1..];
284                (var_name, Some(default))
285            } else {
286                (var_expr, None)
287            };
288
289            // Get environment variable value or use default
290            let value = env::var(var_name)
291                .ok()
292                .or_else(|| default_value.map(String::from));
293
294            if let Some(value) = value {
295                result.replace_range(start..=end, &value);
296                start_idx = start + value.len();
297            } else {
298                // No value found and no default, keep original and move forward
299                start_idx = end + 1;
300            }
301        } else {
302            break;
303        }
304    }
305
306    result
307}
308
309impl Config {
310    /// Load configuration from a TOML file with environment variable expansion
311    pub fn from_file<P: AsRef<Path>>(path: P) -> anyhow::Result<Self> {
312        let content = std::fs::read_to_string(path)?;
313        let expanded_content = expand_env_vars(&content);
314        let config: Config = toml::from_str(&expanded_content)?;
315        Ok(config)
316    }
317
318    /// Save configuration to a TOML file
319    pub fn to_file<P: AsRef<Path>>(&self, path: P) -> anyhow::Result<()> {
320        let content = toml::to_string_pretty(self)?;
321        std::fs::write(path, content)?;
322        Ok(())
323    }
324
325    /// Create a default configuration file
326    pub fn default_config() -> Self {
327        Self {
328            profile: None,
329            broker: BrokerConfig {
330                broker_type: "redis".to_string(),
331                url: "redis://localhost:6379".to_string(),
332                failover_urls: vec![],
333                failover_retries: default_failover_retries(),
334                failover_timeout_secs: default_failover_timeout(),
335                queue: "celers".to_string(),
336                mode: "fifo".to_string(),
337            },
338            worker: WorkerConfig::default(),
339            queues: vec!["celers".to_string()],
340            autoscale: None,
341            alerts: None,
342        }
343    }
344
345    /// Load configuration for a specific profile
346    #[allow(dead_code)]
347    pub fn from_file_with_profile<P: AsRef<Path>>(path: P, profile: &str) -> anyhow::Result<Self> {
348        let base_config = Self::from_file(&path)?;
349
350        // Try to load profile-specific configuration
351        let profile_path = path
352            .as_ref()
353            .parent()
354            .unwrap_or_else(|| Path::new("."))
355            .join(format!("celers.{profile}.toml"));
356
357        if profile_path.exists() {
358            let profile_config = Self::from_file(profile_path)?;
359            Ok(base_config.merge_with(profile_config))
360        } else {
361            Ok(base_config)
362        }
363    }
364
365    /// Merge this configuration with another, with the other taking precedence
366    #[allow(dead_code)]
367    fn merge_with(mut self, other: Self) -> Self {
368        // Merge broker config
369        if !other.broker.url.is_empty() {
370            self.broker.url = other.broker.url;
371        }
372        if !other.broker.broker_type.is_empty() {
373            self.broker.broker_type = other.broker.broker_type;
374        }
375        if !other.broker.failover_urls.is_empty() {
376            self.broker.failover_urls = other.broker.failover_urls;
377        }
378
379        // Merge worker config
380        if other.worker.concurrency > 0 {
381            self.worker.concurrency = other.worker.concurrency;
382        }
383
384        // Merge queues
385        if !other.queues.is_empty() {
386            self.queues = other.queues;
387        }
388
389        // Merge autoscale
390        if other.autoscale.is_some() {
391            self.autoscale = other.autoscale;
392        }
393
394        // Merge alerts
395        if other.alerts.is_some() {
396            self.alerts = other.alerts;
397        }
398
399        self
400    }
401
402    /// Validate configuration settings
403    pub fn validate(&self) -> anyhow::Result<Vec<String>> {
404        let mut warnings = Vec::new();
405
406        // Validate broker type
407        let valid_broker_types = [
408            "redis",
409            "postgres",
410            "postgresql",
411            "mysql",
412            "amqp",
413            "rabbitmq",
414            "sqs",
415        ];
416        if !valid_broker_types.contains(&self.broker.broker_type.to_lowercase().as_str()) {
417            warnings.push(format!(
418                "Unknown broker type '{}'. Supported types: {}",
419                self.broker.broker_type,
420                valid_broker_types.join(", ")
421            ));
422        }
423
424        // Validate queue mode
425        if self.broker.mode != "fifo" && self.broker.mode != "priority" {
426            warnings.push(format!(
427                "Unknown queue mode '{}'. Expected 'fifo' or 'priority'",
428                self.broker.mode
429            ));
430        }
431
432        // Validate worker configuration
433        if self.worker.concurrency == 0 {
434            warnings.push("Concurrency is 0 - worker will not process any tasks".to_string());
435        } else if self.worker.concurrency > 100 {
436            warnings.push(format!(
437                "High concurrency ({}) may cause resource exhaustion",
438                self.worker.concurrency
439            ));
440        }
441
442        if self.worker.poll_interval_ms < 100 {
443            warnings.push("Very low poll interval may cause excessive CPU usage".to_string());
444        }
445
446        // Validate autoscale configuration
447        if let Some(ref autoscale) = self.autoscale {
448            if autoscale.enabled {
449                if autoscale.min_workers == 0 {
450                    warnings.push("Autoscale min_workers is 0".to_string());
451                }
452                if autoscale.max_workers < autoscale.min_workers {
453                    warnings.push(format!(
454                        "Autoscale max_workers ({}) is less than min_workers ({})",
455                        autoscale.max_workers, autoscale.min_workers
456                    ));
457                }
458                if autoscale.scale_down_threshold >= autoscale.scale_up_threshold {
459                    warnings.push(
460                        "Autoscale scale_down_threshold should be less than scale_up_threshold"
461                            .to_string(),
462                    );
463                }
464            }
465        }
466
467        // Validate alert configuration
468        if let Some(ref alerts) = self.alerts {
469            if alerts.enabled && alerts.webhook_url.is_none() {
470                warnings.push("Alerts enabled but no webhook_url configured".to_string());
471            }
472        }
473
474        Ok(warnings)
475    }
476}
477
478#[cfg(test)]
479mod tests {
480    use super::*;
481
482    #[test]
483    fn test_default_config() {
484        let config = Config::default_config();
485        assert_eq!(config.profile, None);
486        assert_eq!(config.broker.broker_type, "redis");
487        assert_eq!(config.broker.url, "redis://localhost:6379");
488        assert_eq!(config.broker.queue, "celers");
489        assert_eq!(config.broker.mode, "fifo");
490        assert_eq!(config.broker.failover_urls, Vec::<String>::new());
491        assert_eq!(config.broker.failover_retries, 3);
492        assert_eq!(config.broker.failover_timeout_secs, 5);
493        assert_eq!(config.worker.concurrency, 4);
494        assert_eq!(config.worker.poll_interval_ms, 1000);
495        assert_eq!(config.worker.max_retries, 3);
496        assert_eq!(config.worker.default_timeout_secs, 300);
497        assert_eq!(config.queues, vec!["celers"]);
498        assert!(config.autoscale.is_none());
499        assert!(config.alerts.is_none());
500    }
501
502    #[test]
503    fn test_config_serialization() {
504        let config = Config::default_config();
505        let toml_str = toml::to_string(&config).unwrap();
506        assert!(toml_str.contains("type = \"redis\""));
507        assert!(toml_str.contains("url = \"redis://localhost:6379\""));
508        assert!(toml_str.contains("concurrency = 4"));
509    }
510
511    #[test]
512    fn test_config_deserialization() {
513        let toml_str = r#"
514queues = ["queue1", "queue2"]
515
516[broker]
517type = "redis"
518url = "redis://127.0.0.1:6379"
519queue = "test_queue"
520mode = "priority"
521
522[worker]
523concurrency = 8
524poll_interval_ms = 500
525max_retries = 5
526default_timeout_secs = 600
527        "#;
528
529        let config: Config = toml::from_str(toml_str).unwrap();
530        assert_eq!(config.broker.broker_type, "redis");
531        assert_eq!(config.broker.url, "redis://127.0.0.1:6379");
532        assert_eq!(config.broker.queue, "test_queue");
533        assert_eq!(config.broker.mode, "priority");
534        assert_eq!(config.worker.concurrency, 8);
535        assert_eq!(config.worker.poll_interval_ms, 500);
536        assert_eq!(config.worker.max_retries, 5);
537        assert_eq!(config.worker.default_timeout_secs, 600);
538        assert_eq!(config.queues, vec!["queue1", "queue2"]);
539    }
540
541    #[test]
542    fn test_config_defaults() {
543        let toml_str = r#"
544            [broker]
545            type = "redis"
546            url = "redis://localhost:6379"
547        "#;
548
549        let config: Config = toml::from_str(toml_str).unwrap();
550        assert_eq!(config.broker.queue, "celers");
551        assert_eq!(config.broker.mode, "fifo");
552        assert_eq!(config.worker.concurrency, 4);
553        assert_eq!(config.worker.poll_interval_ms, 1000);
554    }
555
556    #[test]
557    fn test_config_validation_valid() {
558        let config = Config::default_config();
559        let warnings = config.validate().unwrap();
560        assert!(warnings.is_empty());
561    }
562
563    #[test]
564    fn test_config_validation_invalid_broker_type() {
565        let mut config = Config::default_config();
566        config.broker.broker_type = "invalid".to_string();
567        let warnings = config.validate().unwrap();
568        assert_eq!(warnings.len(), 1);
569        assert!(warnings[0].contains("Unknown broker type"));
570    }
571
572    #[test]
573    fn test_config_validation_invalid_queue_mode() {
574        let mut config = Config::default_config();
575        config.broker.mode = "invalid".to_string();
576        let warnings = config.validate().unwrap();
577        assert_eq!(warnings.len(), 1);
578        assert!(warnings[0].contains("Unknown queue mode"));
579    }
580
581    #[test]
582    fn test_config_validation_zero_concurrency() {
583        let mut config = Config::default_config();
584        config.worker.concurrency = 0;
585        let warnings = config.validate().unwrap();
586        assert_eq!(warnings.len(), 1);
587        assert!(warnings[0].contains("Concurrency is 0"));
588    }
589
590    #[test]
591    fn test_config_validation_high_concurrency() {
592        let mut config = Config::default_config();
593        config.worker.concurrency = 150;
594        let warnings = config.validate().unwrap();
595        assert_eq!(warnings.len(), 1);
596        assert!(warnings[0].contains("High concurrency"));
597    }
598
599    #[test]
600    fn test_config_validation_low_poll_interval() {
601        let mut config = Config::default_config();
602        config.worker.poll_interval_ms = 50;
603        let warnings = config.validate().unwrap();
604        assert_eq!(warnings.len(), 1);
605        assert!(warnings[0].contains("Very low poll interval"));
606    }
607
608    #[test]
609    fn test_config_validation_multiple_issues() {
610        let mut config = Config::default_config();
611        config.broker.broker_type = "unknown".to_string();
612        config.broker.mode = "invalid_mode".to_string();
613        config.worker.concurrency = 0;
614        config.worker.poll_interval_ms = 50;
615
616        let warnings = config.validate().unwrap();
617        assert_eq!(warnings.len(), 4);
618    }
619
620    #[test]
621    fn test_config_file_roundtrip() {
622        let temp_file = tempfile::NamedTempFile::new().unwrap();
623        let temp_path = temp_file.path();
624
625        let config = Config::default_config();
626        config.to_file(temp_path).unwrap();
627
628        let loaded_config = Config::from_file(temp_path).unwrap();
629        assert_eq!(config.broker.broker_type, loaded_config.broker.broker_type);
630        assert_eq!(config.broker.url, loaded_config.broker.url);
631        assert_eq!(config.worker.concurrency, loaded_config.worker.concurrency);
632    }
633
634    #[test]
635    fn test_worker_config_default() {
636        let worker_config = WorkerConfig::default();
637        assert_eq!(worker_config.concurrency, 4);
638        assert_eq!(worker_config.poll_interval_ms, 1000);
639        assert_eq!(worker_config.max_retries, 3);
640        assert_eq!(worker_config.default_timeout_secs, 300);
641    }
642
643    #[test]
644    fn test_expand_env_vars_simple() {
645        env::set_var("TEST_VAR", "test_value");
646        let result = super::expand_env_vars("prefix ${TEST_VAR} suffix");
647        assert_eq!(result, "prefix test_value suffix");
648        env::remove_var("TEST_VAR");
649    }
650
651    #[test]
652    fn test_expand_env_vars_with_default() {
653        env::remove_var("MISSING_VAR");
654        let result = super::expand_env_vars("value is ${MISSING_VAR:default_value}");
655        assert_eq!(result, "value is default_value");
656    }
657
658    #[test]
659    fn test_expand_env_vars_without_default() {
660        env::remove_var("MISSING_VAR");
661        let result = super::expand_env_vars("value is ${MISSING_VAR}");
662        assert_eq!(result, "value is ${MISSING_VAR}");
663    }
664
665    #[test]
666    fn test_expand_env_vars_multiple() {
667        env::set_var("VAR1", "value1");
668        env::set_var("VAR2", "value2");
669        let result = super::expand_env_vars("${VAR1} and ${VAR2}");
670        assert_eq!(result, "value1 and value2");
671        env::remove_var("VAR1");
672        env::remove_var("VAR2");
673    }
674
675    #[test]
676    fn test_expand_env_vars_mixed() {
677        env::set_var("EXISTING_VAR", "exists");
678        env::remove_var("MISSING_VAR");
679        let result = super::expand_env_vars("${EXISTING_VAR} and ${MISSING_VAR:default}");
680        assert_eq!(result, "exists and default");
681        env::remove_var("EXISTING_VAR");
682    }
683
684    #[test]
685    fn test_config_with_env_vars() {
686        env::set_var("TEST_REDIS_HOST", "localhost");
687        env::set_var("TEST_REDIS_PORT", "6379");
688
689        let toml_str = r#"
690[broker]
691type = "redis"
692url = "redis://${TEST_REDIS_HOST}:${TEST_REDIS_PORT}"
693queue = "celers"
694mode = "fifo"
695        "#;
696
697        let expanded = super::expand_env_vars(toml_str);
698        assert!(
699            expanded.contains("redis://localhost:6379"),
700            "Expanded content: {}",
701            expanded
702        );
703
704        let config: Config = toml::from_str(&expanded).unwrap();
705        assert_eq!(config.broker.url, "redis://localhost:6379");
706
707        env::remove_var("TEST_REDIS_HOST");
708        env::remove_var("TEST_REDIS_PORT");
709    }
710
711    #[test]
712    fn test_config_with_env_vars_and_defaults() {
713        env::remove_var("REDIS_HOST");
714
715        let toml_str = r#"
716[broker]
717type = "redis"
718url = "redis://${REDIS_HOST:localhost}:${REDIS_PORT:6379}"
719queue = "celers"
720mode = "fifo"
721        "#;
722
723        let expanded = super::expand_env_vars(toml_str);
724        assert!(expanded.contains("redis://localhost:6379"));
725
726        let config: Config = toml::from_str(&expanded).unwrap();
727        assert_eq!(config.broker.url, "redis://localhost:6379");
728    }
729
730    #[test]
731    fn test_broker_failover_config() {
732        let toml_str = r#"
733[broker]
734type = "redis"
735url = "redis://primary:6379"
736failover_urls = ["redis://backup1:6379", "redis://backup2:6379"]
737failover_retries = 5
738failover_timeout_secs = 10
739queue = "celers"
740mode = "fifo"
741        "#;
742
743        let config: Config = toml::from_str(toml_str).unwrap();
744        assert_eq!(config.broker.failover_urls.len(), 2);
745        assert_eq!(config.broker.failover_urls[0], "redis://backup1:6379");
746        assert_eq!(config.broker.failover_urls[1], "redis://backup2:6379");
747        assert_eq!(config.broker.failover_retries, 5);
748        assert_eq!(config.broker.failover_timeout_secs, 10);
749    }
750
751    #[test]
752    fn test_autoscale_config() {
753        let toml_str = r#"
754[broker]
755type = "redis"
756url = "redis://localhost:6379"
757
758[autoscale]
759enabled = true
760min_workers = 2
761max_workers = 20
762scale_up_threshold = 200
763scale_down_threshold = 20
764check_interval_secs = 60
765        "#;
766
767        let config: Config = toml::from_str(toml_str).unwrap();
768        assert!(config.autoscale.is_some());
769        let autoscale = config.autoscale.unwrap();
770        assert!(autoscale.enabled);
771        assert_eq!(autoscale.min_workers, 2);
772        assert_eq!(autoscale.max_workers, 20);
773        assert_eq!(autoscale.scale_up_threshold, 200);
774        assert_eq!(autoscale.scale_down_threshold, 20);
775        assert_eq!(autoscale.check_interval_secs, 60);
776    }
777
778    #[test]
779    fn test_alert_config() {
780        let toml_str = r#"
781[broker]
782type = "redis"
783url = "redis://localhost:6379"
784
785[alerts]
786enabled = true
787webhook_url = "https://hooks.slack.com/services/xxx"
788dlq_threshold = 100
789failed_threshold = 200
790check_interval_secs = 120
791        "#;
792
793        let config: Config = toml::from_str(toml_str).unwrap();
794        assert!(config.alerts.is_some());
795        let alerts = config.alerts.unwrap();
796        assert!(alerts.enabled);
797        assert_eq!(
798            alerts.webhook_url,
799            Some("https://hooks.slack.com/services/xxx".to_string())
800        );
801        assert_eq!(alerts.dlq_threshold, 100);
802        assert_eq!(alerts.failed_threshold, 200);
803        assert_eq!(alerts.check_interval_secs, 120);
804    }
805
806    #[test]
807    fn test_config_validation_autoscale_invalid() {
808        let mut config = Config::default_config();
809        config.autoscale = Some(AutoScaleConfig {
810            enabled: true,
811            min_workers: 0,
812            max_workers: 5,
813            scale_up_threshold: 100,
814            scale_down_threshold: 10,
815            check_interval_secs: 30,
816        });
817
818        let warnings = config.validate().unwrap();
819        assert!(warnings.iter().any(|w| w.contains("min_workers is 0")));
820    }
821
822    #[test]
823    fn test_config_validation_autoscale_max_less_than_min() {
824        let mut config = Config::default_config();
825        config.autoscale = Some(AutoScaleConfig {
826            enabled: true,
827            min_workers: 10,
828            max_workers: 5,
829            scale_up_threshold: 100,
830            scale_down_threshold: 10,
831            check_interval_secs: 30,
832        });
833
834        let warnings = config.validate().unwrap();
835        assert!(warnings
836            .iter()
837            .any(|w| w.contains("max_workers") && w.contains("min_workers")));
838    }
839
840    #[test]
841    fn test_config_validation_autoscale_threshold_invalid() {
842        let mut config = Config::default_config();
843        config.autoscale = Some(AutoScaleConfig {
844            enabled: true,
845            min_workers: 1,
846            max_workers: 10,
847            scale_up_threshold: 50,
848            scale_down_threshold: 100,
849            check_interval_secs: 30,
850        });
851
852        let warnings = config.validate().unwrap();
853        assert!(warnings.iter().any(|w| w.contains("scale_down_threshold")));
854    }
855
856    #[test]
857    fn test_config_validation_alert_no_webhook() {
858        let mut config = Config::default_config();
859        config.alerts = Some(AlertConfig {
860            enabled: true,
861            webhook_url: None,
862            dlq_threshold: 50,
863            failed_threshold: 100,
864            check_interval_secs: 60,
865        });
866
867        let warnings = config.validate().unwrap();
868        assert!(warnings.iter().any(|w| w.contains("webhook_url")));
869    }
870
871    #[test]
872    fn test_profile_config() {
873        let toml_str = r#"
874profile = "production"
875
876[broker]
877type = "redis"
878url = "redis://localhost:6379"
879        "#;
880
881        let config: Config = toml::from_str(toml_str).unwrap();
882        assert_eq!(config.profile, Some("production".to_string()));
883    }
884}