Skip to main content

camel_api/
multicast.rs

1use crate::exchange::Exchange;
2use std::sync::Arc;
3use std::time::Duration;
4
5pub type MulticastAggregationFn = Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync>;
6
7#[derive(Clone, Default)]
8pub enum MulticastStrategy {
9    #[default]
10    LastWins,
11    CollectAll,
12    Original,
13    Custom(MulticastAggregationFn),
14}
15
16#[derive(Clone)]
17pub struct MulticastConfig {
18    pub parallel: bool,
19    pub parallel_limit: Option<usize>,
20    pub stop_on_exception: bool,
21    pub timeout: Option<Duration>,
22    pub aggregation: MulticastStrategy,
23}
24
25impl MulticastConfig {
26    pub fn new() -> Self {
27        Self {
28            parallel: false,
29            parallel_limit: None,
30            stop_on_exception: false,
31            timeout: None,
32            aggregation: MulticastStrategy::default(),
33        }
34    }
35
36    pub fn parallel(mut self, parallel: bool) -> Self {
37        self.parallel = parallel;
38        self
39    }
40
41    pub fn parallel_limit(mut self, limit: usize) -> Self {
42        self.parallel_limit = Some(limit);
43        self
44    }
45
46    pub fn stop_on_exception(mut self, stop: bool) -> Self {
47        self.stop_on_exception = stop;
48        self
49    }
50
51    pub fn timeout(mut self, duration: Duration) -> Self {
52        self.timeout = Some(duration);
53        self
54    }
55
56    pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
57        self.aggregation = strategy;
58        self
59    }
60}
61
62impl Default for MulticastConfig {
63    fn default() -> Self {
64        Self::new()
65    }
66}
67
68#[cfg(test)]
69mod tests {
70    use super::*;
71    use std::time::Duration;
72
73    #[test]
74    fn test_multicast_config_defaults() {
75        let config = MulticastConfig::new();
76        assert!(!config.parallel);
77        assert!(config.parallel_limit.is_none());
78        assert!(!config.stop_on_exception);
79        assert!(config.timeout.is_none());
80        assert!(matches!(config.aggregation, MulticastStrategy::LastWins));
81    }
82
83    #[test]
84    fn test_multicast_config_builder() {
85        let config = MulticastConfig::new()
86            .parallel(true)
87            .parallel_limit(4)
88            .stop_on_exception(true)
89            .timeout(Duration::from_millis(500));
90
91        assert!(config.parallel);
92        assert_eq!(config.parallel_limit, Some(4));
93        assert!(config.stop_on_exception);
94        assert_eq!(config.timeout, Some(Duration::from_millis(500)));
95    }
96}