Skip to main content

camel_master/
lib.rs

1//! Master/leader-election component — wraps a delegate consumer with distributed lock-based leadership coordination.
2//!
3//! Main types: `MasterComponent`, `MasterBundle`.
4//! Main modules: `bundle`, `config`.
5
6pub mod bundle;
7pub mod config;
8
9pub use bundle::MasterBundle;
10
11use std::sync::Arc;
12use std::time::Duration;
13
14use async_trait::async_trait;
15use camel_api::{CamelError, MetricsCollector, PlatformService};
16use camel_component_api::{
17    BoxProcessor, Component, ComponentContext, Consumer, ConsumerContext, Endpoint,
18    ExchangeEnvelope, ProducerContext, parse_uri,
19};
20use camel_language_api::Language;
21use tokio::task::JoinHandle;
22use tokio::time::{interval, timeout};
23use tokio_util::sync::CancellationToken;
24use tracing::{error, info, warn};
25
26use crate::config::{MasterComponentConfig, MasterUriConfig};
27
28const DELEGATE_RETRY_INTERVAL: Duration = Duration::from_millis(200);
29
30pub struct MasterComponent {
31    drain_timeout_ms: u64,
32    delegate_retry_max_attempts: Option<u32>,
33}
34
35impl MasterComponent {
36    pub fn new(config: MasterComponentConfig) -> Self {
37        Self {
38            drain_timeout_ms: config.drain_timeout_ms,
39            delegate_retry_max_attempts: config.delegate_retry_max_attempts,
40        }
41    }
42}
43
44impl Default for MasterComponent {
45    fn default() -> Self {
46        Self::new(MasterComponentConfig::default())
47    }
48}
49
50impl Component for MasterComponent {
51    fn scheme(&self) -> &str {
52        "master"
53    }
54
55    fn create_endpoint(
56        &self,
57        uri: &str,
58        ctx: &dyn ComponentContext,
59    ) -> Result<Box<dyn Endpoint>, CamelError> {
60        let parsed = MasterUriConfig::parse(uri)?;
61        let delegate_parts = parse_uri(&parsed.delegate_uri)?;
62        let delegate_scheme = delegate_parts.scheme;
63        let delegate_component = ctx
64            .resolve_component(&delegate_scheme)
65            .ok_or_else(|| CamelError::ComponentNotFound(delegate_scheme.clone()))?;
66
67        Ok(Box::new(MasterEndpoint {
68            uri: uri.to_string(),
69            lock_name: parsed.lock_name,
70            delegate_uri: parsed.delegate_uri,
71            delegate_component,
72            metrics: ctx.metrics(),
73            platform_service: ctx.platform_service(),
74            drain_timeout: Duration::from_millis(self.drain_timeout_ms),
75            delegate_retry_max_attempts: self.delegate_retry_max_attempts,
76        }))
77    }
78}
79
80struct MasterEndpoint {
81    uri: String,
82    lock_name: String,
83    delegate_uri: String,
84    delegate_component: Arc<dyn Component>,
85    // TODO(MST-001): MetricsCollector is wired through from ComponentContext but never called.
86    // Should emit metrics on leader acquisition (increment_exchanges), leader loss
87    // (increment_errors), and delegate start/stop events (record_circuit_breaker_change).
88    metrics: Arc<dyn MetricsCollector>,
89    platform_service: Arc<dyn PlatformService>,
90    drain_timeout: Duration,
91    delegate_retry_max_attempts: Option<u32>,
92}
93
94impl Endpoint for MasterEndpoint {
95    fn uri(&self) -> &str {
96        &self.uri
97    }
98
99    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
100        Ok(Box::new(MasterConsumer::new(
101            self.lock_name.clone(),
102            self.delegate_uri.clone(),
103            Arc::clone(&self.delegate_component),
104            Arc::clone(&self.metrics),
105            Arc::clone(&self.platform_service),
106            self.drain_timeout,
107            self.delegate_retry_max_attempts,
108        )))
109    }
110
111    fn create_producer(&self, ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
112        let delegate_ctx = MasterDelegateContext {
113            delegate_component: Arc::clone(&self.delegate_component),
114            metrics: Arc::clone(&self.metrics),
115            platform_service: Arc::clone(&self.platform_service),
116        };
117
118        self.delegate_component
119            .create_endpoint(&self.delegate_uri, &delegate_ctx)?
120            .create_producer(ctx)
121    }
122}
123
124struct MasterDelegateContext {
125    delegate_component: Arc<dyn Component>,
126    metrics: Arc<dyn MetricsCollector>,
127    platform_service: Arc<dyn PlatformService>,
128}
129
130impl ComponentContext for MasterDelegateContext {
131    fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
132        if self.delegate_component.scheme() == scheme {
133            Some(Arc::clone(&self.delegate_component))
134        } else {
135            None
136        }
137    }
138
139    fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
140        None
141    }
142
143    fn metrics(&self) -> Arc<dyn MetricsCollector> {
144        Arc::clone(&self.metrics)
145    }
146
147    fn platform_service(&self) -> Arc<dyn PlatformService> {
148        Arc::clone(&self.platform_service)
149    }
150}
151
152struct MasterConsumer {
153    lock_name: String,
154    delegate_uri: String,
155    delegate_component: Arc<dyn Component>,
156    // TODO(MST-001): MetricsCollector is stored here but never used for emission.
157    // Wire into reconcile_event to record leadership transitions and delegate lifecycle.
158    metrics: Arc<dyn MetricsCollector>,
159    platform_service: Arc<dyn PlatformService>,
160    drain_timeout: Duration,
161    delegate_retry_max_attempts: Option<u32>,
162    leadership_task: Option<JoinHandle<Result<(), CamelError>>>,
163    stop_token: Option<CancellationToken>,
164}
165
166impl MasterConsumer {
167    fn new(
168        lock_name: String,
169        delegate_uri: String,
170        delegate_component: Arc<dyn Component>,
171        metrics: Arc<dyn MetricsCollector>,
172        platform_service: Arc<dyn PlatformService>,
173        drain_timeout: Duration,
174        delegate_retry_max_attempts: Option<u32>,
175    ) -> Self {
176        Self {
177            lock_name,
178            delegate_uri,
179            delegate_component,
180            metrics,
181            platform_service,
182            drain_timeout,
183            delegate_retry_max_attempts,
184            leadership_task: None,
185            stop_token: None,
186        }
187    }
188}
189
190enum DelegateState {
191    Inactive,
192    Active {
193        run_token: CancellationToken,
194        handle: JoinHandle<Result<(), CamelError>>,
195    },
196}
197
198async fn stop_delegate(
199    state: &mut DelegateState,
200    drain_timeout: Duration,
201) -> Result<(), CamelError> {
202    if let DelegateState::Active {
203        run_token,
204        mut handle,
205    } = std::mem::replace(state, DelegateState::Inactive)
206    {
207        run_token.cancel();
208        match timeout(drain_timeout, &mut handle).await {
209            Ok(Ok(Ok(()))) => {}
210            Ok(Ok(Err(err))) => {
211                return Err(err);
212            }
213            Ok(Err(e)) if e.is_panic() => {
214                error!(error = %e, "master delegate task panicked");
215                return Err(CamelError::ProcessorError(format!(
216                    "master delegate task panicked: {e}"
217                )));
218            }
219            Ok(Err(e)) => {
220                warn!(error = %e, "master delegate task cancelled");
221                return Err(CamelError::ProcessorError(format!(
222                    "master delegate task cancelled: {e}"
223                )));
224            }
225            Err(_) => {
226                warn!("master delegate shutdown timed out, aborting");
227                handle.abort();
228            }
229        }
230    }
231    Ok(())
232}
233
234struct ReconcileContext<'a> {
235    lock_name: &'a str,
236    delegate_component: &'a Arc<dyn Component>,
237    delegate_uri: &'a str,
238    sender: &'a tokio::sync::mpsc::Sender<ExchangeEnvelope>,
239    parent_cancel: &'a CancellationToken,
240    drain_timeout: Duration,
241    metrics: &'a Arc<dyn MetricsCollector>,
242    platform_service: &'a Arc<dyn PlatformService>,
243}
244
245async fn reconcile_event(
246    event: camel_api::LeadershipEvent,
247    state: &mut DelegateState,
248    ctx: &ReconcileContext<'_>,
249) -> Result<(), CamelError> {
250    match event {
251        camel_api::LeadershipEvent::StartedLeading => {
252            info!(lock = %ctx.lock_name, "master leadership acquired");
253            // TODO(MST-001): emit metrics here — MetricsCollector is wired but never called
254            tracing::info!(lock = %ctx.lock_name, "metrics emission placeholder: leadership acquired");
255            stop_delegate(state, ctx.drain_timeout).await?;
256
257            let delegate_ctx = MasterDelegateContext {
258                delegate_component: Arc::clone(ctx.delegate_component),
259                metrics: Arc::clone(ctx.metrics),
260                platform_service: Arc::clone(ctx.platform_service),
261            };
262
263            let endpoint = match ctx
264                .delegate_component
265                .create_endpoint(ctx.delegate_uri, &delegate_ctx)
266            {
267                Ok(endpoint) => endpoint,
268                Err(err) => {
269                    warn!(lock = %ctx.lock_name, "failed to create delegate endpoint: {err}");
270                    return Ok(());
271                }
272            };
273
274            let mut consumer = match endpoint.create_consumer() {
275                Ok(consumer) => consumer,
276                Err(err) => {
277                    warn!(lock = %ctx.lock_name, "failed to create delegate consumer: {err}");
278                    return Ok(());
279                }
280            };
281
282            let run_token = ctx.parent_cancel.child_token();
283            let delegate_ctx = ConsumerContext::new(ctx.sender.clone(), run_token.clone());
284            let handle = tokio::spawn(async move {
285                consumer.start(delegate_ctx).await?;
286                consumer.stop().await?;
287                Ok::<(), CamelError>(())
288            });
289
290            *state = DelegateState::Active { run_token, handle };
291        }
292        camel_api::LeadershipEvent::StoppedLeading => {
293            info!(lock = %ctx.lock_name, "master leadership lost");
294            // TODO(MST-001): emit metrics here — MetricsCollector is wired but never called
295            tracing::info!(lock = %ctx.lock_name, "metrics emission placeholder: leadership lost");
296            stop_delegate(state, ctx.drain_timeout).await?;
297        }
298    }
299    Ok(())
300}
301
302#[async_trait]
303impl Consumer for MasterConsumer {
304    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
305        if self.leadership_task.is_some() {
306            return Ok(());
307        }
308
309        let handle = self
310            .platform_service
311            .leadership()
312            .start(&self.lock_name)
313            .await
314            .map_err(|e| {
315                CamelError::EndpointCreationFailed(format!("failed to start leader election: {e}"))
316            })?;
317
318        let lock_name = self.lock_name.clone();
319        let delegate_uri = self.delegate_uri.clone();
320        let delegate_component = Arc::clone(&self.delegate_component);
321        let metrics = Arc::clone(&self.metrics);
322        let platform_service = Arc::clone(&self.platform_service);
323        let sender = context.sender();
324        let parent_cancel = context.cancel_token();
325        let drain_timeout = self.drain_timeout;
326        let delegate_retry_max_attempts = self.delegate_retry_max_attempts;
327        let mut events = handle.events.clone();
328
329        let stop_token = CancellationToken::new();
330        let stop_token_loop = stop_token.clone();
331        let leadership_handle = handle;
332
333        let task = tokio::spawn(async move {
334            let mut state = DelegateState::Inactive;
335            let mut is_leading = false;
336            let mut delegate_attempts = 0u32;
337            let mut retry_tick = interval(DELEGATE_RETRY_INTERVAL);
338
339            let rctx = ReconcileContext {
340                lock_name: &lock_name,
341                delegate_component: &delegate_component,
342                delegate_uri: &delegate_uri,
343                sender: &sender,
344                parent_cancel: &parent_cancel,
345                drain_timeout,
346                metrics: &metrics,
347                platform_service: &platform_service,
348            };
349
350            let initial_event = { events.borrow().clone() };
351            if let Some(initial_event) = initial_event {
352                is_leading = matches!(&initial_event, camel_api::LeadershipEvent::StartedLeading);
353                if is_leading {
354                    delegate_attempts = 0;
355                }
356                if let Err(err) = reconcile_event(initial_event, &mut state, &rctx).await {
357                    error!(lock = %lock_name, "master delegate error: {err}");
358                    return Err(err);
359                }
360            }
361
362            loop {
363                tokio::select! {
364                    _ = stop_token_loop.cancelled() => {
365                        break;
366                    }
367                    _ = context.cancelled() => {
368                        break;
369                    }
370                    changed = events.changed() => {
371                        if changed.is_err() {
372                            break;
373                        }
374                        let event = { events.borrow().clone() };
375                        if let Some(event) = event {
376                            let was_leading = is_leading;
377                            is_leading = matches!(&event, camel_api::LeadershipEvent::StartedLeading);
378                            if !was_leading && is_leading {
379                                delegate_attempts = 0;
380                            }
381                            if let Err(err) = reconcile_event(event, &mut state, &rctx).await {
382                                error!(lock = %lock_name, "master delegate error: {err}");
383                                return Err(err);
384                            }
385                        }
386                    }
387                    _ = retry_tick.tick() => {
388                        if matches!(&state, DelegateState::Active { handle, .. } if handle.is_finished())
389                            && let Err(err) = stop_delegate(&mut state, drain_timeout).await
390                        {
391                            error!(lock = %lock_name, "master delegate task failed: {err}");
392                            return Err(err);
393                        }
394
395                        if is_leading && matches!(state, DelegateState::Inactive) {
396                            if let Some(max) = delegate_retry_max_attempts {
397                                delegate_attempts = delegate_attempts.saturating_add(1);
398                                if delegate_attempts > max {
399                                    warn!(
400                                        lock = %lock_name,
401                                        attempts = max,
402                                        "delegate start exceeded max attempts, stopping consumer"
403                                    );
404                                    break;
405                                }
406                            }
407                            if let Err(err) = reconcile_event(
408                                camel_api::LeadershipEvent::StartedLeading,
409                                &mut state,
410                                &rctx,
411                            )
412                            .await {
413                                error!(lock = %lock_name, "master delegate retry error: {err}");
414                                return Err(err);
415                            }
416                        }
417                    }
418                }
419            }
420
421            stop_delegate(&mut state, drain_timeout).await?;
422            let _ = timeout(drain_timeout, leadership_handle.step_down()).await;
423            Ok::<(), CamelError>(())
424        });
425
426        self.stop_token = Some(stop_token);
427        self.leadership_task = Some(task);
428
429        Ok(())
430    }
431
432    async fn stop(&mut self) -> Result<(), CamelError> {
433        if let Some(token) = self.stop_token.take() {
434            token.cancel();
435        }
436
437        if let Some(handle) = self.leadership_task.take() {
438            if handle.is_finished() {
439                match timeout(self.drain_timeout, handle).await {
440                    Ok(Ok(Ok(()))) => {}
441                    Ok(Ok(Err(err))) => return Err(err),
442                    Ok(Err(e)) => {
443                        return Err(CamelError::ProcessorError(format!(
444                            "leadership task join failed: {e}"
445                        )));
446                    }
447                    Err(_) => {
448                        return Err(CamelError::ProcessorError(
449                            "leadership task join timed out".to_string(),
450                        ));
451                    }
452                }
453                return Ok(());
454            }
455
456            // Abort first so the task is guaranteed to stop; then await with
457            // a timeout as a safety-net in case abort takes a moment to land.
458            handle.abort();
459            match timeout(self.drain_timeout, handle).await {
460                Ok(Ok(Ok(()))) => {}
461                Ok(Ok(Err(err))) => return Err(err),
462                Ok(Err(e)) if e.is_panic() => {
463                    error!(lock = %self.lock_name, error = %e, "leadership task panicked");
464                }
465                Ok(Err(e)) => {
466                    warn!(lock = %self.lock_name, error = %e, "leadership task cancelled");
467                }
468                Err(_) => {
469                    warn!("master leadership loop shutdown timed out after abort");
470                }
471            }
472        }
473
474        Ok(())
475    }
476}
477
478#[cfg(test)]
479mod tests {
480    use std::sync::Arc;
481    use std::sync::Mutex;
482    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
483
484    use camel_api::{
485        BoxProcessorExt, Exchange, LeadershipEvent, LeadershipHandle, LeadershipService, Message,
486        NoOpMetrics, NoopPlatformService, NoopReadinessGate, PlatformError, PlatformIdentity,
487        PlatformService, ReadinessGate,
488    };
489    use camel_component_api::NoOpComponentContext;
490    use std::time::Instant;
491    use tokio::sync::{oneshot, watch};
492    use tokio::time::{sleep, timeout};
493    use tokio_util::sync::CancellationToken;
494    use tower::ServiceExt;
495
496    use super::*;
497
498    #[test]
499    fn parse_master_uri_valid() {
500        let cfg = MasterUriConfig::parse("master:mylock:timer:tick?period=250").unwrap();
501        assert_eq!(cfg.lock_name, "mylock");
502        assert_eq!(cfg.delegate_uri, "timer:tick?period=250");
503    }
504
505    #[test]
506    fn parse_master_uri_missing_lockname() {
507        let err = MasterUriConfig::parse("master::timer:tick").unwrap_err();
508        assert!(matches!(err, CamelError::InvalidUri(_)));
509    }
510
511    #[test]
512    fn parse_master_uri_missing_delegate() {
513        let err = MasterUriConfig::parse("master:mylock:").unwrap_err();
514        assert!(matches!(err, CamelError::InvalidUri(_)));
515    }
516
517    #[test]
518    fn endpoint_fails_when_delegate_component_missing() {
519        let master = MasterComponent::default();
520        let result =
521            master.create_endpoint("master:lock-1:missing:delegate", &NoOpComponentContext);
522        assert!(matches!(result, Err(CamelError::ComponentNotFound(_))));
523    }
524
525    #[test]
526    fn delegate_scheme_is_parsed_from_delegate_uri() {
527        let seen_scheme = Arc::new(AtomicBool::new(false));
528
529        struct SchemeAwareContext {
530            delegate: Arc<dyn Component>,
531            seen_scheme: Arc<AtomicBool>,
532        }
533
534        impl ComponentContext for SchemeAwareContext {
535            fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
536                if scheme == "mock" {
537                    self.seen_scheme.store(true, Ordering::SeqCst);
538                    Some(Arc::clone(&self.delegate))
539                } else {
540                    None
541                }
542            }
543
544            fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
545                None
546            }
547
548            fn metrics(&self) -> Arc<dyn MetricsCollector> {
549                Arc::new(NoOpMetrics)
550            }
551
552            fn platform_service(&self) -> Arc<dyn PlatformService> {
553                Arc::new(NoopPlatformService::default())
554            }
555        }
556
557        struct MockDelegateComponent;
558
559        impl Component for MockDelegateComponent {
560            fn scheme(&self) -> &str {
561                "mock"
562            }
563
564            fn create_endpoint(
565                &self,
566                _uri: &str,
567                _ctx: &dyn ComponentContext,
568            ) -> Result<Box<dyn Endpoint>, CamelError> {
569                Ok(Box::new(MockDelegateEndpoint))
570            }
571        }
572
573        struct MockDelegateEndpoint;
574
575        impl Endpoint for MockDelegateEndpoint {
576            fn uri(&self) -> &str {
577                "mock:delegate"
578            }
579
580            fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
581                Err(CamelError::EndpointCreationFailed("not used".to_string()))
582            }
583
584            fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
585                Err(CamelError::EndpointCreationFailed("not used".to_string()))
586            }
587        }
588
589        let delegate = Arc::new(MockDelegateComponent);
590        let ctx = SchemeAwareContext {
591            delegate,
592            seen_scheme: Arc::clone(&seen_scheme),
593        };
594
595        let master = MasterComponent::default();
596        let endpoint = master
597            .create_endpoint("master:mylock:mock:delegate?x=1", &ctx)
598            .unwrap();
599
600        assert_eq!(endpoint.uri(), "master:mylock:mock:delegate?x=1");
601        assert!(seen_scheme.load(Ordering::SeqCst));
602    }
603
604    struct MockDelegateContext {
605        delegate: Arc<dyn Component>,
606    }
607
608    impl ComponentContext for MockDelegateContext {
609        fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
610            if self.delegate.scheme() == scheme {
611                Some(Arc::clone(&self.delegate))
612            } else {
613                None
614            }
615        }
616
617        fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
618            None
619        }
620
621        fn metrics(&self) -> Arc<dyn MetricsCollector> {
622            Arc::new(NoOpMetrics)
623        }
624
625        fn platform_service(&self) -> Arc<dyn PlatformService> {
626            Arc::new(NoopPlatformService::default())
627        }
628    }
629
630    struct MockProducerDelegateComponent {
631        create_endpoint_calls: Arc<AtomicUsize>,
632        create_producer_calls: Arc<AtomicUsize>,
633        fail_producer: bool,
634    }
635
636    impl Component for MockProducerDelegateComponent {
637        fn scheme(&self) -> &str {
638            "mock"
639        }
640
641        fn create_endpoint(
642            &self,
643            _uri: &str,
644            _ctx: &dyn ComponentContext,
645        ) -> Result<Box<dyn Endpoint>, CamelError> {
646            self.create_endpoint_calls.fetch_add(1, Ordering::SeqCst);
647            Ok(Box::new(MockProducerDelegateEndpoint {
648                create_producer_calls: Arc::clone(&self.create_producer_calls),
649                fail_producer: self.fail_producer,
650            }))
651        }
652    }
653
654    struct MockProducerDelegateEndpoint {
655        create_producer_calls: Arc<AtomicUsize>,
656        fail_producer: bool,
657    }
658
659    impl Endpoint for MockProducerDelegateEndpoint {
660        fn uri(&self) -> &str {
661            "mock:delegate"
662        }
663
664        fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
665            Err(CamelError::EndpointCreationFailed(
666                "not used in test".to_string(),
667            ))
668        }
669
670        fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
671            self.create_producer_calls.fetch_add(1, Ordering::SeqCst);
672            if self.fail_producer {
673                return Err(CamelError::ProcessorError(
674                    "delegate producer failed".to_string(),
675                ));
676            }
677            Ok(BoxProcessor::from_fn(
678                |exchange| async move { Ok(exchange) },
679            ))
680        }
681    }
682
683    #[tokio::test]
684    async fn producer_passthrough_delegates_and_produces() {
685        let endpoint_calls = Arc::new(AtomicUsize::new(0));
686        let producer_calls = Arc::new(AtomicUsize::new(0));
687        let delegate = Arc::new(MockProducerDelegateComponent {
688            create_endpoint_calls: Arc::clone(&endpoint_calls),
689            create_producer_calls: Arc::clone(&producer_calls),
690            fail_producer: false,
691        });
692
693        let ctx = MockDelegateContext {
694            delegate: delegate.clone(),
695        };
696
697        let master = MasterComponent::default();
698        let endpoint = master
699            .create_endpoint("master:lock-1:mock:delegate", &ctx)
700            .unwrap();
701        let producer_ctx = ProducerContext::new();
702        let producer = endpoint.create_producer(&producer_ctx).unwrap();
703
704        let exchange = Exchange::new(Message::new("ok"));
705        let result = producer.oneshot(exchange).await.unwrap();
706
707        assert_eq!(result.input.body.as_text(), Some("ok"));
708        assert_eq!(endpoint_calls.load(Ordering::SeqCst), 1);
709        assert_eq!(producer_calls.load(Ordering::SeqCst), 1);
710    }
711
712    #[test]
713    fn producer_passthrough_bubbles_delegate_errors() {
714        let endpoint_calls = Arc::new(AtomicUsize::new(0));
715        let producer_calls = Arc::new(AtomicUsize::new(0));
716        let delegate = Arc::new(MockProducerDelegateComponent {
717            create_endpoint_calls: Arc::clone(&endpoint_calls),
718            create_producer_calls: Arc::clone(&producer_calls),
719            fail_producer: true,
720        });
721
722        let ctx = MockDelegateContext {
723            delegate: delegate.clone(),
724        };
725
726        let master = MasterComponent::default();
727        let endpoint = master
728            .create_endpoint("master:lock-1:mock:delegate", &ctx)
729            .unwrap();
730        let producer_ctx = ProducerContext::new();
731        let err = endpoint.create_producer(&producer_ctx).unwrap_err();
732
733        assert!(matches!(err, CamelError::ProcessorError(_)));
734        assert_eq!(endpoint_calls.load(Ordering::SeqCst), 1);
735        assert_eq!(producer_calls.load(Ordering::SeqCst), 1);
736    }
737
738    struct FakeLeadershipService {
739        tx: Mutex<Option<watch::Sender<Option<LeadershipEvent>>>>,
740        is_leader: Arc<AtomicBool>,
741        initial: Option<LeadershipEvent>,
742    }
743
744    impl FakeLeadershipService {
745        fn new(initial: Option<LeadershipEvent>) -> Self {
746            let starts_as_leader = matches!(initial, Some(LeadershipEvent::StartedLeading));
747            Self {
748                tx: Mutex::new(None),
749                is_leader: Arc::new(AtomicBool::new(starts_as_leader)),
750                initial,
751            }
752        }
753
754        async fn emit(&self, event: LeadershipEvent) {
755            self.is_leader.store(
756                matches!(event, LeadershipEvent::StartedLeading),
757                Ordering::Release,
758            );
759            if let Some(tx) = self
760                .tx
761                .lock()
762                .expect("mutex poisoned: fake elector sender")
763                .as_ref()
764            {
765                let _ = tx.send(Some(event));
766            }
767        }
768    }
769
770    #[async_trait]
771    impl LeadershipService for FakeLeadershipService {
772        async fn start(&self, _lock_name: &str) -> Result<LeadershipHandle, PlatformError> {
773            let (tx, rx) = watch::channel(self.initial.clone());
774            *self.tx.lock().expect("mutex poisoned: fake elector sender") = Some(tx);
775
776            let cancel = CancellationToken::new();
777            let cancel_wait = cancel.clone();
778            let (term_tx, term_rx) = oneshot::channel();
779            tokio::spawn(async move {
780                cancel_wait.cancelled().await;
781                let _ = term_tx.send(());
782            });
783
784            Ok(LeadershipHandle::new(
785                rx,
786                Arc::clone(&self.is_leader),
787                cancel,
788                term_rx,
789            ))
790        }
791    }
792
793    struct FakePlatformService {
794        identity: PlatformIdentity,
795        readiness_gate: Arc<dyn ReadinessGate>,
796        leadership: Arc<dyn LeadershipService>,
797    }
798
799    impl FakePlatformService {
800        fn new(leadership: Arc<dyn LeadershipService>) -> Self {
801            Self {
802                identity: PlatformIdentity::local("master-tests"),
803                readiness_gate: Arc::new(NoopReadinessGate),
804                leadership,
805            }
806        }
807    }
808
809    impl PlatformService for FakePlatformService {
810        fn identity(&self) -> PlatformIdentity {
811            self.identity.clone()
812        }
813
814        fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
815            Arc::clone(&self.readiness_gate)
816        }
817
818        fn leadership(&self) -> Arc<dyn LeadershipService> {
819            Arc::clone(&self.leadership)
820        }
821    }
822
823    struct FakeDelegateComponent {
824        create_consumer_calls: Arc<AtomicUsize>,
825        start_calls: Arc<AtomicUsize>,
826    }
827
828    impl Component for FakeDelegateComponent {
829        fn scheme(&self) -> &str {
830            "fake"
831        }
832
833        fn create_endpoint(
834            &self,
835            _uri: &str,
836            _ctx: &dyn ComponentContext,
837        ) -> Result<Box<dyn Endpoint>, CamelError> {
838            Ok(Box::new(FakeDelegateEndpoint {
839                create_consumer_calls: Arc::clone(&self.create_consumer_calls),
840                start_calls: Arc::clone(&self.start_calls),
841            }))
842        }
843    }
844
845    struct FakeDelegateEndpoint {
846        create_consumer_calls: Arc<AtomicUsize>,
847        start_calls: Arc<AtomicUsize>,
848    }
849
850    impl Endpoint for FakeDelegateEndpoint {
851        fn uri(&self) -> &str {
852            "fake:delegate"
853        }
854
855        fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
856            let epoch = self.create_consumer_calls.fetch_add(1, Ordering::SeqCst) + 1;
857            Ok(Box::new(FakeDelegateConsumer {
858                epoch,
859                start_calls: Arc::clone(&self.start_calls),
860            }))
861        }
862
863        fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
864            Err(CamelError::EndpointCreationFailed("not used".to_string()))
865        }
866    }
867
868    struct FakeDelegateConsumer {
869        epoch: usize,
870        start_calls: Arc<AtomicUsize>,
871    }
872
873    struct FailingDelegateComponent {
874        create_endpoint_calls: Arc<AtomicUsize>,
875    }
876
877    impl Component for FailingDelegateComponent {
878        fn scheme(&self) -> &str {
879            "failing"
880        }
881
882        fn create_endpoint(
883            &self,
884            _uri: &str,
885            _ctx: &dyn ComponentContext,
886        ) -> Result<Box<dyn Endpoint>, CamelError> {
887            self.create_endpoint_calls.fetch_add(1, Ordering::SeqCst);
888            Err(CamelError::EndpointCreationFailed(
889                "delegate endpoint creation failed".to_string(),
890            ))
891        }
892    }
893
894    #[async_trait]
895    impl Consumer for FakeDelegateConsumer {
896        async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
897            self.start_calls.fetch_add(1, Ordering::SeqCst);
898            context
899                .send(Exchange::new(Message::new(format!("epoch-{}", self.epoch))))
900                .await?;
901
902            loop {
903                tokio::select! {
904                    _ = context.cancelled() => {
905                        break;
906                    }
907                    _ = sleep(Duration::from_millis(20)) => {
908                        context
909                            .send(Exchange::new(Message::new(format!("epoch-{}", self.epoch))))
910                            .await?;
911                    }
912                }
913            }
914
915            Ok(())
916        }
917
918        async fn stop(&mut self) -> Result<(), CamelError> {
919            Ok(())
920        }
921    }
922
923    fn build_master_consumer(
924        platform_service: Arc<dyn PlatformService>,
925        create_consumer_calls: Arc<AtomicUsize>,
926        start_calls: Arc<AtomicUsize>,
927        delegate_retry_max_attempts: Option<u32>,
928    ) -> MasterConsumer {
929        MasterConsumer::new(
930            "lock-a".to_string(),
931            "fake:delegate".to_string(),
932            Arc::new(FakeDelegateComponent {
933                create_consumer_calls,
934                start_calls,
935            }),
936            Arc::new(NoOpMetrics),
937            platform_service,
938            Duration::from_millis(500),
939            delegate_retry_max_attempts,
940        )
941    }
942
943    #[tokio::test]
944    async fn starts_delegate_only_after_started_leading() {
945        let leadership = Arc::new(FakeLeadershipService::new(None));
946        let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
947        let create_consumer_calls = Arc::new(AtomicUsize::new(0));
948        let start_calls = Arc::new(AtomicUsize::new(0));
949        let mut master = build_master_consumer(
950            platform_service,
951            Arc::clone(&create_consumer_calls),
952            Arc::clone(&start_calls),
953            Some(30),
954        );
955
956        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
957        let cancel = CancellationToken::new();
958        let ctx = ConsumerContext::new(tx, cancel.clone());
959
960        master.start(ctx).await.unwrap();
961
962        sleep(Duration::from_millis(80)).await;
963        assert!(rx.try_recv().is_err());
964        assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 0);
965
966        leadership.emit(LeadershipEvent::StartedLeading).await;
967
968        let first = timeout(Duration::from_millis(500), rx.recv())
969            .await
970            .unwrap()
971            .unwrap();
972        assert_eq!(first.exchange.input.body.as_text(), Some("epoch-1"));
973        assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 1);
974        assert_eq!(start_calls.load(Ordering::SeqCst), 1);
975
976        cancel.cancel();
977        master.stop().await.unwrap();
978    }
979
980    #[tokio::test]
981    async fn stops_delegate_on_stopped_leading() {
982        let leadership = Arc::new(FakeLeadershipService::new(None));
983        let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
984        let create_consumer_calls = Arc::new(AtomicUsize::new(0));
985        let start_calls = Arc::new(AtomicUsize::new(0));
986        let mut master = build_master_consumer(
987            platform_service,
988            Arc::clone(&create_consumer_calls),
989            Arc::clone(&start_calls),
990            Some(30),
991        );
992
993        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
994        let cancel = CancellationToken::new();
995        let ctx = ConsumerContext::new(tx, cancel.clone());
996
997        master.start(ctx).await.unwrap();
998        leadership.emit(LeadershipEvent::StartedLeading).await;
999        let _ = timeout(Duration::from_millis(500), rx.recv())
1000            .await
1001            .unwrap()
1002            .unwrap();
1003
1004        leadership.emit(LeadershipEvent::StoppedLeading).await;
1005        sleep(Duration::from_millis(100)).await;
1006        while rx.try_recv().is_ok() {}
1007        assert!(
1008            timeout(Duration::from_millis(120), rx.recv())
1009                .await
1010                .is_err()
1011        );
1012
1013        cancel.cancel();
1014        master.stop().await.unwrap();
1015    }
1016
1017    #[tokio::test]
1018    async fn recreates_delegate_on_new_leadership_epoch() {
1019        let leadership = Arc::new(FakeLeadershipService::new(None));
1020        let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
1021        let create_consumer_calls = Arc::new(AtomicUsize::new(0));
1022        let start_calls = Arc::new(AtomicUsize::new(0));
1023        let mut master = build_master_consumer(
1024            platform_service,
1025            Arc::clone(&create_consumer_calls),
1026            Arc::clone(&start_calls),
1027            Some(30),
1028        );
1029
1030        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1031        let cancel = CancellationToken::new();
1032        let ctx = ConsumerContext::new(tx, cancel.clone());
1033
1034        master.start(ctx).await.unwrap();
1035
1036        leadership.emit(LeadershipEvent::StartedLeading).await;
1037        let first = timeout(Duration::from_millis(500), rx.recv())
1038            .await
1039            .unwrap()
1040            .unwrap();
1041        assert_eq!(first.exchange.input.body.as_text(), Some("epoch-1"));
1042
1043        leadership.emit(LeadershipEvent::StoppedLeading).await;
1044        sleep(Duration::from_millis(120)).await;
1045
1046        leadership.emit(LeadershipEvent::StartedLeading).await;
1047        let second = timeout(Duration::from_millis(500), rx.recv())
1048            .await
1049            .unwrap()
1050            .unwrap();
1051        assert_eq!(second.exchange.input.body.as_text(), Some("epoch-2"));
1052
1053        assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 2);
1054        assert_eq!(start_calls.load(Ordering::SeqCst), 2);
1055
1056        cancel.cancel();
1057        master.stop().await.unwrap();
1058    }
1059
1060    #[tokio::test]
1061    async fn stops_retrying_delegate_start_after_max_attempts() {
1062        let leadership = Arc::new(FakeLeadershipService::new(Some(
1063            LeadershipEvent::StartedLeading,
1064        )));
1065        let platform_service = Arc::new(FakePlatformService::new(leadership));
1066        let create_endpoint_calls = Arc::new(AtomicUsize::new(0));
1067
1068        let mut master = MasterConsumer::new(
1069            "lock-a".to_string(),
1070            "failing:delegate".to_string(),
1071            Arc::new(FailingDelegateComponent {
1072                create_endpoint_calls: Arc::clone(&create_endpoint_calls),
1073            }),
1074            Arc::new(NoOpMetrics),
1075            platform_service,
1076            Duration::from_millis(500),
1077            Some(1),
1078        );
1079
1080        let (tx, _rx) = tokio::sync::mpsc::channel(16);
1081        let cancel = CancellationToken::new();
1082        let ctx = ConsumerContext::new(tx, cancel.clone());
1083
1084        master.start(ctx).await.unwrap();
1085        sleep(Duration::from_millis(750)).await;
1086
1087        assert_eq!(create_endpoint_calls.load(Ordering::SeqCst), 2);
1088
1089        cancel.cancel();
1090        master.stop().await.unwrap();
1091    }
1092
1093    /// Regression test for MST-002: stop() must abort the leadership JoinHandle
1094    /// instead of just dropping it when the task is slow to drain.
1095    /// Without the fix, stop() blocks for the full drain_timeout (~500 ms)
1096    /// because the leadership task is stuck in stop_delegate awaiting a
1097    /// slow delegate. With abort-first, stop() returns almost instantly.
1098    #[tokio::test]
1099    async fn stop_completes_quickly_when_leadership_task_is_slow() {
1100        // Delegate consumer that ignores its cancellation token and blocks.
1101        struct SlowStoppingConsumer;
1102
1103        #[async_trait]
1104        impl Consumer for SlowStoppingConsumer {
1105            async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
1106                ctx.send(Exchange::new(Message::new("slow-start")))
1107                    .await
1108                    .ok();
1109                // Ignore cancellation — sleep far beyond the drain timeout.
1110                sleep(Duration::from_secs(60)).await;
1111                Ok(())
1112            }
1113
1114            async fn stop(&mut self) -> Result<(), CamelError> {
1115                Ok(())
1116            }
1117        }
1118
1119        struct SlowStoppingComponent;
1120
1121        impl Component for SlowStoppingComponent {
1122            fn scheme(&self) -> &str {
1123                "slow"
1124            }
1125
1126            fn create_endpoint(
1127                &self,
1128                _uri: &str,
1129                _ctx: &dyn ComponentContext,
1130            ) -> Result<Box<dyn Endpoint>, CamelError> {
1131                Ok(Box::new(SlowStoppingEndpoint))
1132            }
1133        }
1134
1135        struct SlowStoppingEndpoint;
1136
1137        impl Endpoint for SlowStoppingEndpoint {
1138            fn uri(&self) -> &str {
1139                "slow:delegate"
1140            }
1141
1142            fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1143                Ok(Box::new(SlowStoppingConsumer))
1144            }
1145
1146            fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1147                Err(CamelError::EndpointCreationFailed("not used".into()))
1148            }
1149        }
1150
1151        let leadership = Arc::new(FakeLeadershipService::new(Some(
1152            LeadershipEvent::StartedLeading,
1153        )));
1154        let platform_service = Arc::new(FakePlatformService::new(leadership));
1155
1156        let mut master = MasterConsumer::new(
1157            "lock-slow".into(),
1158            "slow:delegate".into(),
1159            Arc::new(SlowStoppingComponent),
1160            Arc::new(NoOpMetrics),
1161            platform_service,
1162            Duration::from_millis(500), // drain_timeout
1163            Some(30),
1164        );
1165
1166        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1167        let cancel = CancellationToken::new();
1168        let ctx = ConsumerContext::new(tx, cancel.clone());
1169
1170        master.start(ctx).await.unwrap();
1171
1172        // Wait for the delegate to actually start.
1173        let msg = timeout(Duration::from_secs(2), rx.recv())
1174            .await
1175            .unwrap()
1176            .unwrap();
1177        assert_eq!(msg.exchange.input.body.as_text(), Some("slow-start"));
1178
1179        // stop() must complete quickly because the leadership task is aborted,
1180        // not just timed-out and leaked.
1181        let start = Instant::now();
1182        master.stop().await.unwrap();
1183        let elapsed = start.elapsed();
1184
1185        // With abort-first: ~0 ms. Without the fix: ~drain_timeout (500 ms).
1186        // Assert < 250 ms to reliably distinguish the two behaviours.
1187        assert!(
1188            elapsed < Duration::from_millis(250),
1189            "stop() took {:?}, expected < 250 ms (abort should be near-instant)",
1190            elapsed,
1191        );
1192
1193        cancel.cancel();
1194    }
1195
1196    #[tokio::test]
1197    async fn stop_propagates_delegate_start_error() {
1198        struct FailingStartConsumer;
1199
1200        #[async_trait]
1201        impl Consumer for FailingStartConsumer {
1202            async fn start(&mut self, _ctx: ConsumerContext) -> Result<(), CamelError> {
1203                Err(CamelError::ProcessorError(
1204                    "delegate start failed".to_string(),
1205                ))
1206            }
1207
1208            async fn stop(&mut self) -> Result<(), CamelError> {
1209                Ok(())
1210            }
1211        }
1212
1213        struct FailingStartComponent;
1214
1215        impl Component for FailingStartComponent {
1216            fn scheme(&self) -> &str {
1217                "failstart"
1218            }
1219
1220            fn create_endpoint(
1221                &self,
1222                _uri: &str,
1223                _ctx: &dyn ComponentContext,
1224            ) -> Result<Box<dyn Endpoint>, CamelError> {
1225                Ok(Box::new(FailingStartEndpoint))
1226            }
1227        }
1228
1229        struct FailingStartEndpoint;
1230
1231        impl Endpoint for FailingStartEndpoint {
1232            fn uri(&self) -> &str {
1233                "failstart:delegate"
1234            }
1235
1236            fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1237                Ok(Box::new(FailingStartConsumer))
1238            }
1239
1240            fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1241                Err(CamelError::EndpointCreationFailed("not used".into()))
1242            }
1243        }
1244
1245        let leadership = Arc::new(FakeLeadershipService::new(Some(
1246            LeadershipEvent::StartedLeading,
1247        )));
1248        let platform_service = Arc::new(FakePlatformService::new(leadership));
1249
1250        let mut master = MasterConsumer::new(
1251            "lock-error".into(),
1252            "failstart:delegate".into(),
1253            Arc::new(FailingStartComponent),
1254            Arc::new(NoOpMetrics),
1255            platform_service,
1256            Duration::from_millis(500),
1257            Some(30),
1258        );
1259
1260        let (tx, _rx) = tokio::sync::mpsc::channel(16);
1261        let cancel = CancellationToken::new();
1262        let ctx = ConsumerContext::new(tx, cancel.clone());
1263
1264        master.start(ctx).await.unwrap();
1265        sleep(Duration::from_millis(250)).await;
1266        assert!(
1267            master
1268                .leadership_task
1269                .as_ref()
1270                .is_some_and(tokio::task::JoinHandle::is_finished),
1271            "leadership task should finish after delegate error"
1272        );
1273        let err = master.stop().await.expect_err("expected delegate error");
1274        assert!(err.to_string().contains("delegate start failed"));
1275
1276        cancel.cancel();
1277    }
1278}