1use crate::exchange::Exchange;
2use std::sync::Arc;
3use std::time::Duration;
4
5pub type MulticastAggregationFn = Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync>;
6
7#[derive(Clone, Default)]
8pub enum MulticastStrategy {
9 #[default]
10 LastWins,
11 CollectAll,
12 Original,
13 Custom(MulticastAggregationFn),
14}
15
16#[derive(Clone)]
17pub struct MulticastConfig {
18 pub parallel: bool,
19 pub parallel_limit: Option<usize>,
20 pub stop_on_exception: bool,
21 pub timeout: Option<Duration>,
22 pub aggregation: MulticastStrategy,
23}
24
25impl MulticastConfig {
26 pub fn new() -> Self {
27 Self {
28 parallel: false,
29 parallel_limit: None,
30 stop_on_exception: false,
31 timeout: None,
32 aggregation: MulticastStrategy::default(),
33 }
34 }
35
36 pub fn parallel(mut self, parallel: bool) -> Self {
37 self.parallel = parallel;
38 self
39 }
40
41 pub fn parallel_limit(mut self, limit: usize) -> Self {
42 self.parallel_limit = Some(limit);
43 self
44 }
45
46 pub fn stop_on_exception(mut self, stop: bool) -> Self {
47 self.stop_on_exception = stop;
48 self
49 }
50
51 pub fn timeout(mut self, duration: Duration) -> Self {
52 self.timeout = Some(duration);
53 self
54 }
55
56 pub fn aggregation(mut self, strategy: MulticastStrategy) -> Self {
57 self.aggregation = strategy;
58 self
59 }
60}
61
62impl Default for MulticastConfig {
63 fn default() -> Self {
64 Self::new()
65 }
66}
67
68#[cfg(test)]
69mod tests {
70 use super::*;
71 use std::time::Duration;
72
73 #[test]
74 fn test_multicast_config_defaults() {
75 let config = MulticastConfig::new();
76 assert!(!config.parallel);
77 assert!(config.parallel_limit.is_none());
78 assert!(!config.stop_on_exception);
79 assert!(config.timeout.is_none());
80 assert!(matches!(config.aggregation, MulticastStrategy::LastWins));
81 }
82
83 #[test]
84 fn test_multicast_config_builder() {
85 let config = MulticastConfig::new()
86 .parallel(true)
87 .parallel_limit(4)
88 .stop_on_exception(true)
89 .timeout(Duration::from_millis(500));
90
91 assert!(config.parallel);
92 assert_eq!(config.parallel_limit, Some(4));
93 assert!(config.stop_on_exception);
94 assert_eq!(config.timeout, Some(Duration::from_millis(500)));
95 }
96}