1use crate::error::CamelError;
2use crate::exchange::Exchange;
3use std::sync::Arc;
4use std::time::Duration;
5
6pub type MulticastAggregationFn = Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync>;
7
8#[derive(Clone, Default)]
9pub enum MulticastStrategy {
10 #[default]
11 LastWins,
12 CollectAll,
13 Original,
14 Custom(MulticastAggregationFn),
15}
16
17#[derive(Clone)]
18pub struct MulticastConfig {
19 pub parallel: bool,
20 pub parallel_limit: Option<usize>,
21 pub stop_on_exception: bool,
22 pub timeout: Option<Duration>,
23 pub aggregation: MulticastStrategy,
24}
25
26impl MulticastConfig {
27 pub fn new() -> Self {
28 Self {
29 parallel: false,
30 parallel_limit: None,
31 stop_on_exception: false,
32 timeout: None,
33 aggregation: MulticastStrategy::default(),
34 }
35 }
36
37 pub fn parallel(mut self, parallel: bool) -> Self {
38 self.parallel = parallel;
39 self
40 }
41
42 pub fn parallel_limit(mut self, limit: usize) -> Self {
43 self.parallel_limit = Some(limit);
44 self
45 }
46
47 pub fn stop_on_exception(mut self, stop: bool) -> Self {
48 self.stop_on_exception = stop;
49 self
50 }
51
52 pub fn timeout(mut self, duration: Duration) -> Self {
53 self.timeout = Some(duration);
54 self
55 }
56
57 pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
58 self.aggregation = strategy;
59 self
60 }
61
62 pub fn validate(&self) -> Result<(), CamelError> {
67 if self.parallel && self.parallel_limit == Some(0) {
68 return Err(CamelError::Config(
69 "multicast parallel_limit must be > 0".to_string(),
70 ));
71 }
72 Ok(())
73 }
74}
75
76impl Default for MulticastConfig {
77 fn default() -> Self {
78 Self::new()
79 }
80}
81
82#[cfg(test)]
83mod tests {
84 use super::*;
85 use std::time::Duration;
86
87 #[test]
88 fn test_multicast_config_defaults() {
89 let config = MulticastConfig::new();
90 assert!(!config.parallel);
91 assert!(config.parallel_limit.is_none());
92 assert!(!config.stop_on_exception);
93 assert!(config.timeout.is_none());
94 assert!(matches!(config.aggregation, MulticastStrategy::LastWins));
95 }
96
97 #[test]
98 fn test_multicast_config_builder() {
99 let config = MulticastConfig::new()
100 .parallel(true)
101 .parallel_limit(4)
102 .stop_on_exception(true)
103 .timeout(Duration::from_millis(500));
104
105 assert!(config.parallel);
106 assert_eq!(config.parallel_limit, Some(4));
107 assert!(config.stop_on_exception);
108 assert_eq!(config.timeout, Some(Duration::from_millis(500)));
109 }
110}