Skip to main content

camel_master/
component.rs

1use std::time::Duration;
2
3use camel_api::CamelError;
4use camel_component_api::{Component, ComponentContext, Endpoint, NetworkRetryPolicy, parse_uri};
5
6use crate::config::{MasterComponentConfig, MasterUriConfig};
7use crate::endpoint::MasterEndpoint;
8
9pub struct MasterComponent {
10    drain_timeout_ms: u64,
11    /// Structured reconnection policy for delegate retry.
12    reconnect: NetworkRetryPolicy,
13}
14
15impl MasterComponent {
16    pub fn new(config: MasterComponentConfig) -> Self {
17        // Bridge backward-compat field: if delegate_retry_max_attempts is set
18        // and the explicit reconnect is still at its default (max_attempts=0),
19        // override reconnect.max_attempts from the legacy field.
20        let mut reconnect = config.reconnect;
21        if let Some(max) = config.delegate_retry_max_attempts
22            && reconnect.max_attempts == 0
23        {
24            reconnect.max_attempts = max;
25        }
26        Self {
27            drain_timeout_ms: config.drain_timeout_ms,
28            reconnect,
29        }
30    }
31}
32
33impl Default for MasterComponent {
34    fn default() -> Self {
35        Self::new(MasterComponentConfig::default())
36    }
37}
38
39impl Component for MasterComponent {
40    fn scheme(&self) -> &str {
41        "master"
42    }
43
44    fn create_endpoint(
45        &self,
46        uri: &str,
47        ctx: &dyn ComponentContext,
48    ) -> Result<Box<dyn Endpoint>, CamelError> {
49        let parsed = MasterUriConfig::parse(uri)?;
50        let delegate_parts = parse_uri(&parsed.delegate_uri)?;
51        let delegate_scheme = delegate_parts.scheme;
52        let delegate_component = ctx
53            .resolve_component(&delegate_scheme)
54            .ok_or_else(|| CamelError::ComponentNotFound(delegate_scheme.clone()))?;
55
56        Ok(Box::new(MasterEndpoint {
57            uri: uri.to_string(),
58            lock_name: parsed.lock_name,
59            delegate_uri: parsed.delegate_uri,
60            delegate_component,
61            metrics: ctx.metrics(),
62            platform_service: ctx.platform_service(),
63            drain_timeout: Duration::from_millis(self.drain_timeout_ms),
64            reconnect: self.reconnect.clone(),
65        }))
66    }
67}