Skip to main content

camel_master/
lib.rs

1//! Master/leader-election component — wraps a delegate consumer with distributed lock-based leadership coordination.
2//!
3//! Main types: `MasterComponent`, `MasterBundle`.
4//! Main modules: `bundle`, `config`.
5
6pub mod bundle;
7pub mod config;
8
9pub use bundle::MasterBundle;
10
11use std::sync::Arc;
12use std::time::Duration;
13
14use async_trait::async_trait;
15use camel_api::{CamelError, MetricsCollector, PlatformService};
16use camel_component_api::{
17    BoxProcessor, Component, ComponentContext, Consumer, ConsumerContext, Endpoint,
18    ExchangeEnvelope, NetworkRetryPolicy, ProducerContext, is_retryable_camel_error, parse_uri,
19};
20use camel_language_api::Language;
21use tokio::task::JoinHandle;
22use tokio::time::{interval, timeout};
23use tokio_util::sync::CancellationToken;
24use tracing::{error, info, warn};
25
26use crate::config::{MasterComponentConfig, MasterUriConfig};
27
28const DELEGATE_RETRY_INTERVAL: Duration = Duration::from_millis(200);
29
30pub struct MasterComponent {
31    drain_timeout_ms: u64,
32    /// Structured reconnection policy for delegate retry.
33    reconnect: NetworkRetryPolicy,
34}
35
36impl MasterComponent {
37    pub fn new(config: MasterComponentConfig) -> Self {
38        // Bridge backward-compat field: if delegate_retry_max_attempts is set
39        // and the explicit reconnect is still at its default (max_attempts=0),
40        // override reconnect.max_attempts from the legacy field.
41        let mut reconnect = config.reconnect;
42        if let Some(max) = config.delegate_retry_max_attempts
43            && reconnect.max_attempts == 0
44        {
45            reconnect.max_attempts = max;
46        }
47        Self {
48            drain_timeout_ms: config.drain_timeout_ms,
49            reconnect,
50        }
51    }
52}
53
54impl Default for MasterComponent {
55    fn default() -> Self {
56        Self::new(MasterComponentConfig::default())
57    }
58}
59
60impl Component for MasterComponent {
61    fn scheme(&self) -> &str {
62        "master"
63    }
64
65    fn create_endpoint(
66        &self,
67        uri: &str,
68        ctx: &dyn ComponentContext,
69    ) -> Result<Box<dyn Endpoint>, CamelError> {
70        let parsed = MasterUriConfig::parse(uri)?;
71        let delegate_parts = parse_uri(&parsed.delegate_uri)?;
72        let delegate_scheme = delegate_parts.scheme;
73        let delegate_component = ctx
74            .resolve_component(&delegate_scheme)
75            .ok_or_else(|| CamelError::ComponentNotFound(delegate_scheme.clone()))?;
76
77        Ok(Box::new(MasterEndpoint {
78            uri: uri.to_string(),
79            lock_name: parsed.lock_name,
80            delegate_uri: parsed.delegate_uri,
81            delegate_component,
82            metrics: ctx.metrics(),
83            platform_service: ctx.platform_service(),
84            drain_timeout: Duration::from_millis(self.drain_timeout_ms),
85            reconnect: self.reconnect.clone(),
86        }))
87    }
88}
89
90struct MasterEndpoint {
91    uri: String,
92    lock_name: String,
93    delegate_uri: String,
94    delegate_component: Arc<dyn Component>,
95    // TODO(MST-001): MetricsCollector is wired through from ComponentContext but never called.
96    // Should emit metrics on leader acquisition (increment_exchanges), leader loss
97    // (increment_errors), and delegate start/stop events (record_circuit_breaker_change).
98    metrics: Arc<dyn MetricsCollector>,
99    platform_service: Arc<dyn PlatformService>,
100    drain_timeout: Duration,
101    reconnect: NetworkRetryPolicy,
102}
103
104impl Endpoint for MasterEndpoint {
105    fn uri(&self) -> &str {
106        &self.uri
107    }
108
109    fn create_consumer(
110        &self,
111        rt: Arc<dyn camel_component_api::RuntimeObservability>,
112    ) -> Result<Box<dyn Consumer>, CamelError> {
113        Ok(Box::new(MasterConsumer::new(
114            self.lock_name.clone(),
115            self.delegate_uri.clone(),
116            Arc::clone(&self.delegate_component),
117            Arc::clone(&self.metrics),
118            Arc::clone(&self.platform_service),
119            self.drain_timeout,
120            self.reconnect.clone(),
121            rt,
122        )))
123    }
124
125    fn create_producer(
126        &self,
127        rt: Arc<dyn camel_component_api::RuntimeObservability>,
128        ctx: &ProducerContext,
129    ) -> Result<BoxProcessor, CamelError> {
130        let delegate_ctx = MasterDelegateContext {
131            delegate_component: Arc::clone(&self.delegate_component),
132            metrics: Arc::clone(&self.metrics),
133            platform_service: Arc::clone(&self.platform_service),
134        };
135
136        self.delegate_component
137            .create_endpoint(&self.delegate_uri, &delegate_ctx)?
138            .create_producer(rt, ctx)
139    }
140}
141
142struct MasterDelegateContext {
143    delegate_component: Arc<dyn Component>,
144    metrics: Arc<dyn MetricsCollector>,
145    platform_service: Arc<dyn PlatformService>,
146}
147
148impl ComponentContext for MasterDelegateContext {
149    fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
150        if self.delegate_component.scheme() == scheme {
151            Some(Arc::clone(&self.delegate_component))
152        } else {
153            None
154        }
155    }
156
157    fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
158        None
159    }
160
161    fn metrics(&self) -> Arc<dyn MetricsCollector> {
162        Arc::clone(&self.metrics)
163    }
164
165    fn platform_service(&self) -> Arc<dyn PlatformService> {
166        Arc::clone(&self.platform_service)
167    }
168
169    fn register_route_health_check(
170        &self,
171        _route_id: &str,
172        _check: Arc<dyn camel_api::AsyncHealthCheck>,
173    ) {
174    }
175
176    fn unregister_route_health_check(&self, _route_id: &str) {}
177}
178
179struct MasterConsumer {
180    lock_name: String,
181    delegate_uri: String,
182    delegate_component: Arc<dyn Component>,
183    // TODO(MST-001): MetricsCollector is stored here but never used for emission.
184    // Wire into reconcile_event to record leadership transitions and delegate lifecycle.
185    metrics: Arc<dyn MetricsCollector>,
186    platform_service: Arc<dyn PlatformService>,
187    drain_timeout: Duration,
188    reconnect: NetworkRetryPolicy,
189    leadership_task: Option<JoinHandle<Result<(), CamelError>>>,
190    stop_token: Option<CancellationToken>,
191    runtime: Arc<dyn camel_component_api::RuntimeObservability>,
192}
193
194impl MasterConsumer {
195    #[allow(clippy::too_many_arguments)]
196    fn new(
197        lock_name: String,
198        delegate_uri: String,
199        delegate_component: Arc<dyn Component>,
200        metrics: Arc<dyn MetricsCollector>,
201        platform_service: Arc<dyn PlatformService>,
202        drain_timeout: Duration,
203        reconnect: NetworkRetryPolicy,
204        runtime: Arc<dyn camel_component_api::RuntimeObservability>,
205    ) -> Self {
206        Self {
207            lock_name,
208            delegate_uri,
209            delegate_component,
210            metrics,
211            platform_service,
212            drain_timeout,
213            reconnect,
214            leadership_task: None,
215            stop_token: None,
216            runtime,
217        }
218    }
219}
220
221enum DelegateState {
222    Inactive,
223    Active {
224        run_token: CancellationToken,
225        handle: JoinHandle<Result<(), CamelError>>,
226    },
227}
228
229async fn stop_delegate(
230    state: &mut DelegateState,
231    drain_timeout: Duration,
232) -> Result<(), CamelError> {
233    if let DelegateState::Active {
234        run_token,
235        mut handle,
236    } = std::mem::replace(state, DelegateState::Inactive)
237    {
238        run_token.cancel();
239        match timeout(drain_timeout, &mut handle).await {
240            Ok(Ok(Ok(()))) => {}
241            Ok(Ok(Err(err))) => {
242                return Err(err);
243            }
244            Ok(Err(e)) if e.is_panic() => {
245                // log-policy: system-broken
246                error!(error = %e, "master delegate task panicked");
247                return Err(CamelError::ProcessorError(format!(
248                    "master delegate task panicked: {e}"
249                )));
250            }
251            Ok(Err(e)) => {
252                warn!(error = %e, "master delegate task cancelled");
253                return Err(CamelError::ProcessorError(format!(
254                    "master delegate task cancelled: {e}"
255                )));
256            }
257            Err(_) => {
258                warn!("master delegate shutdown timed out, aborting");
259                handle.abort();
260            }
261        }
262    }
263    Ok(())
264}
265
266struct ReconcileContext<'a> {
267    lock_name: &'a str,
268    delegate_component: &'a Arc<dyn Component>,
269    delegate_uri: &'a str,
270    sender: &'a tokio::sync::mpsc::Sender<ExchangeEnvelope>,
271    parent_cancel: &'a CancellationToken,
272    drain_timeout: Duration,
273    metrics: &'a Arc<dyn MetricsCollector>,
274    platform_service: &'a Arc<dyn PlatformService>,
275    runtime: Arc<dyn camel_component_api::RuntimeObservability>,
276}
277
278async fn reconcile_event(
279    event: camel_api::LeadershipEvent,
280    state: &mut DelegateState,
281    ctx: &ReconcileContext<'_>,
282) -> Result<(), CamelError> {
283    match event {
284        camel_api::LeadershipEvent::StartedLeading => {
285            info!(lock = %ctx.lock_name, "master leadership acquired");
286            // TODO(MST-001): emit metrics here — MetricsCollector is wired but never called
287            tracing::info!(lock = %ctx.lock_name, "metrics emission placeholder: leadership acquired");
288            stop_delegate(state, ctx.drain_timeout).await?;
289
290            let delegate_ctx = MasterDelegateContext {
291                delegate_component: Arc::clone(ctx.delegate_component),
292                metrics: Arc::clone(ctx.metrics),
293                platform_service: Arc::clone(ctx.platform_service),
294            };
295
296            let endpoint = match ctx
297                .delegate_component
298                .create_endpoint(ctx.delegate_uri, &delegate_ctx)
299            {
300                Ok(endpoint) => endpoint,
301                Err(err) => {
302                    if is_retryable_camel_error(&err) {
303                        warn!(lock = %ctx.lock_name, error = %err, "transient delegate endpoint error (will retry)");
304                        return Ok(()); // swallow transient — retry via next tick
305                    }
306                    return Err(err); // permanent — propagate to retry loop for fail-fast
307                }
308            };
309
310            let mut consumer = match endpoint.create_consumer(Arc::clone(&ctx.runtime)) {
311                Ok(consumer) => consumer,
312                Err(err) => {
313                    if is_retryable_camel_error(&err) {
314                        warn!(lock = %ctx.lock_name, error = %err, "transient delegate consumer error (will retry)");
315                        return Ok(()); // swallow transient — retry via next tick
316                    }
317                    return Err(err); // permanent — propagate to retry loop for fail-fast
318                }
319            };
320
321            let run_token = ctx.parent_cancel.child_token();
322            let delegate_ctx = ConsumerContext::new(ctx.sender.clone(), run_token.clone());
323            let handle = tokio::spawn(async move {
324                consumer.start(delegate_ctx).await?;
325                consumer.stop().await?;
326                Ok::<(), CamelError>(())
327            });
328
329            *state = DelegateState::Active { run_token, handle };
330        }
331        camel_api::LeadershipEvent::StoppedLeading => {
332            info!(lock = %ctx.lock_name, "master leadership lost");
333            // TODO(MST-001): emit metrics here — MetricsCollector is wired but never called
334            tracing::info!(lock = %ctx.lock_name, "metrics emission placeholder: leadership lost");
335            stop_delegate(state, ctx.drain_timeout).await?;
336        }
337    }
338    Ok(())
339}
340
341// Uses camel_component_api::is_retryable_camel_error for transient/permanent
342// classification (rc-7ct consolidation, was master-local in rc-i1z).
343
344#[async_trait]
345impl Consumer for MasterConsumer {
346    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
347        if self.leadership_task.is_some() {
348            return Ok(());
349        }
350
351        let handle = self
352            .platform_service
353            .leadership()
354            .start(&self.lock_name)
355            .await
356            .map_err(|e| {
357                CamelError::EndpointCreationFailed(format!("failed to start leader election: {e}"))
358            })?;
359
360        let lock_name = self.lock_name.clone();
361        let delegate_uri = self.delegate_uri.clone();
362        let delegate_component = Arc::clone(&self.delegate_component);
363        let metrics = Arc::clone(&self.metrics);
364        let platform_service = Arc::clone(&self.platform_service);
365        let sender = context.sender();
366        let parent_cancel = context.cancel_token();
367        let drain_timeout = self.drain_timeout;
368        let reconnect = self.reconnect.clone();
369        let runtime = Arc::clone(&self.runtime);
370        let mut events = handle.events.clone();
371
372        let stop_token = CancellationToken::new();
373        let stop_token_loop = stop_token.clone();
374        let leadership_handle = handle;
375
376        let task = tokio::spawn(async move {
377            let mut state = DelegateState::Inactive;
378            let mut is_leading = false;
379            let mut delegate_attempts = 0u32;
380            let mut retry_tick = interval(DELEGATE_RETRY_INTERVAL);
381
382            let rctx = ReconcileContext {
383                lock_name: &lock_name,
384                delegate_component: &delegate_component,
385                delegate_uri: &delegate_uri,
386                sender: &sender,
387                parent_cancel: &parent_cancel,
388                drain_timeout,
389                metrics: &metrics,
390                platform_service: &platform_service,
391                runtime: Arc::clone(&runtime),
392            };
393
394            let initial_event = { events.borrow().clone() };
395            if let Some(initial_event) = initial_event {
396                is_leading = matches!(&initial_event, camel_api::LeadershipEvent::StartedLeading);
397                if is_leading {
398                    delegate_attempts = 0;
399                }
400                if let Err(err) = reconcile_event(initial_event, &mut state, &rctx).await {
401                    // log-policy: system-broken
402                    error!(lock = %lock_name, "master delegate error: {err}");
403                    return Err(err);
404                }
405            }
406
407            loop {
408                tokio::select! {
409                    _ = stop_token_loop.cancelled() => {
410                        break;
411                    }
412                    _ = context.cancelled() => {
413                        break;
414                    }
415                    changed = events.changed() => {
416                        if changed.is_err() {
417                            break;
418                        }
419                        let event = { events.borrow().clone() };
420                        if let Some(event) = event {
421                            let was_leading = is_leading;
422                            is_leading = matches!(&event, camel_api::LeadershipEvent::StartedLeading);
423                            if !was_leading && is_leading {
424                                delegate_attempts = 0;
425                            }
426                            if let Err(err) = reconcile_event(event, &mut state, &rctx).await {
427                                // log-policy: system-broken
428                                error!(lock = %lock_name, "master delegate error: {err}");
429                                return Err(err);
430                            }
431                        }
432                    }
433                    _ = retry_tick.tick() => {
434                        if matches!(&state, DelegateState::Active { handle, .. } if handle.is_finished())
435                            && let Err(err) = stop_delegate(&mut state, drain_timeout).await
436                        {
437                            // log-policy: system-broken
438                            error!(lock = %lock_name, "master delegate task failed: {err}");
439                            return Err(err);
440                        }
441
442                        if is_leading && matches!(state, DelegateState::Inactive) {
443                            // Manual retry loop (not retry_async) because:
444                            // - The retry logic is embedded inside a periodic
445                            //   retry_tick.tick() handler; the outer select! runs
446                            //   every DELEGATE_RETRY_INTERVAL regardless, so the
447                            //   delay is applied as an additive sleep on top of
448                            //   the tick interval, not as a replacement for it.
449                            // - reconcile_event() requires &mut state, and the
450                            //   inter-attempt logic checks handle.is_finished()
451                            //   before retrying — both require state access
452                            //   between iterations that retry_async cannot provide.
453                            // - Classifies errors (rc-i1z): permanent → fail-fast,
454                            //   transient → retry with backoff.
455                            // Use NetworkRetryPolicy for bounded retries.
456                            // delegate_attempts tracks the next zero-based attempt index.
457                            if !reconnect.should_retry(delegate_attempts) {
458                                warn!(
459                                    lock = %lock_name,
460                                    attempts = delegate_attempts,
461                                    "delegate start exceeded max attempts, stopping consumer"
462                                );
463                                break;
464                            }
465                            // Apply backoff delay for retries (skip first attempt).
466                            if delegate_attempts > 0 {
467                                let delay = reconnect.delay_for(delegate_attempts - 1);
468                                if delay > DELEGATE_RETRY_INTERVAL {
469                                    tokio::select! {
470                                        _ = stop_token_loop.cancelled() => break,
471                                        _ = tokio::time::sleep(delay.saturating_sub(DELEGATE_RETRY_INTERVAL)) => {}
472                                    }
473                                }
474                            }
475                            delegate_attempts = delegate_attempts.saturating_add(1);
476                            if let Err(err) = reconcile_event(
477                                camel_api::LeadershipEvent::StartedLeading,
478                                &mut state,
479                                &rctx,
480                            )
481                            .await {
482                                if is_retryable_camel_error(&err) {
483                                    // log-policy: system-broken
484                                    error!(
485                                        lock = %lock_name,
486                                        error = %err,
487                                        attempt = delegate_attempts,
488                                        "master delegate transient error, will retry"
489                                    );
490                                    // Don't return — let the next tick attempt retry.
491                                } else {
492                                    // log-policy: system-broken
493                                    error!(
494                                        lock = %lock_name,
495                                        error = %err,
496                                        "master delegate permanent error, terminating"
497                                    );
498                                    return Err(err);
499                                }
500                            }
501                        }
502                    }
503                }
504            }
505
506            stop_delegate(&mut state, drain_timeout).await?;
507            let _ = timeout(drain_timeout, leadership_handle.step_down()).await;
508            Ok::<(), CamelError>(())
509        });
510
511        self.stop_token = Some(stop_token);
512        self.leadership_task = Some(task);
513
514        Ok(())
515    }
516
517    async fn stop(&mut self) -> Result<(), CamelError> {
518        if let Some(token) = self.stop_token.take() {
519            token.cancel();
520        }
521
522        if let Some(handle) = self.leadership_task.take() {
523            if handle.is_finished() {
524                match timeout(self.drain_timeout, handle).await {
525                    Ok(Ok(Ok(()))) => {}
526                    Ok(Ok(Err(err))) => return Err(err),
527                    Ok(Err(e)) => {
528                        return Err(CamelError::ProcessorError(format!(
529                            "leadership task join failed: {e}"
530                        )));
531                    }
532                    Err(_) => {
533                        return Err(CamelError::ProcessorError(
534                            "leadership task join timed out".to_string(),
535                        ));
536                    }
537                }
538                return Ok(());
539            }
540
541            // Abort first so the task is guaranteed to stop; then await with
542            // a timeout as a safety-net in case abort takes a moment to land.
543            handle.abort();
544            match timeout(self.drain_timeout, handle).await {
545                Ok(Ok(Ok(()))) => {}
546                Ok(Ok(Err(err))) => return Err(err),
547                Ok(Err(e)) if e.is_panic() => {
548                    // log-policy: system-broken
549                    error!(lock = %self.lock_name, error = %e, "leadership task panicked");
550                }
551                Ok(Err(e)) => {
552                    warn!(lock = %self.lock_name, error = %e, "leadership task cancelled");
553                }
554                Err(_) => {
555                    warn!("master leadership loop shutdown timed out after abort");
556                }
557            }
558        }
559
560        Ok(())
561    }
562
563    fn background_task_handle(
564        &mut self,
565    ) -> Option<tokio::task::JoinHandle<Result<(), CamelError>>> {
566        self.leadership_task.take()
567    }
568}
569
570#[cfg(test)]
571mod tests {
572    use std::sync::Arc;
573    use std::sync::Mutex;
574    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
575
576    use camel_api::{
577        BoxProcessorExt, Exchange, LeadershipEvent, LeadershipHandle, LeadershipService, Message,
578        NoOpMetrics, NoopPlatformService, NoopReadinessGate, PlatformError, PlatformIdentity,
579        PlatformService, ReadinessGate,
580    };
581    use camel_component_api::NoOpComponentContext;
582    use camel_component_api::test_support::PanicRuntimeObservability;
583    use std::time::Instant;
584    use tokio::sync::{oneshot, watch};
585    use tokio::time::{sleep, timeout};
586    use tokio_util::sync::CancellationToken;
587    use tower::ServiceExt;
588
589    use super::*;
590
591    #[test]
592    fn parse_master_uri_valid() {
593        let cfg = MasterUriConfig::parse("master:mylock:timer:tick?period=250").unwrap();
594        assert_eq!(cfg.lock_name, "mylock");
595        assert_eq!(cfg.delegate_uri, "timer:tick?period=250");
596    }
597
598    #[test]
599    fn parse_master_uri_missing_lockname() {
600        let err = MasterUriConfig::parse("master::timer:tick").unwrap_err();
601        assert!(matches!(err, CamelError::InvalidUri(_)));
602    }
603
604    #[test]
605    fn parse_master_uri_missing_delegate() {
606        let err = MasterUriConfig::parse("master:mylock:").unwrap_err();
607        assert!(matches!(err, CamelError::InvalidUri(_)));
608    }
609
610    #[test]
611    fn endpoint_fails_when_delegate_component_missing() {
612        let master = MasterComponent::default();
613        let result =
614            master.create_endpoint("master:lock-1:missing:delegate", &NoOpComponentContext);
615        assert!(matches!(result, Err(CamelError::ComponentNotFound(_))));
616    }
617
618    #[test]
619    fn delegate_scheme_is_parsed_from_delegate_uri() {
620        let seen_scheme = Arc::new(AtomicBool::new(false));
621
622        struct SchemeAwareContext {
623            delegate: Arc<dyn Component>,
624            seen_scheme: Arc<AtomicBool>,
625        }
626
627        impl ComponentContext for SchemeAwareContext {
628            fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
629                if scheme == "mock" {
630                    self.seen_scheme.store(true, Ordering::SeqCst);
631                    Some(Arc::clone(&self.delegate))
632                } else {
633                    None
634                }
635            }
636
637            fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
638                None
639            }
640
641            fn metrics(&self) -> Arc<dyn MetricsCollector> {
642                Arc::new(NoOpMetrics)
643            }
644
645            fn platform_service(&self) -> Arc<dyn PlatformService> {
646                Arc::new(NoopPlatformService::default())
647            }
648
649            fn register_route_health_check(
650                &self,
651                _route_id: &str,
652                _check: Arc<dyn camel_api::AsyncHealthCheck>,
653            ) {
654            }
655
656            fn unregister_route_health_check(&self, _route_id: &str) {}
657        }
658
659        struct MockDelegateComponent;
660
661        impl Component for MockDelegateComponent {
662            fn scheme(&self) -> &str {
663                "mock"
664            }
665
666            fn create_endpoint(
667                &self,
668                _uri: &str,
669                _ctx: &dyn ComponentContext,
670            ) -> Result<Box<dyn Endpoint>, CamelError> {
671                Ok(Box::new(MockDelegateEndpoint))
672            }
673        }
674
675        struct MockDelegateEndpoint;
676
677        impl Endpoint for MockDelegateEndpoint {
678            fn uri(&self) -> &str {
679                "mock:delegate"
680            }
681
682            fn create_consumer(
683                &self,
684                _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
685            ) -> Result<Box<dyn Consumer>, CamelError> {
686                Err(CamelError::EndpointCreationFailed("not used".to_string()))
687            }
688
689            fn create_producer(
690                &self,
691                _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
692                _ctx: &ProducerContext,
693            ) -> Result<BoxProcessor, CamelError> {
694                Err(CamelError::EndpointCreationFailed("not used".to_string()))
695            }
696        }
697
698        let delegate = Arc::new(MockDelegateComponent);
699        let ctx = SchemeAwareContext {
700            delegate,
701            seen_scheme: Arc::clone(&seen_scheme),
702        };
703
704        let master = MasterComponent::default();
705        let endpoint = master
706            .create_endpoint("master:mylock:mock:delegate?x=1", &ctx)
707            .unwrap();
708
709        assert_eq!(endpoint.uri(), "master:mylock:mock:delegate?x=1");
710        assert!(seen_scheme.load(Ordering::SeqCst));
711    }
712
713    struct MockDelegateContext {
714        delegate: Arc<dyn Component>,
715    }
716
717    impl ComponentContext for MockDelegateContext {
718        fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
719            if self.delegate.scheme() == scheme {
720                Some(Arc::clone(&self.delegate))
721            } else {
722                None
723            }
724        }
725
726        fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
727            None
728        }
729
730        fn metrics(&self) -> Arc<dyn MetricsCollector> {
731            Arc::new(NoOpMetrics)
732        }
733
734        fn platform_service(&self) -> Arc<dyn PlatformService> {
735            Arc::new(NoopPlatformService::default())
736        }
737
738        fn register_route_health_check(
739            &self,
740            _route_id: &str,
741            _check: Arc<dyn camel_api::AsyncHealthCheck>,
742        ) {
743        }
744
745        fn unregister_route_health_check(&self, _route_id: &str) {}
746    }
747
748    struct MockProducerDelegateComponent {
749        create_endpoint_calls: Arc<AtomicUsize>,
750        create_producer_calls: Arc<AtomicUsize>,
751        fail_producer: bool,
752    }
753
754    impl Component for MockProducerDelegateComponent {
755        fn scheme(&self) -> &str {
756            "mock"
757        }
758
759        fn create_endpoint(
760            &self,
761            _uri: &str,
762            _ctx: &dyn ComponentContext,
763        ) -> Result<Box<dyn Endpoint>, CamelError> {
764            self.create_endpoint_calls.fetch_add(1, Ordering::SeqCst);
765            Ok(Box::new(MockProducerDelegateEndpoint {
766                create_producer_calls: Arc::clone(&self.create_producer_calls),
767                fail_producer: self.fail_producer,
768            }))
769        }
770    }
771
772    struct MockProducerDelegateEndpoint {
773        create_producer_calls: Arc<AtomicUsize>,
774        fail_producer: bool,
775    }
776
777    impl Endpoint for MockProducerDelegateEndpoint {
778        fn uri(&self) -> &str {
779            "mock:delegate"
780        }
781
782        fn create_consumer(
783            &self,
784            _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
785        ) -> Result<Box<dyn Consumer>, CamelError> {
786            Err(CamelError::EndpointCreationFailed(
787                "not used in test".to_string(),
788            ))
789        }
790
791        fn create_producer(
792            &self,
793            _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
794            _ctx: &ProducerContext,
795        ) -> Result<BoxProcessor, CamelError> {
796            self.create_producer_calls.fetch_add(1, Ordering::SeqCst);
797            if self.fail_producer {
798                return Err(CamelError::ProcessorError(
799                    "delegate producer failed".to_string(),
800                ));
801            }
802            Ok(BoxProcessor::from_fn(
803                |exchange| async move { Ok(exchange) },
804            ))
805        }
806    }
807
808    #[tokio::test]
809    async fn producer_passthrough_delegates_and_produces() {
810        let endpoint_calls = Arc::new(AtomicUsize::new(0));
811        let producer_calls = Arc::new(AtomicUsize::new(0));
812        let delegate = Arc::new(MockProducerDelegateComponent {
813            create_endpoint_calls: Arc::clone(&endpoint_calls),
814            create_producer_calls: Arc::clone(&producer_calls),
815            fail_producer: false,
816        });
817
818        let ctx = MockDelegateContext {
819            delegate: delegate.clone(),
820        };
821
822        let master = MasterComponent::default();
823        let endpoint = master
824            .create_endpoint("master:lock-1:mock:delegate", &ctx)
825            .unwrap();
826        let producer_ctx = ProducerContext::new();
827        let producer = endpoint
828            .create_producer(
829                Arc::new(PanicRuntimeObservability)
830                    as Arc<dyn camel_component_api::RuntimeObservability>,
831                &producer_ctx,
832            )
833            .unwrap();
834
835        let exchange = Exchange::new(Message::new("ok"));
836        let result = producer.oneshot(exchange).await.unwrap();
837
838        assert_eq!(result.input.body.as_text(), Some("ok"));
839        assert_eq!(endpoint_calls.load(Ordering::SeqCst), 1);
840        assert_eq!(producer_calls.load(Ordering::SeqCst), 1);
841    }
842
843    #[test]
844    fn producer_passthrough_bubbles_delegate_errors() {
845        let endpoint_calls = Arc::new(AtomicUsize::new(0));
846        let producer_calls = Arc::new(AtomicUsize::new(0));
847        let delegate = Arc::new(MockProducerDelegateComponent {
848            create_endpoint_calls: Arc::clone(&endpoint_calls),
849            create_producer_calls: Arc::clone(&producer_calls),
850            fail_producer: true,
851        });
852
853        let ctx = MockDelegateContext {
854            delegate: delegate.clone(),
855        };
856
857        let master = MasterComponent::default();
858        let endpoint = master
859            .create_endpoint("master:lock-1:mock:delegate", &ctx)
860            .unwrap();
861        let producer_ctx = ProducerContext::new();
862        let err = endpoint
863            .create_producer(
864                Arc::new(PanicRuntimeObservability)
865                    as Arc<dyn camel_component_api::RuntimeObservability>,
866                &producer_ctx,
867            )
868            .unwrap_err();
869
870        assert!(matches!(err, CamelError::ProcessorError(_)));
871        assert_eq!(endpoint_calls.load(Ordering::SeqCst), 1);
872        assert_eq!(producer_calls.load(Ordering::SeqCst), 1);
873    }
874
875    struct FakeLeadershipService {
876        tx: Mutex<Option<watch::Sender<Option<LeadershipEvent>>>>,
877        is_leader: Arc<AtomicBool>,
878        initial: Option<LeadershipEvent>,
879    }
880
881    impl FakeLeadershipService {
882        fn new(initial: Option<LeadershipEvent>) -> Self {
883            let starts_as_leader = matches!(initial, Some(LeadershipEvent::StartedLeading));
884            Self {
885                tx: Mutex::new(None),
886                is_leader: Arc::new(AtomicBool::new(starts_as_leader)),
887                initial,
888            }
889        }
890
891        async fn emit(&self, event: LeadershipEvent) {
892            self.is_leader.store(
893                matches!(event, LeadershipEvent::StartedLeading),
894                Ordering::Release,
895            );
896            if let Some(tx) = self
897                .tx
898                .lock()
899                .expect("mutex poisoned: fake elector sender")
900                .as_ref()
901            {
902                let _ = tx.send(Some(event));
903            }
904        }
905    }
906
907    #[async_trait]
908    impl LeadershipService for FakeLeadershipService {
909        async fn start(&self, _lock_name: &str) -> Result<LeadershipHandle, PlatformError> {
910            let (tx, rx) = watch::channel(self.initial.clone());
911            *self.tx.lock().expect("mutex poisoned: fake elector sender") = Some(tx);
912
913            let cancel = CancellationToken::new();
914            let cancel_wait = cancel.clone();
915            let (term_tx, term_rx) = oneshot::channel();
916            tokio::spawn(async move {
917                cancel_wait.cancelled().await;
918                let _ = term_tx.send(());
919            });
920
921            Ok(LeadershipHandle::new(
922                rx,
923                Arc::clone(&self.is_leader),
924                cancel,
925                term_rx,
926            ))
927        }
928    }
929
930    struct FakePlatformService {
931        identity: PlatformIdentity,
932        readiness_gate: Arc<dyn ReadinessGate>,
933        leadership: Arc<dyn LeadershipService>,
934    }
935
936    impl FakePlatformService {
937        fn new(leadership: Arc<dyn LeadershipService>) -> Self {
938            Self {
939                identity: PlatformIdentity::local("master-tests"),
940                readiness_gate: Arc::new(NoopReadinessGate),
941                leadership,
942            }
943        }
944    }
945
946    impl PlatformService for FakePlatformService {
947        fn identity(&self) -> PlatformIdentity {
948            self.identity.clone()
949        }
950
951        fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
952            Arc::clone(&self.readiness_gate)
953        }
954
955        fn leadership(&self) -> Arc<dyn LeadershipService> {
956            Arc::clone(&self.leadership)
957        }
958    }
959
960    struct FakeDelegateComponent {
961        create_consumer_calls: Arc<AtomicUsize>,
962        start_calls: Arc<AtomicUsize>,
963    }
964
965    impl Component for FakeDelegateComponent {
966        fn scheme(&self) -> &str {
967            "fake"
968        }
969
970        fn create_endpoint(
971            &self,
972            _uri: &str,
973            _ctx: &dyn ComponentContext,
974        ) -> Result<Box<dyn Endpoint>, CamelError> {
975            Ok(Box::new(FakeDelegateEndpoint {
976                create_consumer_calls: Arc::clone(&self.create_consumer_calls),
977                start_calls: Arc::clone(&self.start_calls),
978            }))
979        }
980    }
981
982    struct FakeDelegateEndpoint {
983        create_consumer_calls: Arc<AtomicUsize>,
984        start_calls: Arc<AtomicUsize>,
985    }
986
987    impl Endpoint for FakeDelegateEndpoint {
988        fn uri(&self) -> &str {
989            "fake:delegate"
990        }
991
992        fn create_consumer(
993            &self,
994            _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
995        ) -> Result<Box<dyn Consumer>, CamelError> {
996            let epoch = self.create_consumer_calls.fetch_add(1, Ordering::SeqCst) + 1;
997            Ok(Box::new(FakeDelegateConsumer {
998                epoch,
999                start_calls: Arc::clone(&self.start_calls),
1000            }))
1001        }
1002
1003        fn create_producer(
1004            &self,
1005            _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
1006            _ctx: &ProducerContext,
1007        ) -> Result<BoxProcessor, CamelError> {
1008            Err(CamelError::EndpointCreationFailed("not used".to_string()))
1009        }
1010    }
1011
1012    struct FakeDelegateConsumer {
1013        epoch: usize,
1014        start_calls: Arc<AtomicUsize>,
1015    }
1016
1017    struct FailingDelegateComponent {
1018        create_endpoint_calls: Arc<AtomicUsize>,
1019    }
1020
1021    impl Component for FailingDelegateComponent {
1022        fn scheme(&self) -> &str {
1023            "failing"
1024        }
1025
1026        fn create_endpoint(
1027            &self,
1028            _uri: &str,
1029            _ctx: &dyn ComponentContext,
1030        ) -> Result<Box<dyn Endpoint>, CamelError> {
1031            self.create_endpoint_calls.fetch_add(1, Ordering::SeqCst);
1032            Err(CamelError::EndpointCreationFailed(
1033                "delegate endpoint creation failed".to_string(),
1034            ))
1035        }
1036    }
1037
1038    #[async_trait]
1039    impl Consumer for FakeDelegateConsumer {
1040        async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1041            self.start_calls.fetch_add(1, Ordering::SeqCst);
1042            context
1043                .send(Exchange::new(Message::new(format!("epoch-{}", self.epoch))))
1044                .await?;
1045
1046            loop {
1047                tokio::select! {
1048                    _ = context.cancelled() => {
1049                        break;
1050                    }
1051                    _ = sleep(Duration::from_millis(20)) => {
1052                        context
1053                            .send(Exchange::new(Message::new(format!("epoch-{}", self.epoch))))
1054                            .await?;
1055                    }
1056                }
1057            }
1058
1059            Ok(())
1060        }
1061
1062        async fn stop(&mut self) -> Result<(), CamelError> {
1063            Ok(())
1064        }
1065    }
1066
1067    fn build_master_consumer(
1068        platform_service: Arc<dyn PlatformService>,
1069        create_consumer_calls: Arc<AtomicUsize>,
1070        start_calls: Arc<AtomicUsize>,
1071        delegate_retry_max_attempts: Option<u32>,
1072    ) -> MasterConsumer {
1073        let reconnect = match delegate_retry_max_attempts {
1074            Some(max) => NetworkRetryPolicy {
1075                max_attempts: max,
1076                ..NetworkRetryPolicy::default()
1077            },
1078            None => NetworkRetryPolicy {
1079                max_attempts: 0,
1080                ..NetworkRetryPolicy::default()
1081            },
1082        };
1083        MasterConsumer::new(
1084            "lock-a".to_string(),
1085            "fake:delegate".to_string(),
1086            Arc::new(FakeDelegateComponent {
1087                create_consumer_calls,
1088                start_calls,
1089            }),
1090            Arc::new(NoOpMetrics),
1091            platform_service,
1092            Duration::from_millis(500),
1093            reconnect,
1094            Arc::new(PanicRuntimeObservability)
1095                as Arc<dyn camel_component_api::RuntimeObservability>,
1096        )
1097    }
1098
1099    #[tokio::test]
1100    async fn starts_delegate_only_after_started_leading() {
1101        let leadership = Arc::new(FakeLeadershipService::new(None));
1102        let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
1103        let create_consumer_calls = Arc::new(AtomicUsize::new(0));
1104        let start_calls = Arc::new(AtomicUsize::new(0));
1105        let mut master = build_master_consumer(
1106            platform_service,
1107            Arc::clone(&create_consumer_calls),
1108            Arc::clone(&start_calls),
1109            Some(30),
1110        );
1111
1112        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1113        let cancel = CancellationToken::new();
1114        let ctx = ConsumerContext::new(tx, cancel.clone());
1115
1116        master.start(ctx).await.unwrap();
1117
1118        sleep(Duration::from_millis(80)).await;
1119        assert!(rx.try_recv().is_err());
1120        assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 0);
1121
1122        leadership.emit(LeadershipEvent::StartedLeading).await;
1123
1124        let first = timeout(Duration::from_millis(500), rx.recv())
1125            .await
1126            .unwrap()
1127            .unwrap();
1128        assert_eq!(first.exchange.input.body.as_text(), Some("epoch-1"));
1129        assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 1);
1130        assert_eq!(start_calls.load(Ordering::SeqCst), 1);
1131
1132        cancel.cancel();
1133        master.stop().await.unwrap();
1134    }
1135
1136    #[tokio::test]
1137    async fn stops_delegate_on_stopped_leading() {
1138        let leadership = Arc::new(FakeLeadershipService::new(None));
1139        let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
1140        let create_consumer_calls = Arc::new(AtomicUsize::new(0));
1141        let start_calls = Arc::new(AtomicUsize::new(0));
1142        let mut master = build_master_consumer(
1143            platform_service,
1144            Arc::clone(&create_consumer_calls),
1145            Arc::clone(&start_calls),
1146            Some(30),
1147        );
1148
1149        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1150        let cancel = CancellationToken::new();
1151        let ctx = ConsumerContext::new(tx, cancel.clone());
1152
1153        master.start(ctx).await.unwrap();
1154        leadership.emit(LeadershipEvent::StartedLeading).await;
1155        let _ = timeout(Duration::from_millis(500), rx.recv())
1156            .await
1157            .unwrap()
1158            .unwrap();
1159
1160        leadership.emit(LeadershipEvent::StoppedLeading).await;
1161        sleep(Duration::from_millis(100)).await;
1162        while rx.try_recv().is_ok() {}
1163        assert!(
1164            timeout(Duration::from_millis(120), rx.recv())
1165                .await
1166                .is_err()
1167        );
1168
1169        cancel.cancel();
1170        master.stop().await.unwrap();
1171    }
1172
1173    #[tokio::test]
1174    async fn recreates_delegate_on_new_leadership_epoch() {
1175        let leadership = Arc::new(FakeLeadershipService::new(None));
1176        let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
1177        let create_consumer_calls = Arc::new(AtomicUsize::new(0));
1178        let start_calls = Arc::new(AtomicUsize::new(0));
1179        let mut master = build_master_consumer(
1180            platform_service,
1181            Arc::clone(&create_consumer_calls),
1182            Arc::clone(&start_calls),
1183            Some(30),
1184        );
1185
1186        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1187        let cancel = CancellationToken::new();
1188        let ctx = ConsumerContext::new(tx, cancel.clone());
1189
1190        master.start(ctx).await.unwrap();
1191
1192        leadership.emit(LeadershipEvent::StartedLeading).await;
1193        let first = timeout(Duration::from_millis(500), rx.recv())
1194            .await
1195            .unwrap()
1196            .unwrap();
1197        assert_eq!(first.exchange.input.body.as_text(), Some("epoch-1"));
1198
1199        leadership.emit(LeadershipEvent::StoppedLeading).await;
1200        sleep(Duration::from_millis(120)).await;
1201
1202        leadership.emit(LeadershipEvent::StartedLeading).await;
1203        let second = timeout(Duration::from_millis(500), rx.recv())
1204            .await
1205            .unwrap()
1206            .unwrap();
1207        assert_eq!(second.exchange.input.body.as_text(), Some("epoch-2"));
1208
1209        assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 2);
1210        assert_eq!(start_calls.load(Ordering::SeqCst), 2);
1211
1212        cancel.cancel();
1213        master.stop().await.unwrap();
1214    }
1215
1216    // ── rc-i1z test infrastructure ──────────────────────────────────────
1217
1218    /// Delegate component that returns errors from create_endpoint or create_consumer.
1219    /// Configurable: which error to return, and after how many successful calls
1220    /// to stop failing.
1221    struct ErrorDelegateComponent {
1222        create_endpoint_calls: Arc<AtomicUsize>,
1223        create_consumer_calls: Arc<AtomicUsize>,
1224        endpoint_error: Option<CamelError>,
1225        consumer_error_after: usize, // fail start() this many times, then succeed
1226        consumer_error: Option<CamelError>,
1227    }
1228
1229    impl Component for ErrorDelegateComponent {
1230        fn scheme(&self) -> &str {
1231            "errdelegate"
1232        }
1233
1234        fn create_endpoint(
1235            &self,
1236            _uri: &str,
1237            _ctx: &dyn ComponentContext,
1238        ) -> Result<Box<dyn Endpoint>, CamelError> {
1239            self.create_endpoint_calls.fetch_add(1, Ordering::SeqCst);
1240            if let Some(ref err) = self.endpoint_error {
1241                return Err(err.clone());
1242            }
1243            Ok(Box::new(ErrorDelegateEndpoint {
1244                create_consumer_calls: Arc::clone(&self.create_consumer_calls),
1245                consumer_error_after: self.consumer_error_after,
1246                consumer_error: self.consumer_error.clone(),
1247            }))
1248        }
1249    }
1250
1251    struct ErrorDelegateEndpoint {
1252        create_consumer_calls: Arc<AtomicUsize>,
1253        consumer_error_after: usize,
1254        consumer_error: Option<CamelError>,
1255    }
1256
1257    impl Endpoint for ErrorDelegateEndpoint {
1258        fn uri(&self) -> &str {
1259            "errdelegate:delegate"
1260        }
1261
1262        fn create_consumer(
1263            &self,
1264            _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
1265        ) -> Result<Box<dyn Consumer>, CamelError> {
1266            let call_idx = self.create_consumer_calls.fetch_add(1, Ordering::SeqCst) + 1;
1267            if call_idx <= self.consumer_error_after {
1268                return Err(self
1269                    .consumer_error
1270                    .clone()
1271                    .unwrap_or_else(|| CamelError::ProcessorError("default error".to_string())));
1272            }
1273            Ok(Box::new(SuccessDelegateConsumer))
1274        }
1275
1276        fn create_producer(
1277            &self,
1278            _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
1279            _ctx: &ProducerContext,
1280        ) -> Result<BoxProcessor, CamelError> {
1281            Err(CamelError::EndpointCreationFailed("not used".to_string()))
1282        }
1283    }
1284
1285    /// A delegate consumer that starts, sends one message, then cancels.
1286    struct SuccessDelegateConsumer;
1287
1288    #[async_trait]
1289    impl Consumer for SuccessDelegateConsumer {
1290        async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1291            context.send(Exchange::new(Message::new("ok"))).await?;
1292            context.cancelled().await;
1293            Ok(())
1294        }
1295
1296        async fn stop(&mut self) -> Result<(), CamelError> {
1297            Ok(())
1298        }
1299    }
1300
1301    fn build_error_delegate_master(
1302        platform_service: Arc<dyn PlatformService>,
1303        create_endpoint_calls: Arc<AtomicUsize>,
1304        create_consumer_calls: Arc<AtomicUsize>,
1305        endpoint_error: Option<CamelError>,
1306        consumer_error_after: usize,
1307        consumer_error: Option<CamelError>,
1308        max_attempts: u32,
1309    ) -> MasterConsumer {
1310        let reconnect = NetworkRetryPolicy {
1311            max_attempts,
1312            initial_delay: Duration::from_millis(1),
1313            max_delay: Duration::from_millis(5),
1314            multiplier: 1.0,
1315            ..NetworkRetryPolicy::default()
1316        };
1317        MasterConsumer::new(
1318            "lock-err".to_string(),
1319            "errdelegate:delegate".to_string(),
1320            Arc::new(ErrorDelegateComponent {
1321                create_endpoint_calls,
1322                create_consumer_calls,
1323                endpoint_error,
1324                consumer_error_after,
1325                consumer_error,
1326            }),
1327            Arc::new(NoOpMetrics),
1328            platform_service,
1329            Duration::from_millis(500),
1330            reconnect,
1331            Arc::new(PanicRuntimeObservability)
1332                as Arc<dyn camel_component_api::RuntimeObservability>,
1333        )
1334    }
1335
1336    #[tokio::test]
1337    async fn delegate_permanent_error_terminates_master_without_retry() {
1338        let leadership = Arc::new(FakeLeadershipService::new(Some(
1339            LeadershipEvent::StartedLeading,
1340        )));
1341        let platform_service = Arc::new(FakePlatformService::new(leadership));
1342        let create_endpoint_calls = Arc::new(AtomicUsize::new(0));
1343        let create_consumer_calls = Arc::new(AtomicUsize::new(0));
1344
1345        // Delegate that fails create_endpoint with a permanent error.
1346        // Use max_attempts=0 (unlimited) — without classification, this
1347        // would hang forever. With classification, the task must terminate
1348        // in milliseconds via fail-fast.
1349        let mut master = build_error_delegate_master(
1350            platform_service,
1351            Arc::clone(&create_endpoint_calls),
1352            Arc::clone(&create_consumer_calls),
1353            Some(CamelError::Config("permanent delegate error".to_string())),
1354            0, // consumer never succeeds (we never get there)
1355            None,
1356            0, // max_attempts=0 → unlimited — classification is the terminator
1357        );
1358
1359        let (tx, _rx) = tokio::sync::mpsc::channel(16);
1360        let cancel = CancellationToken::new();
1361        let ctx = ConsumerContext::new(tx, cancel.clone());
1362
1363        master.start(ctx).await.unwrap();
1364
1365        // Poll for task completion with a short timeout. A permanent error
1366        // must terminate the task in milliseconds via fail-fast classification,
1367        // NOT via retry-budget exhaustion.
1368        let task_finished = timeout(Duration::from_millis(500), async {
1369            loop {
1370                if master
1371                    .leadership_task
1372                    .as_ref()
1373                    .is_some_and(tokio::task::JoinHandle::is_finished)
1374                {
1375                    break;
1376                }
1377                sleep(Duration::from_millis(5)).await;
1378            }
1379        })
1380        .await;
1381
1382        assert!(
1383            task_finished.is_ok(),
1384            "master should terminate within 500ms via fail-fast classification"
1385        );
1386
1387        // Verify single invocation (true fail-fast, not budget exhaustion).
1388        assert_eq!(
1389            create_endpoint_calls.load(Ordering::SeqCst),
1390            1,
1391            "permanent error must terminate master after exactly 1 invocation"
1392        );
1393
1394        // stop() propagates the delegate error; that's correct behavior
1395        let _ = master.stop().await;
1396
1397        cancel.cancel();
1398    }
1399
1400    #[tokio::test]
1401    async fn delegate_transient_error_retries_and_eventually_succeeds() {
1402        let leadership = Arc::new(FakeLeadershipService::new(Some(
1403            LeadershipEvent::StartedLeading,
1404        )));
1405        let platform_service = Arc::new(FakePlatformService::new(leadership));
1406        let create_endpoint_calls = Arc::new(AtomicUsize::new(0));
1407        let create_consumer_calls = Arc::new(AtomicUsize::new(0));
1408
1409        // Delegate that fails create_consumer with transient error for
1410        // the first 2 attempts, then succeeds on the 3rd.
1411        let mut master = build_error_delegate_master(
1412            platform_service,
1413            Arc::clone(&create_endpoint_calls),
1414            Arc::clone(&create_consumer_calls),
1415            None, // endpoint always succeeds
1416            2,    // fail first 2 create_consumer calls
1417            Some(CamelError::Io("connection refused".to_string())),
1418            5, // max_attempts
1419        );
1420
1421        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1422        let cancel = CancellationToken::new();
1423        let ctx = ConsumerContext::new(tx, cancel.clone());
1424
1425        master.start(ctx).await.unwrap();
1426
1427        // Wait for the delegate to eventually succeed.
1428        let msg = timeout(Duration::from_secs(2), rx.recv())
1429            .await
1430            .unwrap()
1431            .unwrap();
1432        assert_eq!(msg.exchange.input.body.as_text(), Some("ok"));
1433
1434        // Endpoint created 3 times (initial event + 2 retry ticks), consumer
1435        // created 3 times (2 failures + 1 success).
1436        assert_eq!(create_endpoint_calls.load(Ordering::SeqCst), 3);
1437        assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 3);
1438
1439        cancel.cancel();
1440        master.stop().await.unwrap();
1441    }
1442
1443    // ── Existing regression tests (rc-f9k) ──────────────────────────────
1444
1445    #[tokio::test]
1446    async fn stops_retrying_delegate_start_after_max_attempts() {
1447        let leadership = Arc::new(FakeLeadershipService::new(Some(
1448            LeadershipEvent::StartedLeading,
1449        )));
1450        let platform_service = Arc::new(FakePlatformService::new(leadership));
1451        let create_endpoint_calls = Arc::new(AtomicUsize::new(0));
1452
1453        let mut master = MasterConsumer::new(
1454            "lock-a".to_string(),
1455            "failing:delegate".to_string(),
1456            Arc::new(FailingDelegateComponent {
1457                create_endpoint_calls: Arc::clone(&create_endpoint_calls),
1458            }),
1459            Arc::new(NoOpMetrics),
1460            platform_service,
1461            Duration::from_millis(500),
1462            NetworkRetryPolicy {
1463                max_attempts: 1,
1464                ..NetworkRetryPolicy::default()
1465            },
1466            Arc::new(PanicRuntimeObservability)
1467                as Arc<dyn camel_component_api::RuntimeObservability>,
1468        );
1469
1470        let (tx, _rx) = tokio::sync::mpsc::channel(16);
1471        let cancel = CancellationToken::new();
1472        let ctx = ConsumerContext::new(tx, cancel.clone());
1473
1474        master.start(ctx).await.unwrap();
1475        sleep(Duration::from_millis(750)).await;
1476
1477        // With error classification (rc-i1z), EndpointCreationFailed is
1478        // permanent → fail-fast after exactly 1 invocation. Previously
1479        // (pre-rc-i1z) this would have been 2 calls (initial + retry via
1480        // budget exhaustion).
1481        assert_eq!(create_endpoint_calls.load(Ordering::SeqCst), 1);
1482
1483        cancel.cancel();
1484        let _ = master.stop().await;
1485    }
1486
1487    /// Regression test for MST-002: stop() must abort the leadership JoinHandle
1488    /// instead of just dropping it when the task is slow to drain.
1489    /// Without the fix, stop() blocks for the full drain_timeout (~500 ms)
1490    /// because the leadership task is stuck in stop_delegate awaiting a
1491    /// slow delegate. With abort-first, stop() returns almost instantly.
1492    #[tokio::test]
1493    async fn stop_completes_quickly_when_leadership_task_is_slow() {
1494        // Delegate consumer that ignores its cancellation token and blocks.
1495        struct SlowStoppingConsumer;
1496
1497        #[async_trait]
1498        impl Consumer for SlowStoppingConsumer {
1499            async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
1500                ctx.send(Exchange::new(Message::new("slow-start")))
1501                    .await
1502                    .ok();
1503                // Ignore cancellation — sleep far beyond the drain timeout.
1504                sleep(Duration::from_secs(60)).await;
1505                Ok(())
1506            }
1507
1508            async fn stop(&mut self) -> Result<(), CamelError> {
1509                Ok(())
1510            }
1511        }
1512
1513        struct SlowStoppingComponent;
1514
1515        impl Component for SlowStoppingComponent {
1516            fn scheme(&self) -> &str {
1517                "slow"
1518            }
1519
1520            fn create_endpoint(
1521                &self,
1522                _uri: &str,
1523                _ctx: &dyn ComponentContext,
1524            ) -> Result<Box<dyn Endpoint>, CamelError> {
1525                Ok(Box::new(SlowStoppingEndpoint))
1526            }
1527        }
1528
1529        struct SlowStoppingEndpoint;
1530
1531        impl Endpoint for SlowStoppingEndpoint {
1532            fn uri(&self) -> &str {
1533                "slow:delegate"
1534            }
1535
1536            fn create_consumer(
1537                &self,
1538                _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
1539            ) -> Result<Box<dyn Consumer>, CamelError> {
1540                Ok(Box::new(SlowStoppingConsumer))
1541            }
1542
1543            fn create_producer(
1544                &self,
1545                _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
1546                _ctx: &ProducerContext,
1547            ) -> Result<BoxProcessor, CamelError> {
1548                Err(CamelError::EndpointCreationFailed("not used".into()))
1549            }
1550        }
1551
1552        let leadership = Arc::new(FakeLeadershipService::new(Some(
1553            LeadershipEvent::StartedLeading,
1554        )));
1555        let platform_service = Arc::new(FakePlatformService::new(leadership));
1556
1557        let mut master = MasterConsumer::new(
1558            "lock-slow".into(),
1559            "slow:delegate".into(),
1560            Arc::new(SlowStoppingComponent),
1561            Arc::new(NoOpMetrics),
1562            platform_service,
1563            Duration::from_millis(500), // drain_timeout
1564            NetworkRetryPolicy {
1565                max_attempts: 30,
1566                ..NetworkRetryPolicy::default()
1567            },
1568            Arc::new(PanicRuntimeObservability)
1569                as Arc<dyn camel_component_api::RuntimeObservability>,
1570        );
1571
1572        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1573        let cancel = CancellationToken::new();
1574        let ctx = ConsumerContext::new(tx, cancel.clone());
1575
1576        master.start(ctx).await.unwrap();
1577
1578        // Wait for the delegate to actually start.
1579        let msg = timeout(Duration::from_secs(2), rx.recv())
1580            .await
1581            .unwrap()
1582            .unwrap();
1583        assert_eq!(msg.exchange.input.body.as_text(), Some("slow-start"));
1584
1585        // stop() must complete quickly because the leadership task is aborted,
1586        // not just timed-out and leaked.
1587        let start = Instant::now();
1588        master.stop().await.unwrap();
1589        let elapsed = start.elapsed();
1590
1591        // With abort-first: ~0 ms. Without the fix: ~drain_timeout (500 ms).
1592        // Assert < 250 ms to reliably distinguish the two behaviours.
1593        assert!(
1594            elapsed < Duration::from_millis(250),
1595            "stop() took {:?}, expected < 250 ms (abort should be near-instant)",
1596            elapsed,
1597        );
1598
1599        cancel.cancel();
1600    }
1601
1602    #[tokio::test]
1603    async fn stop_propagates_delegate_start_error() {
1604        struct FailingStartConsumer;
1605
1606        #[async_trait]
1607        impl Consumer for FailingStartConsumer {
1608            async fn start(&mut self, _ctx: ConsumerContext) -> Result<(), CamelError> {
1609                Err(CamelError::ProcessorError(
1610                    "delegate start failed".to_string(),
1611                ))
1612            }
1613
1614            async fn stop(&mut self) -> Result<(), CamelError> {
1615                Ok(())
1616            }
1617        }
1618
1619        struct FailingStartComponent;
1620
1621        impl Component for FailingStartComponent {
1622            fn scheme(&self) -> &str {
1623                "failstart"
1624            }
1625
1626            fn create_endpoint(
1627                &self,
1628                _uri: &str,
1629                _ctx: &dyn ComponentContext,
1630            ) -> Result<Box<dyn Endpoint>, CamelError> {
1631                Ok(Box::new(FailingStartEndpoint))
1632            }
1633        }
1634
1635        struct FailingStartEndpoint;
1636
1637        impl Endpoint for FailingStartEndpoint {
1638            fn uri(&self) -> &str {
1639                "failstart:delegate"
1640            }
1641
1642            fn create_consumer(
1643                &self,
1644                _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
1645            ) -> Result<Box<dyn Consumer>, CamelError> {
1646                Ok(Box::new(FailingStartConsumer))
1647            }
1648
1649            fn create_producer(
1650                &self,
1651                _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
1652                _ctx: &ProducerContext,
1653            ) -> Result<BoxProcessor, CamelError> {
1654                Err(CamelError::EndpointCreationFailed("not used".into()))
1655            }
1656        }
1657
1658        let leadership = Arc::new(FakeLeadershipService::new(Some(
1659            LeadershipEvent::StartedLeading,
1660        )));
1661        let platform_service = Arc::new(FakePlatformService::new(leadership));
1662
1663        let mut master = MasterConsumer::new(
1664            "lock-error".into(),
1665            "failstart:delegate".into(),
1666            Arc::new(FailingStartComponent),
1667            Arc::new(NoOpMetrics),
1668            platform_service,
1669            Duration::from_millis(500),
1670            NetworkRetryPolicy {
1671                max_attempts: 30,
1672                ..NetworkRetryPolicy::default()
1673            },
1674            Arc::new(PanicRuntimeObservability)
1675                as Arc<dyn camel_component_api::RuntimeObservability>,
1676        );
1677
1678        let (tx, _rx) = tokio::sync::mpsc::channel(16);
1679        let cancel = CancellationToken::new();
1680        let ctx = ConsumerContext::new(tx, cancel.clone());
1681
1682        master.start(ctx).await.unwrap();
1683        sleep(Duration::from_millis(250)).await;
1684        assert!(
1685            master
1686                .leadership_task
1687                .as_ref()
1688                .is_some_and(tokio::task::JoinHandle::is_finished),
1689            "leadership task should finish after delegate error"
1690        );
1691        let err = master.stop().await.expect_err("expected delegate error");
1692        assert!(err.to_string().contains("delegate start failed"));
1693
1694        cancel.cancel();
1695    }
1696}