Skip to main content

camel_master/
config.rs

1use camel_api::CamelError;
2use camel_component_api::NetworkRetryPolicy;
3
4#[derive(Debug, Clone)]
5pub struct MasterUriConfig {
6    pub lock_name: String,
7    pub delegate_uri: String,
8}
9
10impl MasterUriConfig {
11    pub fn parse(uri: &str) -> Result<Self, CamelError> {
12        let mut parts = uri.splitn(3, ':');
13        let scheme = parts.next().unwrap_or_default();
14        let lock_name = parts.next().unwrap_or_default();
15        let delegate_uri = parts.next().unwrap_or_default();
16        if scheme != "master" || lock_name.is_empty() || delegate_uri.is_empty() {
17            return Err(CamelError::InvalidUri(format!(
18                "{uri}: expected master:<lockname>:<delegate-uri>"
19            )));
20        }
21        Ok(Self {
22            lock_name: lock_name.to_string(),
23            delegate_uri: delegate_uri.to_string(),
24        })
25    }
26}
27
28/// Per-component reconnect default: unlimited retries (max_attempts=0),
29/// preserving the previous `None = unlimited` behavior. Operators can opt
30/// into bounded retry via TOML `[components.master.reconnect]`.
31fn master_reconnect_default() -> NetworkRetryPolicy {
32    NetworkRetryPolicy {
33        max_attempts: 0, // unlimited
34        ..NetworkRetryPolicy::default()
35    }
36}
37
38/// Configuration for the master/leader-election component.
39///
40/// Controls drain timeout for graceful delegate shutdown and reconnection policy.
41///
42/// ## Backward compatibility
43///
44/// The `delegate_retry_max_attempts` field is retained as a backward-compat alias.
45/// When set (not `None`), it bridges into `reconnect.max_attempts` during construction
46/// in `MasterComponent::new()`. If `reconnect` is also explicitly configured, the
47/// explicit `reconnect` value wins.
48#[derive(Debug, Clone)]
49pub struct MasterComponentConfig {
50    /// Timeout in milliseconds for draining a delegate consumer on leadership loss.
51    pub drain_timeout_ms: u64,
52    /// Structured reconnection policy, replacing the flat `delegate_retry_max_attempts`
53    /// field for new configs. Default: unlimited (`max_attempts=0`).
54    pub reconnect: NetworkRetryPolicy,
55    /// Backward-compat alias for `reconnect.max_attempts`. `None` means unlimited.
56    /// Bridged into `reconnect` during `MasterComponent::new()`.
57    pub delegate_retry_max_attempts: Option<u32>,
58}
59
60impl MasterComponentConfig {
61    /// Create a new config with the given drain timeout and retry limit.
62    pub fn new(drain_timeout_ms: u64, delegate_retry_max_attempts: Option<u32>) -> Self {
63        Self {
64            drain_timeout_ms,
65            reconnect: master_reconnect_default(),
66            delegate_retry_max_attempts,
67        }
68    }
69}
70
71impl Default for MasterComponentConfig {
72    fn default() -> Self {
73        Self {
74            drain_timeout_ms: 5000,
75            reconnect: master_reconnect_default(),
76            delegate_retry_max_attempts: None,
77        }
78    }
79}
80
81#[cfg(test)]
82mod tests {
83    use super::*;
84
85    #[test]
86    fn master_config_has_reconnect_policy() {
87        let cfg = MasterComponentConfig::default();
88        // Default should be unlimited (max_attempts=0) to preserve old None behavior
89        assert_eq!(cfg.reconnect.max_attempts, 0);
90        assert!(cfg.reconnect.enabled);
91        // Backward-compat field defaults to None
92        assert_eq!(cfg.delegate_retry_max_attempts, None);
93    }
94
95    #[test]
96    fn master_config_default_reconnect_is_unlimited() {
97        let policy = master_reconnect_default();
98        assert_eq!(policy.max_attempts, 0);
99        assert!(policy.enabled);
100        // Unlimited means should_retry always returns true
101        assert!(policy.should_retry(0));
102        assert!(policy.should_retry(100));
103        assert!(policy.should_retry(10_000));
104    }
105
106    #[test]
107    fn master_config_new_preserves_drain_timeout() {
108        let cfg = MasterComponentConfig::new(10_000, Some(5));
109        assert_eq!(cfg.drain_timeout_ms, 10_000);
110    }
111}