Skip to main content

camel_master/
lib.rs

1//! Master/leader-election component — wraps a delegate consumer with distributed lock-based leadership coordination.
2//!
3//! Main types: `MasterComponent`, `MasterBundle`.
4//! Main modules: `bundle`, `config`.
5
6pub mod bundle;
7mod config;
8
9pub use bundle::MasterBundle;
10
11use std::sync::Arc;
12use std::time::Duration;
13
14use async_trait::async_trait;
15use camel_api::{CamelError, MetricsCollector, PlatformService};
16use camel_component_api::{
17    BoxProcessor, Component, ComponentContext, Consumer, ConsumerContext, Endpoint,
18    ExchangeEnvelope, ProducerContext, parse_uri,
19};
20use camel_language_api::Language;
21use tokio::task::JoinHandle;
22use tokio::time::{interval, timeout};
23use tokio_util::sync::CancellationToken;
24use tracing::{error, info, warn};
25
26use crate::config::{MasterComponentConfig, MasterUriConfig};
27
28const DELEGATE_RETRY_INTERVAL: Duration = Duration::from_millis(200);
29
30pub struct MasterComponent {
31    drain_timeout_ms: u64,
32    delegate_retry_max_attempts: Option<u32>,
33}
34
35impl MasterComponent {
36    pub fn new(config: MasterComponentConfig) -> Self {
37        Self {
38            drain_timeout_ms: config.drain_timeout_ms,
39            delegate_retry_max_attempts: config.delegate_retry_max_attempts,
40        }
41    }
42}
43
44impl Default for MasterComponent {
45    fn default() -> Self {
46        Self::new(MasterComponentConfig::default())
47    }
48}
49
50impl Component for MasterComponent {
51    fn scheme(&self) -> &str {
52        "master"
53    }
54
55    fn create_endpoint(
56        &self,
57        uri: &str,
58        ctx: &dyn ComponentContext,
59    ) -> Result<Box<dyn Endpoint>, CamelError> {
60        let parsed = MasterUriConfig::parse(uri)?;
61        let delegate_parts = parse_uri(&parsed.delegate_uri)?;
62        let delegate_scheme = delegate_parts.scheme;
63        let delegate_component = ctx
64            .resolve_component(&delegate_scheme)
65            .ok_or_else(|| CamelError::ComponentNotFound(delegate_scheme.clone()))?;
66
67        Ok(Box::new(MasterEndpoint {
68            uri: uri.to_string(),
69            lock_name: parsed.lock_name,
70            delegate_uri: parsed.delegate_uri,
71            delegate_component,
72            metrics: ctx.metrics(),
73            platform_service: ctx.platform_service(),
74            drain_timeout: Duration::from_millis(self.drain_timeout_ms),
75            delegate_retry_max_attempts: self.delegate_retry_max_attempts,
76        }))
77    }
78}
79
80struct MasterEndpoint {
81    uri: String,
82    lock_name: String,
83    delegate_uri: String,
84    delegate_component: Arc<dyn Component>,
85    metrics: Arc<dyn MetricsCollector>,
86    platform_service: Arc<dyn PlatformService>,
87    drain_timeout: Duration,
88    delegate_retry_max_attempts: Option<u32>,
89}
90
91impl Endpoint for MasterEndpoint {
92    fn uri(&self) -> &str {
93        &self.uri
94    }
95
96    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
97        Ok(Box::new(MasterConsumer::new(
98            self.lock_name.clone(),
99            self.delegate_uri.clone(),
100            Arc::clone(&self.delegate_component),
101            Arc::clone(&self.metrics),
102            Arc::clone(&self.platform_service),
103            self.drain_timeout,
104            self.delegate_retry_max_attempts,
105        )))
106    }
107
108    fn create_producer(&self, ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
109        let delegate_ctx = MasterDelegateContext {
110            delegate_component: Arc::clone(&self.delegate_component),
111            metrics: Arc::clone(&self.metrics),
112            platform_service: Arc::clone(&self.platform_service),
113        };
114
115        self.delegate_component
116            .create_endpoint(&self.delegate_uri, &delegate_ctx)?
117            .create_producer(ctx)
118    }
119}
120
121struct MasterDelegateContext {
122    delegate_component: Arc<dyn Component>,
123    metrics: Arc<dyn MetricsCollector>,
124    platform_service: Arc<dyn PlatformService>,
125}
126
127impl ComponentContext for MasterDelegateContext {
128    fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
129        if self.delegate_component.scheme() == scheme {
130            Some(Arc::clone(&self.delegate_component))
131        } else {
132            None
133        }
134    }
135
136    fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
137        None
138    }
139
140    fn metrics(&self) -> Arc<dyn MetricsCollector> {
141        Arc::clone(&self.metrics)
142    }
143
144    fn platform_service(&self) -> Arc<dyn PlatformService> {
145        Arc::clone(&self.platform_service)
146    }
147}
148
149struct MasterConsumer {
150    lock_name: String,
151    delegate_uri: String,
152    delegate_component: Arc<dyn Component>,
153    metrics: Arc<dyn MetricsCollector>,
154    platform_service: Arc<dyn PlatformService>,
155    drain_timeout: Duration,
156    delegate_retry_max_attempts: Option<u32>,
157    leadership_task: Option<JoinHandle<()>>,
158    stop_token: Option<CancellationToken>,
159}
160
161impl MasterConsumer {
162    fn new(
163        lock_name: String,
164        delegate_uri: String,
165        delegate_component: Arc<dyn Component>,
166        metrics: Arc<dyn MetricsCollector>,
167        platform_service: Arc<dyn PlatformService>,
168        drain_timeout: Duration,
169        delegate_retry_max_attempts: Option<u32>,
170    ) -> Self {
171        Self {
172            lock_name,
173            delegate_uri,
174            delegate_component,
175            metrics,
176            platform_service,
177            drain_timeout,
178            delegate_retry_max_attempts,
179            leadership_task: None,
180            stop_token: None,
181        }
182    }
183}
184
185enum DelegateState {
186    Inactive,
187    Active {
188        run_token: CancellationToken,
189        handle: JoinHandle<()>,
190    },
191}
192
193async fn stop_delegate(state: &mut DelegateState, drain_timeout: Duration) {
194    if let DelegateState::Active {
195        run_token,
196        mut handle,
197    } = std::mem::replace(state, DelegateState::Inactive)
198    {
199        run_token.cancel();
200        match timeout(drain_timeout, &mut handle).await {
201            Ok(Ok(())) => {}
202            Ok(Err(e)) if e.is_panic() => {
203                error!(error = %e, "master delegate task panicked");
204            }
205            Ok(Err(e)) => {
206                warn!(error = %e, "master delegate task cancelled");
207            }
208            Err(_) => {
209                warn!("master delegate shutdown timed out, aborting");
210                handle.abort();
211            }
212        }
213    }
214}
215
216struct ReconcileContext<'a> {
217    lock_name: &'a str,
218    delegate_component: &'a Arc<dyn Component>,
219    delegate_uri: &'a str,
220    sender: &'a tokio::sync::mpsc::Sender<ExchangeEnvelope>,
221    parent_cancel: &'a CancellationToken,
222    drain_timeout: Duration,
223    metrics: &'a Arc<dyn MetricsCollector>,
224    platform_service: &'a Arc<dyn PlatformService>,
225}
226
227async fn reconcile_event(
228    event: camel_api::LeadershipEvent,
229    state: &mut DelegateState,
230    ctx: &ReconcileContext<'_>,
231) {
232    match event {
233        camel_api::LeadershipEvent::StartedLeading => {
234            info!(lock = %ctx.lock_name, "master leadership acquired");
235            stop_delegate(state, ctx.drain_timeout).await;
236
237            let delegate_ctx = MasterDelegateContext {
238                delegate_component: Arc::clone(ctx.delegate_component),
239                metrics: Arc::clone(ctx.metrics),
240                platform_service: Arc::clone(ctx.platform_service),
241            };
242
243            let endpoint = match ctx
244                .delegate_component
245                .create_endpoint(ctx.delegate_uri, &delegate_ctx)
246            {
247                Ok(endpoint) => endpoint,
248                Err(err) => {
249                    warn!(lock = %ctx.lock_name, "failed to create delegate endpoint: {err}");
250                    return;
251                }
252            };
253
254            let mut consumer = match endpoint.create_consumer() {
255                Ok(consumer) => consumer,
256                Err(err) => {
257                    warn!(lock = %ctx.lock_name, "failed to create delegate consumer: {err}");
258                    return;
259                }
260            };
261
262            let run_token = ctx.parent_cancel.child_token();
263            let delegate_ctx = ConsumerContext::new(ctx.sender.clone(), run_token.clone());
264            let handle = tokio::spawn(async move {
265                let _ = consumer.start(delegate_ctx).await;
266                let _ = consumer.stop().await;
267            });
268
269            *state = DelegateState::Active { run_token, handle };
270        }
271        camel_api::LeadershipEvent::StoppedLeading => {
272            info!(lock = %ctx.lock_name, "master leadership lost");
273            stop_delegate(state, ctx.drain_timeout).await;
274        }
275    }
276}
277
278#[async_trait]
279impl Consumer for MasterConsumer {
280    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
281        if self.leadership_task.is_some() {
282            return Ok(());
283        }
284
285        let handle = self
286            .platform_service
287            .leadership()
288            .start(&self.lock_name)
289            .await
290            .map_err(|e| {
291                CamelError::EndpointCreationFailed(format!("failed to start leader election: {e}"))
292            })?;
293
294        let lock_name = self.lock_name.clone();
295        let delegate_uri = self.delegate_uri.clone();
296        let delegate_component = Arc::clone(&self.delegate_component);
297        let metrics = Arc::clone(&self.metrics);
298        let platform_service = Arc::clone(&self.platform_service);
299        let sender = context.sender();
300        let parent_cancel = context.cancel_token();
301        let drain_timeout = self.drain_timeout;
302        let delegate_retry_max_attempts = self.delegate_retry_max_attempts;
303        let mut events = handle.events.clone();
304
305        let stop_token = CancellationToken::new();
306        let stop_token_loop = stop_token.clone();
307        let leadership_handle = handle;
308
309        let task = tokio::spawn(async move {
310            let mut state = DelegateState::Inactive;
311            let mut is_leading = false;
312            let mut delegate_attempts = 0u32;
313            let mut retry_tick = interval(DELEGATE_RETRY_INTERVAL);
314
315            let rctx = ReconcileContext {
316                lock_name: &lock_name,
317                delegate_component: &delegate_component,
318                delegate_uri: &delegate_uri,
319                sender: &sender,
320                parent_cancel: &parent_cancel,
321                drain_timeout,
322                metrics: &metrics,
323                platform_service: &platform_service,
324            };
325
326            let initial_event = { events.borrow().clone() };
327            if let Some(initial_event) = initial_event {
328                is_leading = matches!(&initial_event, camel_api::LeadershipEvent::StartedLeading);
329                if is_leading {
330                    delegate_attempts = 0;
331                }
332                reconcile_event(initial_event, &mut state, &rctx).await;
333            }
334
335            loop {
336                tokio::select! {
337                    _ = stop_token_loop.cancelled() => {
338                        break;
339                    }
340                    _ = context.cancelled() => {
341                        break;
342                    }
343                    changed = events.changed() => {
344                        if changed.is_err() {
345                            break;
346                        }
347                        let event = { events.borrow().clone() };
348                        if let Some(event) = event {
349                            let was_leading = is_leading;
350                            is_leading = matches!(&event, camel_api::LeadershipEvent::StartedLeading);
351                            if !was_leading && is_leading {
352                                delegate_attempts = 0;
353                            }
354                            reconcile_event(event, &mut state, &rctx).await;
355                        }
356                    }
357                    _ = retry_tick.tick() => {
358                        if is_leading && matches!(state, DelegateState::Inactive) {
359                            if let Some(max) = delegate_retry_max_attempts {
360                                delegate_attempts = delegate_attempts.saturating_add(1);
361                                if delegate_attempts > max {
362                                    warn!(
363                                        lock = %lock_name,
364                                        attempts = max,
365                                        "delegate start exceeded max attempts, stopping consumer"
366                                    );
367                                    break;
368                                }
369                            }
370                            reconcile_event(
371                                camel_api::LeadershipEvent::StartedLeading,
372                                &mut state,
373                                &rctx,
374                            )
375                            .await;
376                        }
377                    }
378                }
379            }
380
381            stop_delegate(&mut state, drain_timeout).await;
382            let _ = timeout(drain_timeout, leadership_handle.step_down()).await;
383        });
384
385        self.stop_token = Some(stop_token);
386        self.leadership_task = Some(task);
387
388        Ok(())
389    }
390
391    async fn stop(&mut self) -> Result<(), CamelError> {
392        if let Some(token) = self.stop_token.take() {
393            token.cancel();
394        }
395
396        if let Some(task) = self.leadership_task.take() {
397            match timeout(self.drain_timeout, task).await {
398                Ok(Ok(())) => {}
399                Ok(Err(e)) if e.is_panic() => {
400                    error!(lock = %self.lock_name, error = %e, "leadership task panicked");
401                }
402                Ok(Err(e)) => {
403                    warn!(lock = %self.lock_name, error = %e, "leadership task cancelled");
404                }
405                Err(_) => {
406                    warn!("master leadership loop shutdown timed out");
407                }
408            }
409        }
410
411        Ok(())
412    }
413}
414
415#[cfg(test)]
416mod tests {
417    use std::sync::Arc;
418    use std::sync::Mutex;
419    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
420
421    use camel_api::{
422        BoxProcessorExt, Exchange, LeadershipEvent, LeadershipHandle, LeadershipService, Message,
423        NoOpMetrics, NoopPlatformService, NoopReadinessGate, PlatformError, PlatformIdentity,
424        PlatformService, ReadinessGate,
425    };
426    use camel_component_api::NoOpComponentContext;
427    use tokio::sync::{oneshot, watch};
428    use tokio::time::{sleep, timeout};
429    use tokio_util::sync::CancellationToken;
430    use tower::ServiceExt;
431
432    use super::*;
433
434    #[test]
435    fn parse_master_uri_valid() {
436        let cfg = MasterUriConfig::parse("master:mylock:timer:tick?period=250").unwrap();
437        assert_eq!(cfg.lock_name, "mylock");
438        assert_eq!(cfg.delegate_uri, "timer:tick?period=250");
439    }
440
441    #[test]
442    fn parse_master_uri_missing_lockname() {
443        let err = MasterUriConfig::parse("master::timer:tick").unwrap_err();
444        assert!(matches!(err, CamelError::InvalidUri(_)));
445    }
446
447    #[test]
448    fn parse_master_uri_missing_delegate() {
449        let err = MasterUriConfig::parse("master:mylock:").unwrap_err();
450        assert!(matches!(err, CamelError::InvalidUri(_)));
451    }
452
453    #[test]
454    fn endpoint_fails_when_delegate_component_missing() {
455        let master = MasterComponent::default();
456        let result =
457            master.create_endpoint("master:lock-1:missing:delegate", &NoOpComponentContext);
458        assert!(matches!(result, Err(CamelError::ComponentNotFound(_))));
459    }
460
461    #[test]
462    fn delegate_scheme_is_parsed_from_delegate_uri() {
463        let seen_scheme = Arc::new(AtomicBool::new(false));
464
465        struct SchemeAwareContext {
466            delegate: Arc<dyn Component>,
467            seen_scheme: Arc<AtomicBool>,
468        }
469
470        impl ComponentContext for SchemeAwareContext {
471            fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
472                if scheme == "mock" {
473                    self.seen_scheme.store(true, Ordering::SeqCst);
474                    Some(Arc::clone(&self.delegate))
475                } else {
476                    None
477                }
478            }
479
480            fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
481                None
482            }
483
484            fn metrics(&self) -> Arc<dyn MetricsCollector> {
485                Arc::new(NoOpMetrics)
486            }
487
488            fn platform_service(&self) -> Arc<dyn PlatformService> {
489                Arc::new(NoopPlatformService::default())
490            }
491        }
492
493        struct MockDelegateComponent;
494
495        impl Component for MockDelegateComponent {
496            fn scheme(&self) -> &str {
497                "mock"
498            }
499
500            fn create_endpoint(
501                &self,
502                _uri: &str,
503                _ctx: &dyn ComponentContext,
504            ) -> Result<Box<dyn Endpoint>, CamelError> {
505                Ok(Box::new(MockDelegateEndpoint))
506            }
507        }
508
509        struct MockDelegateEndpoint;
510
511        impl Endpoint for MockDelegateEndpoint {
512            fn uri(&self) -> &str {
513                "mock:delegate"
514            }
515
516            fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
517                Err(CamelError::EndpointCreationFailed("not used".to_string()))
518            }
519
520            fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
521                Err(CamelError::EndpointCreationFailed("not used".to_string()))
522            }
523        }
524
525        let delegate = Arc::new(MockDelegateComponent);
526        let ctx = SchemeAwareContext {
527            delegate,
528            seen_scheme: Arc::clone(&seen_scheme),
529        };
530
531        let master = MasterComponent::default();
532        let endpoint = master
533            .create_endpoint("master:mylock:mock:delegate?x=1", &ctx)
534            .unwrap();
535
536        assert_eq!(endpoint.uri(), "master:mylock:mock:delegate?x=1");
537        assert!(seen_scheme.load(Ordering::SeqCst));
538    }
539
540    struct MockDelegateContext {
541        delegate: Arc<dyn Component>,
542    }
543
544    impl ComponentContext for MockDelegateContext {
545        fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
546            if self.delegate.scheme() == scheme {
547                Some(Arc::clone(&self.delegate))
548            } else {
549                None
550            }
551        }
552
553        fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
554            None
555        }
556
557        fn metrics(&self) -> Arc<dyn MetricsCollector> {
558            Arc::new(NoOpMetrics)
559        }
560
561        fn platform_service(&self) -> Arc<dyn PlatformService> {
562            Arc::new(NoopPlatformService::default())
563        }
564    }
565
566    struct MockProducerDelegateComponent {
567        create_endpoint_calls: Arc<AtomicUsize>,
568        create_producer_calls: Arc<AtomicUsize>,
569        fail_producer: bool,
570    }
571
572    impl Component for MockProducerDelegateComponent {
573        fn scheme(&self) -> &str {
574            "mock"
575        }
576
577        fn create_endpoint(
578            &self,
579            _uri: &str,
580            _ctx: &dyn ComponentContext,
581        ) -> Result<Box<dyn Endpoint>, CamelError> {
582            self.create_endpoint_calls.fetch_add(1, Ordering::SeqCst);
583            Ok(Box::new(MockProducerDelegateEndpoint {
584                create_producer_calls: Arc::clone(&self.create_producer_calls),
585                fail_producer: self.fail_producer,
586            }))
587        }
588    }
589
590    struct MockProducerDelegateEndpoint {
591        create_producer_calls: Arc<AtomicUsize>,
592        fail_producer: bool,
593    }
594
595    impl Endpoint for MockProducerDelegateEndpoint {
596        fn uri(&self) -> &str {
597            "mock:delegate"
598        }
599
600        fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
601            Err(CamelError::EndpointCreationFailed(
602                "not used in test".to_string(),
603            ))
604        }
605
606        fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
607            self.create_producer_calls.fetch_add(1, Ordering::SeqCst);
608            if self.fail_producer {
609                return Err(CamelError::ProcessorError(
610                    "delegate producer failed".to_string(),
611                ));
612            }
613            Ok(BoxProcessor::from_fn(
614                |exchange| async move { Ok(exchange) },
615            ))
616        }
617    }
618
619    #[tokio::test]
620    async fn producer_passthrough_delegates_and_produces() {
621        let endpoint_calls = Arc::new(AtomicUsize::new(0));
622        let producer_calls = Arc::new(AtomicUsize::new(0));
623        let delegate = Arc::new(MockProducerDelegateComponent {
624            create_endpoint_calls: Arc::clone(&endpoint_calls),
625            create_producer_calls: Arc::clone(&producer_calls),
626            fail_producer: false,
627        });
628
629        let ctx = MockDelegateContext {
630            delegate: delegate.clone(),
631        };
632
633        let master = MasterComponent::default();
634        let endpoint = master
635            .create_endpoint("master:lock-1:mock:delegate", &ctx)
636            .unwrap();
637        let producer_ctx = ProducerContext::new();
638        let producer = endpoint.create_producer(&producer_ctx).unwrap();
639
640        let exchange = Exchange::new(Message::new("ok"));
641        let result = producer.oneshot(exchange).await.unwrap();
642
643        assert_eq!(result.input.body.as_text(), Some("ok"));
644        assert_eq!(endpoint_calls.load(Ordering::SeqCst), 1);
645        assert_eq!(producer_calls.load(Ordering::SeqCst), 1);
646    }
647
648    #[test]
649    fn producer_passthrough_bubbles_delegate_errors() {
650        let endpoint_calls = Arc::new(AtomicUsize::new(0));
651        let producer_calls = Arc::new(AtomicUsize::new(0));
652        let delegate = Arc::new(MockProducerDelegateComponent {
653            create_endpoint_calls: Arc::clone(&endpoint_calls),
654            create_producer_calls: Arc::clone(&producer_calls),
655            fail_producer: true,
656        });
657
658        let ctx = MockDelegateContext {
659            delegate: delegate.clone(),
660        };
661
662        let master = MasterComponent::default();
663        let endpoint = master
664            .create_endpoint("master:lock-1:mock:delegate", &ctx)
665            .unwrap();
666        let producer_ctx = ProducerContext::new();
667        let err = endpoint.create_producer(&producer_ctx).unwrap_err();
668
669        assert!(matches!(err, CamelError::ProcessorError(_)));
670        assert_eq!(endpoint_calls.load(Ordering::SeqCst), 1);
671        assert_eq!(producer_calls.load(Ordering::SeqCst), 1);
672    }
673
674    struct FakeLeadershipService {
675        tx: Mutex<Option<watch::Sender<Option<LeadershipEvent>>>>,
676        is_leader: Arc<AtomicBool>,
677        initial: Option<LeadershipEvent>,
678    }
679
680    impl FakeLeadershipService {
681        fn new(initial: Option<LeadershipEvent>) -> Self {
682            let starts_as_leader = matches!(initial, Some(LeadershipEvent::StartedLeading));
683            Self {
684                tx: Mutex::new(None),
685                is_leader: Arc::new(AtomicBool::new(starts_as_leader)),
686                initial,
687            }
688        }
689
690        async fn emit(&self, event: LeadershipEvent) {
691            self.is_leader.store(
692                matches!(event, LeadershipEvent::StartedLeading),
693                Ordering::Release,
694            );
695            if let Some(tx) = self
696                .tx
697                .lock()
698                .expect("mutex poisoned: fake elector sender")
699                .as_ref()
700            {
701                let _ = tx.send(Some(event));
702            }
703        }
704    }
705
706    #[async_trait]
707    impl LeadershipService for FakeLeadershipService {
708        async fn start(&self, _lock_name: &str) -> Result<LeadershipHandle, PlatformError> {
709            let (tx, rx) = watch::channel(self.initial.clone());
710            *self.tx.lock().expect("mutex poisoned: fake elector sender") = Some(tx);
711
712            let cancel = CancellationToken::new();
713            let cancel_wait = cancel.clone();
714            let (term_tx, term_rx) = oneshot::channel();
715            tokio::spawn(async move {
716                cancel_wait.cancelled().await;
717                let _ = term_tx.send(());
718            });
719
720            Ok(LeadershipHandle::new(
721                rx,
722                Arc::clone(&self.is_leader),
723                cancel,
724                term_rx,
725            ))
726        }
727    }
728
729    struct FakePlatformService {
730        identity: PlatformIdentity,
731        readiness_gate: Arc<dyn ReadinessGate>,
732        leadership: Arc<dyn LeadershipService>,
733    }
734
735    impl FakePlatformService {
736        fn new(leadership: Arc<dyn LeadershipService>) -> Self {
737            Self {
738                identity: PlatformIdentity::local("master-tests"),
739                readiness_gate: Arc::new(NoopReadinessGate),
740                leadership,
741            }
742        }
743    }
744
745    impl PlatformService for FakePlatformService {
746        fn identity(&self) -> PlatformIdentity {
747            self.identity.clone()
748        }
749
750        fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
751            Arc::clone(&self.readiness_gate)
752        }
753
754        fn leadership(&self) -> Arc<dyn LeadershipService> {
755            Arc::clone(&self.leadership)
756        }
757    }
758
759    struct FakeDelegateComponent {
760        create_consumer_calls: Arc<AtomicUsize>,
761        start_calls: Arc<AtomicUsize>,
762    }
763
764    impl Component for FakeDelegateComponent {
765        fn scheme(&self) -> &str {
766            "fake"
767        }
768
769        fn create_endpoint(
770            &self,
771            _uri: &str,
772            _ctx: &dyn ComponentContext,
773        ) -> Result<Box<dyn Endpoint>, CamelError> {
774            Ok(Box::new(FakeDelegateEndpoint {
775                create_consumer_calls: Arc::clone(&self.create_consumer_calls),
776                start_calls: Arc::clone(&self.start_calls),
777            }))
778        }
779    }
780
781    struct FakeDelegateEndpoint {
782        create_consumer_calls: Arc<AtomicUsize>,
783        start_calls: Arc<AtomicUsize>,
784    }
785
786    impl Endpoint for FakeDelegateEndpoint {
787        fn uri(&self) -> &str {
788            "fake:delegate"
789        }
790
791        fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
792            let epoch = self.create_consumer_calls.fetch_add(1, Ordering::SeqCst) + 1;
793            Ok(Box::new(FakeDelegateConsumer {
794                epoch,
795                start_calls: Arc::clone(&self.start_calls),
796            }))
797        }
798
799        fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
800            Err(CamelError::EndpointCreationFailed("not used".to_string()))
801        }
802    }
803
804    struct FakeDelegateConsumer {
805        epoch: usize,
806        start_calls: Arc<AtomicUsize>,
807    }
808
809    struct FailingDelegateComponent {
810        create_endpoint_calls: Arc<AtomicUsize>,
811    }
812
813    impl Component for FailingDelegateComponent {
814        fn scheme(&self) -> &str {
815            "failing"
816        }
817
818        fn create_endpoint(
819            &self,
820            _uri: &str,
821            _ctx: &dyn ComponentContext,
822        ) -> Result<Box<dyn Endpoint>, CamelError> {
823            self.create_endpoint_calls.fetch_add(1, Ordering::SeqCst);
824            Err(CamelError::EndpointCreationFailed(
825                "delegate endpoint creation failed".to_string(),
826            ))
827        }
828    }
829
830    #[async_trait]
831    impl Consumer for FakeDelegateConsumer {
832        async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
833            self.start_calls.fetch_add(1, Ordering::SeqCst);
834            context
835                .send(Exchange::new(Message::new(format!("epoch-{}", self.epoch))))
836                .await?;
837
838            loop {
839                tokio::select! {
840                    _ = context.cancelled() => {
841                        break;
842                    }
843                    _ = sleep(Duration::from_millis(20)) => {
844                        context
845                            .send(Exchange::new(Message::new(format!("epoch-{}", self.epoch))))
846                            .await?;
847                    }
848                }
849            }
850
851            Ok(())
852        }
853
854        async fn stop(&mut self) -> Result<(), CamelError> {
855            Ok(())
856        }
857    }
858
859    fn build_master_consumer(
860        platform_service: Arc<dyn PlatformService>,
861        create_consumer_calls: Arc<AtomicUsize>,
862        start_calls: Arc<AtomicUsize>,
863        delegate_retry_max_attempts: Option<u32>,
864    ) -> MasterConsumer {
865        MasterConsumer::new(
866            "lock-a".to_string(),
867            "fake:delegate".to_string(),
868            Arc::new(FakeDelegateComponent {
869                create_consumer_calls,
870                start_calls,
871            }),
872            Arc::new(NoOpMetrics),
873            platform_service,
874            Duration::from_millis(500),
875            delegate_retry_max_attempts,
876        )
877    }
878
879    #[tokio::test]
880    async fn starts_delegate_only_after_started_leading() {
881        let leadership = Arc::new(FakeLeadershipService::new(None));
882        let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
883        let create_consumer_calls = Arc::new(AtomicUsize::new(0));
884        let start_calls = Arc::new(AtomicUsize::new(0));
885        let mut master = build_master_consumer(
886            platform_service,
887            Arc::clone(&create_consumer_calls),
888            Arc::clone(&start_calls),
889            Some(30),
890        );
891
892        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
893        let cancel = CancellationToken::new();
894        let ctx = ConsumerContext::new(tx, cancel.clone());
895
896        master.start(ctx).await.unwrap();
897
898        sleep(Duration::from_millis(80)).await;
899        assert!(rx.try_recv().is_err());
900        assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 0);
901
902        leadership.emit(LeadershipEvent::StartedLeading).await;
903
904        let first = timeout(Duration::from_millis(500), rx.recv())
905            .await
906            .unwrap()
907            .unwrap();
908        assert_eq!(first.exchange.input.body.as_text(), Some("epoch-1"));
909        assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 1);
910        assert_eq!(start_calls.load(Ordering::SeqCst), 1);
911
912        cancel.cancel();
913        master.stop().await.unwrap();
914    }
915
916    #[tokio::test]
917    async fn stops_delegate_on_stopped_leading() {
918        let leadership = Arc::new(FakeLeadershipService::new(None));
919        let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
920        let create_consumer_calls = Arc::new(AtomicUsize::new(0));
921        let start_calls = Arc::new(AtomicUsize::new(0));
922        let mut master = build_master_consumer(
923            platform_service,
924            Arc::clone(&create_consumer_calls),
925            Arc::clone(&start_calls),
926            Some(30),
927        );
928
929        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
930        let cancel = CancellationToken::new();
931        let ctx = ConsumerContext::new(tx, cancel.clone());
932
933        master.start(ctx).await.unwrap();
934        leadership.emit(LeadershipEvent::StartedLeading).await;
935        let _ = timeout(Duration::from_millis(500), rx.recv())
936            .await
937            .unwrap()
938            .unwrap();
939
940        leadership.emit(LeadershipEvent::StoppedLeading).await;
941        sleep(Duration::from_millis(100)).await;
942        while rx.try_recv().is_ok() {}
943        assert!(
944            timeout(Duration::from_millis(120), rx.recv())
945                .await
946                .is_err()
947        );
948
949        cancel.cancel();
950        master.stop().await.unwrap();
951    }
952
953    #[tokio::test]
954    async fn recreates_delegate_on_new_leadership_epoch() {
955        let leadership = Arc::new(FakeLeadershipService::new(None));
956        let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
957        let create_consumer_calls = Arc::new(AtomicUsize::new(0));
958        let start_calls = Arc::new(AtomicUsize::new(0));
959        let mut master = build_master_consumer(
960            platform_service,
961            Arc::clone(&create_consumer_calls),
962            Arc::clone(&start_calls),
963            Some(30),
964        );
965
966        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
967        let cancel = CancellationToken::new();
968        let ctx = ConsumerContext::new(tx, cancel.clone());
969
970        master.start(ctx).await.unwrap();
971
972        leadership.emit(LeadershipEvent::StartedLeading).await;
973        let first = timeout(Duration::from_millis(500), rx.recv())
974            .await
975            .unwrap()
976            .unwrap();
977        assert_eq!(first.exchange.input.body.as_text(), Some("epoch-1"));
978
979        leadership.emit(LeadershipEvent::StoppedLeading).await;
980        sleep(Duration::from_millis(120)).await;
981
982        leadership.emit(LeadershipEvent::StartedLeading).await;
983        let second = timeout(Duration::from_millis(500), rx.recv())
984            .await
985            .unwrap()
986            .unwrap();
987        assert_eq!(second.exchange.input.body.as_text(), Some("epoch-2"));
988
989        assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 2);
990        assert_eq!(start_calls.load(Ordering::SeqCst), 2);
991
992        cancel.cancel();
993        master.stop().await.unwrap();
994    }
995
996    #[tokio::test]
997    async fn stops_retrying_delegate_start_after_max_attempts() {
998        let leadership = Arc::new(FakeLeadershipService::new(Some(
999            LeadershipEvent::StartedLeading,
1000        )));
1001        let platform_service = Arc::new(FakePlatformService::new(leadership));
1002        let create_endpoint_calls = Arc::new(AtomicUsize::new(0));
1003
1004        let mut master = MasterConsumer::new(
1005            "lock-a".to_string(),
1006            "failing:delegate".to_string(),
1007            Arc::new(FailingDelegateComponent {
1008                create_endpoint_calls: Arc::clone(&create_endpoint_calls),
1009            }),
1010            Arc::new(NoOpMetrics),
1011            platform_service,
1012            Duration::from_millis(500),
1013            Some(1),
1014        );
1015
1016        let (tx, _rx) = tokio::sync::mpsc::channel(16);
1017        let cancel = CancellationToken::new();
1018        let ctx = ConsumerContext::new(tx, cancel.clone());
1019
1020        master.start(ctx).await.unwrap();
1021        sleep(Duration::from_millis(750)).await;
1022
1023        assert_eq!(create_endpoint_calls.load(Ordering::SeqCst), 2);
1024
1025        cancel.cancel();
1026        master.stop().await.unwrap();
1027    }
1028}