Skip to main content

camel_master/
lib.rs

1//! Master/leader-election component — wraps a delegate consumer with distributed lock-based leadership coordination.
2//!
3//! Main types: `MasterComponent`, `MasterBundle`.
4//! Main modules: `bundle`, `config`.
5
6pub mod bundle;
7pub mod config;
8
9pub use bundle::MasterBundle;
10
11use std::sync::Arc;
12use std::time::Duration;
13
14use async_trait::async_trait;
15use camel_api::{CamelError, MetricsCollector, PlatformService};
16use camel_component_api::{
17    BoxProcessor, Component, ComponentContext, Consumer, ConsumerContext, Endpoint,
18    ExchangeEnvelope, ProducerContext, parse_uri,
19};
20use camel_language_api::Language;
21use tokio::task::JoinHandle;
22use tokio::time::{interval, timeout};
23use tokio_util::sync::CancellationToken;
24use tracing::{error, info, warn};
25
26use crate::config::{MasterComponentConfig, MasterUriConfig};
27
28const DELEGATE_RETRY_INTERVAL: Duration = Duration::from_millis(200);
29
30pub struct MasterComponent {
31    drain_timeout_ms: u64,
32    delegate_retry_max_attempts: Option<u32>,
33}
34
35impl MasterComponent {
36    pub fn new(config: MasterComponentConfig) -> Self {
37        Self {
38            drain_timeout_ms: config.drain_timeout_ms,
39            delegate_retry_max_attempts: config.delegate_retry_max_attempts,
40        }
41    }
42}
43
44impl Default for MasterComponent {
45    fn default() -> Self {
46        Self::new(MasterComponentConfig::default())
47    }
48}
49
50impl Component for MasterComponent {
51    fn scheme(&self) -> &str {
52        "master"
53    }
54
55    fn create_endpoint(
56        &self,
57        uri: &str,
58        ctx: &dyn ComponentContext,
59    ) -> Result<Box<dyn Endpoint>, CamelError> {
60        let parsed = MasterUriConfig::parse(uri)?;
61        let delegate_parts = parse_uri(&parsed.delegate_uri)?;
62        let delegate_scheme = delegate_parts.scheme;
63        let delegate_component = ctx
64            .resolve_component(&delegate_scheme)
65            .ok_or_else(|| CamelError::ComponentNotFound(delegate_scheme.clone()))?;
66
67        Ok(Box::new(MasterEndpoint {
68            uri: uri.to_string(),
69            lock_name: parsed.lock_name,
70            delegate_uri: parsed.delegate_uri,
71            delegate_component,
72            metrics: ctx.metrics(),
73            platform_service: ctx.platform_service(),
74            drain_timeout: Duration::from_millis(self.drain_timeout_ms),
75            delegate_retry_max_attempts: self.delegate_retry_max_attempts,
76        }))
77    }
78}
79
80struct MasterEndpoint {
81    uri: String,
82    lock_name: String,
83    delegate_uri: String,
84    delegate_component: Arc<dyn Component>,
85    // TODO(MST-001): MetricsCollector is wired through from ComponentContext but never called.
86    // Should emit metrics on leader acquisition (increment_exchanges), leader loss
87    // (increment_errors), and delegate start/stop events (record_circuit_breaker_change).
88    metrics: Arc<dyn MetricsCollector>,
89    platform_service: Arc<dyn PlatformService>,
90    drain_timeout: Duration,
91    delegate_retry_max_attempts: Option<u32>,
92}
93
94impl Endpoint for MasterEndpoint {
95    fn uri(&self) -> &str {
96        &self.uri
97    }
98
99    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
100        Ok(Box::new(MasterConsumer::new(
101            self.lock_name.clone(),
102            self.delegate_uri.clone(),
103            Arc::clone(&self.delegate_component),
104            Arc::clone(&self.metrics),
105            Arc::clone(&self.platform_service),
106            self.drain_timeout,
107            self.delegate_retry_max_attempts,
108        )))
109    }
110
111    fn create_producer(&self, ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
112        let delegate_ctx = MasterDelegateContext {
113            delegate_component: Arc::clone(&self.delegate_component),
114            metrics: Arc::clone(&self.metrics),
115            platform_service: Arc::clone(&self.platform_service),
116        };
117
118        self.delegate_component
119            .create_endpoint(&self.delegate_uri, &delegate_ctx)?
120            .create_producer(ctx)
121    }
122}
123
124struct MasterDelegateContext {
125    delegate_component: Arc<dyn Component>,
126    metrics: Arc<dyn MetricsCollector>,
127    platform_service: Arc<dyn PlatformService>,
128}
129
130impl ComponentContext for MasterDelegateContext {
131    fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
132        if self.delegate_component.scheme() == scheme {
133            Some(Arc::clone(&self.delegate_component))
134        } else {
135            None
136        }
137    }
138
139    fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
140        None
141    }
142
143    fn metrics(&self) -> Arc<dyn MetricsCollector> {
144        Arc::clone(&self.metrics)
145    }
146
147    fn platform_service(&self) -> Arc<dyn PlatformService> {
148        Arc::clone(&self.platform_service)
149    }
150
151    fn register_route_health_check(
152        &self,
153        _route_id: &str,
154        _check: Arc<dyn camel_api::AsyncHealthCheck>,
155    ) {
156    }
157
158    fn unregister_route_health_check(&self, _route_id: &str) {}
159}
160
161struct MasterConsumer {
162    lock_name: String,
163    delegate_uri: String,
164    delegate_component: Arc<dyn Component>,
165    // TODO(MST-001): MetricsCollector is stored here but never used for emission.
166    // Wire into reconcile_event to record leadership transitions and delegate lifecycle.
167    metrics: Arc<dyn MetricsCollector>,
168    platform_service: Arc<dyn PlatformService>,
169    drain_timeout: Duration,
170    delegate_retry_max_attempts: Option<u32>,
171    leadership_task: Option<JoinHandle<Result<(), CamelError>>>,
172    stop_token: Option<CancellationToken>,
173}
174
175impl MasterConsumer {
176    fn new(
177        lock_name: String,
178        delegate_uri: String,
179        delegate_component: Arc<dyn Component>,
180        metrics: Arc<dyn MetricsCollector>,
181        platform_service: Arc<dyn PlatformService>,
182        drain_timeout: Duration,
183        delegate_retry_max_attempts: Option<u32>,
184    ) -> Self {
185        Self {
186            lock_name,
187            delegate_uri,
188            delegate_component,
189            metrics,
190            platform_service,
191            drain_timeout,
192            delegate_retry_max_attempts,
193            leadership_task: None,
194            stop_token: None,
195        }
196    }
197}
198
199enum DelegateState {
200    Inactive,
201    Active {
202        run_token: CancellationToken,
203        handle: JoinHandle<Result<(), CamelError>>,
204    },
205}
206
207async fn stop_delegate(
208    state: &mut DelegateState,
209    drain_timeout: Duration,
210) -> Result<(), CamelError> {
211    if let DelegateState::Active {
212        run_token,
213        mut handle,
214    } = std::mem::replace(state, DelegateState::Inactive)
215    {
216        run_token.cancel();
217        match timeout(drain_timeout, &mut handle).await {
218            Ok(Ok(Ok(()))) => {}
219            Ok(Ok(Err(err))) => {
220                return Err(err);
221            }
222            Ok(Err(e)) if e.is_panic() => {
223                error!(error = %e, "master delegate task panicked");
224                return Err(CamelError::ProcessorError(format!(
225                    "master delegate task panicked: {e}"
226                )));
227            }
228            Ok(Err(e)) => {
229                warn!(error = %e, "master delegate task cancelled");
230                return Err(CamelError::ProcessorError(format!(
231                    "master delegate task cancelled: {e}"
232                )));
233            }
234            Err(_) => {
235                warn!("master delegate shutdown timed out, aborting");
236                handle.abort();
237            }
238        }
239    }
240    Ok(())
241}
242
243struct ReconcileContext<'a> {
244    lock_name: &'a str,
245    delegate_component: &'a Arc<dyn Component>,
246    delegate_uri: &'a str,
247    sender: &'a tokio::sync::mpsc::Sender<ExchangeEnvelope>,
248    parent_cancel: &'a CancellationToken,
249    drain_timeout: Duration,
250    metrics: &'a Arc<dyn MetricsCollector>,
251    platform_service: &'a Arc<dyn PlatformService>,
252}
253
254async fn reconcile_event(
255    event: camel_api::LeadershipEvent,
256    state: &mut DelegateState,
257    ctx: &ReconcileContext<'_>,
258) -> Result<(), CamelError> {
259    match event {
260        camel_api::LeadershipEvent::StartedLeading => {
261            info!(lock = %ctx.lock_name, "master leadership acquired");
262            // TODO(MST-001): emit metrics here — MetricsCollector is wired but never called
263            tracing::info!(lock = %ctx.lock_name, "metrics emission placeholder: leadership acquired");
264            stop_delegate(state, ctx.drain_timeout).await?;
265
266            let delegate_ctx = MasterDelegateContext {
267                delegate_component: Arc::clone(ctx.delegate_component),
268                metrics: Arc::clone(ctx.metrics),
269                platform_service: Arc::clone(ctx.platform_service),
270            };
271
272            let endpoint = match ctx
273                .delegate_component
274                .create_endpoint(ctx.delegate_uri, &delegate_ctx)
275            {
276                Ok(endpoint) => endpoint,
277                Err(err) => {
278                    warn!(lock = %ctx.lock_name, "failed to create delegate endpoint: {err}");
279                    return Ok(());
280                }
281            };
282
283            let mut consumer = match endpoint.create_consumer() {
284                Ok(consumer) => consumer,
285                Err(err) => {
286                    warn!(lock = %ctx.lock_name, "failed to create delegate consumer: {err}");
287                    return Ok(());
288                }
289            };
290
291            let run_token = ctx.parent_cancel.child_token();
292            let delegate_ctx = ConsumerContext::new(ctx.sender.clone(), run_token.clone());
293            let handle = tokio::spawn(async move {
294                consumer.start(delegate_ctx).await?;
295                consumer.stop().await?;
296                Ok::<(), CamelError>(())
297            });
298
299            *state = DelegateState::Active { run_token, handle };
300        }
301        camel_api::LeadershipEvent::StoppedLeading => {
302            info!(lock = %ctx.lock_name, "master leadership lost");
303            // TODO(MST-001): emit metrics here — MetricsCollector is wired but never called
304            tracing::info!(lock = %ctx.lock_name, "metrics emission placeholder: leadership lost");
305            stop_delegate(state, ctx.drain_timeout).await?;
306        }
307    }
308    Ok(())
309}
310
311#[async_trait]
312impl Consumer for MasterConsumer {
313    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
314        if self.leadership_task.is_some() {
315            return Ok(());
316        }
317
318        let handle = self
319            .platform_service
320            .leadership()
321            .start(&self.lock_name)
322            .await
323            .map_err(|e| {
324                CamelError::EndpointCreationFailed(format!("failed to start leader election: {e}"))
325            })?;
326
327        let lock_name = self.lock_name.clone();
328        let delegate_uri = self.delegate_uri.clone();
329        let delegate_component = Arc::clone(&self.delegate_component);
330        let metrics = Arc::clone(&self.metrics);
331        let platform_service = Arc::clone(&self.platform_service);
332        let sender = context.sender();
333        let parent_cancel = context.cancel_token();
334        let drain_timeout = self.drain_timeout;
335        let delegate_retry_max_attempts = self.delegate_retry_max_attempts;
336        let mut events = handle.events.clone();
337
338        let stop_token = CancellationToken::new();
339        let stop_token_loop = stop_token.clone();
340        let leadership_handle = handle;
341
342        let task = tokio::spawn(async move {
343            let mut state = DelegateState::Inactive;
344            let mut is_leading = false;
345            let mut delegate_attempts = 0u32;
346            let mut retry_tick = interval(DELEGATE_RETRY_INTERVAL);
347
348            let rctx = ReconcileContext {
349                lock_name: &lock_name,
350                delegate_component: &delegate_component,
351                delegate_uri: &delegate_uri,
352                sender: &sender,
353                parent_cancel: &parent_cancel,
354                drain_timeout,
355                metrics: &metrics,
356                platform_service: &platform_service,
357            };
358
359            let initial_event = { events.borrow().clone() };
360            if let Some(initial_event) = initial_event {
361                is_leading = matches!(&initial_event, camel_api::LeadershipEvent::StartedLeading);
362                if is_leading {
363                    delegate_attempts = 0;
364                }
365                if let Err(err) = reconcile_event(initial_event, &mut state, &rctx).await {
366                    error!(lock = %lock_name, "master delegate error: {err}");
367                    return Err(err);
368                }
369            }
370
371            loop {
372                tokio::select! {
373                    _ = stop_token_loop.cancelled() => {
374                        break;
375                    }
376                    _ = context.cancelled() => {
377                        break;
378                    }
379                    changed = events.changed() => {
380                        if changed.is_err() {
381                            break;
382                        }
383                        let event = { events.borrow().clone() };
384                        if let Some(event) = event {
385                            let was_leading = is_leading;
386                            is_leading = matches!(&event, camel_api::LeadershipEvent::StartedLeading);
387                            if !was_leading && is_leading {
388                                delegate_attempts = 0;
389                            }
390                            if let Err(err) = reconcile_event(event, &mut state, &rctx).await {
391                                error!(lock = %lock_name, "master delegate error: {err}");
392                                return Err(err);
393                            }
394                        }
395                    }
396                    _ = retry_tick.tick() => {
397                        if matches!(&state, DelegateState::Active { handle, .. } if handle.is_finished())
398                            && let Err(err) = stop_delegate(&mut state, drain_timeout).await
399                        {
400                            error!(lock = %lock_name, "master delegate task failed: {err}");
401                            return Err(err);
402                        }
403
404                        if is_leading && matches!(state, DelegateState::Inactive) {
405                            if let Some(max) = delegate_retry_max_attempts {
406                                delegate_attempts = delegate_attempts.saturating_add(1);
407                                if delegate_attempts > max {
408                                    warn!(
409                                        lock = %lock_name,
410                                        attempts = max,
411                                        "delegate start exceeded max attempts, stopping consumer"
412                                    );
413                                    break;
414                                }
415                            }
416                            if let Err(err) = reconcile_event(
417                                camel_api::LeadershipEvent::StartedLeading,
418                                &mut state,
419                                &rctx,
420                            )
421                            .await {
422                                error!(lock = %lock_name, "master delegate retry error: {err}");
423                                return Err(err);
424                            }
425                        }
426                    }
427                }
428            }
429
430            stop_delegate(&mut state, drain_timeout).await?;
431            let _ = timeout(drain_timeout, leadership_handle.step_down()).await;
432            Ok::<(), CamelError>(())
433        });
434
435        self.stop_token = Some(stop_token);
436        self.leadership_task = Some(task);
437
438        Ok(())
439    }
440
441    async fn stop(&mut self) -> Result<(), CamelError> {
442        if let Some(token) = self.stop_token.take() {
443            token.cancel();
444        }
445
446        if let Some(handle) = self.leadership_task.take() {
447            if handle.is_finished() {
448                match timeout(self.drain_timeout, handle).await {
449                    Ok(Ok(Ok(()))) => {}
450                    Ok(Ok(Err(err))) => return Err(err),
451                    Ok(Err(e)) => {
452                        return Err(CamelError::ProcessorError(format!(
453                            "leadership task join failed: {e}"
454                        )));
455                    }
456                    Err(_) => {
457                        return Err(CamelError::ProcessorError(
458                            "leadership task join timed out".to_string(),
459                        ));
460                    }
461                }
462                return Ok(());
463            }
464
465            // Abort first so the task is guaranteed to stop; then await with
466            // a timeout as a safety-net in case abort takes a moment to land.
467            handle.abort();
468            match timeout(self.drain_timeout, handle).await {
469                Ok(Ok(Ok(()))) => {}
470                Ok(Ok(Err(err))) => return Err(err),
471                Ok(Err(e)) if e.is_panic() => {
472                    error!(lock = %self.lock_name, error = %e, "leadership task panicked");
473                }
474                Ok(Err(e)) => {
475                    warn!(lock = %self.lock_name, error = %e, "leadership task cancelled");
476                }
477                Err(_) => {
478                    warn!("master leadership loop shutdown timed out after abort");
479                }
480            }
481        }
482
483        Ok(())
484    }
485
486    fn background_task_handle(
487        &mut self,
488    ) -> Option<tokio::task::JoinHandle<Result<(), CamelError>>> {
489        self.leadership_task.take()
490    }
491}
492
493#[cfg(test)]
494mod tests {
495    use std::sync::Arc;
496    use std::sync::Mutex;
497    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
498
499    use camel_api::{
500        BoxProcessorExt, Exchange, LeadershipEvent, LeadershipHandle, LeadershipService, Message,
501        NoOpMetrics, NoopPlatformService, NoopReadinessGate, PlatformError, PlatformIdentity,
502        PlatformService, ReadinessGate,
503    };
504    use camel_component_api::NoOpComponentContext;
505    use std::time::Instant;
506    use tokio::sync::{oneshot, watch};
507    use tokio::time::{sleep, timeout};
508    use tokio_util::sync::CancellationToken;
509    use tower::ServiceExt;
510
511    use super::*;
512
513    #[test]
514    fn parse_master_uri_valid() {
515        let cfg = MasterUriConfig::parse("master:mylock:timer:tick?period=250").unwrap();
516        assert_eq!(cfg.lock_name, "mylock");
517        assert_eq!(cfg.delegate_uri, "timer:tick?period=250");
518    }
519
520    #[test]
521    fn parse_master_uri_missing_lockname() {
522        let err = MasterUriConfig::parse("master::timer:tick").unwrap_err();
523        assert!(matches!(err, CamelError::InvalidUri(_)));
524    }
525
526    #[test]
527    fn parse_master_uri_missing_delegate() {
528        let err = MasterUriConfig::parse("master:mylock:").unwrap_err();
529        assert!(matches!(err, CamelError::InvalidUri(_)));
530    }
531
532    #[test]
533    fn endpoint_fails_when_delegate_component_missing() {
534        let master = MasterComponent::default();
535        let result =
536            master.create_endpoint("master:lock-1:missing:delegate", &NoOpComponentContext);
537        assert!(matches!(result, Err(CamelError::ComponentNotFound(_))));
538    }
539
540    #[test]
541    fn delegate_scheme_is_parsed_from_delegate_uri() {
542        let seen_scheme = Arc::new(AtomicBool::new(false));
543
544        struct SchemeAwareContext {
545            delegate: Arc<dyn Component>,
546            seen_scheme: Arc<AtomicBool>,
547        }
548
549        impl ComponentContext for SchemeAwareContext {
550            fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
551                if scheme == "mock" {
552                    self.seen_scheme.store(true, Ordering::SeqCst);
553                    Some(Arc::clone(&self.delegate))
554                } else {
555                    None
556                }
557            }
558
559            fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
560                None
561            }
562
563            fn metrics(&self) -> Arc<dyn MetricsCollector> {
564                Arc::new(NoOpMetrics)
565            }
566
567            fn platform_service(&self) -> Arc<dyn PlatformService> {
568                Arc::new(NoopPlatformService::default())
569            }
570
571            fn register_route_health_check(
572                &self,
573                _route_id: &str,
574                _check: Arc<dyn camel_api::AsyncHealthCheck>,
575            ) {
576            }
577
578            fn unregister_route_health_check(&self, _route_id: &str) {}
579        }
580
581        struct MockDelegateComponent;
582
583        impl Component for MockDelegateComponent {
584            fn scheme(&self) -> &str {
585                "mock"
586            }
587
588            fn create_endpoint(
589                &self,
590                _uri: &str,
591                _ctx: &dyn ComponentContext,
592            ) -> Result<Box<dyn Endpoint>, CamelError> {
593                Ok(Box::new(MockDelegateEndpoint))
594            }
595        }
596
597        struct MockDelegateEndpoint;
598
599        impl Endpoint for MockDelegateEndpoint {
600            fn uri(&self) -> &str {
601                "mock:delegate"
602            }
603
604            fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
605                Err(CamelError::EndpointCreationFailed("not used".to_string()))
606            }
607
608            fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
609                Err(CamelError::EndpointCreationFailed("not used".to_string()))
610            }
611        }
612
613        let delegate = Arc::new(MockDelegateComponent);
614        let ctx = SchemeAwareContext {
615            delegate,
616            seen_scheme: Arc::clone(&seen_scheme),
617        };
618
619        let master = MasterComponent::default();
620        let endpoint = master
621            .create_endpoint("master:mylock:mock:delegate?x=1", &ctx)
622            .unwrap();
623
624        assert_eq!(endpoint.uri(), "master:mylock:mock:delegate?x=1");
625        assert!(seen_scheme.load(Ordering::SeqCst));
626    }
627
628    struct MockDelegateContext {
629        delegate: Arc<dyn Component>,
630    }
631
632    impl ComponentContext for MockDelegateContext {
633        fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
634            if self.delegate.scheme() == scheme {
635                Some(Arc::clone(&self.delegate))
636            } else {
637                None
638            }
639        }
640
641        fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
642            None
643        }
644
645        fn metrics(&self) -> Arc<dyn MetricsCollector> {
646            Arc::new(NoOpMetrics)
647        }
648
649        fn platform_service(&self) -> Arc<dyn PlatformService> {
650            Arc::new(NoopPlatformService::default())
651        }
652
653        fn register_route_health_check(
654            &self,
655            _route_id: &str,
656            _check: Arc<dyn camel_api::AsyncHealthCheck>,
657        ) {
658        }
659
660        fn unregister_route_health_check(&self, _route_id: &str) {}
661    }
662
663    struct MockProducerDelegateComponent {
664        create_endpoint_calls: Arc<AtomicUsize>,
665        create_producer_calls: Arc<AtomicUsize>,
666        fail_producer: bool,
667    }
668
669    impl Component for MockProducerDelegateComponent {
670        fn scheme(&self) -> &str {
671            "mock"
672        }
673
674        fn create_endpoint(
675            &self,
676            _uri: &str,
677            _ctx: &dyn ComponentContext,
678        ) -> Result<Box<dyn Endpoint>, CamelError> {
679            self.create_endpoint_calls.fetch_add(1, Ordering::SeqCst);
680            Ok(Box::new(MockProducerDelegateEndpoint {
681                create_producer_calls: Arc::clone(&self.create_producer_calls),
682                fail_producer: self.fail_producer,
683            }))
684        }
685    }
686
687    struct MockProducerDelegateEndpoint {
688        create_producer_calls: Arc<AtomicUsize>,
689        fail_producer: bool,
690    }
691
692    impl Endpoint for MockProducerDelegateEndpoint {
693        fn uri(&self) -> &str {
694            "mock:delegate"
695        }
696
697        fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
698            Err(CamelError::EndpointCreationFailed(
699                "not used in test".to_string(),
700            ))
701        }
702
703        fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
704            self.create_producer_calls.fetch_add(1, Ordering::SeqCst);
705            if self.fail_producer {
706                return Err(CamelError::ProcessorError(
707                    "delegate producer failed".to_string(),
708                ));
709            }
710            Ok(BoxProcessor::from_fn(
711                |exchange| async move { Ok(exchange) },
712            ))
713        }
714    }
715
716    #[tokio::test]
717    async fn producer_passthrough_delegates_and_produces() {
718        let endpoint_calls = Arc::new(AtomicUsize::new(0));
719        let producer_calls = Arc::new(AtomicUsize::new(0));
720        let delegate = Arc::new(MockProducerDelegateComponent {
721            create_endpoint_calls: Arc::clone(&endpoint_calls),
722            create_producer_calls: Arc::clone(&producer_calls),
723            fail_producer: false,
724        });
725
726        let ctx = MockDelegateContext {
727            delegate: delegate.clone(),
728        };
729
730        let master = MasterComponent::default();
731        let endpoint = master
732            .create_endpoint("master:lock-1:mock:delegate", &ctx)
733            .unwrap();
734        let producer_ctx = ProducerContext::new();
735        let producer = endpoint.create_producer(&producer_ctx).unwrap();
736
737        let exchange = Exchange::new(Message::new("ok"));
738        let result = producer.oneshot(exchange).await.unwrap();
739
740        assert_eq!(result.input.body.as_text(), Some("ok"));
741        assert_eq!(endpoint_calls.load(Ordering::SeqCst), 1);
742        assert_eq!(producer_calls.load(Ordering::SeqCst), 1);
743    }
744
745    #[test]
746    fn producer_passthrough_bubbles_delegate_errors() {
747        let endpoint_calls = Arc::new(AtomicUsize::new(0));
748        let producer_calls = Arc::new(AtomicUsize::new(0));
749        let delegate = Arc::new(MockProducerDelegateComponent {
750            create_endpoint_calls: Arc::clone(&endpoint_calls),
751            create_producer_calls: Arc::clone(&producer_calls),
752            fail_producer: true,
753        });
754
755        let ctx = MockDelegateContext {
756            delegate: delegate.clone(),
757        };
758
759        let master = MasterComponent::default();
760        let endpoint = master
761            .create_endpoint("master:lock-1:mock:delegate", &ctx)
762            .unwrap();
763        let producer_ctx = ProducerContext::new();
764        let err = endpoint.create_producer(&producer_ctx).unwrap_err();
765
766        assert!(matches!(err, CamelError::ProcessorError(_)));
767        assert_eq!(endpoint_calls.load(Ordering::SeqCst), 1);
768        assert_eq!(producer_calls.load(Ordering::SeqCst), 1);
769    }
770
771    struct FakeLeadershipService {
772        tx: Mutex<Option<watch::Sender<Option<LeadershipEvent>>>>,
773        is_leader: Arc<AtomicBool>,
774        initial: Option<LeadershipEvent>,
775    }
776
777    impl FakeLeadershipService {
778        fn new(initial: Option<LeadershipEvent>) -> Self {
779            let starts_as_leader = matches!(initial, Some(LeadershipEvent::StartedLeading));
780            Self {
781                tx: Mutex::new(None),
782                is_leader: Arc::new(AtomicBool::new(starts_as_leader)),
783                initial,
784            }
785        }
786
787        async fn emit(&self, event: LeadershipEvent) {
788            self.is_leader.store(
789                matches!(event, LeadershipEvent::StartedLeading),
790                Ordering::Release,
791            );
792            if let Some(tx) = self
793                .tx
794                .lock()
795                .expect("mutex poisoned: fake elector sender")
796                .as_ref()
797            {
798                let _ = tx.send(Some(event));
799            }
800        }
801    }
802
803    #[async_trait]
804    impl LeadershipService for FakeLeadershipService {
805        async fn start(&self, _lock_name: &str) -> Result<LeadershipHandle, PlatformError> {
806            let (tx, rx) = watch::channel(self.initial.clone());
807            *self.tx.lock().expect("mutex poisoned: fake elector sender") = Some(tx);
808
809            let cancel = CancellationToken::new();
810            let cancel_wait = cancel.clone();
811            let (term_tx, term_rx) = oneshot::channel();
812            tokio::spawn(async move {
813                cancel_wait.cancelled().await;
814                let _ = term_tx.send(());
815            });
816
817            Ok(LeadershipHandle::new(
818                rx,
819                Arc::clone(&self.is_leader),
820                cancel,
821                term_rx,
822            ))
823        }
824    }
825
826    struct FakePlatformService {
827        identity: PlatformIdentity,
828        readiness_gate: Arc<dyn ReadinessGate>,
829        leadership: Arc<dyn LeadershipService>,
830    }
831
832    impl FakePlatformService {
833        fn new(leadership: Arc<dyn LeadershipService>) -> Self {
834            Self {
835                identity: PlatformIdentity::local("master-tests"),
836                readiness_gate: Arc::new(NoopReadinessGate),
837                leadership,
838            }
839        }
840    }
841
842    impl PlatformService for FakePlatformService {
843        fn identity(&self) -> PlatformIdentity {
844            self.identity.clone()
845        }
846
847        fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
848            Arc::clone(&self.readiness_gate)
849        }
850
851        fn leadership(&self) -> Arc<dyn LeadershipService> {
852            Arc::clone(&self.leadership)
853        }
854    }
855
856    struct FakeDelegateComponent {
857        create_consumer_calls: Arc<AtomicUsize>,
858        start_calls: Arc<AtomicUsize>,
859    }
860
861    impl Component for FakeDelegateComponent {
862        fn scheme(&self) -> &str {
863            "fake"
864        }
865
866        fn create_endpoint(
867            &self,
868            _uri: &str,
869            _ctx: &dyn ComponentContext,
870        ) -> Result<Box<dyn Endpoint>, CamelError> {
871            Ok(Box::new(FakeDelegateEndpoint {
872                create_consumer_calls: Arc::clone(&self.create_consumer_calls),
873                start_calls: Arc::clone(&self.start_calls),
874            }))
875        }
876    }
877
878    struct FakeDelegateEndpoint {
879        create_consumer_calls: Arc<AtomicUsize>,
880        start_calls: Arc<AtomicUsize>,
881    }
882
883    impl Endpoint for FakeDelegateEndpoint {
884        fn uri(&self) -> &str {
885            "fake:delegate"
886        }
887
888        fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
889            let epoch = self.create_consumer_calls.fetch_add(1, Ordering::SeqCst) + 1;
890            Ok(Box::new(FakeDelegateConsumer {
891                epoch,
892                start_calls: Arc::clone(&self.start_calls),
893            }))
894        }
895
896        fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
897            Err(CamelError::EndpointCreationFailed("not used".to_string()))
898        }
899    }
900
901    struct FakeDelegateConsumer {
902        epoch: usize,
903        start_calls: Arc<AtomicUsize>,
904    }
905
906    struct FailingDelegateComponent {
907        create_endpoint_calls: Arc<AtomicUsize>,
908    }
909
910    impl Component for FailingDelegateComponent {
911        fn scheme(&self) -> &str {
912            "failing"
913        }
914
915        fn create_endpoint(
916            &self,
917            _uri: &str,
918            _ctx: &dyn ComponentContext,
919        ) -> Result<Box<dyn Endpoint>, CamelError> {
920            self.create_endpoint_calls.fetch_add(1, Ordering::SeqCst);
921            Err(CamelError::EndpointCreationFailed(
922                "delegate endpoint creation failed".to_string(),
923            ))
924        }
925    }
926
927    #[async_trait]
928    impl Consumer for FakeDelegateConsumer {
929        async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
930            self.start_calls.fetch_add(1, Ordering::SeqCst);
931            context
932                .send(Exchange::new(Message::new(format!("epoch-{}", self.epoch))))
933                .await?;
934
935            loop {
936                tokio::select! {
937                    _ = context.cancelled() => {
938                        break;
939                    }
940                    _ = sleep(Duration::from_millis(20)) => {
941                        context
942                            .send(Exchange::new(Message::new(format!("epoch-{}", self.epoch))))
943                            .await?;
944                    }
945                }
946            }
947
948            Ok(())
949        }
950
951        async fn stop(&mut self) -> Result<(), CamelError> {
952            Ok(())
953        }
954    }
955
956    fn build_master_consumer(
957        platform_service: Arc<dyn PlatformService>,
958        create_consumer_calls: Arc<AtomicUsize>,
959        start_calls: Arc<AtomicUsize>,
960        delegate_retry_max_attempts: Option<u32>,
961    ) -> MasterConsumer {
962        MasterConsumer::new(
963            "lock-a".to_string(),
964            "fake:delegate".to_string(),
965            Arc::new(FakeDelegateComponent {
966                create_consumer_calls,
967                start_calls,
968            }),
969            Arc::new(NoOpMetrics),
970            platform_service,
971            Duration::from_millis(500),
972            delegate_retry_max_attempts,
973        )
974    }
975
976    #[tokio::test]
977    async fn starts_delegate_only_after_started_leading() {
978        let leadership = Arc::new(FakeLeadershipService::new(None));
979        let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
980        let create_consumer_calls = Arc::new(AtomicUsize::new(0));
981        let start_calls = Arc::new(AtomicUsize::new(0));
982        let mut master = build_master_consumer(
983            platform_service,
984            Arc::clone(&create_consumer_calls),
985            Arc::clone(&start_calls),
986            Some(30),
987        );
988
989        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
990        let cancel = CancellationToken::new();
991        let ctx = ConsumerContext::new(tx, cancel.clone());
992
993        master.start(ctx).await.unwrap();
994
995        sleep(Duration::from_millis(80)).await;
996        assert!(rx.try_recv().is_err());
997        assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 0);
998
999        leadership.emit(LeadershipEvent::StartedLeading).await;
1000
1001        let first = timeout(Duration::from_millis(500), rx.recv())
1002            .await
1003            .unwrap()
1004            .unwrap();
1005        assert_eq!(first.exchange.input.body.as_text(), Some("epoch-1"));
1006        assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 1);
1007        assert_eq!(start_calls.load(Ordering::SeqCst), 1);
1008
1009        cancel.cancel();
1010        master.stop().await.unwrap();
1011    }
1012
1013    #[tokio::test]
1014    async fn stops_delegate_on_stopped_leading() {
1015        let leadership = Arc::new(FakeLeadershipService::new(None));
1016        let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
1017        let create_consumer_calls = Arc::new(AtomicUsize::new(0));
1018        let start_calls = Arc::new(AtomicUsize::new(0));
1019        let mut master = build_master_consumer(
1020            platform_service,
1021            Arc::clone(&create_consumer_calls),
1022            Arc::clone(&start_calls),
1023            Some(30),
1024        );
1025
1026        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1027        let cancel = CancellationToken::new();
1028        let ctx = ConsumerContext::new(tx, cancel.clone());
1029
1030        master.start(ctx).await.unwrap();
1031        leadership.emit(LeadershipEvent::StartedLeading).await;
1032        let _ = timeout(Duration::from_millis(500), rx.recv())
1033            .await
1034            .unwrap()
1035            .unwrap();
1036
1037        leadership.emit(LeadershipEvent::StoppedLeading).await;
1038        sleep(Duration::from_millis(100)).await;
1039        while rx.try_recv().is_ok() {}
1040        assert!(
1041            timeout(Duration::from_millis(120), rx.recv())
1042                .await
1043                .is_err()
1044        );
1045
1046        cancel.cancel();
1047        master.stop().await.unwrap();
1048    }
1049
1050    #[tokio::test]
1051    async fn recreates_delegate_on_new_leadership_epoch() {
1052        let leadership = Arc::new(FakeLeadershipService::new(None));
1053        let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
1054        let create_consumer_calls = Arc::new(AtomicUsize::new(0));
1055        let start_calls = Arc::new(AtomicUsize::new(0));
1056        let mut master = build_master_consumer(
1057            platform_service,
1058            Arc::clone(&create_consumer_calls),
1059            Arc::clone(&start_calls),
1060            Some(30),
1061        );
1062
1063        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1064        let cancel = CancellationToken::new();
1065        let ctx = ConsumerContext::new(tx, cancel.clone());
1066
1067        master.start(ctx).await.unwrap();
1068
1069        leadership.emit(LeadershipEvent::StartedLeading).await;
1070        let first = timeout(Duration::from_millis(500), rx.recv())
1071            .await
1072            .unwrap()
1073            .unwrap();
1074        assert_eq!(first.exchange.input.body.as_text(), Some("epoch-1"));
1075
1076        leadership.emit(LeadershipEvent::StoppedLeading).await;
1077        sleep(Duration::from_millis(120)).await;
1078
1079        leadership.emit(LeadershipEvent::StartedLeading).await;
1080        let second = timeout(Duration::from_millis(500), rx.recv())
1081            .await
1082            .unwrap()
1083            .unwrap();
1084        assert_eq!(second.exchange.input.body.as_text(), Some("epoch-2"));
1085
1086        assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 2);
1087        assert_eq!(start_calls.load(Ordering::SeqCst), 2);
1088
1089        cancel.cancel();
1090        master.stop().await.unwrap();
1091    }
1092
1093    #[tokio::test]
1094    async fn stops_retrying_delegate_start_after_max_attempts() {
1095        let leadership = Arc::new(FakeLeadershipService::new(Some(
1096            LeadershipEvent::StartedLeading,
1097        )));
1098        let platform_service = Arc::new(FakePlatformService::new(leadership));
1099        let create_endpoint_calls = Arc::new(AtomicUsize::new(0));
1100
1101        let mut master = MasterConsumer::new(
1102            "lock-a".to_string(),
1103            "failing:delegate".to_string(),
1104            Arc::new(FailingDelegateComponent {
1105                create_endpoint_calls: Arc::clone(&create_endpoint_calls),
1106            }),
1107            Arc::new(NoOpMetrics),
1108            platform_service,
1109            Duration::from_millis(500),
1110            Some(1),
1111        );
1112
1113        let (tx, _rx) = tokio::sync::mpsc::channel(16);
1114        let cancel = CancellationToken::new();
1115        let ctx = ConsumerContext::new(tx, cancel.clone());
1116
1117        master.start(ctx).await.unwrap();
1118        sleep(Duration::from_millis(750)).await;
1119
1120        assert_eq!(create_endpoint_calls.load(Ordering::SeqCst), 2);
1121
1122        cancel.cancel();
1123        master.stop().await.unwrap();
1124    }
1125
1126    /// Regression test for MST-002: stop() must abort the leadership JoinHandle
1127    /// instead of just dropping it when the task is slow to drain.
1128    /// Without the fix, stop() blocks for the full drain_timeout (~500 ms)
1129    /// because the leadership task is stuck in stop_delegate awaiting a
1130    /// slow delegate. With abort-first, stop() returns almost instantly.
1131    #[tokio::test]
1132    async fn stop_completes_quickly_when_leadership_task_is_slow() {
1133        // Delegate consumer that ignores its cancellation token and blocks.
1134        struct SlowStoppingConsumer;
1135
1136        #[async_trait]
1137        impl Consumer for SlowStoppingConsumer {
1138            async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
1139                ctx.send(Exchange::new(Message::new("slow-start")))
1140                    .await
1141                    .ok();
1142                // Ignore cancellation — sleep far beyond the drain timeout.
1143                sleep(Duration::from_secs(60)).await;
1144                Ok(())
1145            }
1146
1147            async fn stop(&mut self) -> Result<(), CamelError> {
1148                Ok(())
1149            }
1150        }
1151
1152        struct SlowStoppingComponent;
1153
1154        impl Component for SlowStoppingComponent {
1155            fn scheme(&self) -> &str {
1156                "slow"
1157            }
1158
1159            fn create_endpoint(
1160                &self,
1161                _uri: &str,
1162                _ctx: &dyn ComponentContext,
1163            ) -> Result<Box<dyn Endpoint>, CamelError> {
1164                Ok(Box::new(SlowStoppingEndpoint))
1165            }
1166        }
1167
1168        struct SlowStoppingEndpoint;
1169
1170        impl Endpoint for SlowStoppingEndpoint {
1171            fn uri(&self) -> &str {
1172                "slow:delegate"
1173            }
1174
1175            fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1176                Ok(Box::new(SlowStoppingConsumer))
1177            }
1178
1179            fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1180                Err(CamelError::EndpointCreationFailed("not used".into()))
1181            }
1182        }
1183
1184        let leadership = Arc::new(FakeLeadershipService::new(Some(
1185            LeadershipEvent::StartedLeading,
1186        )));
1187        let platform_service = Arc::new(FakePlatformService::new(leadership));
1188
1189        let mut master = MasterConsumer::new(
1190            "lock-slow".into(),
1191            "slow:delegate".into(),
1192            Arc::new(SlowStoppingComponent),
1193            Arc::new(NoOpMetrics),
1194            platform_service,
1195            Duration::from_millis(500), // drain_timeout
1196            Some(30),
1197        );
1198
1199        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1200        let cancel = CancellationToken::new();
1201        let ctx = ConsumerContext::new(tx, cancel.clone());
1202
1203        master.start(ctx).await.unwrap();
1204
1205        // Wait for the delegate to actually start.
1206        let msg = timeout(Duration::from_secs(2), rx.recv())
1207            .await
1208            .unwrap()
1209            .unwrap();
1210        assert_eq!(msg.exchange.input.body.as_text(), Some("slow-start"));
1211
1212        // stop() must complete quickly because the leadership task is aborted,
1213        // not just timed-out and leaked.
1214        let start = Instant::now();
1215        master.stop().await.unwrap();
1216        let elapsed = start.elapsed();
1217
1218        // With abort-first: ~0 ms. Without the fix: ~drain_timeout (500 ms).
1219        // Assert < 250 ms to reliably distinguish the two behaviours.
1220        assert!(
1221            elapsed < Duration::from_millis(250),
1222            "stop() took {:?}, expected < 250 ms (abort should be near-instant)",
1223            elapsed,
1224        );
1225
1226        cancel.cancel();
1227    }
1228
1229    #[tokio::test]
1230    async fn stop_propagates_delegate_start_error() {
1231        struct FailingStartConsumer;
1232
1233        #[async_trait]
1234        impl Consumer for FailingStartConsumer {
1235            async fn start(&mut self, _ctx: ConsumerContext) -> Result<(), CamelError> {
1236                Err(CamelError::ProcessorError(
1237                    "delegate start failed".to_string(),
1238                ))
1239            }
1240
1241            async fn stop(&mut self) -> Result<(), CamelError> {
1242                Ok(())
1243            }
1244        }
1245
1246        struct FailingStartComponent;
1247
1248        impl Component for FailingStartComponent {
1249            fn scheme(&self) -> &str {
1250                "failstart"
1251            }
1252
1253            fn create_endpoint(
1254                &self,
1255                _uri: &str,
1256                _ctx: &dyn ComponentContext,
1257            ) -> Result<Box<dyn Endpoint>, CamelError> {
1258                Ok(Box::new(FailingStartEndpoint))
1259            }
1260        }
1261
1262        struct FailingStartEndpoint;
1263
1264        impl Endpoint for FailingStartEndpoint {
1265            fn uri(&self) -> &str {
1266                "failstart:delegate"
1267            }
1268
1269            fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1270                Ok(Box::new(FailingStartConsumer))
1271            }
1272
1273            fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1274                Err(CamelError::EndpointCreationFailed("not used".into()))
1275            }
1276        }
1277
1278        let leadership = Arc::new(FakeLeadershipService::new(Some(
1279            LeadershipEvent::StartedLeading,
1280        )));
1281        let platform_service = Arc::new(FakePlatformService::new(leadership));
1282
1283        let mut master = MasterConsumer::new(
1284            "lock-error".into(),
1285            "failstart:delegate".into(),
1286            Arc::new(FailingStartComponent),
1287            Arc::new(NoOpMetrics),
1288            platform_service,
1289            Duration::from_millis(500),
1290            Some(30),
1291        );
1292
1293        let (tx, _rx) = tokio::sync::mpsc::channel(16);
1294        let cancel = CancellationToken::new();
1295        let ctx = ConsumerContext::new(tx, cancel.clone());
1296
1297        master.start(ctx).await.unwrap();
1298        sleep(Duration::from_millis(250)).await;
1299        assert!(
1300            master
1301                .leadership_task
1302                .as_ref()
1303                .is_some_and(tokio::task::JoinHandle::is_finished),
1304            "leadership task should finish after delegate error"
1305        );
1306        let err = master.stop().await.expect_err("expected delegate error");
1307        assert!(err.to_string().contains("delegate start failed"));
1308
1309        cancel.cancel();
1310    }
1311}