Skip to main content

camel_master/
lib.rs

1pub mod bundle;
2mod config;
3
4pub use bundle::MasterBundle;
5
6use std::sync::Arc;
7use std::time::Duration;
8
9use async_trait::async_trait;
10use camel_api::{CamelError, MetricsCollector, PlatformService};
11use camel_component_api::{
12    BoxProcessor, Component, ComponentContext, Consumer, ConsumerContext, Endpoint,
13    ExchangeEnvelope, ProducerContext, parse_uri,
14};
15use camel_language_api::Language;
16use tokio::task::JoinHandle;
17use tokio::time::{interval, timeout};
18use tokio_util::sync::CancellationToken;
19use tracing::{info, warn};
20
21use crate::config::{MasterComponentConfig, MasterUriConfig};
22
23const DELEGATE_RETRY_INTERVAL: Duration = Duration::from_millis(200);
24
25pub struct MasterComponent {
26    drain_timeout_ms: u64,
27    delegate_retry_max_attempts: Option<u32>,
28}
29
30impl MasterComponent {
31    pub fn new(config: MasterComponentConfig) -> Self {
32        Self {
33            drain_timeout_ms: config.drain_timeout_ms,
34            delegate_retry_max_attempts: config.delegate_retry_max_attempts,
35        }
36    }
37}
38
39impl Default for MasterComponent {
40    fn default() -> Self {
41        Self::new(MasterComponentConfig::default())
42    }
43}
44
45impl Component for MasterComponent {
46    fn scheme(&self) -> &str {
47        "master"
48    }
49
50    fn create_endpoint(
51        &self,
52        uri: &str,
53        ctx: &dyn ComponentContext,
54    ) -> Result<Box<dyn Endpoint>, CamelError> {
55        let parsed = MasterUriConfig::parse(uri)?;
56        let delegate_parts = parse_uri(&parsed.delegate_uri)?;
57        let delegate_scheme = delegate_parts.scheme;
58        let delegate_component = ctx
59            .resolve_component(&delegate_scheme)
60            .ok_or_else(|| CamelError::ComponentNotFound(delegate_scheme.clone()))?;
61
62        Ok(Box::new(MasterEndpoint {
63            uri: uri.to_string(),
64            lock_name: parsed.lock_name,
65            delegate_uri: parsed.delegate_uri,
66            delegate_component,
67            metrics: ctx.metrics(),
68            platform_service: ctx.platform_service(),
69            drain_timeout: Duration::from_millis(self.drain_timeout_ms),
70            delegate_retry_max_attempts: self.delegate_retry_max_attempts,
71        }))
72    }
73}
74
75struct MasterEndpoint {
76    uri: String,
77    lock_name: String,
78    delegate_uri: String,
79    delegate_component: Arc<dyn Component>,
80    metrics: Arc<dyn MetricsCollector>,
81    platform_service: Arc<dyn PlatformService>,
82    drain_timeout: Duration,
83    delegate_retry_max_attempts: Option<u32>,
84}
85
86impl Endpoint for MasterEndpoint {
87    fn uri(&self) -> &str {
88        &self.uri
89    }
90
91    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
92        Ok(Box::new(MasterConsumer::new(
93            self.lock_name.clone(),
94            self.delegate_uri.clone(),
95            Arc::clone(&self.delegate_component),
96            Arc::clone(&self.metrics),
97            Arc::clone(&self.platform_service),
98            self.drain_timeout,
99            self.delegate_retry_max_attempts,
100        )))
101    }
102
103    fn create_producer(&self, ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
104        let delegate_ctx = MasterDelegateContext {
105            delegate_component: Arc::clone(&self.delegate_component),
106            metrics: Arc::clone(&self.metrics),
107            platform_service: Arc::clone(&self.platform_service),
108        };
109
110        self.delegate_component
111            .create_endpoint(&self.delegate_uri, &delegate_ctx)?
112            .create_producer(ctx)
113    }
114}
115
116struct MasterDelegateContext {
117    delegate_component: Arc<dyn Component>,
118    metrics: Arc<dyn MetricsCollector>,
119    platform_service: Arc<dyn PlatformService>,
120}
121
122impl ComponentContext for MasterDelegateContext {
123    fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
124        if self.delegate_component.scheme() == scheme {
125            Some(Arc::clone(&self.delegate_component))
126        } else {
127            None
128        }
129    }
130
131    fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
132        None
133    }
134
135    fn metrics(&self) -> Arc<dyn MetricsCollector> {
136        Arc::clone(&self.metrics)
137    }
138
139    fn platform_service(&self) -> Arc<dyn PlatformService> {
140        Arc::clone(&self.platform_service)
141    }
142}
143
144struct MasterConsumer {
145    lock_name: String,
146    delegate_uri: String,
147    delegate_component: Arc<dyn Component>,
148    metrics: Arc<dyn MetricsCollector>,
149    platform_service: Arc<dyn PlatformService>,
150    drain_timeout: Duration,
151    delegate_retry_max_attempts: Option<u32>,
152    leadership_handle: Option<camel_api::LeadershipHandle>,
153    leadership_task: Option<JoinHandle<()>>,
154    stop_token: Option<CancellationToken>,
155}
156
157impl MasterConsumer {
158    fn new(
159        lock_name: String,
160        delegate_uri: String,
161        delegate_component: Arc<dyn Component>,
162        metrics: Arc<dyn MetricsCollector>,
163        platform_service: Arc<dyn PlatformService>,
164        drain_timeout: Duration,
165        delegate_retry_max_attempts: Option<u32>,
166    ) -> Self {
167        Self {
168            lock_name,
169            delegate_uri,
170            delegate_component,
171            metrics,
172            platform_service,
173            drain_timeout,
174            delegate_retry_max_attempts,
175            leadership_handle: None,
176            leadership_task: None,
177            stop_token: None,
178        }
179    }
180}
181
182enum DelegateState {
183    Inactive,
184    Active {
185        run_token: CancellationToken,
186        handle: JoinHandle<()>,
187    },
188}
189
190async fn stop_delegate(state: &mut DelegateState, drain_timeout: Duration) {
191    if let DelegateState::Active {
192        run_token,
193        mut handle,
194    } = std::mem::replace(state, DelegateState::Inactive)
195    {
196        run_token.cancel();
197        match timeout(drain_timeout, &mut handle).await {
198            Ok(_) => {}
199            Err(_) => {
200                warn!("master delegate shutdown timed out, aborting");
201                handle.abort();
202            }
203        }
204    }
205}
206
207async fn reconcile_event(
208    event: camel_api::LeadershipEvent,
209    state: &mut DelegateState,
210    lock_name: &str,
211    delegate_component: &Arc<dyn Component>,
212    delegate_uri: &str,
213    sender: &tokio::sync::mpsc::Sender<ExchangeEnvelope>,
214    parent_cancel: &CancellationToken,
215    drain_timeout: Duration,
216    metrics: &Arc<dyn MetricsCollector>,
217    platform_service: &Arc<dyn PlatformService>,
218) {
219    match event {
220        camel_api::LeadershipEvent::StartedLeading => {
221            info!(lock = %lock_name, "master leadership acquired");
222            stop_delegate(state, drain_timeout).await;
223
224            let delegate_ctx = MasterDelegateContext {
225                delegate_component: Arc::clone(delegate_component),
226                metrics: Arc::clone(metrics),
227                platform_service: Arc::clone(platform_service),
228            };
229
230            let endpoint = match delegate_component.create_endpoint(delegate_uri, &delegate_ctx) {
231                Ok(endpoint) => endpoint,
232                Err(err) => {
233                    warn!(lock = %lock_name, "failed to create delegate endpoint: {err}");
234                    return;
235                }
236            };
237
238            let mut consumer = match endpoint.create_consumer() {
239                Ok(consumer) => consumer,
240                Err(err) => {
241                    warn!(lock = %lock_name, "failed to create delegate consumer: {err}");
242                    return;
243                }
244            };
245
246            let run_token = parent_cancel.child_token();
247            let delegate_ctx = ConsumerContext::new(sender.clone(), run_token.clone());
248            let handle = tokio::spawn(async move {
249                let _ = consumer.start(delegate_ctx).await;
250                let _ = consumer.stop().await;
251            });
252
253            *state = DelegateState::Active { run_token, handle };
254        }
255        camel_api::LeadershipEvent::StoppedLeading => {
256            info!(lock = %lock_name, "master leadership lost");
257            stop_delegate(state, drain_timeout).await;
258        }
259    }
260}
261
262#[async_trait]
263impl Consumer for MasterConsumer {
264    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
265        if self.leadership_task.is_some() {
266            return Ok(());
267        }
268
269        let handle = self
270            .platform_service
271            .leadership()
272            .start(&self.lock_name)
273            .await
274            .map_err(|e| {
275                CamelError::EndpointCreationFailed(format!("failed to start leader election: {e}"))
276            })?;
277
278        let lock_name = self.lock_name.clone();
279        let delegate_uri = self.delegate_uri.clone();
280        let delegate_component = Arc::clone(&self.delegate_component);
281        let metrics = Arc::clone(&self.metrics);
282        let platform_service = Arc::clone(&self.platform_service);
283        let sender = context.sender();
284        let parent_cancel = context.cancel_token();
285        let drain_timeout = self.drain_timeout;
286        let delegate_retry_max_attempts = self.delegate_retry_max_attempts;
287        let mut events = handle.events.clone();
288
289        let stop_token = CancellationToken::new();
290        let stop_token_loop = stop_token.clone();
291
292        let task = tokio::spawn(async move {
293            let mut state = DelegateState::Inactive;
294            let mut is_leading = false;
295            let mut delegate_attempts = 0u32;
296            let mut retry_tick = interval(DELEGATE_RETRY_INTERVAL);
297
298            let initial_event = { events.borrow().clone() };
299            if let Some(initial_event) = initial_event {
300                is_leading = matches!(&initial_event, camel_api::LeadershipEvent::StartedLeading);
301                if is_leading {
302                    delegate_attempts = 0;
303                }
304                reconcile_event(
305                    initial_event,
306                    &mut state,
307                    &lock_name,
308                    &delegate_component,
309                    &delegate_uri,
310                    &sender,
311                    &parent_cancel,
312                    drain_timeout,
313                    &metrics,
314                    &platform_service,
315                )
316                .await;
317            }
318
319            loop {
320                tokio::select! {
321                    _ = stop_token_loop.cancelled() => {
322                        break;
323                    }
324                    _ = context.cancelled() => {
325                        break;
326                    }
327                    changed = events.changed() => {
328                        if changed.is_err() {
329                            break;
330                        }
331                        let event = { events.borrow().clone() };
332                        if let Some(event) = event {
333                            let was_leading = is_leading;
334                            is_leading = matches!(&event, camel_api::LeadershipEvent::StartedLeading);
335                            if !was_leading && is_leading {
336                                delegate_attempts = 0;
337                            }
338                            reconcile_event(
339                                event,
340                                &mut state,
341                                &lock_name,
342                                &delegate_component,
343                                &delegate_uri,
344                                &sender,
345                                &parent_cancel,
346                                drain_timeout,
347                                &metrics,
348                                &platform_service,
349                            )
350                            .await;
351                        }
352                    }
353                    _ = retry_tick.tick() => {
354                        if is_leading && matches!(state, DelegateState::Inactive) {
355                            if let Some(max) = delegate_retry_max_attempts {
356                                delegate_attempts = delegate_attempts.saturating_add(1);
357                                if delegate_attempts > max {
358                                    warn!(
359                                        lock = %lock_name,
360                                        attempts = max,
361                                        "delegate start exceeded max attempts, stopping consumer"
362                                    );
363                                    break;
364                                }
365                            }
366                            reconcile_event(
367                                camel_api::LeadershipEvent::StartedLeading,
368                                &mut state,
369                                &lock_name,
370                                &delegate_component,
371                                &delegate_uri,
372                                &sender,
373                                &parent_cancel,
374                                drain_timeout,
375                                &metrics,
376                                &platform_service,
377                            )
378                            .await;
379                        }
380                    }
381                }
382            }
383
384            stop_delegate(&mut state, drain_timeout).await;
385        });
386
387        self.leadership_handle = Some(handle);
388        self.stop_token = Some(stop_token);
389        self.leadership_task = Some(task);
390
391        Ok(())
392    }
393
394    async fn stop(&mut self) -> Result<(), CamelError> {
395        if let Some(token) = self.stop_token.take() {
396            token.cancel();
397        }
398
399        if let Some(task) = self.leadership_task.take()
400            && timeout(self.drain_timeout, task).await.is_err()
401        {
402            warn!("master leadership loop shutdown timed out");
403        }
404
405        if let Some(handle) = self.leadership_handle.take() {
406            let _ = timeout(self.drain_timeout, handle.step_down()).await;
407        }
408
409        Ok(())
410    }
411}
412
413#[cfg(test)]
414mod tests {
415    use std::sync::Arc;
416    use std::sync::Mutex;
417    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
418
419    use camel_api::{
420        BoxProcessorExt, Exchange, LeadershipEvent, LeadershipHandle, LeadershipService, Message,
421        NoOpMetrics, NoopPlatformService, NoopReadinessGate, PlatformError, PlatformIdentity,
422        PlatformService, ReadinessGate,
423    };
424    use camel_component_api::NoOpComponentContext;
425    use tokio::sync::{oneshot, watch};
426    use tokio::time::{sleep, timeout};
427    use tokio_util::sync::CancellationToken;
428    use tower::ServiceExt;
429
430    use super::*;
431
432    #[test]
433    fn parse_master_uri_valid() {
434        let cfg = MasterUriConfig::parse("master:mylock:timer:tick?period=250").unwrap();
435        assert_eq!(cfg.lock_name, "mylock");
436        assert_eq!(cfg.delegate_uri, "timer:tick?period=250");
437    }
438
439    #[test]
440    fn parse_master_uri_missing_lockname() {
441        let err = MasterUriConfig::parse("master::timer:tick").unwrap_err();
442        assert!(matches!(err, CamelError::InvalidUri(_)));
443    }
444
445    #[test]
446    fn parse_master_uri_missing_delegate() {
447        let err = MasterUriConfig::parse("master:mylock:").unwrap_err();
448        assert!(matches!(err, CamelError::InvalidUri(_)));
449    }
450
451    #[test]
452    fn endpoint_fails_when_delegate_component_missing() {
453        let master = MasterComponent::default();
454        let result =
455            master.create_endpoint("master:lock-1:missing:delegate", &NoOpComponentContext);
456        assert!(matches!(result, Err(CamelError::ComponentNotFound(_))));
457    }
458
459    #[test]
460    fn delegate_scheme_is_parsed_from_delegate_uri() {
461        let seen_scheme = Arc::new(AtomicBool::new(false));
462
463        struct SchemeAwareContext {
464            delegate: Arc<dyn Component>,
465            seen_scheme: Arc<AtomicBool>,
466        }
467
468        impl ComponentContext for SchemeAwareContext {
469            fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
470                if scheme == "mock" {
471                    self.seen_scheme.store(true, Ordering::SeqCst);
472                    Some(Arc::clone(&self.delegate))
473                } else {
474                    None
475                }
476            }
477
478            fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
479                None
480            }
481
482            fn metrics(&self) -> Arc<dyn MetricsCollector> {
483                Arc::new(NoOpMetrics)
484            }
485
486            fn platform_service(&self) -> Arc<dyn PlatformService> {
487                Arc::new(NoopPlatformService::default())
488            }
489        }
490
491        struct MockDelegateComponent;
492
493        impl Component for MockDelegateComponent {
494            fn scheme(&self) -> &str {
495                "mock"
496            }
497
498            fn create_endpoint(
499                &self,
500                _uri: &str,
501                _ctx: &dyn ComponentContext,
502            ) -> Result<Box<dyn Endpoint>, CamelError> {
503                Ok(Box::new(MockDelegateEndpoint))
504            }
505        }
506
507        struct MockDelegateEndpoint;
508
509        impl Endpoint for MockDelegateEndpoint {
510            fn uri(&self) -> &str {
511                "mock:delegate"
512            }
513
514            fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
515                Err(CamelError::EndpointCreationFailed("not used".to_string()))
516            }
517
518            fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
519                Err(CamelError::EndpointCreationFailed("not used".to_string()))
520            }
521        }
522
523        let delegate = Arc::new(MockDelegateComponent);
524        let ctx = SchemeAwareContext {
525            delegate,
526            seen_scheme: Arc::clone(&seen_scheme),
527        };
528
529        let master = MasterComponent::default();
530        let endpoint = master
531            .create_endpoint("master:mylock:mock:delegate?x=1", &ctx)
532            .unwrap();
533
534        assert_eq!(endpoint.uri(), "master:mylock:mock:delegate?x=1");
535        assert!(seen_scheme.load(Ordering::SeqCst));
536    }
537
538    struct MockDelegateContext {
539        delegate: Arc<dyn Component>,
540    }
541
542    impl ComponentContext for MockDelegateContext {
543        fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
544            if self.delegate.scheme() == scheme {
545                Some(Arc::clone(&self.delegate))
546            } else {
547                None
548            }
549        }
550
551        fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
552            None
553        }
554
555        fn metrics(&self) -> Arc<dyn MetricsCollector> {
556            Arc::new(NoOpMetrics)
557        }
558
559        fn platform_service(&self) -> Arc<dyn PlatformService> {
560            Arc::new(NoopPlatformService::default())
561        }
562    }
563
564    struct MockProducerDelegateComponent {
565        create_endpoint_calls: Arc<AtomicUsize>,
566        create_producer_calls: Arc<AtomicUsize>,
567        fail_producer: bool,
568    }
569
570    impl Component for MockProducerDelegateComponent {
571        fn scheme(&self) -> &str {
572            "mock"
573        }
574
575        fn create_endpoint(
576            &self,
577            _uri: &str,
578            _ctx: &dyn ComponentContext,
579        ) -> Result<Box<dyn Endpoint>, CamelError> {
580            self.create_endpoint_calls.fetch_add(1, Ordering::SeqCst);
581            Ok(Box::new(MockProducerDelegateEndpoint {
582                create_producer_calls: Arc::clone(&self.create_producer_calls),
583                fail_producer: self.fail_producer,
584            }))
585        }
586    }
587
588    struct MockProducerDelegateEndpoint {
589        create_producer_calls: Arc<AtomicUsize>,
590        fail_producer: bool,
591    }
592
593    impl Endpoint for MockProducerDelegateEndpoint {
594        fn uri(&self) -> &str {
595            "mock:delegate"
596        }
597
598        fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
599            Err(CamelError::EndpointCreationFailed(
600                "not used in test".to_string(),
601            ))
602        }
603
604        fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
605            self.create_producer_calls.fetch_add(1, Ordering::SeqCst);
606            if self.fail_producer {
607                return Err(CamelError::ProcessorError(
608                    "delegate producer failed".to_string(),
609                ));
610            }
611            Ok(BoxProcessor::from_fn(
612                |exchange| async move { Ok(exchange) },
613            ))
614        }
615    }
616
617    #[tokio::test]
618    async fn producer_passthrough_delegates_and_produces() {
619        let endpoint_calls = Arc::new(AtomicUsize::new(0));
620        let producer_calls = Arc::new(AtomicUsize::new(0));
621        let delegate = Arc::new(MockProducerDelegateComponent {
622            create_endpoint_calls: Arc::clone(&endpoint_calls),
623            create_producer_calls: Arc::clone(&producer_calls),
624            fail_producer: false,
625        });
626
627        let ctx = MockDelegateContext {
628            delegate: delegate.clone(),
629        };
630
631        let master = MasterComponent::default();
632        let endpoint = master
633            .create_endpoint("master:lock-1:mock:delegate", &ctx)
634            .unwrap();
635        let producer_ctx = ProducerContext::new();
636        let producer = endpoint.create_producer(&producer_ctx).unwrap();
637
638        let exchange = Exchange::new(Message::new("ok"));
639        let result = producer.oneshot(exchange).await.unwrap();
640
641        assert_eq!(result.input.body.as_text(), Some("ok"));
642        assert_eq!(endpoint_calls.load(Ordering::SeqCst), 1);
643        assert_eq!(producer_calls.load(Ordering::SeqCst), 1);
644    }
645
646    #[test]
647    fn producer_passthrough_bubbles_delegate_errors() {
648        let endpoint_calls = Arc::new(AtomicUsize::new(0));
649        let producer_calls = Arc::new(AtomicUsize::new(0));
650        let delegate = Arc::new(MockProducerDelegateComponent {
651            create_endpoint_calls: Arc::clone(&endpoint_calls),
652            create_producer_calls: Arc::clone(&producer_calls),
653            fail_producer: true,
654        });
655
656        let ctx = MockDelegateContext {
657            delegate: delegate.clone(),
658        };
659
660        let master = MasterComponent::default();
661        let endpoint = master
662            .create_endpoint("master:lock-1:mock:delegate", &ctx)
663            .unwrap();
664        let producer_ctx = ProducerContext::new();
665        let err = endpoint.create_producer(&producer_ctx).unwrap_err();
666
667        assert!(matches!(err, CamelError::ProcessorError(_)));
668        assert_eq!(endpoint_calls.load(Ordering::SeqCst), 1);
669        assert_eq!(producer_calls.load(Ordering::SeqCst), 1);
670    }
671
672    struct FakeLeadershipService {
673        tx: Mutex<Option<watch::Sender<Option<LeadershipEvent>>>>,
674        is_leader: Arc<AtomicBool>,
675        initial: Option<LeadershipEvent>,
676    }
677
678    impl FakeLeadershipService {
679        fn new(initial: Option<LeadershipEvent>) -> Self {
680            let starts_as_leader = matches!(initial, Some(LeadershipEvent::StartedLeading));
681            Self {
682                tx: Mutex::new(None),
683                is_leader: Arc::new(AtomicBool::new(starts_as_leader)),
684                initial,
685            }
686        }
687
688        async fn emit(&self, event: LeadershipEvent) {
689            self.is_leader.store(
690                matches!(event, LeadershipEvent::StartedLeading),
691                Ordering::Release,
692            );
693            if let Some(tx) = self
694                .tx
695                .lock()
696                .expect("mutex poisoned: fake elector sender")
697                .as_ref()
698            {
699                let _ = tx.send(Some(event));
700            }
701        }
702    }
703
704    #[async_trait]
705    impl LeadershipService for FakeLeadershipService {
706        async fn start(&self, _lock_name: &str) -> Result<LeadershipHandle, PlatformError> {
707            let (tx, rx) = watch::channel(self.initial.clone());
708            *self.tx.lock().expect("mutex poisoned: fake elector sender") = Some(tx);
709
710            let cancel = CancellationToken::new();
711            let cancel_wait = cancel.clone();
712            let (term_tx, term_rx) = oneshot::channel();
713            tokio::spawn(async move {
714                cancel_wait.cancelled().await;
715                let _ = term_tx.send(());
716            });
717
718            Ok(LeadershipHandle::new(
719                rx,
720                Arc::clone(&self.is_leader),
721                cancel,
722                term_rx,
723            ))
724        }
725    }
726
727    struct FakePlatformService {
728        identity: PlatformIdentity,
729        readiness_gate: Arc<dyn ReadinessGate>,
730        leadership: Arc<dyn LeadershipService>,
731    }
732
733    impl FakePlatformService {
734        fn new(leadership: Arc<dyn LeadershipService>) -> Self {
735            Self {
736                identity: PlatformIdentity::local("master-tests"),
737                readiness_gate: Arc::new(NoopReadinessGate),
738                leadership,
739            }
740        }
741    }
742
743    impl PlatformService for FakePlatformService {
744        fn identity(&self) -> PlatformIdentity {
745            self.identity.clone()
746        }
747
748        fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
749            Arc::clone(&self.readiness_gate)
750        }
751
752        fn leadership(&self) -> Arc<dyn LeadershipService> {
753            Arc::clone(&self.leadership)
754        }
755    }
756
757    struct FakeDelegateComponent {
758        create_consumer_calls: Arc<AtomicUsize>,
759        start_calls: Arc<AtomicUsize>,
760    }
761
762    impl Component for FakeDelegateComponent {
763        fn scheme(&self) -> &str {
764            "fake"
765        }
766
767        fn create_endpoint(
768            &self,
769            _uri: &str,
770            _ctx: &dyn ComponentContext,
771        ) -> Result<Box<dyn Endpoint>, CamelError> {
772            Ok(Box::new(FakeDelegateEndpoint {
773                create_consumer_calls: Arc::clone(&self.create_consumer_calls),
774                start_calls: Arc::clone(&self.start_calls),
775            }))
776        }
777    }
778
779    struct FakeDelegateEndpoint {
780        create_consumer_calls: Arc<AtomicUsize>,
781        start_calls: Arc<AtomicUsize>,
782    }
783
784    impl Endpoint for FakeDelegateEndpoint {
785        fn uri(&self) -> &str {
786            "fake:delegate"
787        }
788
789        fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
790            let epoch = self.create_consumer_calls.fetch_add(1, Ordering::SeqCst) + 1;
791            Ok(Box::new(FakeDelegateConsumer {
792                epoch,
793                start_calls: Arc::clone(&self.start_calls),
794            }))
795        }
796
797        fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
798            Err(CamelError::EndpointCreationFailed("not used".to_string()))
799        }
800    }
801
802    struct FakeDelegateConsumer {
803        epoch: usize,
804        start_calls: Arc<AtomicUsize>,
805    }
806
807    struct FailingDelegateComponent {
808        create_endpoint_calls: Arc<AtomicUsize>,
809    }
810
811    impl Component for FailingDelegateComponent {
812        fn scheme(&self) -> &str {
813            "failing"
814        }
815
816        fn create_endpoint(
817            &self,
818            _uri: &str,
819            _ctx: &dyn ComponentContext,
820        ) -> Result<Box<dyn Endpoint>, CamelError> {
821            self.create_endpoint_calls.fetch_add(1, Ordering::SeqCst);
822            Err(CamelError::EndpointCreationFailed(
823                "delegate endpoint creation failed".to_string(),
824            ))
825        }
826    }
827
828    #[async_trait]
829    impl Consumer for FakeDelegateConsumer {
830        async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
831            self.start_calls.fetch_add(1, Ordering::SeqCst);
832            context
833                .send(Exchange::new(Message::new(format!("epoch-{}", self.epoch))))
834                .await?;
835
836            loop {
837                tokio::select! {
838                    _ = context.cancelled() => {
839                        break;
840                    }
841                    _ = sleep(Duration::from_millis(20)) => {
842                        context
843                            .send(Exchange::new(Message::new(format!("epoch-{}", self.epoch))))
844                            .await?;
845                    }
846                }
847            }
848
849            Ok(())
850        }
851
852        async fn stop(&mut self) -> Result<(), CamelError> {
853            Ok(())
854        }
855    }
856
857    fn build_master_consumer(
858        platform_service: Arc<dyn PlatformService>,
859        create_consumer_calls: Arc<AtomicUsize>,
860        start_calls: Arc<AtomicUsize>,
861        delegate_retry_max_attempts: Option<u32>,
862    ) -> MasterConsumer {
863        MasterConsumer::new(
864            "lock-a".to_string(),
865            "fake:delegate".to_string(),
866            Arc::new(FakeDelegateComponent {
867                create_consumer_calls,
868                start_calls,
869            }),
870            Arc::new(NoOpMetrics),
871            platform_service,
872            Duration::from_millis(500),
873            delegate_retry_max_attempts,
874        )
875    }
876
877    #[tokio::test]
878    async fn starts_delegate_only_after_started_leading() {
879        let leadership = Arc::new(FakeLeadershipService::new(None));
880        let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
881        let create_consumer_calls = Arc::new(AtomicUsize::new(0));
882        let start_calls = Arc::new(AtomicUsize::new(0));
883        let mut master = build_master_consumer(
884            platform_service,
885            Arc::clone(&create_consumer_calls),
886            Arc::clone(&start_calls),
887            Some(30),
888        );
889
890        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
891        let cancel = CancellationToken::new();
892        let ctx = ConsumerContext::new(tx, cancel.clone());
893
894        master.start(ctx).await.unwrap();
895
896        sleep(Duration::from_millis(80)).await;
897        assert!(rx.try_recv().is_err());
898        assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 0);
899
900        leadership.emit(LeadershipEvent::StartedLeading).await;
901
902        let first = timeout(Duration::from_millis(500), rx.recv())
903            .await
904            .unwrap()
905            .unwrap();
906        assert_eq!(first.exchange.input.body.as_text(), Some("epoch-1"));
907        assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 1);
908        assert_eq!(start_calls.load(Ordering::SeqCst), 1);
909
910        cancel.cancel();
911        master.stop().await.unwrap();
912    }
913
914    #[tokio::test]
915    async fn stops_delegate_on_stopped_leading() {
916        let leadership = Arc::new(FakeLeadershipService::new(None));
917        let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
918        let create_consumer_calls = Arc::new(AtomicUsize::new(0));
919        let start_calls = Arc::new(AtomicUsize::new(0));
920        let mut master = build_master_consumer(
921            platform_service,
922            Arc::clone(&create_consumer_calls),
923            Arc::clone(&start_calls),
924            Some(30),
925        );
926
927        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
928        let cancel = CancellationToken::new();
929        let ctx = ConsumerContext::new(tx, cancel.clone());
930
931        master.start(ctx).await.unwrap();
932        leadership.emit(LeadershipEvent::StartedLeading).await;
933        let _ = timeout(Duration::from_millis(500), rx.recv())
934            .await
935            .unwrap()
936            .unwrap();
937
938        leadership.emit(LeadershipEvent::StoppedLeading).await;
939        sleep(Duration::from_millis(100)).await;
940        while rx.try_recv().is_ok() {}
941        assert!(
942            timeout(Duration::from_millis(120), rx.recv())
943                .await
944                .is_err()
945        );
946
947        cancel.cancel();
948        master.stop().await.unwrap();
949    }
950
951    #[tokio::test]
952    async fn recreates_delegate_on_new_leadership_epoch() {
953        let leadership = Arc::new(FakeLeadershipService::new(None));
954        let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
955        let create_consumer_calls = Arc::new(AtomicUsize::new(0));
956        let start_calls = Arc::new(AtomicUsize::new(0));
957        let mut master = build_master_consumer(
958            platform_service,
959            Arc::clone(&create_consumer_calls),
960            Arc::clone(&start_calls),
961            Some(30),
962        );
963
964        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
965        let cancel = CancellationToken::new();
966        let ctx = ConsumerContext::new(tx, cancel.clone());
967
968        master.start(ctx).await.unwrap();
969
970        leadership.emit(LeadershipEvent::StartedLeading).await;
971        let first = timeout(Duration::from_millis(500), rx.recv())
972            .await
973            .unwrap()
974            .unwrap();
975        assert_eq!(first.exchange.input.body.as_text(), Some("epoch-1"));
976
977        leadership.emit(LeadershipEvent::StoppedLeading).await;
978        sleep(Duration::from_millis(120)).await;
979
980        leadership.emit(LeadershipEvent::StartedLeading).await;
981        let second = timeout(Duration::from_millis(500), rx.recv())
982            .await
983            .unwrap()
984            .unwrap();
985        assert_eq!(second.exchange.input.body.as_text(), Some("epoch-2"));
986
987        assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 2);
988        assert_eq!(start_calls.load(Ordering::SeqCst), 2);
989
990        cancel.cancel();
991        master.stop().await.unwrap();
992    }
993
994    #[tokio::test]
995    async fn stops_retrying_delegate_start_after_max_attempts() {
996        let leadership = Arc::new(FakeLeadershipService::new(Some(
997            LeadershipEvent::StartedLeading,
998        )));
999        let platform_service = Arc::new(FakePlatformService::new(leadership));
1000        let create_endpoint_calls = Arc::new(AtomicUsize::new(0));
1001
1002        let mut master = MasterConsumer::new(
1003            "lock-a".to_string(),
1004            "failing:delegate".to_string(),
1005            Arc::new(FailingDelegateComponent {
1006                create_endpoint_calls: Arc::clone(&create_endpoint_calls),
1007            }),
1008            Arc::new(NoOpMetrics),
1009            platform_service,
1010            Duration::from_millis(500),
1011            Some(1),
1012        );
1013
1014        let (tx, _rx) = tokio::sync::mpsc::channel(16);
1015        let cancel = CancellationToken::new();
1016        let ctx = ConsumerContext::new(tx, cancel.clone());
1017
1018        master.start(ctx).await.unwrap();
1019        sleep(Duration::from_millis(750)).await;
1020
1021        assert_eq!(create_endpoint_calls.load(Ordering::SeqCst), 2);
1022
1023        cancel.cancel();
1024        master.stop().await.unwrap();
1025    }
1026}