1use serde::{Deserialize, Serialize};
57use std::env;
58use std::path::Path;
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct Config {
67 #[serde(default)]
69 pub profile: Option<String>,
70
71 pub broker: BrokerConfig,
73
74 #[serde(default)]
76 pub worker: WorkerConfig,
77
78 #[serde(default)]
80 pub queues: Vec<String>,
81
82 #[serde(default)]
84 pub autoscale: Option<AutoScaleConfig>,
85
86 #[serde(default)]
88 pub alerts: Option<AlertConfig>,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize)]
93pub struct AutoScaleConfig {
94 #[serde(default)]
96 pub enabled: bool,
97
98 #[serde(default = "default_min_workers")]
100 pub min_workers: usize,
101
102 #[serde(default = "default_max_workers")]
104 pub max_workers: usize,
105
106 #[serde(default = "default_scale_up_threshold")]
108 pub scale_up_threshold: usize,
109
110 #[serde(default = "default_scale_down_threshold")]
112 pub scale_down_threshold: usize,
113
114 #[serde(default = "default_autoscale_check_interval")]
116 pub check_interval_secs: u64,
117}
118
119#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct AlertConfig {
122 #[serde(default)]
124 pub enabled: bool,
125
126 pub webhook_url: Option<String>,
128
129 #[serde(default = "default_dlq_threshold")]
131 pub dlq_threshold: usize,
132
133 #[serde(default = "default_failed_threshold")]
135 pub failed_threshold: usize,
136
137 #[serde(default = "default_alert_check_interval")]
139 pub check_interval_secs: u64,
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
144pub struct BrokerConfig {
145 #[serde(rename = "type")]
147 pub broker_type: String,
148
149 pub url: String,
151
152 #[serde(default)]
154 pub failover_urls: Vec<String>,
155
156 #[serde(default = "default_failover_retries")]
158 pub failover_retries: u32,
159
160 #[serde(default = "default_failover_timeout")]
162 pub failover_timeout_secs: u64,
163
164 #[serde(default = "default_queue_name")]
166 pub queue: String,
167
168 #[serde(default = "default_queue_mode")]
170 pub mode: String,
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize)]
175pub struct WorkerConfig {
176 #[serde(default = "default_concurrency")]
178 pub concurrency: usize,
179
180 #[serde(default = "default_poll_interval")]
182 pub poll_interval_ms: u64,
183
184 #[serde(default = "default_max_retries")]
186 pub max_retries: u32,
187
188 #[serde(default = "default_timeout")]
190 pub default_timeout_secs: u64,
191}
192
193impl Default for WorkerConfig {
194 fn default() -> Self {
195 Self {
196 concurrency: default_concurrency(),
197 poll_interval_ms: default_poll_interval(),
198 max_retries: default_max_retries(),
199 default_timeout_secs: default_timeout(),
200 }
201 }
202}
203
204fn default_queue_name() -> String {
205 "celers".to_string()
206}
207
208fn default_queue_mode() -> String {
209 "fifo".to_string()
210}
211
212fn default_concurrency() -> usize {
213 4
214}
215
216fn default_poll_interval() -> u64 {
217 1000
218}
219
220fn default_max_retries() -> u32 {
221 3
222}
223
224fn default_timeout() -> u64 {
225 300
226}
227
228fn default_failover_retries() -> u32 {
229 3
230}
231
232fn default_failover_timeout() -> u64 {
233 5
234}
235
236fn default_min_workers() -> usize {
237 1
238}
239
240fn default_max_workers() -> usize {
241 10
242}
243
244fn default_scale_up_threshold() -> usize {
245 100
246}
247
248fn default_scale_down_threshold() -> usize {
249 10
250}
251
252fn default_autoscale_check_interval() -> u64 {
253 30
254}
255
256fn default_dlq_threshold() -> usize {
257 50
258}
259
260fn default_failed_threshold() -> usize {
261 100
262}
263
264fn default_alert_check_interval() -> u64 {
265 60
266}
267
268fn expand_env_vars(s: &str) -> String {
271 let mut result = s.to_string();
272 let mut start_idx = 0;
273
274 while let Some(start) = result[start_idx..].find("${") {
275 let start = start_idx + start;
276 if let Some(end) = result[start..].find('}') {
277 let end = start + end;
278 let var_expr = &result[start + 2..end];
279
280 let (var_name, default_value) = if let Some(colon_idx) = var_expr.find(':') {
282 let var_name = &var_expr[..colon_idx];
283 let default = &var_expr[colon_idx + 1..];
284 (var_name, Some(default))
285 } else {
286 (var_expr, None)
287 };
288
289 let value = env::var(var_name)
291 .ok()
292 .or_else(|| default_value.map(String::from));
293
294 if let Some(value) = value {
295 result.replace_range(start..=end, &value);
296 start_idx = start + value.len();
297 } else {
298 start_idx = end + 1;
300 }
301 } else {
302 break;
303 }
304 }
305
306 result
307}
308
309impl Config {
310 pub fn from_file<P: AsRef<Path>>(path: P) -> anyhow::Result<Self> {
312 let content = std::fs::read_to_string(path)?;
313 let expanded_content = expand_env_vars(&content);
314 let config: Config = toml::from_str(&expanded_content)?;
315 Ok(config)
316 }
317
318 pub fn to_file<P: AsRef<Path>>(&self, path: P) -> anyhow::Result<()> {
320 let content = toml::to_string_pretty(self)?;
321 std::fs::write(path, content)?;
322 Ok(())
323 }
324
325 pub fn default_config() -> Self {
327 Self {
328 profile: None,
329 broker: BrokerConfig {
330 broker_type: "redis".to_string(),
331 url: "redis://localhost:6379".to_string(),
332 failover_urls: vec![],
333 failover_retries: default_failover_retries(),
334 failover_timeout_secs: default_failover_timeout(),
335 queue: "celers".to_string(),
336 mode: "fifo".to_string(),
337 },
338 worker: WorkerConfig::default(),
339 queues: vec!["celers".to_string()],
340 autoscale: None,
341 alerts: None,
342 }
343 }
344
345 #[allow(dead_code)]
347 pub fn from_file_with_profile<P: AsRef<Path>>(path: P, profile: &str) -> anyhow::Result<Self> {
348 let base_config = Self::from_file(&path)?;
349
350 let profile_path = path
352 .as_ref()
353 .parent()
354 .unwrap_or_else(|| Path::new("."))
355 .join(format!("celers.{profile}.toml"));
356
357 if profile_path.exists() {
358 let profile_config = Self::from_file(profile_path)?;
359 Ok(base_config.merge_with(profile_config))
360 } else {
361 Ok(base_config)
362 }
363 }
364
365 #[allow(dead_code)]
367 fn merge_with(mut self, other: Self) -> Self {
368 if !other.broker.url.is_empty() {
370 self.broker.url = other.broker.url;
371 }
372 if !other.broker.broker_type.is_empty() {
373 self.broker.broker_type = other.broker.broker_type;
374 }
375 if !other.broker.failover_urls.is_empty() {
376 self.broker.failover_urls = other.broker.failover_urls;
377 }
378
379 if other.worker.concurrency > 0 {
381 self.worker.concurrency = other.worker.concurrency;
382 }
383
384 if !other.queues.is_empty() {
386 self.queues = other.queues;
387 }
388
389 if other.autoscale.is_some() {
391 self.autoscale = other.autoscale;
392 }
393
394 if other.alerts.is_some() {
396 self.alerts = other.alerts;
397 }
398
399 self
400 }
401
402 pub fn validate(&self) -> anyhow::Result<Vec<String>> {
404 let mut warnings = Vec::new();
405
406 let valid_broker_types = [
408 "redis",
409 "postgres",
410 "postgresql",
411 "mysql",
412 "amqp",
413 "rabbitmq",
414 "sqs",
415 ];
416 if !valid_broker_types.contains(&self.broker.broker_type.to_lowercase().as_str()) {
417 warnings.push(format!(
418 "Unknown broker type '{}'. Supported types: {}",
419 self.broker.broker_type,
420 valid_broker_types.join(", ")
421 ));
422 }
423
424 if self.broker.mode != "fifo" && self.broker.mode != "priority" {
426 warnings.push(format!(
427 "Unknown queue mode '{}'. Expected 'fifo' or 'priority'",
428 self.broker.mode
429 ));
430 }
431
432 if self.worker.concurrency == 0 {
434 warnings.push("Concurrency is 0 - worker will not process any tasks".to_string());
435 } else if self.worker.concurrency > 100 {
436 warnings.push(format!(
437 "High concurrency ({}) may cause resource exhaustion",
438 self.worker.concurrency
439 ));
440 }
441
442 if self.worker.poll_interval_ms < 100 {
443 warnings.push("Very low poll interval may cause excessive CPU usage".to_string());
444 }
445
446 if let Some(ref autoscale) = self.autoscale {
448 if autoscale.enabled {
449 if autoscale.min_workers == 0 {
450 warnings.push("Autoscale min_workers is 0".to_string());
451 }
452 if autoscale.max_workers < autoscale.min_workers {
453 warnings.push(format!(
454 "Autoscale max_workers ({}) is less than min_workers ({})",
455 autoscale.max_workers, autoscale.min_workers
456 ));
457 }
458 if autoscale.scale_down_threshold >= autoscale.scale_up_threshold {
459 warnings.push(
460 "Autoscale scale_down_threshold should be less than scale_up_threshold"
461 .to_string(),
462 );
463 }
464 }
465 }
466
467 if let Some(ref alerts) = self.alerts {
469 if alerts.enabled && alerts.webhook_url.is_none() {
470 warnings.push("Alerts enabled but no webhook_url configured".to_string());
471 }
472 }
473
474 Ok(warnings)
475 }
476}
477
478#[cfg(test)]
479mod tests {
480 use super::*;
481
482 #[test]
483 fn test_default_config() {
484 let config = Config::default_config();
485 assert_eq!(config.profile, None);
486 assert_eq!(config.broker.broker_type, "redis");
487 assert_eq!(config.broker.url, "redis://localhost:6379");
488 assert_eq!(config.broker.queue, "celers");
489 assert_eq!(config.broker.mode, "fifo");
490 assert_eq!(config.broker.failover_urls, Vec::<String>::new());
491 assert_eq!(config.broker.failover_retries, 3);
492 assert_eq!(config.broker.failover_timeout_secs, 5);
493 assert_eq!(config.worker.concurrency, 4);
494 assert_eq!(config.worker.poll_interval_ms, 1000);
495 assert_eq!(config.worker.max_retries, 3);
496 assert_eq!(config.worker.default_timeout_secs, 300);
497 assert_eq!(config.queues, vec!["celers"]);
498 assert!(config.autoscale.is_none());
499 assert!(config.alerts.is_none());
500 }
501
502 #[test]
503 fn test_config_serialization() {
504 let config = Config::default_config();
505 let toml_str = toml::to_string(&config).unwrap();
506 assert!(toml_str.contains("type = \"redis\""));
507 assert!(toml_str.contains("url = \"redis://localhost:6379\""));
508 assert!(toml_str.contains("concurrency = 4"));
509 }
510
511 #[test]
512 fn test_config_deserialization() {
513 let toml_str = r#"
514queues = ["queue1", "queue2"]
515
516[broker]
517type = "redis"
518url = "redis://127.0.0.1:6379"
519queue = "test_queue"
520mode = "priority"
521
522[worker]
523concurrency = 8
524poll_interval_ms = 500
525max_retries = 5
526default_timeout_secs = 600
527 "#;
528
529 let config: Config = toml::from_str(toml_str).unwrap();
530 assert_eq!(config.broker.broker_type, "redis");
531 assert_eq!(config.broker.url, "redis://127.0.0.1:6379");
532 assert_eq!(config.broker.queue, "test_queue");
533 assert_eq!(config.broker.mode, "priority");
534 assert_eq!(config.worker.concurrency, 8);
535 assert_eq!(config.worker.poll_interval_ms, 500);
536 assert_eq!(config.worker.max_retries, 5);
537 assert_eq!(config.worker.default_timeout_secs, 600);
538 assert_eq!(config.queues, vec!["queue1", "queue2"]);
539 }
540
541 #[test]
542 fn test_config_defaults() {
543 let toml_str = r#"
544 [broker]
545 type = "redis"
546 url = "redis://localhost:6379"
547 "#;
548
549 let config: Config = toml::from_str(toml_str).unwrap();
550 assert_eq!(config.broker.queue, "celers");
551 assert_eq!(config.broker.mode, "fifo");
552 assert_eq!(config.worker.concurrency, 4);
553 assert_eq!(config.worker.poll_interval_ms, 1000);
554 }
555
556 #[test]
557 fn test_config_validation_valid() {
558 let config = Config::default_config();
559 let warnings = config.validate().unwrap();
560 assert!(warnings.is_empty());
561 }
562
563 #[test]
564 fn test_config_validation_invalid_broker_type() {
565 let mut config = Config::default_config();
566 config.broker.broker_type = "invalid".to_string();
567 let warnings = config.validate().unwrap();
568 assert_eq!(warnings.len(), 1);
569 assert!(warnings[0].contains("Unknown broker type"));
570 }
571
572 #[test]
573 fn test_config_validation_invalid_queue_mode() {
574 let mut config = Config::default_config();
575 config.broker.mode = "invalid".to_string();
576 let warnings = config.validate().unwrap();
577 assert_eq!(warnings.len(), 1);
578 assert!(warnings[0].contains("Unknown queue mode"));
579 }
580
581 #[test]
582 fn test_config_validation_zero_concurrency() {
583 let mut config = Config::default_config();
584 config.worker.concurrency = 0;
585 let warnings = config.validate().unwrap();
586 assert_eq!(warnings.len(), 1);
587 assert!(warnings[0].contains("Concurrency is 0"));
588 }
589
590 #[test]
591 fn test_config_validation_high_concurrency() {
592 let mut config = Config::default_config();
593 config.worker.concurrency = 150;
594 let warnings = config.validate().unwrap();
595 assert_eq!(warnings.len(), 1);
596 assert!(warnings[0].contains("High concurrency"));
597 }
598
599 #[test]
600 fn test_config_validation_low_poll_interval() {
601 let mut config = Config::default_config();
602 config.worker.poll_interval_ms = 50;
603 let warnings = config.validate().unwrap();
604 assert_eq!(warnings.len(), 1);
605 assert!(warnings[0].contains("Very low poll interval"));
606 }
607
608 #[test]
609 fn test_config_validation_multiple_issues() {
610 let mut config = Config::default_config();
611 config.broker.broker_type = "unknown".to_string();
612 config.broker.mode = "invalid_mode".to_string();
613 config.worker.concurrency = 0;
614 config.worker.poll_interval_ms = 50;
615
616 let warnings = config.validate().unwrap();
617 assert_eq!(warnings.len(), 4);
618 }
619
620 #[test]
621 fn test_config_file_roundtrip() {
622 let temp_file = tempfile::NamedTempFile::new().unwrap();
623 let temp_path = temp_file.path();
624
625 let config = Config::default_config();
626 config.to_file(temp_path).unwrap();
627
628 let loaded_config = Config::from_file(temp_path).unwrap();
629 assert_eq!(config.broker.broker_type, loaded_config.broker.broker_type);
630 assert_eq!(config.broker.url, loaded_config.broker.url);
631 assert_eq!(config.worker.concurrency, loaded_config.worker.concurrency);
632 }
633
634 #[test]
635 fn test_worker_config_default() {
636 let worker_config = WorkerConfig::default();
637 assert_eq!(worker_config.concurrency, 4);
638 assert_eq!(worker_config.poll_interval_ms, 1000);
639 assert_eq!(worker_config.max_retries, 3);
640 assert_eq!(worker_config.default_timeout_secs, 300);
641 }
642
643 #[test]
644 fn test_expand_env_vars_simple() {
645 env::set_var("TEST_VAR", "test_value");
646 let result = super::expand_env_vars("prefix ${TEST_VAR} suffix");
647 assert_eq!(result, "prefix test_value suffix");
648 env::remove_var("TEST_VAR");
649 }
650
651 #[test]
652 fn test_expand_env_vars_with_default() {
653 env::remove_var("MISSING_VAR");
654 let result = super::expand_env_vars("value is ${MISSING_VAR:default_value}");
655 assert_eq!(result, "value is default_value");
656 }
657
658 #[test]
659 fn test_expand_env_vars_without_default() {
660 env::remove_var("MISSING_VAR");
661 let result = super::expand_env_vars("value is ${MISSING_VAR}");
662 assert_eq!(result, "value is ${MISSING_VAR}");
663 }
664
665 #[test]
666 fn test_expand_env_vars_multiple() {
667 env::set_var("VAR1", "value1");
668 env::set_var("VAR2", "value2");
669 let result = super::expand_env_vars("${VAR1} and ${VAR2}");
670 assert_eq!(result, "value1 and value2");
671 env::remove_var("VAR1");
672 env::remove_var("VAR2");
673 }
674
675 #[test]
676 fn test_expand_env_vars_mixed() {
677 env::set_var("EXISTING_VAR", "exists");
678 env::remove_var("MISSING_VAR");
679 let result = super::expand_env_vars("${EXISTING_VAR} and ${MISSING_VAR:default}");
680 assert_eq!(result, "exists and default");
681 env::remove_var("EXISTING_VAR");
682 }
683
684 #[test]
685 fn test_config_with_env_vars() {
686 env::set_var("TEST_REDIS_HOST", "localhost");
687 env::set_var("TEST_REDIS_PORT", "6379");
688
689 let toml_str = r#"
690[broker]
691type = "redis"
692url = "redis://${TEST_REDIS_HOST}:${TEST_REDIS_PORT}"
693queue = "celers"
694mode = "fifo"
695 "#;
696
697 let expanded = super::expand_env_vars(toml_str);
698 assert!(
699 expanded.contains("redis://localhost:6379"),
700 "Expanded content: {}",
701 expanded
702 );
703
704 let config: Config = toml::from_str(&expanded).unwrap();
705 assert_eq!(config.broker.url, "redis://localhost:6379");
706
707 env::remove_var("TEST_REDIS_HOST");
708 env::remove_var("TEST_REDIS_PORT");
709 }
710
711 #[test]
712 fn test_config_with_env_vars_and_defaults() {
713 env::remove_var("REDIS_HOST");
714
715 let toml_str = r#"
716[broker]
717type = "redis"
718url = "redis://${REDIS_HOST:localhost}:${REDIS_PORT:6379}"
719queue = "celers"
720mode = "fifo"
721 "#;
722
723 let expanded = super::expand_env_vars(toml_str);
724 assert!(expanded.contains("redis://localhost:6379"));
725
726 let config: Config = toml::from_str(&expanded).unwrap();
727 assert_eq!(config.broker.url, "redis://localhost:6379");
728 }
729
730 #[test]
731 fn test_broker_failover_config() {
732 let toml_str = r#"
733[broker]
734type = "redis"
735url = "redis://primary:6379"
736failover_urls = ["redis://backup1:6379", "redis://backup2:6379"]
737failover_retries = 5
738failover_timeout_secs = 10
739queue = "celers"
740mode = "fifo"
741 "#;
742
743 let config: Config = toml::from_str(toml_str).unwrap();
744 assert_eq!(config.broker.failover_urls.len(), 2);
745 assert_eq!(config.broker.failover_urls[0], "redis://backup1:6379");
746 assert_eq!(config.broker.failover_urls[1], "redis://backup2:6379");
747 assert_eq!(config.broker.failover_retries, 5);
748 assert_eq!(config.broker.failover_timeout_secs, 10);
749 }
750
751 #[test]
752 fn test_autoscale_config() {
753 let toml_str = r#"
754[broker]
755type = "redis"
756url = "redis://localhost:6379"
757
758[autoscale]
759enabled = true
760min_workers = 2
761max_workers = 20
762scale_up_threshold = 200
763scale_down_threshold = 20
764check_interval_secs = 60
765 "#;
766
767 let config: Config = toml::from_str(toml_str).unwrap();
768 assert!(config.autoscale.is_some());
769 let autoscale = config.autoscale.unwrap();
770 assert!(autoscale.enabled);
771 assert_eq!(autoscale.min_workers, 2);
772 assert_eq!(autoscale.max_workers, 20);
773 assert_eq!(autoscale.scale_up_threshold, 200);
774 assert_eq!(autoscale.scale_down_threshold, 20);
775 assert_eq!(autoscale.check_interval_secs, 60);
776 }
777
778 #[test]
779 fn test_alert_config() {
780 let toml_str = r#"
781[broker]
782type = "redis"
783url = "redis://localhost:6379"
784
785[alerts]
786enabled = true
787webhook_url = "https://hooks.slack.com/services/xxx"
788dlq_threshold = 100
789failed_threshold = 200
790check_interval_secs = 120
791 "#;
792
793 let config: Config = toml::from_str(toml_str).unwrap();
794 assert!(config.alerts.is_some());
795 let alerts = config.alerts.unwrap();
796 assert!(alerts.enabled);
797 assert_eq!(
798 alerts.webhook_url,
799 Some("https://hooks.slack.com/services/xxx".to_string())
800 );
801 assert_eq!(alerts.dlq_threshold, 100);
802 assert_eq!(alerts.failed_threshold, 200);
803 assert_eq!(alerts.check_interval_secs, 120);
804 }
805
806 #[test]
807 fn test_config_validation_autoscale_invalid() {
808 let mut config = Config::default_config();
809 config.autoscale = Some(AutoScaleConfig {
810 enabled: true,
811 min_workers: 0,
812 max_workers: 5,
813 scale_up_threshold: 100,
814 scale_down_threshold: 10,
815 check_interval_secs: 30,
816 });
817
818 let warnings = config.validate().unwrap();
819 assert!(warnings.iter().any(|w| w.contains("min_workers is 0")));
820 }
821
822 #[test]
823 fn test_config_validation_autoscale_max_less_than_min() {
824 let mut config = Config::default_config();
825 config.autoscale = Some(AutoScaleConfig {
826 enabled: true,
827 min_workers: 10,
828 max_workers: 5,
829 scale_up_threshold: 100,
830 scale_down_threshold: 10,
831 check_interval_secs: 30,
832 });
833
834 let warnings = config.validate().unwrap();
835 assert!(warnings
836 .iter()
837 .any(|w| w.contains("max_workers") && w.contains("min_workers")));
838 }
839
840 #[test]
841 fn test_config_validation_autoscale_threshold_invalid() {
842 let mut config = Config::default_config();
843 config.autoscale = Some(AutoScaleConfig {
844 enabled: true,
845 min_workers: 1,
846 max_workers: 10,
847 scale_up_threshold: 50,
848 scale_down_threshold: 100,
849 check_interval_secs: 30,
850 });
851
852 let warnings = config.validate().unwrap();
853 assert!(warnings.iter().any(|w| w.contains("scale_down_threshold")));
854 }
855
856 #[test]
857 fn test_config_validation_alert_no_webhook() {
858 let mut config = Config::default_config();
859 config.alerts = Some(AlertConfig {
860 enabled: true,
861 webhook_url: None,
862 dlq_threshold: 50,
863 failed_threshold: 100,
864 check_interval_secs: 60,
865 });
866
867 let warnings = config.validate().unwrap();
868 assert!(warnings.iter().any(|w| w.contains("webhook_url")));
869 }
870
871 #[test]
872 fn test_profile_config() {
873 let toml_str = r#"
874profile = "production"
875
876[broker]
877type = "redis"
878url = "redis://localhost:6379"
879 "#;
880
881 let config: Config = toml::from_str(toml_str).unwrap();
882 assert_eq!(config.profile, Some("production".to_string()));
883 }
884}