Skip to main content

camel_api/
multicast.rs

1use crate::error::CamelError;
2use crate::exchange::Exchange;
3use std::sync::Arc;
4use std::time::Duration;
5
6pub type MulticastAggregationFn = Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync>;
7
8#[derive(Clone, Default)]
9pub enum MulticastStrategy {
10    #[default]
11    LastWins,
12    CollectAll,
13    Original,
14    Custom(MulticastAggregationFn),
15}
16
17#[derive(Clone)]
18pub struct MulticastConfig {
19    pub parallel: bool,
20    pub parallel_limit: Option<usize>,
21    pub stop_on_exception: bool,
22    pub timeout: Option<Duration>,
23    pub aggregation: MulticastStrategy,
24}
25
26impl MulticastConfig {
27    pub fn new() -> Self {
28        Self {
29            parallel: false,
30            parallel_limit: None,
31            stop_on_exception: false,
32            timeout: None,
33            aggregation: MulticastStrategy::default(),
34        }
35    }
36
37    pub fn parallel(mut self, parallel: bool) -> Self {
38        self.parallel = parallel;
39        self
40    }
41
42    pub fn parallel_limit(mut self, limit: usize) -> Self {
43        self.parallel_limit = Some(limit);
44        self
45    }
46
47    pub fn stop_on_exception(mut self, stop: bool) -> Self {
48        self.stop_on_exception = stop;
49        self
50    }
51
52    pub fn timeout(mut self, duration: Duration) -> Self {
53        self.timeout = Some(duration);
54        self
55    }
56
57    pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
58        self.aggregation = strategy;
59        self
60    }
61
62    /// Validates the configuration.
63    ///
64    /// Returns `Err(CamelError::Config)` if `parallel_limit` is set to 0,
65    /// which would cause a `Semaphore::new(0)` panic at runtime.
66    pub fn validate(&self) -> Result<(), CamelError> {
67        if self.parallel && self.parallel_limit == Some(0) {
68            return Err(CamelError::Config(
69                "multicast parallel_limit must be > 0".to_string(),
70            ));
71        }
72        Ok(())
73    }
74}
75
76impl Default for MulticastConfig {
77    fn default() -> Self {
78        Self::new()
79    }
80}
81
82#[cfg(test)]
83mod tests {
84    use super::*;
85    use std::time::Duration;
86
87    #[test]
88    fn test_multicast_config_defaults() {
89        let config = MulticastConfig::new();
90        assert!(!config.parallel);
91        assert!(config.parallel_limit.is_none());
92        assert!(!config.stop_on_exception);
93        assert!(config.timeout.is_none());
94        assert!(matches!(config.aggregation, MulticastStrategy::LastWins));
95    }
96
97    #[test]
98    fn test_multicast_config_builder() {
99        let config = MulticastConfig::new()
100            .parallel(true)
101            .parallel_limit(4)
102            .stop_on_exception(true)
103            .timeout(Duration::from_millis(500));
104
105        assert!(config.parallel);
106        assert_eq!(config.parallel_limit, Some(4));
107        assert!(config.stop_on_exception);
108        assert_eq!(config.timeout, Some(Duration::from_millis(500)));
109    }
110}