Skip to main content

camel_master/
lib.rs

1pub mod bundle;
2mod config;
3
4pub use bundle::MasterBundle;
5
6use std::sync::Arc;
7use std::time::Duration;
8
9use async_trait::async_trait;
10use camel_api::{CamelError, MetricsCollector, PlatformService};
11use camel_component_api::{
12    BoxProcessor, Component, ComponentContext, Consumer, ConsumerContext, Endpoint,
13    ExchangeEnvelope, ProducerContext, parse_uri,
14};
15use camel_language_api::Language;
16use tokio::task::JoinHandle;
17use tokio::time::{interval, timeout};
18use tokio_util::sync::CancellationToken;
19use tracing::{info, warn};
20
21use crate::config::{MasterComponentConfig, MasterUriConfig};
22
23const DELEGATE_RETRY_INTERVAL: Duration = Duration::from_millis(200);
24
25pub struct MasterComponent {
26    drain_timeout_ms: u64,
27    delegate_retry_max_attempts: Option<u32>,
28}
29
30impl MasterComponent {
31    pub fn new(config: MasterComponentConfig) -> Self {
32        Self {
33            drain_timeout_ms: config.drain_timeout_ms,
34            delegate_retry_max_attempts: config.delegate_retry_max_attempts,
35        }
36    }
37}
38
39impl Default for MasterComponent {
40    fn default() -> Self {
41        Self::new(MasterComponentConfig::default())
42    }
43}
44
45impl Component for MasterComponent {
46    fn scheme(&self) -> &str {
47        "master"
48    }
49
50    fn create_endpoint(
51        &self,
52        uri: &str,
53        ctx: &dyn ComponentContext,
54    ) -> Result<Box<dyn Endpoint>, CamelError> {
55        let parsed = MasterUriConfig::parse(uri)?;
56        let delegate_parts = parse_uri(&parsed.delegate_uri)?;
57        let delegate_scheme = delegate_parts.scheme;
58        let delegate_component = ctx
59            .resolve_component(&delegate_scheme)
60            .ok_or_else(|| CamelError::ComponentNotFound(delegate_scheme.clone()))?;
61
62        Ok(Box::new(MasterEndpoint {
63            uri: uri.to_string(),
64            lock_name: parsed.lock_name,
65            delegate_uri: parsed.delegate_uri,
66            delegate_component,
67            metrics: ctx.metrics(),
68            platform_service: ctx.platform_service(),
69            drain_timeout: Duration::from_millis(self.drain_timeout_ms),
70            delegate_retry_max_attempts: self.delegate_retry_max_attempts,
71        }))
72    }
73}
74
75struct MasterEndpoint {
76    uri: String,
77    lock_name: String,
78    delegate_uri: String,
79    delegate_component: Arc<dyn Component>,
80    metrics: Arc<dyn MetricsCollector>,
81    platform_service: Arc<dyn PlatformService>,
82    drain_timeout: Duration,
83    delegate_retry_max_attempts: Option<u32>,
84}
85
86impl Endpoint for MasterEndpoint {
87    fn uri(&self) -> &str {
88        &self.uri
89    }
90
91    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
92        Ok(Box::new(MasterConsumer::new(
93            self.lock_name.clone(),
94            self.delegate_uri.clone(),
95            Arc::clone(&self.delegate_component),
96            Arc::clone(&self.metrics),
97            Arc::clone(&self.platform_service),
98            self.drain_timeout,
99            self.delegate_retry_max_attempts,
100        )))
101    }
102
103    fn create_producer(&self, ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
104        let delegate_ctx = MasterDelegateContext {
105            delegate_component: Arc::clone(&self.delegate_component),
106            metrics: Arc::clone(&self.metrics),
107            platform_service: Arc::clone(&self.platform_service),
108        };
109
110        self.delegate_component
111            .create_endpoint(&self.delegate_uri, &delegate_ctx)?
112            .create_producer(ctx)
113    }
114}
115
116struct MasterDelegateContext {
117    delegate_component: Arc<dyn Component>,
118    metrics: Arc<dyn MetricsCollector>,
119    platform_service: Arc<dyn PlatformService>,
120}
121
122impl ComponentContext for MasterDelegateContext {
123    fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
124        if self.delegate_component.scheme() == scheme {
125            Some(Arc::clone(&self.delegate_component))
126        } else {
127            None
128        }
129    }
130
131    fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
132        None
133    }
134
135    fn metrics(&self) -> Arc<dyn MetricsCollector> {
136        Arc::clone(&self.metrics)
137    }
138
139    fn platform_service(&self) -> Arc<dyn PlatformService> {
140        Arc::clone(&self.platform_service)
141    }
142}
143
144struct MasterConsumer {
145    lock_name: String,
146    delegate_uri: String,
147    delegate_component: Arc<dyn Component>,
148    metrics: Arc<dyn MetricsCollector>,
149    platform_service: Arc<dyn PlatformService>,
150    drain_timeout: Duration,
151    delegate_retry_max_attempts: Option<u32>,
152    leadership_task: Option<JoinHandle<()>>,
153    stop_token: Option<CancellationToken>,
154}
155
156impl MasterConsumer {
157    fn new(
158        lock_name: String,
159        delegate_uri: String,
160        delegate_component: Arc<dyn Component>,
161        metrics: Arc<dyn MetricsCollector>,
162        platform_service: Arc<dyn PlatformService>,
163        drain_timeout: Duration,
164        delegate_retry_max_attempts: Option<u32>,
165    ) -> Self {
166        Self {
167            lock_name,
168            delegate_uri,
169            delegate_component,
170            metrics,
171            platform_service,
172            drain_timeout,
173            delegate_retry_max_attempts,
174            leadership_task: None,
175            stop_token: None,
176        }
177    }
178}
179
180enum DelegateState {
181    Inactive,
182    Active {
183        run_token: CancellationToken,
184        handle: JoinHandle<()>,
185    },
186}
187
188async fn stop_delegate(state: &mut DelegateState, drain_timeout: Duration) {
189    if let DelegateState::Active {
190        run_token,
191        mut handle,
192    } = std::mem::replace(state, DelegateState::Inactive)
193    {
194        run_token.cancel();
195        match timeout(drain_timeout, &mut handle).await {
196            Ok(_) => {}
197            Err(_) => {
198                warn!("master delegate shutdown timed out, aborting");
199                handle.abort();
200            }
201        }
202    }
203}
204
205struct ReconcileContext<'a> {
206    lock_name: &'a str,
207    delegate_component: &'a Arc<dyn Component>,
208    delegate_uri: &'a str,
209    sender: &'a tokio::sync::mpsc::Sender<ExchangeEnvelope>,
210    parent_cancel: &'a CancellationToken,
211    drain_timeout: Duration,
212    metrics: &'a Arc<dyn MetricsCollector>,
213    platform_service: &'a Arc<dyn PlatformService>,
214}
215
216async fn reconcile_event(
217    event: camel_api::LeadershipEvent,
218    state: &mut DelegateState,
219    ctx: &ReconcileContext<'_>,
220) {
221    match event {
222        camel_api::LeadershipEvent::StartedLeading => {
223            info!(lock = %ctx.lock_name, "master leadership acquired");
224            stop_delegate(state, ctx.drain_timeout).await;
225
226            let delegate_ctx = MasterDelegateContext {
227                delegate_component: Arc::clone(ctx.delegate_component),
228                metrics: Arc::clone(ctx.metrics),
229                platform_service: Arc::clone(ctx.platform_service),
230            };
231
232            let endpoint = match ctx
233                .delegate_component
234                .create_endpoint(ctx.delegate_uri, &delegate_ctx)
235            {
236                Ok(endpoint) => endpoint,
237                Err(err) => {
238                    warn!(lock = %ctx.lock_name, "failed to create delegate endpoint: {err}");
239                    return;
240                }
241            };
242
243            let mut consumer = match endpoint.create_consumer() {
244                Ok(consumer) => consumer,
245                Err(err) => {
246                    warn!(lock = %ctx.lock_name, "failed to create delegate consumer: {err}");
247                    return;
248                }
249            };
250
251            let run_token = ctx.parent_cancel.child_token();
252            let delegate_ctx = ConsumerContext::new(ctx.sender.clone(), run_token.clone());
253            let handle = tokio::spawn(async move {
254                let _ = consumer.start(delegate_ctx).await;
255                let _ = consumer.stop().await;
256            });
257
258            *state = DelegateState::Active { run_token, handle };
259        }
260        camel_api::LeadershipEvent::StoppedLeading => {
261            info!(lock = %ctx.lock_name, "master leadership lost");
262            stop_delegate(state, ctx.drain_timeout).await;
263        }
264    }
265}
266
267#[async_trait]
268impl Consumer for MasterConsumer {
269    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
270        if self.leadership_task.is_some() {
271            return Ok(());
272        }
273
274        let handle = self
275            .platform_service
276            .leadership()
277            .start(&self.lock_name)
278            .await
279            .map_err(|e| {
280                CamelError::EndpointCreationFailed(format!("failed to start leader election: {e}"))
281            })?;
282
283        let lock_name = self.lock_name.clone();
284        let delegate_uri = self.delegate_uri.clone();
285        let delegate_component = Arc::clone(&self.delegate_component);
286        let metrics = Arc::clone(&self.metrics);
287        let platform_service = Arc::clone(&self.platform_service);
288        let sender = context.sender();
289        let parent_cancel = context.cancel_token();
290        let drain_timeout = self.drain_timeout;
291        let delegate_retry_max_attempts = self.delegate_retry_max_attempts;
292        let mut events = handle.events.clone();
293
294        let stop_token = CancellationToken::new();
295        let stop_token_loop = stop_token.clone();
296        let leadership_handle = handle;
297
298        let task = tokio::spawn(async move {
299            let mut state = DelegateState::Inactive;
300            let mut is_leading = false;
301            let mut delegate_attempts = 0u32;
302            let mut retry_tick = interval(DELEGATE_RETRY_INTERVAL);
303
304            let rctx = ReconcileContext {
305                lock_name: &lock_name,
306                delegate_component: &delegate_component,
307                delegate_uri: &delegate_uri,
308                sender: &sender,
309                parent_cancel: &parent_cancel,
310                drain_timeout,
311                metrics: &metrics,
312                platform_service: &platform_service,
313            };
314
315            let initial_event = { events.borrow().clone() };
316            if let Some(initial_event) = initial_event {
317                is_leading = matches!(&initial_event, camel_api::LeadershipEvent::StartedLeading);
318                if is_leading {
319                    delegate_attempts = 0;
320                }
321                reconcile_event(initial_event, &mut state, &rctx).await;
322            }
323
324            loop {
325                tokio::select! {
326                    _ = stop_token_loop.cancelled() => {
327                        break;
328                    }
329                    _ = context.cancelled() => {
330                        break;
331                    }
332                    changed = events.changed() => {
333                        if changed.is_err() {
334                            break;
335                        }
336                        let event = { events.borrow().clone() };
337                        if let Some(event) = event {
338                            let was_leading = is_leading;
339                            is_leading = matches!(&event, camel_api::LeadershipEvent::StartedLeading);
340                            if !was_leading && is_leading {
341                                delegate_attempts = 0;
342                            }
343                            reconcile_event(event, &mut state, &rctx).await;
344                        }
345                    }
346                    _ = retry_tick.tick() => {
347                        if is_leading && matches!(state, DelegateState::Inactive) {
348                            if let Some(max) = delegate_retry_max_attempts {
349                                delegate_attempts = delegate_attempts.saturating_add(1);
350                                if delegate_attempts > max {
351                                    warn!(
352                                        lock = %lock_name,
353                                        attempts = max,
354                                        "delegate start exceeded max attempts, stopping consumer"
355                                    );
356                                    break;
357                                }
358                            }
359                            reconcile_event(
360                                camel_api::LeadershipEvent::StartedLeading,
361                                &mut state,
362                                &rctx,
363                            )
364                            .await;
365                        }
366                    }
367                }
368            }
369
370            stop_delegate(&mut state, drain_timeout).await;
371            let _ = timeout(drain_timeout, leadership_handle.step_down()).await;
372        });
373
374        self.stop_token = Some(stop_token);
375        self.leadership_task = Some(task);
376
377        Ok(())
378    }
379
380    async fn stop(&mut self) -> Result<(), CamelError> {
381        if let Some(token) = self.stop_token.take() {
382            token.cancel();
383        }
384
385        if let Some(task) = self.leadership_task.take()
386            && timeout(self.drain_timeout, task).await.is_err()
387        {
388            warn!("master leadership loop shutdown timed out");
389        }
390
391        Ok(())
392    }
393}
394
395#[cfg(test)]
396mod tests {
397    use std::sync::Arc;
398    use std::sync::Mutex;
399    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
400
401    use camel_api::{
402        BoxProcessorExt, Exchange, LeadershipEvent, LeadershipHandle, LeadershipService, Message,
403        NoOpMetrics, NoopPlatformService, NoopReadinessGate, PlatformError, PlatformIdentity,
404        PlatformService, ReadinessGate,
405    };
406    use camel_component_api::NoOpComponentContext;
407    use tokio::sync::{oneshot, watch};
408    use tokio::time::{sleep, timeout};
409    use tokio_util::sync::CancellationToken;
410    use tower::ServiceExt;
411
412    use super::*;
413
414    #[test]
415    fn parse_master_uri_valid() {
416        let cfg = MasterUriConfig::parse("master:mylock:timer:tick?period=250").unwrap();
417        assert_eq!(cfg.lock_name, "mylock");
418        assert_eq!(cfg.delegate_uri, "timer:tick?period=250");
419    }
420
421    #[test]
422    fn parse_master_uri_missing_lockname() {
423        let err = MasterUriConfig::parse("master::timer:tick").unwrap_err();
424        assert!(matches!(err, CamelError::InvalidUri(_)));
425    }
426
427    #[test]
428    fn parse_master_uri_missing_delegate() {
429        let err = MasterUriConfig::parse("master:mylock:").unwrap_err();
430        assert!(matches!(err, CamelError::InvalidUri(_)));
431    }
432
433    #[test]
434    fn endpoint_fails_when_delegate_component_missing() {
435        let master = MasterComponent::default();
436        let result =
437            master.create_endpoint("master:lock-1:missing:delegate", &NoOpComponentContext);
438        assert!(matches!(result, Err(CamelError::ComponentNotFound(_))));
439    }
440
441    #[test]
442    fn delegate_scheme_is_parsed_from_delegate_uri() {
443        let seen_scheme = Arc::new(AtomicBool::new(false));
444
445        struct SchemeAwareContext {
446            delegate: Arc<dyn Component>,
447            seen_scheme: Arc<AtomicBool>,
448        }
449
450        impl ComponentContext for SchemeAwareContext {
451            fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
452                if scheme == "mock" {
453                    self.seen_scheme.store(true, Ordering::SeqCst);
454                    Some(Arc::clone(&self.delegate))
455                } else {
456                    None
457                }
458            }
459
460            fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
461                None
462            }
463
464            fn metrics(&self) -> Arc<dyn MetricsCollector> {
465                Arc::new(NoOpMetrics)
466            }
467
468            fn platform_service(&self) -> Arc<dyn PlatformService> {
469                Arc::new(NoopPlatformService::default())
470            }
471        }
472
473        struct MockDelegateComponent;
474
475        impl Component for MockDelegateComponent {
476            fn scheme(&self) -> &str {
477                "mock"
478            }
479
480            fn create_endpoint(
481                &self,
482                _uri: &str,
483                _ctx: &dyn ComponentContext,
484            ) -> Result<Box<dyn Endpoint>, CamelError> {
485                Ok(Box::new(MockDelegateEndpoint))
486            }
487        }
488
489        struct MockDelegateEndpoint;
490
491        impl Endpoint for MockDelegateEndpoint {
492            fn uri(&self) -> &str {
493                "mock:delegate"
494            }
495
496            fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
497                Err(CamelError::EndpointCreationFailed("not used".to_string()))
498            }
499
500            fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
501                Err(CamelError::EndpointCreationFailed("not used".to_string()))
502            }
503        }
504
505        let delegate = Arc::new(MockDelegateComponent);
506        let ctx = SchemeAwareContext {
507            delegate,
508            seen_scheme: Arc::clone(&seen_scheme),
509        };
510
511        let master = MasterComponent::default();
512        let endpoint = master
513            .create_endpoint("master:mylock:mock:delegate?x=1", &ctx)
514            .unwrap();
515
516        assert_eq!(endpoint.uri(), "master:mylock:mock:delegate?x=1");
517        assert!(seen_scheme.load(Ordering::SeqCst));
518    }
519
520    struct MockDelegateContext {
521        delegate: Arc<dyn Component>,
522    }
523
524    impl ComponentContext for MockDelegateContext {
525        fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
526            if self.delegate.scheme() == scheme {
527                Some(Arc::clone(&self.delegate))
528            } else {
529                None
530            }
531        }
532
533        fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
534            None
535        }
536
537        fn metrics(&self) -> Arc<dyn MetricsCollector> {
538            Arc::new(NoOpMetrics)
539        }
540
541        fn platform_service(&self) -> Arc<dyn PlatformService> {
542            Arc::new(NoopPlatformService::default())
543        }
544    }
545
546    struct MockProducerDelegateComponent {
547        create_endpoint_calls: Arc<AtomicUsize>,
548        create_producer_calls: Arc<AtomicUsize>,
549        fail_producer: bool,
550    }
551
552    impl Component for MockProducerDelegateComponent {
553        fn scheme(&self) -> &str {
554            "mock"
555        }
556
557        fn create_endpoint(
558            &self,
559            _uri: &str,
560            _ctx: &dyn ComponentContext,
561        ) -> Result<Box<dyn Endpoint>, CamelError> {
562            self.create_endpoint_calls.fetch_add(1, Ordering::SeqCst);
563            Ok(Box::new(MockProducerDelegateEndpoint {
564                create_producer_calls: Arc::clone(&self.create_producer_calls),
565                fail_producer: self.fail_producer,
566            }))
567        }
568    }
569
570    struct MockProducerDelegateEndpoint {
571        create_producer_calls: Arc<AtomicUsize>,
572        fail_producer: bool,
573    }
574
575    impl Endpoint for MockProducerDelegateEndpoint {
576        fn uri(&self) -> &str {
577            "mock:delegate"
578        }
579
580        fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
581            Err(CamelError::EndpointCreationFailed(
582                "not used in test".to_string(),
583            ))
584        }
585
586        fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
587            self.create_producer_calls.fetch_add(1, Ordering::SeqCst);
588            if self.fail_producer {
589                return Err(CamelError::ProcessorError(
590                    "delegate producer failed".to_string(),
591                ));
592            }
593            Ok(BoxProcessor::from_fn(
594                |exchange| async move { Ok(exchange) },
595            ))
596        }
597    }
598
599    #[tokio::test]
600    async fn producer_passthrough_delegates_and_produces() {
601        let endpoint_calls = Arc::new(AtomicUsize::new(0));
602        let producer_calls = Arc::new(AtomicUsize::new(0));
603        let delegate = Arc::new(MockProducerDelegateComponent {
604            create_endpoint_calls: Arc::clone(&endpoint_calls),
605            create_producer_calls: Arc::clone(&producer_calls),
606            fail_producer: false,
607        });
608
609        let ctx = MockDelegateContext {
610            delegate: delegate.clone(),
611        };
612
613        let master = MasterComponent::default();
614        let endpoint = master
615            .create_endpoint("master:lock-1:mock:delegate", &ctx)
616            .unwrap();
617        let producer_ctx = ProducerContext::new();
618        let producer = endpoint.create_producer(&producer_ctx).unwrap();
619
620        let exchange = Exchange::new(Message::new("ok"));
621        let result = producer.oneshot(exchange).await.unwrap();
622
623        assert_eq!(result.input.body.as_text(), Some("ok"));
624        assert_eq!(endpoint_calls.load(Ordering::SeqCst), 1);
625        assert_eq!(producer_calls.load(Ordering::SeqCst), 1);
626    }
627
628    #[test]
629    fn producer_passthrough_bubbles_delegate_errors() {
630        let endpoint_calls = Arc::new(AtomicUsize::new(0));
631        let producer_calls = Arc::new(AtomicUsize::new(0));
632        let delegate = Arc::new(MockProducerDelegateComponent {
633            create_endpoint_calls: Arc::clone(&endpoint_calls),
634            create_producer_calls: Arc::clone(&producer_calls),
635            fail_producer: true,
636        });
637
638        let ctx = MockDelegateContext {
639            delegate: delegate.clone(),
640        };
641
642        let master = MasterComponent::default();
643        let endpoint = master
644            .create_endpoint("master:lock-1:mock:delegate", &ctx)
645            .unwrap();
646        let producer_ctx = ProducerContext::new();
647        let err = endpoint.create_producer(&producer_ctx).unwrap_err();
648
649        assert!(matches!(err, CamelError::ProcessorError(_)));
650        assert_eq!(endpoint_calls.load(Ordering::SeqCst), 1);
651        assert_eq!(producer_calls.load(Ordering::SeqCst), 1);
652    }
653
654    struct FakeLeadershipService {
655        tx: Mutex<Option<watch::Sender<Option<LeadershipEvent>>>>,
656        is_leader: Arc<AtomicBool>,
657        initial: Option<LeadershipEvent>,
658    }
659
660    impl FakeLeadershipService {
661        fn new(initial: Option<LeadershipEvent>) -> Self {
662            let starts_as_leader = matches!(initial, Some(LeadershipEvent::StartedLeading));
663            Self {
664                tx: Mutex::new(None),
665                is_leader: Arc::new(AtomicBool::new(starts_as_leader)),
666                initial,
667            }
668        }
669
670        async fn emit(&self, event: LeadershipEvent) {
671            self.is_leader.store(
672                matches!(event, LeadershipEvent::StartedLeading),
673                Ordering::Release,
674            );
675            if let Some(tx) = self
676                .tx
677                .lock()
678                .expect("mutex poisoned: fake elector sender")
679                .as_ref()
680            {
681                let _ = tx.send(Some(event));
682            }
683        }
684    }
685
686    #[async_trait]
687    impl LeadershipService for FakeLeadershipService {
688        async fn start(&self, _lock_name: &str) -> Result<LeadershipHandle, PlatformError> {
689            let (tx, rx) = watch::channel(self.initial.clone());
690            *self.tx.lock().expect("mutex poisoned: fake elector sender") = Some(tx);
691
692            let cancel = CancellationToken::new();
693            let cancel_wait = cancel.clone();
694            let (term_tx, term_rx) = oneshot::channel();
695            tokio::spawn(async move {
696                cancel_wait.cancelled().await;
697                let _ = term_tx.send(());
698            });
699
700            Ok(LeadershipHandle::new(
701                rx,
702                Arc::clone(&self.is_leader),
703                cancel,
704                term_rx,
705            ))
706        }
707    }
708
709    struct FakePlatformService {
710        identity: PlatformIdentity,
711        readiness_gate: Arc<dyn ReadinessGate>,
712        leadership: Arc<dyn LeadershipService>,
713    }
714
715    impl FakePlatformService {
716        fn new(leadership: Arc<dyn LeadershipService>) -> Self {
717            Self {
718                identity: PlatformIdentity::local("master-tests"),
719                readiness_gate: Arc::new(NoopReadinessGate),
720                leadership,
721            }
722        }
723    }
724
725    impl PlatformService for FakePlatformService {
726        fn identity(&self) -> PlatformIdentity {
727            self.identity.clone()
728        }
729
730        fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
731            Arc::clone(&self.readiness_gate)
732        }
733
734        fn leadership(&self) -> Arc<dyn LeadershipService> {
735            Arc::clone(&self.leadership)
736        }
737    }
738
739    struct FakeDelegateComponent {
740        create_consumer_calls: Arc<AtomicUsize>,
741        start_calls: Arc<AtomicUsize>,
742    }
743
744    impl Component for FakeDelegateComponent {
745        fn scheme(&self) -> &str {
746            "fake"
747        }
748
749        fn create_endpoint(
750            &self,
751            _uri: &str,
752            _ctx: &dyn ComponentContext,
753        ) -> Result<Box<dyn Endpoint>, CamelError> {
754            Ok(Box::new(FakeDelegateEndpoint {
755                create_consumer_calls: Arc::clone(&self.create_consumer_calls),
756                start_calls: Arc::clone(&self.start_calls),
757            }))
758        }
759    }
760
761    struct FakeDelegateEndpoint {
762        create_consumer_calls: Arc<AtomicUsize>,
763        start_calls: Arc<AtomicUsize>,
764    }
765
766    impl Endpoint for FakeDelegateEndpoint {
767        fn uri(&self) -> &str {
768            "fake:delegate"
769        }
770
771        fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
772            let epoch = self.create_consumer_calls.fetch_add(1, Ordering::SeqCst) + 1;
773            Ok(Box::new(FakeDelegateConsumer {
774                epoch,
775                start_calls: Arc::clone(&self.start_calls),
776            }))
777        }
778
779        fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
780            Err(CamelError::EndpointCreationFailed("not used".to_string()))
781        }
782    }
783
784    struct FakeDelegateConsumer {
785        epoch: usize,
786        start_calls: Arc<AtomicUsize>,
787    }
788
789    struct FailingDelegateComponent {
790        create_endpoint_calls: Arc<AtomicUsize>,
791    }
792
793    impl Component for FailingDelegateComponent {
794        fn scheme(&self) -> &str {
795            "failing"
796        }
797
798        fn create_endpoint(
799            &self,
800            _uri: &str,
801            _ctx: &dyn ComponentContext,
802        ) -> Result<Box<dyn Endpoint>, CamelError> {
803            self.create_endpoint_calls.fetch_add(1, Ordering::SeqCst);
804            Err(CamelError::EndpointCreationFailed(
805                "delegate endpoint creation failed".to_string(),
806            ))
807        }
808    }
809
810    #[async_trait]
811    impl Consumer for FakeDelegateConsumer {
812        async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
813            self.start_calls.fetch_add(1, Ordering::SeqCst);
814            context
815                .send(Exchange::new(Message::new(format!("epoch-{}", self.epoch))))
816                .await?;
817
818            loop {
819                tokio::select! {
820                    _ = context.cancelled() => {
821                        break;
822                    }
823                    _ = sleep(Duration::from_millis(20)) => {
824                        context
825                            .send(Exchange::new(Message::new(format!("epoch-{}", self.epoch))))
826                            .await?;
827                    }
828                }
829            }
830
831            Ok(())
832        }
833
834        async fn stop(&mut self) -> Result<(), CamelError> {
835            Ok(())
836        }
837    }
838
839    fn build_master_consumer(
840        platform_service: Arc<dyn PlatformService>,
841        create_consumer_calls: Arc<AtomicUsize>,
842        start_calls: Arc<AtomicUsize>,
843        delegate_retry_max_attempts: Option<u32>,
844    ) -> MasterConsumer {
845        MasterConsumer::new(
846            "lock-a".to_string(),
847            "fake:delegate".to_string(),
848            Arc::new(FakeDelegateComponent {
849                create_consumer_calls,
850                start_calls,
851            }),
852            Arc::new(NoOpMetrics),
853            platform_service,
854            Duration::from_millis(500),
855            delegate_retry_max_attempts,
856        )
857    }
858
859    #[tokio::test]
860    async fn starts_delegate_only_after_started_leading() {
861        let leadership = Arc::new(FakeLeadershipService::new(None));
862        let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
863        let create_consumer_calls = Arc::new(AtomicUsize::new(0));
864        let start_calls = Arc::new(AtomicUsize::new(0));
865        let mut master = build_master_consumer(
866            platform_service,
867            Arc::clone(&create_consumer_calls),
868            Arc::clone(&start_calls),
869            Some(30),
870        );
871
872        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
873        let cancel = CancellationToken::new();
874        let ctx = ConsumerContext::new(tx, cancel.clone());
875
876        master.start(ctx).await.unwrap();
877
878        sleep(Duration::from_millis(80)).await;
879        assert!(rx.try_recv().is_err());
880        assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 0);
881
882        leadership.emit(LeadershipEvent::StartedLeading).await;
883
884        let first = timeout(Duration::from_millis(500), rx.recv())
885            .await
886            .unwrap()
887            .unwrap();
888        assert_eq!(first.exchange.input.body.as_text(), Some("epoch-1"));
889        assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 1);
890        assert_eq!(start_calls.load(Ordering::SeqCst), 1);
891
892        cancel.cancel();
893        master.stop().await.unwrap();
894    }
895
896    #[tokio::test]
897    async fn stops_delegate_on_stopped_leading() {
898        let leadership = Arc::new(FakeLeadershipService::new(None));
899        let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
900        let create_consumer_calls = Arc::new(AtomicUsize::new(0));
901        let start_calls = Arc::new(AtomicUsize::new(0));
902        let mut master = build_master_consumer(
903            platform_service,
904            Arc::clone(&create_consumer_calls),
905            Arc::clone(&start_calls),
906            Some(30),
907        );
908
909        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
910        let cancel = CancellationToken::new();
911        let ctx = ConsumerContext::new(tx, cancel.clone());
912
913        master.start(ctx).await.unwrap();
914        leadership.emit(LeadershipEvent::StartedLeading).await;
915        let _ = timeout(Duration::from_millis(500), rx.recv())
916            .await
917            .unwrap()
918            .unwrap();
919
920        leadership.emit(LeadershipEvent::StoppedLeading).await;
921        sleep(Duration::from_millis(100)).await;
922        while rx.try_recv().is_ok() {}
923        assert!(
924            timeout(Duration::from_millis(120), rx.recv())
925                .await
926                .is_err()
927        );
928
929        cancel.cancel();
930        master.stop().await.unwrap();
931    }
932
933    #[tokio::test]
934    async fn recreates_delegate_on_new_leadership_epoch() {
935        let leadership = Arc::new(FakeLeadershipService::new(None));
936        let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
937        let create_consumer_calls = Arc::new(AtomicUsize::new(0));
938        let start_calls = Arc::new(AtomicUsize::new(0));
939        let mut master = build_master_consumer(
940            platform_service,
941            Arc::clone(&create_consumer_calls),
942            Arc::clone(&start_calls),
943            Some(30),
944        );
945
946        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
947        let cancel = CancellationToken::new();
948        let ctx = ConsumerContext::new(tx, cancel.clone());
949
950        master.start(ctx).await.unwrap();
951
952        leadership.emit(LeadershipEvent::StartedLeading).await;
953        let first = timeout(Duration::from_millis(500), rx.recv())
954            .await
955            .unwrap()
956            .unwrap();
957        assert_eq!(first.exchange.input.body.as_text(), Some("epoch-1"));
958
959        leadership.emit(LeadershipEvent::StoppedLeading).await;
960        sleep(Duration::from_millis(120)).await;
961
962        leadership.emit(LeadershipEvent::StartedLeading).await;
963        let second = timeout(Duration::from_millis(500), rx.recv())
964            .await
965            .unwrap()
966            .unwrap();
967        assert_eq!(second.exchange.input.body.as_text(), Some("epoch-2"));
968
969        assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 2);
970        assert_eq!(start_calls.load(Ordering::SeqCst), 2);
971
972        cancel.cancel();
973        master.stop().await.unwrap();
974    }
975
976    #[tokio::test]
977    async fn stops_retrying_delegate_start_after_max_attempts() {
978        let leadership = Arc::new(FakeLeadershipService::new(Some(
979            LeadershipEvent::StartedLeading,
980        )));
981        let platform_service = Arc::new(FakePlatformService::new(leadership));
982        let create_endpoint_calls = Arc::new(AtomicUsize::new(0));
983
984        let mut master = MasterConsumer::new(
985            "lock-a".to_string(),
986            "failing:delegate".to_string(),
987            Arc::new(FailingDelegateComponent {
988                create_endpoint_calls: Arc::clone(&create_endpoint_calls),
989            }),
990            Arc::new(NoOpMetrics),
991            platform_service,
992            Duration::from_millis(500),
993            Some(1),
994        );
995
996        let (tx, _rx) = tokio::sync::mpsc::channel(16);
997        let cancel = CancellationToken::new();
998        let ctx = ConsumerContext::new(tx, cancel.clone());
999
1000        master.start(ctx).await.unwrap();
1001        sleep(Duration::from_millis(750)).await;
1002
1003        assert_eq!(create_endpoint_calls.load(Ordering::SeqCst), 2);
1004
1005        cancel.cancel();
1006        master.stop().await.unwrap();
1007    }
1008}