Skip to main content

camel_master/
lib.rs

1pub mod bundle;
2mod config;
3
4pub use bundle::MasterBundle;
5
6use std::sync::Arc;
7use std::time::Duration;
8
9use async_trait::async_trait;
10use camel_api::{CamelError, LeaderElector, MetricsCollector};
11use camel_component_api::{
12    BoxProcessor, Component, ComponentContext, Consumer, ConsumerContext, Endpoint,
13    ExchangeEnvelope, ProducerContext, parse_uri,
14};
15use camel_language_api::Language;
16use tokio::task::JoinHandle;
17use tokio::time::timeout;
18use tokio_util::sync::CancellationToken;
19use tracing::{info, warn};
20
21use crate::config::{MasterComponentConfig, MasterUriConfig};
22
23pub struct MasterComponent {
24    drain_timeout_ms: u64,
25}
26
27impl MasterComponent {
28    pub fn new(config: MasterComponentConfig) -> Self {
29        Self {
30            drain_timeout_ms: config.drain_timeout_ms,
31        }
32    }
33}
34
35impl Default for MasterComponent {
36    fn default() -> Self {
37        Self::new(MasterComponentConfig::default())
38    }
39}
40
41impl Component for MasterComponent {
42    fn scheme(&self) -> &str {
43        "master"
44    }
45
46    fn create_endpoint(
47        &self,
48        uri: &str,
49        ctx: &dyn ComponentContext,
50    ) -> Result<Box<dyn Endpoint>, CamelError> {
51        let parsed = MasterUriConfig::parse(uri)?;
52        let delegate_parts = parse_uri(&parsed.delegate_uri)?;
53        let delegate_scheme = delegate_parts.scheme;
54        let delegate_component = ctx
55            .resolve_component(&delegate_scheme)
56            .ok_or_else(|| CamelError::ComponentNotFound(delegate_scheme.clone()))?;
57
58        Ok(Box::new(MasterEndpoint {
59            uri: uri.to_string(),
60            lock_name: parsed.lock_name,
61            delegate_uri: parsed.delegate_uri,
62            delegate_component,
63            metrics: ctx.metrics(),
64            leader_elector: ctx.leader_elector(),
65            drain_timeout: Duration::from_millis(self.drain_timeout_ms),
66        }))
67    }
68}
69
70struct MasterEndpoint {
71    uri: String,
72    lock_name: String,
73    delegate_uri: String,
74    delegate_component: Arc<dyn Component>,
75    metrics: Arc<dyn MetricsCollector>,
76    leader_elector: Arc<dyn LeaderElector>,
77    drain_timeout: Duration,
78}
79
80impl Endpoint for MasterEndpoint {
81    fn uri(&self) -> &str {
82        &self.uri
83    }
84
85    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
86        Ok(Box::new(MasterConsumer::new(
87            self.lock_name.clone(),
88            self.delegate_uri.clone(),
89            Arc::clone(&self.delegate_component),
90            Arc::clone(&self.metrics),
91            Arc::clone(&self.leader_elector),
92            self.drain_timeout,
93        )))
94    }
95
96    fn create_producer(&self, ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
97        let delegate_ctx = MasterDelegateContext {
98            delegate_component: Arc::clone(&self.delegate_component),
99            metrics: Arc::clone(&self.metrics),
100            leader_elector: Arc::clone(&self.leader_elector),
101        };
102
103        self.delegate_component
104            .create_endpoint(&self.delegate_uri, &delegate_ctx)?
105            .create_producer(ctx)
106    }
107}
108
109struct MasterDelegateContext {
110    delegate_component: Arc<dyn Component>,
111    metrics: Arc<dyn MetricsCollector>,
112    leader_elector: Arc<dyn LeaderElector>,
113}
114
115impl ComponentContext for MasterDelegateContext {
116    fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
117        if self.delegate_component.scheme() == scheme {
118            Some(Arc::clone(&self.delegate_component))
119        } else {
120            None
121        }
122    }
123
124    fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
125        None
126    }
127
128    fn metrics(&self) -> Arc<dyn MetricsCollector> {
129        Arc::clone(&self.metrics)
130    }
131
132    fn leader_elector(&self) -> Arc<dyn LeaderElector> {
133        Arc::clone(&self.leader_elector)
134    }
135}
136
137struct MasterConsumer {
138    lock_name: String,
139    delegate_uri: String,
140    delegate_component: Arc<dyn Component>,
141    metrics: Arc<dyn MetricsCollector>,
142    leader_elector: Arc<dyn LeaderElector>,
143    drain_timeout: Duration,
144    leadership_handle: Option<camel_api::LeadershipHandle>,
145    leadership_task: Option<JoinHandle<()>>,
146    stop_token: Option<CancellationToken>,
147}
148
149impl MasterConsumer {
150    fn new(
151        lock_name: String,
152        delegate_uri: String,
153        delegate_component: Arc<dyn Component>,
154        metrics: Arc<dyn MetricsCollector>,
155        leader_elector: Arc<dyn LeaderElector>,
156        drain_timeout: Duration,
157    ) -> Self {
158        Self {
159            lock_name,
160            delegate_uri,
161            delegate_component,
162            metrics,
163            leader_elector,
164            drain_timeout,
165            leadership_handle: None,
166            leadership_task: None,
167            stop_token: None,
168        }
169    }
170}
171
172enum DelegateState {
173    Inactive,
174    Active {
175        run_token: CancellationToken,
176        handle: JoinHandle<()>,
177    },
178}
179
180async fn stop_delegate(state: &mut DelegateState, drain_timeout: Duration) {
181    if let DelegateState::Active {
182        run_token,
183        mut handle,
184    } = std::mem::replace(state, DelegateState::Inactive)
185    {
186        run_token.cancel();
187        match timeout(drain_timeout, &mut handle).await {
188            Ok(_) => {}
189            Err(_) => {
190                warn!("master delegate shutdown timed out, aborting");
191                handle.abort();
192            }
193        }
194    }
195}
196
197async fn reconcile_event(
198    event: camel_api::LeadershipEvent,
199    state: &mut DelegateState,
200    lock_name: &str,
201    delegate_component: &Arc<dyn Component>,
202    delegate_uri: &str,
203    sender: &tokio::sync::mpsc::Sender<ExchangeEnvelope>,
204    parent_cancel: &CancellationToken,
205    drain_timeout: Duration,
206    metrics: &Arc<dyn MetricsCollector>,
207    leader_elector: &Arc<dyn LeaderElector>,
208) {
209    match event {
210        camel_api::LeadershipEvent::StartedLeading => {
211            info!(lock = %lock_name, "master leadership acquired");
212            stop_delegate(state, drain_timeout).await;
213
214            let delegate_ctx = MasterDelegateContext {
215                delegate_component: Arc::clone(delegate_component),
216                metrics: Arc::clone(metrics),
217                leader_elector: Arc::clone(leader_elector),
218            };
219
220            let endpoint = match delegate_component.create_endpoint(delegate_uri, &delegate_ctx) {
221                Ok(endpoint) => endpoint,
222                Err(err) => {
223                    warn!(lock = %lock_name, "failed to create delegate endpoint: {err}");
224                    return;
225                }
226            };
227
228            let mut consumer = match endpoint.create_consumer() {
229                Ok(consumer) => consumer,
230                Err(err) => {
231                    warn!(lock = %lock_name, "failed to create delegate consumer: {err}");
232                    return;
233                }
234            };
235
236            let run_token = parent_cancel.child_token();
237            let delegate_ctx = ConsumerContext::new(sender.clone(), run_token.clone());
238            let handle = tokio::spawn(async move {
239                let _ = consumer.start(delegate_ctx).await;
240                let _ = consumer.stop().await;
241            });
242
243            *state = DelegateState::Active { run_token, handle };
244        }
245        camel_api::LeadershipEvent::StoppedLeading => {
246            info!(lock = %lock_name, "master leadership lost");
247            stop_delegate(state, drain_timeout).await;
248        }
249    }
250}
251
252#[async_trait]
253impl Consumer for MasterConsumer {
254    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
255        if self.leadership_task.is_some() {
256            return Ok(());
257        }
258
259        let handle = self
260            .leader_elector
261            .start(camel_api::PlatformIdentity::local(&self.lock_name))
262            .await
263            .map_err(|e| {
264                CamelError::EndpointCreationFailed(format!("failed to start leader election: {e}"))
265            })?;
266
267        let lock_name = self.lock_name.clone();
268        let delegate_uri = self.delegate_uri.clone();
269        let delegate_component = Arc::clone(&self.delegate_component);
270        let metrics = Arc::clone(&self.metrics);
271        let leader_elector = Arc::clone(&self.leader_elector);
272        let sender = context.sender();
273        let parent_cancel = context.cancel_token();
274        let drain_timeout = self.drain_timeout;
275        let mut events = handle.events.clone();
276
277        let stop_token = CancellationToken::new();
278        let stop_token_loop = stop_token.clone();
279
280        let task = tokio::spawn(async move {
281            let mut state = DelegateState::Inactive;
282
283            let initial_event = { events.borrow().clone() };
284            if let Some(initial_event) = initial_event {
285                reconcile_event(
286                    initial_event,
287                    &mut state,
288                    &lock_name,
289                    &delegate_component,
290                    &delegate_uri,
291                    &sender,
292                    &parent_cancel,
293                    drain_timeout,
294                    &metrics,
295                    &leader_elector,
296                )
297                .await;
298            }
299
300            loop {
301                tokio::select! {
302                    _ = stop_token_loop.cancelled() => {
303                        break;
304                    }
305                    _ = context.cancelled() => {
306                        break;
307                    }
308                    changed = events.changed() => {
309                        if changed.is_err() {
310                            break;
311                        }
312                        let event = { events.borrow().clone() };
313                        if let Some(event) = event {
314                            reconcile_event(
315                                event,
316                                &mut state,
317                                &lock_name,
318                                &delegate_component,
319                                &delegate_uri,
320                                &sender,
321                                &parent_cancel,
322                                drain_timeout,
323                                &metrics,
324                                &leader_elector,
325                            )
326                            .await;
327                        }
328                    }
329                }
330            }
331
332            stop_delegate(&mut state, drain_timeout).await;
333        });
334
335        self.leadership_handle = Some(handle);
336        self.stop_token = Some(stop_token);
337        self.leadership_task = Some(task);
338
339        Ok(())
340    }
341
342    async fn stop(&mut self) -> Result<(), CamelError> {
343        if let Some(token) = self.stop_token.take() {
344            token.cancel();
345        }
346
347        if let Some(task) = self.leadership_task.take()
348            && timeout(self.drain_timeout, task).await.is_err()
349        {
350            warn!("master leadership loop shutdown timed out");
351        }
352
353        if let Some(handle) = self.leadership_handle.take() {
354            let _ = timeout(self.drain_timeout, handle.step_down()).await;
355        }
356
357        Ok(())
358    }
359}
360
361#[cfg(test)]
362mod tests {
363    use std::sync::Arc;
364    use std::sync::Mutex;
365    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
366
367    use camel_api::{
368        BoxProcessorExt, Exchange, LeadershipEvent, LeadershipHandle, Message, NoOpMetrics,
369        NoopLeaderElector, PlatformError, PlatformIdentity,
370    };
371    use camel_component_api::NoOpComponentContext;
372    use tokio::sync::{oneshot, watch};
373    use tokio::time::{sleep, timeout};
374    use tokio_util::sync::CancellationToken;
375    use tower::ServiceExt;
376
377    use super::*;
378
379    #[test]
380    fn parse_master_uri_valid() {
381        let cfg = MasterUriConfig::parse("master:mylock:timer:tick?period=250").unwrap();
382        assert_eq!(cfg.lock_name, "mylock");
383        assert_eq!(cfg.delegate_uri, "timer:tick?period=250");
384    }
385
386    #[test]
387    fn parse_master_uri_missing_lockname() {
388        let err = MasterUriConfig::parse("master::timer:tick").unwrap_err();
389        assert!(matches!(err, CamelError::InvalidUri(_)));
390    }
391
392    #[test]
393    fn parse_master_uri_missing_delegate() {
394        let err = MasterUriConfig::parse("master:mylock:").unwrap_err();
395        assert!(matches!(err, CamelError::InvalidUri(_)));
396    }
397
398    #[test]
399    fn endpoint_fails_when_delegate_component_missing() {
400        let master = MasterComponent::default();
401        let result =
402            master.create_endpoint("master:lock-1:missing:delegate", &NoOpComponentContext);
403        assert!(matches!(result, Err(CamelError::ComponentNotFound(_))));
404    }
405
406    #[test]
407    fn delegate_scheme_is_parsed_from_delegate_uri() {
408        let seen_scheme = Arc::new(AtomicBool::new(false));
409
410        struct SchemeAwareContext {
411            delegate: Arc<dyn Component>,
412            seen_scheme: Arc<AtomicBool>,
413        }
414
415        impl ComponentContext for SchemeAwareContext {
416            fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
417                if scheme == "mock" {
418                    self.seen_scheme.store(true, Ordering::SeqCst);
419                    Some(Arc::clone(&self.delegate))
420                } else {
421                    None
422                }
423            }
424
425            fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
426                None
427            }
428
429            fn metrics(&self) -> Arc<dyn MetricsCollector> {
430                Arc::new(NoOpMetrics)
431            }
432
433            fn leader_elector(&self) -> Arc<dyn LeaderElector> {
434                Arc::new(NoopLeaderElector)
435            }
436        }
437
438        struct MockDelegateComponent;
439
440        impl Component for MockDelegateComponent {
441            fn scheme(&self) -> &str {
442                "mock"
443            }
444
445            fn create_endpoint(
446                &self,
447                _uri: &str,
448                _ctx: &dyn ComponentContext,
449            ) -> Result<Box<dyn Endpoint>, CamelError> {
450                Ok(Box::new(MockDelegateEndpoint))
451            }
452        }
453
454        struct MockDelegateEndpoint;
455
456        impl Endpoint for MockDelegateEndpoint {
457            fn uri(&self) -> &str {
458                "mock:delegate"
459            }
460
461            fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
462                Err(CamelError::EndpointCreationFailed("not used".to_string()))
463            }
464
465            fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
466                Err(CamelError::EndpointCreationFailed("not used".to_string()))
467            }
468        }
469
470        let delegate = Arc::new(MockDelegateComponent);
471        let ctx = SchemeAwareContext {
472            delegate,
473            seen_scheme: Arc::clone(&seen_scheme),
474        };
475
476        let master = MasterComponent::default();
477        let endpoint = master
478            .create_endpoint("master:mylock:mock:delegate?x=1", &ctx)
479            .unwrap();
480
481        assert_eq!(endpoint.uri(), "master:mylock:mock:delegate?x=1");
482        assert!(seen_scheme.load(Ordering::SeqCst));
483    }
484
485    struct MockDelegateContext {
486        delegate: Arc<dyn Component>,
487    }
488
489    impl ComponentContext for MockDelegateContext {
490        fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
491            if self.delegate.scheme() == scheme {
492                Some(Arc::clone(&self.delegate))
493            } else {
494                None
495            }
496        }
497
498        fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
499            None
500        }
501
502        fn metrics(&self) -> Arc<dyn MetricsCollector> {
503            Arc::new(NoOpMetrics)
504        }
505
506        fn leader_elector(&self) -> Arc<dyn LeaderElector> {
507            Arc::new(NoopLeaderElector)
508        }
509    }
510
511    struct MockProducerDelegateComponent {
512        create_endpoint_calls: Arc<AtomicUsize>,
513        create_producer_calls: Arc<AtomicUsize>,
514        fail_producer: bool,
515    }
516
517    impl Component for MockProducerDelegateComponent {
518        fn scheme(&self) -> &str {
519            "mock"
520        }
521
522        fn create_endpoint(
523            &self,
524            _uri: &str,
525            _ctx: &dyn ComponentContext,
526        ) -> Result<Box<dyn Endpoint>, CamelError> {
527            self.create_endpoint_calls.fetch_add(1, Ordering::SeqCst);
528            Ok(Box::new(MockProducerDelegateEndpoint {
529                create_producer_calls: Arc::clone(&self.create_producer_calls),
530                fail_producer: self.fail_producer,
531            }))
532        }
533    }
534
535    struct MockProducerDelegateEndpoint {
536        create_producer_calls: Arc<AtomicUsize>,
537        fail_producer: bool,
538    }
539
540    impl Endpoint for MockProducerDelegateEndpoint {
541        fn uri(&self) -> &str {
542            "mock:delegate"
543        }
544
545        fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
546            Err(CamelError::EndpointCreationFailed(
547                "not used in test".to_string(),
548            ))
549        }
550
551        fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
552            self.create_producer_calls.fetch_add(1, Ordering::SeqCst);
553            if self.fail_producer {
554                return Err(CamelError::ProcessorError(
555                    "delegate producer failed".to_string(),
556                ));
557            }
558            Ok(BoxProcessor::from_fn(
559                |exchange| async move { Ok(exchange) },
560            ))
561        }
562    }
563
564    #[tokio::test]
565    async fn producer_passthrough_delegates_and_produces() {
566        let endpoint_calls = Arc::new(AtomicUsize::new(0));
567        let producer_calls = Arc::new(AtomicUsize::new(0));
568        let delegate = Arc::new(MockProducerDelegateComponent {
569            create_endpoint_calls: Arc::clone(&endpoint_calls),
570            create_producer_calls: Arc::clone(&producer_calls),
571            fail_producer: false,
572        });
573
574        let ctx = MockDelegateContext {
575            delegate: delegate.clone(),
576        };
577
578        let master = MasterComponent::default();
579        let endpoint = master
580            .create_endpoint("master:lock-1:mock:delegate", &ctx)
581            .unwrap();
582        let producer_ctx = ProducerContext::new();
583        let producer = endpoint.create_producer(&producer_ctx).unwrap();
584
585        let exchange = Exchange::new(Message::new("ok"));
586        let result = producer.oneshot(exchange).await.unwrap();
587
588        assert_eq!(result.input.body.as_text(), Some("ok"));
589        assert_eq!(endpoint_calls.load(Ordering::SeqCst), 1);
590        assert_eq!(producer_calls.load(Ordering::SeqCst), 1);
591    }
592
593    #[test]
594    fn producer_passthrough_bubbles_delegate_errors() {
595        let endpoint_calls = Arc::new(AtomicUsize::new(0));
596        let producer_calls = Arc::new(AtomicUsize::new(0));
597        let delegate = Arc::new(MockProducerDelegateComponent {
598            create_endpoint_calls: Arc::clone(&endpoint_calls),
599            create_producer_calls: Arc::clone(&producer_calls),
600            fail_producer: true,
601        });
602
603        let ctx = MockDelegateContext {
604            delegate: delegate.clone(),
605        };
606
607        let master = MasterComponent::default();
608        let endpoint = master
609            .create_endpoint("master:lock-1:mock:delegate", &ctx)
610            .unwrap();
611        let producer_ctx = ProducerContext::new();
612        let err = endpoint.create_producer(&producer_ctx).unwrap_err();
613
614        assert!(matches!(err, CamelError::ProcessorError(_)));
615        assert_eq!(endpoint_calls.load(Ordering::SeqCst), 1);
616        assert_eq!(producer_calls.load(Ordering::SeqCst), 1);
617    }
618
619    struct FakeLeaderElector {
620        tx: Mutex<Option<watch::Sender<Option<LeadershipEvent>>>>,
621        is_leader: Arc<AtomicBool>,
622        initial: Option<LeadershipEvent>,
623    }
624
625    impl FakeLeaderElector {
626        fn new(initial: Option<LeadershipEvent>) -> Self {
627            let starts_as_leader = matches!(initial, Some(LeadershipEvent::StartedLeading));
628            Self {
629                tx: Mutex::new(None),
630                is_leader: Arc::new(AtomicBool::new(starts_as_leader)),
631                initial,
632            }
633        }
634
635        async fn emit(&self, event: LeadershipEvent) {
636            self.is_leader.store(
637                matches!(event, LeadershipEvent::StartedLeading),
638                Ordering::Release,
639            );
640            if let Some(tx) = self
641                .tx
642                .lock()
643                .expect("mutex poisoned: fake elector sender")
644                .as_ref()
645            {
646                let _ = tx.send(Some(event));
647            }
648        }
649    }
650
651    #[async_trait]
652    impl LeaderElector for FakeLeaderElector {
653        async fn start(
654            &self,
655            _identity: PlatformIdentity,
656        ) -> Result<LeadershipHandle, PlatformError> {
657            let (tx, rx) = watch::channel(self.initial.clone());
658            *self.tx.lock().expect("mutex poisoned: fake elector sender") = Some(tx);
659
660            let cancel = CancellationToken::new();
661            let cancel_wait = cancel.clone();
662            let (term_tx, term_rx) = oneshot::channel();
663            tokio::spawn(async move {
664                cancel_wait.cancelled().await;
665                let _ = term_tx.send(());
666            });
667
668            Ok(LeadershipHandle::new(
669                rx,
670                Arc::clone(&self.is_leader),
671                cancel,
672                term_rx,
673            ))
674        }
675    }
676
677    struct FakeDelegateComponent {
678        create_consumer_calls: Arc<AtomicUsize>,
679        start_calls: Arc<AtomicUsize>,
680    }
681
682    impl Component for FakeDelegateComponent {
683        fn scheme(&self) -> &str {
684            "fake"
685        }
686
687        fn create_endpoint(
688            &self,
689            _uri: &str,
690            _ctx: &dyn ComponentContext,
691        ) -> Result<Box<dyn Endpoint>, CamelError> {
692            Ok(Box::new(FakeDelegateEndpoint {
693                create_consumer_calls: Arc::clone(&self.create_consumer_calls),
694                start_calls: Arc::clone(&self.start_calls),
695            }))
696        }
697    }
698
699    struct FakeDelegateEndpoint {
700        create_consumer_calls: Arc<AtomicUsize>,
701        start_calls: Arc<AtomicUsize>,
702    }
703
704    impl Endpoint for FakeDelegateEndpoint {
705        fn uri(&self) -> &str {
706            "fake:delegate"
707        }
708
709        fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
710            let epoch = self.create_consumer_calls.fetch_add(1, Ordering::SeqCst) + 1;
711            Ok(Box::new(FakeDelegateConsumer {
712                epoch,
713                start_calls: Arc::clone(&self.start_calls),
714            }))
715        }
716
717        fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
718            Err(CamelError::EndpointCreationFailed("not used".to_string()))
719        }
720    }
721
722    struct FakeDelegateConsumer {
723        epoch: usize,
724        start_calls: Arc<AtomicUsize>,
725    }
726
727    #[async_trait]
728    impl Consumer for FakeDelegateConsumer {
729        async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
730            self.start_calls.fetch_add(1, Ordering::SeqCst);
731            context
732                .send(Exchange::new(Message::new(format!("epoch-{}", self.epoch))))
733                .await?;
734
735            loop {
736                tokio::select! {
737                    _ = context.cancelled() => {
738                        break;
739                    }
740                    _ = sleep(Duration::from_millis(20)) => {
741                        context
742                            .send(Exchange::new(Message::new(format!("epoch-{}", self.epoch))))
743                            .await?;
744                    }
745                }
746            }
747
748            Ok(())
749        }
750
751        async fn stop(&mut self) -> Result<(), CamelError> {
752            Ok(())
753        }
754    }
755
756    fn build_master_consumer(
757        elector: Arc<dyn LeaderElector>,
758        create_consumer_calls: Arc<AtomicUsize>,
759        start_calls: Arc<AtomicUsize>,
760    ) -> MasterConsumer {
761        MasterConsumer::new(
762            "lock-a".to_string(),
763            "fake:delegate".to_string(),
764            Arc::new(FakeDelegateComponent {
765                create_consumer_calls,
766                start_calls,
767            }),
768            Arc::new(NoOpMetrics),
769            elector,
770            Duration::from_millis(500),
771        )
772    }
773
774    #[tokio::test]
775    async fn starts_delegate_only_after_started_leading() {
776        let elector = Arc::new(FakeLeaderElector::new(None));
777        let create_consumer_calls = Arc::new(AtomicUsize::new(0));
778        let start_calls = Arc::new(AtomicUsize::new(0));
779        let mut master = build_master_consumer(
780            elector.clone(),
781            Arc::clone(&create_consumer_calls),
782            Arc::clone(&start_calls),
783        );
784
785        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
786        let cancel = CancellationToken::new();
787        let ctx = ConsumerContext::new(tx, cancel.clone());
788
789        master.start(ctx).await.unwrap();
790
791        sleep(Duration::from_millis(80)).await;
792        assert!(rx.try_recv().is_err());
793        assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 0);
794
795        elector.emit(LeadershipEvent::StartedLeading).await;
796
797        let first = timeout(Duration::from_millis(500), rx.recv())
798            .await
799            .unwrap()
800            .unwrap();
801        assert_eq!(first.exchange.input.body.as_text(), Some("epoch-1"));
802        assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 1);
803        assert_eq!(start_calls.load(Ordering::SeqCst), 1);
804
805        cancel.cancel();
806        master.stop().await.unwrap();
807    }
808
809    #[tokio::test]
810    async fn stops_delegate_on_stopped_leading() {
811        let elector = Arc::new(FakeLeaderElector::new(None));
812        let create_consumer_calls = Arc::new(AtomicUsize::new(0));
813        let start_calls = Arc::new(AtomicUsize::new(0));
814        let mut master = build_master_consumer(
815            elector.clone(),
816            Arc::clone(&create_consumer_calls),
817            Arc::clone(&start_calls),
818        );
819
820        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
821        let cancel = CancellationToken::new();
822        let ctx = ConsumerContext::new(tx, cancel.clone());
823
824        master.start(ctx).await.unwrap();
825        elector.emit(LeadershipEvent::StartedLeading).await;
826        let _ = timeout(Duration::from_millis(500), rx.recv())
827            .await
828            .unwrap()
829            .unwrap();
830
831        elector.emit(LeadershipEvent::StoppedLeading).await;
832        sleep(Duration::from_millis(100)).await;
833        while rx.try_recv().is_ok() {}
834        assert!(
835            timeout(Duration::from_millis(120), rx.recv())
836                .await
837                .is_err()
838        );
839
840        cancel.cancel();
841        master.stop().await.unwrap();
842    }
843
844    #[tokio::test]
845    async fn recreates_delegate_on_new_leadership_epoch() {
846        let elector = Arc::new(FakeLeaderElector::new(None));
847        let create_consumer_calls = Arc::new(AtomicUsize::new(0));
848        let start_calls = Arc::new(AtomicUsize::new(0));
849        let mut master = build_master_consumer(
850            elector.clone(),
851            Arc::clone(&create_consumer_calls),
852            Arc::clone(&start_calls),
853        );
854
855        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
856        let cancel = CancellationToken::new();
857        let ctx = ConsumerContext::new(tx, cancel.clone());
858
859        master.start(ctx).await.unwrap();
860
861        elector.emit(LeadershipEvent::StartedLeading).await;
862        let first = timeout(Duration::from_millis(500), rx.recv())
863            .await
864            .unwrap()
865            .unwrap();
866        assert_eq!(first.exchange.input.body.as_text(), Some("epoch-1"));
867
868        elector.emit(LeadershipEvent::StoppedLeading).await;
869        sleep(Duration::from_millis(120)).await;
870
871        elector.emit(LeadershipEvent::StartedLeading).await;
872        let second = timeout(Duration::from_millis(500), rx.recv())
873            .await
874            .unwrap()
875            .unwrap();
876        assert_eq!(second.exchange.input.body.as_text(), Some("epoch-2"));
877
878        assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 2);
879        assert_eq!(start_calls.load(Ordering::SeqCst), 2);
880
881        cancel.cancel();
882        master.stop().await.unwrap();
883    }
884}