1use std::sync::Arc;
2
3use camel_component_api::NetworkRetryPolicy;
4use camel_component_api::{CamelError, ComponentBundle, ComponentRegistrar};
5use serde::Deserialize;
6
7use crate::{MasterComponent, config::MasterComponentConfig};
8
9pub struct MasterBundle {
10 config: MasterComponentConfig,
11}
12
13impl ComponentBundle for MasterBundle {
14 fn config_key() -> &'static str {
15 "master"
16 }
17
18 fn from_toml(value: toml::Value) -> Result<Self, CamelError> {
19 let drain_timeout_ms = value
20 .get("drain_timeout_ms")
21 .and_then(|v| v.as_integer())
22 .unwrap_or(5000) as u64;
23
24 let delegate_retry_max_attempts = value
25 .get("delegate_retry_max_attempts")
26 .and_then(|v| v.as_integer())
27 .and_then(|v| u32::try_from(v).ok())
28 .map(|v| if v == 0 { None } else { Some(v) })
29 .unwrap_or(None);
30
31 let reconnect = value
33 .get("reconnect")
34 .cloned()
35 .and_then(|v| NetworkRetryPolicy::deserialize(v).ok())
36 .unwrap_or_else(|| {
37 NetworkRetryPolicy {
38 max_attempts: 0, ..NetworkRetryPolicy::default()
40 }
41 });
42
43 Ok(Self {
44 config: MasterComponentConfig {
45 drain_timeout_ms,
46 reconnect,
47 delegate_retry_max_attempts,
48 },
49 })
50 }
51
52 fn register_all(self, ctx: &mut dyn ComponentRegistrar) {
53 ctx.register_component_dyn(Arc::new(MasterComponent::new(self.config)));
54 }
55}
56
57#[cfg(test)]
58mod tests {
59 use super::*;
60
61 #[test]
62 fn from_toml_uses_defaults() {
63 let bundle = MasterBundle::from_toml(toml::Value::Table(toml::map::Map::new())).unwrap();
64 assert_eq!(bundle.config.drain_timeout_ms, 5000);
65 assert_eq!(bundle.config.reconnect.max_attempts, 0);
66 assert!(bundle.config.reconnect.enabled);
67 assert_eq!(bundle.config.delegate_retry_max_attempts, None);
68 }
69
70 #[test]
71 fn from_toml_parses_delegate_retry_max_attempts() {
72 let mut table = toml::map::Map::new();
73 table.insert(
74 "delegate_retry_max_attempts".to_string(),
75 toml::Value::Integer(7),
76 );
77 let bundle = MasterBundle::from_toml(toml::Value::Table(table)).unwrap();
78 assert_eq!(bundle.config.delegate_retry_max_attempts, Some(7));
81 assert_eq!(bundle.config.reconnect.max_attempts, 0);
82 }
83
84 #[test]
85 fn from_toml_zero_means_unlimited_retries() {
86 let mut table = toml::map::Map::new();
87 table.insert(
88 "delegate_retry_max_attempts".to_string(),
89 toml::Value::Integer(0),
90 );
91 let bundle = MasterBundle::from_toml(toml::Value::Table(table)).unwrap();
92 assert_eq!(bundle.config.delegate_retry_max_attempts, None);
93 }
94
95 #[test]
96 fn from_toml_parses_reconnect_section() {
97 let mut reconnect_table = toml::map::Map::new();
98 reconnect_table.insert("max_attempts".to_string(), toml::Value::Integer(10));
99 reconnect_table.insert("enabled".to_string(), toml::Value::Boolean(false));
100
101 let mut table = toml::map::Map::new();
102 table.insert("reconnect".to_string(), toml::Value::Table(reconnect_table));
103
104 let bundle = MasterBundle::from_toml(toml::Value::Table(table)).unwrap();
105 assert_eq!(bundle.config.reconnect.max_attempts, 10);
106 assert!(!bundle.config.reconnect.enabled);
107 }
108
109 #[test]
110 fn from_toml_reconnect_wins_over_delegate_retry() {
111 let mut reconnect_table = toml::map::Map::new();
114 reconnect_table.insert("max_attempts".to_string(), toml::Value::Integer(5));
115
116 let mut table = toml::map::Map::new();
117 table.insert("reconnect".to_string(), toml::Value::Table(reconnect_table));
118 table.insert(
119 "delegate_retry_max_attempts".to_string(),
120 toml::Value::Integer(20),
121 );
122
123 let bundle = MasterBundle::from_toml(toml::Value::Table(table)).unwrap();
124 assert_eq!(bundle.config.reconnect.max_attempts, 5); assert_eq!(bundle.config.delegate_retry_max_attempts, Some(20));
126 }
127}