camel_master/
component.rs1use std::time::Duration;
2
3use camel_api::CamelError;
4use camel_component_api::{Component, ComponentContext, Endpoint, NetworkRetryPolicy, parse_uri};
5
6use crate::config::{MasterComponentConfig, MasterUriConfig};
7use crate::endpoint::MasterEndpoint;
8
9pub struct MasterComponent {
10 drain_timeout_ms: u64,
11 reconnect: NetworkRetryPolicy,
13}
14
15impl MasterComponent {
16 pub fn new(config: MasterComponentConfig) -> Self {
17 let mut reconnect = config.reconnect;
21 if let Some(max) = config.delegate_retry_max_attempts
22 && reconnect.max_attempts == 0
23 {
24 reconnect.max_attempts = max;
25 }
26 Self {
27 drain_timeout_ms: config.drain_timeout_ms,
28 reconnect,
29 }
30 }
31}
32
33impl Default for MasterComponent {
34 fn default() -> Self {
35 Self::new(MasterComponentConfig::default())
36 }
37}
38
39impl Component for MasterComponent {
40 fn scheme(&self) -> &str {
41 "master"
42 }
43
44 fn create_endpoint(
45 &self,
46 uri: &str,
47 ctx: &dyn ComponentContext,
48 ) -> Result<Box<dyn Endpoint>, CamelError> {
49 let parsed = MasterUriConfig::parse(uri)?;
50 let delegate_parts = parse_uri(&parsed.delegate_uri)?;
51 let delegate_scheme = delegate_parts.scheme;
52 let delegate_component = ctx
53 .resolve_component(&delegate_scheme)
54 .ok_or_else(|| CamelError::ComponentNotFound(delegate_scheme.clone()))?;
55
56 Ok(Box::new(MasterEndpoint {
57 uri: uri.to_string(),
58 lock_name: parsed.lock_name,
59 delegate_uri: parsed.delegate_uri,
60 delegate_component,
61 metrics: ctx.metrics(),
62 platform_service: ctx.platform_service(),
63 drain_timeout: Duration::from_millis(self.drain_timeout_ms),
64 reconnect: self.reconnect.clone(),
65 }))
66 }
67}