1pub mod bundle;
2mod config;
3
4pub use bundle::MasterBundle;
5
6use std::sync::Arc;
7use std::time::Duration;
8
9use async_trait::async_trait;
10use camel_api::{CamelError, LeaderElector, MetricsCollector};
11use camel_component_api::{
12 BoxProcessor, Component, ComponentContext, Consumer, ConsumerContext, Endpoint,
13 ExchangeEnvelope, ProducerContext, parse_uri,
14};
15use camel_language_api::Language;
16use tokio::task::JoinHandle;
17use tokio::time::timeout;
18use tokio_util::sync::CancellationToken;
19use tracing::{info, warn};
20
21use crate::config::{MasterComponentConfig, MasterUriConfig};
22
23pub struct MasterComponent {
24 drain_timeout_ms: u64,
25}
26
27impl MasterComponent {
28 pub fn new(config: MasterComponentConfig) -> Self {
29 Self {
30 drain_timeout_ms: config.drain_timeout_ms,
31 }
32 }
33}
34
35impl Default for MasterComponent {
36 fn default() -> Self {
37 Self::new(MasterComponentConfig::default())
38 }
39}
40
41impl Component for MasterComponent {
42 fn scheme(&self) -> &str {
43 "master"
44 }
45
46 fn create_endpoint(
47 &self,
48 uri: &str,
49 ctx: &dyn ComponentContext,
50 ) -> Result<Box<dyn Endpoint>, CamelError> {
51 let parsed = MasterUriConfig::parse(uri)?;
52 let delegate_parts = parse_uri(&parsed.delegate_uri)?;
53 let delegate_scheme = delegate_parts.scheme;
54 let delegate_component = ctx
55 .resolve_component(&delegate_scheme)
56 .ok_or_else(|| CamelError::ComponentNotFound(delegate_scheme.clone()))?;
57
58 Ok(Box::new(MasterEndpoint {
59 uri: uri.to_string(),
60 lock_name: parsed.lock_name,
61 delegate_uri: parsed.delegate_uri,
62 delegate_component,
63 metrics: ctx.metrics(),
64 leader_elector: ctx.leader_elector(),
65 drain_timeout: Duration::from_millis(self.drain_timeout_ms),
66 }))
67 }
68}
69
70struct MasterEndpoint {
71 uri: String,
72 lock_name: String,
73 delegate_uri: String,
74 delegate_component: Arc<dyn Component>,
75 metrics: Arc<dyn MetricsCollector>,
76 leader_elector: Arc<dyn LeaderElector>,
77 drain_timeout: Duration,
78}
79
80impl Endpoint for MasterEndpoint {
81 fn uri(&self) -> &str {
82 &self.uri
83 }
84
85 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
86 Ok(Box::new(MasterConsumer::new(
87 self.lock_name.clone(),
88 self.delegate_uri.clone(),
89 Arc::clone(&self.delegate_component),
90 Arc::clone(&self.metrics),
91 Arc::clone(&self.leader_elector),
92 self.drain_timeout,
93 )))
94 }
95
96 fn create_producer(&self, ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
97 let delegate_ctx = MasterDelegateContext {
98 delegate_component: Arc::clone(&self.delegate_component),
99 metrics: Arc::clone(&self.metrics),
100 leader_elector: Arc::clone(&self.leader_elector),
101 };
102
103 self.delegate_component
104 .create_endpoint(&self.delegate_uri, &delegate_ctx)?
105 .create_producer(ctx)
106 }
107}
108
109struct MasterDelegateContext {
110 delegate_component: Arc<dyn Component>,
111 metrics: Arc<dyn MetricsCollector>,
112 leader_elector: Arc<dyn LeaderElector>,
113}
114
115impl ComponentContext for MasterDelegateContext {
116 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
117 if self.delegate_component.scheme() == scheme {
118 Some(Arc::clone(&self.delegate_component))
119 } else {
120 None
121 }
122 }
123
124 fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
125 None
126 }
127
128 fn metrics(&self) -> Arc<dyn MetricsCollector> {
129 Arc::clone(&self.metrics)
130 }
131
132 fn leader_elector(&self) -> Arc<dyn LeaderElector> {
133 Arc::clone(&self.leader_elector)
134 }
135}
136
137struct MasterConsumer {
138 lock_name: String,
139 delegate_uri: String,
140 delegate_component: Arc<dyn Component>,
141 metrics: Arc<dyn MetricsCollector>,
142 leader_elector: Arc<dyn LeaderElector>,
143 drain_timeout: Duration,
144 leadership_handle: Option<camel_api::LeadershipHandle>,
145 leadership_task: Option<JoinHandle<()>>,
146 stop_token: Option<CancellationToken>,
147}
148
149impl MasterConsumer {
150 fn new(
151 lock_name: String,
152 delegate_uri: String,
153 delegate_component: Arc<dyn Component>,
154 metrics: Arc<dyn MetricsCollector>,
155 leader_elector: Arc<dyn LeaderElector>,
156 drain_timeout: Duration,
157 ) -> Self {
158 Self {
159 lock_name,
160 delegate_uri,
161 delegate_component,
162 metrics,
163 leader_elector,
164 drain_timeout,
165 leadership_handle: None,
166 leadership_task: None,
167 stop_token: None,
168 }
169 }
170}
171
172enum DelegateState {
173 Inactive,
174 Active {
175 run_token: CancellationToken,
176 handle: JoinHandle<()>,
177 },
178}
179
180async fn stop_delegate(state: &mut DelegateState, drain_timeout: Duration) {
181 if let DelegateState::Active {
182 run_token,
183 mut handle,
184 } = std::mem::replace(state, DelegateState::Inactive)
185 {
186 run_token.cancel();
187 match timeout(drain_timeout, &mut handle).await {
188 Ok(_) => {}
189 Err(_) => {
190 warn!("master delegate shutdown timed out, aborting");
191 handle.abort();
192 }
193 }
194 }
195}
196
197async fn reconcile_event(
198 event: camel_api::LeadershipEvent,
199 state: &mut DelegateState,
200 lock_name: &str,
201 delegate_component: &Arc<dyn Component>,
202 delegate_uri: &str,
203 sender: &tokio::sync::mpsc::Sender<ExchangeEnvelope>,
204 parent_cancel: &CancellationToken,
205 drain_timeout: Duration,
206 metrics: &Arc<dyn MetricsCollector>,
207 leader_elector: &Arc<dyn LeaderElector>,
208) {
209 match event {
210 camel_api::LeadershipEvent::StartedLeading => {
211 info!(lock = %lock_name, "master leadership acquired");
212 stop_delegate(state, drain_timeout).await;
213
214 let delegate_ctx = MasterDelegateContext {
215 delegate_component: Arc::clone(delegate_component),
216 metrics: Arc::clone(metrics),
217 leader_elector: Arc::clone(leader_elector),
218 };
219
220 let endpoint = match delegate_component.create_endpoint(delegate_uri, &delegate_ctx) {
221 Ok(endpoint) => endpoint,
222 Err(err) => {
223 warn!(lock = %lock_name, "failed to create delegate endpoint: {err}");
224 return;
225 }
226 };
227
228 let mut consumer = match endpoint.create_consumer() {
229 Ok(consumer) => consumer,
230 Err(err) => {
231 warn!(lock = %lock_name, "failed to create delegate consumer: {err}");
232 return;
233 }
234 };
235
236 let run_token = parent_cancel.child_token();
237 let delegate_ctx = ConsumerContext::new(sender.clone(), run_token.clone());
238 let handle = tokio::spawn(async move {
239 let _ = consumer.start(delegate_ctx).await;
240 let _ = consumer.stop().await;
241 });
242
243 *state = DelegateState::Active { run_token, handle };
244 }
245 camel_api::LeadershipEvent::StoppedLeading => {
246 info!(lock = %lock_name, "master leadership lost");
247 stop_delegate(state, drain_timeout).await;
248 }
249 }
250}
251
252#[async_trait]
253impl Consumer for MasterConsumer {
254 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
255 if self.leadership_task.is_some() {
256 return Ok(());
257 }
258
259 let handle = self
260 .leader_elector
261 .start(camel_api::PlatformIdentity::local(&self.lock_name))
262 .await
263 .map_err(|e| {
264 CamelError::EndpointCreationFailed(format!("failed to start leader election: {e}"))
265 })?;
266
267 let lock_name = self.lock_name.clone();
268 let delegate_uri = self.delegate_uri.clone();
269 let delegate_component = Arc::clone(&self.delegate_component);
270 let metrics = Arc::clone(&self.metrics);
271 let leader_elector = Arc::clone(&self.leader_elector);
272 let sender = context.sender();
273 let parent_cancel = context.cancel_token();
274 let drain_timeout = self.drain_timeout;
275 let mut events = handle.events.clone();
276
277 let stop_token = CancellationToken::new();
278 let stop_token_loop = stop_token.clone();
279
280 let task = tokio::spawn(async move {
281 let mut state = DelegateState::Inactive;
282
283 let initial_event = { events.borrow().clone() };
284 if let Some(initial_event) = initial_event {
285 reconcile_event(
286 initial_event,
287 &mut state,
288 &lock_name,
289 &delegate_component,
290 &delegate_uri,
291 &sender,
292 &parent_cancel,
293 drain_timeout,
294 &metrics,
295 &leader_elector,
296 )
297 .await;
298 }
299
300 loop {
301 tokio::select! {
302 _ = stop_token_loop.cancelled() => {
303 break;
304 }
305 _ = context.cancelled() => {
306 break;
307 }
308 changed = events.changed() => {
309 if changed.is_err() {
310 break;
311 }
312 let event = { events.borrow().clone() };
313 if let Some(event) = event {
314 reconcile_event(
315 event,
316 &mut state,
317 &lock_name,
318 &delegate_component,
319 &delegate_uri,
320 &sender,
321 &parent_cancel,
322 drain_timeout,
323 &metrics,
324 &leader_elector,
325 )
326 .await;
327 }
328 }
329 }
330 }
331
332 stop_delegate(&mut state, drain_timeout).await;
333 });
334
335 self.leadership_handle = Some(handle);
336 self.stop_token = Some(stop_token);
337 self.leadership_task = Some(task);
338
339 Ok(())
340 }
341
342 async fn stop(&mut self) -> Result<(), CamelError> {
343 if let Some(token) = self.stop_token.take() {
344 token.cancel();
345 }
346
347 if let Some(task) = self.leadership_task.take()
348 && timeout(self.drain_timeout, task).await.is_err()
349 {
350 warn!("master leadership loop shutdown timed out");
351 }
352
353 if let Some(handle) = self.leadership_handle.take() {
354 let _ = timeout(self.drain_timeout, handle.step_down()).await;
355 }
356
357 Ok(())
358 }
359}
360
361#[cfg(test)]
362mod tests {
363 use std::sync::Arc;
364 use std::sync::Mutex;
365 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
366
367 use camel_api::{
368 BoxProcessorExt, Exchange, LeadershipEvent, LeadershipHandle, Message, NoOpMetrics,
369 NoopLeaderElector, PlatformError, PlatformIdentity,
370 };
371 use camel_component_api::NoOpComponentContext;
372 use tokio::sync::{oneshot, watch};
373 use tokio::time::{sleep, timeout};
374 use tokio_util::sync::CancellationToken;
375 use tower::ServiceExt;
376
377 use super::*;
378
379 #[test]
380 fn parse_master_uri_valid() {
381 let cfg = MasterUriConfig::parse("master:mylock:timer:tick?period=250").unwrap();
382 assert_eq!(cfg.lock_name, "mylock");
383 assert_eq!(cfg.delegate_uri, "timer:tick?period=250");
384 }
385
386 #[test]
387 fn parse_master_uri_missing_lockname() {
388 let err = MasterUriConfig::parse("master::timer:tick").unwrap_err();
389 assert!(matches!(err, CamelError::InvalidUri(_)));
390 }
391
392 #[test]
393 fn parse_master_uri_missing_delegate() {
394 let err = MasterUriConfig::parse("master:mylock:").unwrap_err();
395 assert!(matches!(err, CamelError::InvalidUri(_)));
396 }
397
398 #[test]
399 fn endpoint_fails_when_delegate_component_missing() {
400 let master = MasterComponent::default();
401 let result =
402 master.create_endpoint("master:lock-1:missing:delegate", &NoOpComponentContext);
403 assert!(matches!(result, Err(CamelError::ComponentNotFound(_))));
404 }
405
406 #[test]
407 fn delegate_scheme_is_parsed_from_delegate_uri() {
408 let seen_scheme = Arc::new(AtomicBool::new(false));
409
410 struct SchemeAwareContext {
411 delegate: Arc<dyn Component>,
412 seen_scheme: Arc<AtomicBool>,
413 }
414
415 impl ComponentContext for SchemeAwareContext {
416 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
417 if scheme == "mock" {
418 self.seen_scheme.store(true, Ordering::SeqCst);
419 Some(Arc::clone(&self.delegate))
420 } else {
421 None
422 }
423 }
424
425 fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
426 None
427 }
428
429 fn metrics(&self) -> Arc<dyn MetricsCollector> {
430 Arc::new(NoOpMetrics)
431 }
432
433 fn leader_elector(&self) -> Arc<dyn LeaderElector> {
434 Arc::new(NoopLeaderElector)
435 }
436 }
437
438 struct MockDelegateComponent;
439
440 impl Component for MockDelegateComponent {
441 fn scheme(&self) -> &str {
442 "mock"
443 }
444
445 fn create_endpoint(
446 &self,
447 _uri: &str,
448 _ctx: &dyn ComponentContext,
449 ) -> Result<Box<dyn Endpoint>, CamelError> {
450 Ok(Box::new(MockDelegateEndpoint))
451 }
452 }
453
454 struct MockDelegateEndpoint;
455
456 impl Endpoint for MockDelegateEndpoint {
457 fn uri(&self) -> &str {
458 "mock:delegate"
459 }
460
461 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
462 Err(CamelError::EndpointCreationFailed("not used".to_string()))
463 }
464
465 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
466 Err(CamelError::EndpointCreationFailed("not used".to_string()))
467 }
468 }
469
470 let delegate = Arc::new(MockDelegateComponent);
471 let ctx = SchemeAwareContext {
472 delegate,
473 seen_scheme: Arc::clone(&seen_scheme),
474 };
475
476 let master = MasterComponent::default();
477 let endpoint = master
478 .create_endpoint("master:mylock:mock:delegate?x=1", &ctx)
479 .unwrap();
480
481 assert_eq!(endpoint.uri(), "master:mylock:mock:delegate?x=1");
482 assert!(seen_scheme.load(Ordering::SeqCst));
483 }
484
485 struct MockDelegateContext {
486 delegate: Arc<dyn Component>,
487 }
488
489 impl ComponentContext for MockDelegateContext {
490 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
491 if self.delegate.scheme() == scheme {
492 Some(Arc::clone(&self.delegate))
493 } else {
494 None
495 }
496 }
497
498 fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
499 None
500 }
501
502 fn metrics(&self) -> Arc<dyn MetricsCollector> {
503 Arc::new(NoOpMetrics)
504 }
505
506 fn leader_elector(&self) -> Arc<dyn LeaderElector> {
507 Arc::new(NoopLeaderElector)
508 }
509 }
510
511 struct MockProducerDelegateComponent {
512 create_endpoint_calls: Arc<AtomicUsize>,
513 create_producer_calls: Arc<AtomicUsize>,
514 fail_producer: bool,
515 }
516
517 impl Component for MockProducerDelegateComponent {
518 fn scheme(&self) -> &str {
519 "mock"
520 }
521
522 fn create_endpoint(
523 &self,
524 _uri: &str,
525 _ctx: &dyn ComponentContext,
526 ) -> Result<Box<dyn Endpoint>, CamelError> {
527 self.create_endpoint_calls.fetch_add(1, Ordering::SeqCst);
528 Ok(Box::new(MockProducerDelegateEndpoint {
529 create_producer_calls: Arc::clone(&self.create_producer_calls),
530 fail_producer: self.fail_producer,
531 }))
532 }
533 }
534
535 struct MockProducerDelegateEndpoint {
536 create_producer_calls: Arc<AtomicUsize>,
537 fail_producer: bool,
538 }
539
540 impl Endpoint for MockProducerDelegateEndpoint {
541 fn uri(&self) -> &str {
542 "mock:delegate"
543 }
544
545 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
546 Err(CamelError::EndpointCreationFailed(
547 "not used in test".to_string(),
548 ))
549 }
550
551 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
552 self.create_producer_calls.fetch_add(1, Ordering::SeqCst);
553 if self.fail_producer {
554 return Err(CamelError::ProcessorError(
555 "delegate producer failed".to_string(),
556 ));
557 }
558 Ok(BoxProcessor::from_fn(
559 |exchange| async move { Ok(exchange) },
560 ))
561 }
562 }
563
564 #[tokio::test]
565 async fn producer_passthrough_delegates_and_produces() {
566 let endpoint_calls = Arc::new(AtomicUsize::new(0));
567 let producer_calls = Arc::new(AtomicUsize::new(0));
568 let delegate = Arc::new(MockProducerDelegateComponent {
569 create_endpoint_calls: Arc::clone(&endpoint_calls),
570 create_producer_calls: Arc::clone(&producer_calls),
571 fail_producer: false,
572 });
573
574 let ctx = MockDelegateContext {
575 delegate: delegate.clone(),
576 };
577
578 let master = MasterComponent::default();
579 let endpoint = master
580 .create_endpoint("master:lock-1:mock:delegate", &ctx)
581 .unwrap();
582 let producer_ctx = ProducerContext::new();
583 let producer = endpoint.create_producer(&producer_ctx).unwrap();
584
585 let exchange = Exchange::new(Message::new("ok"));
586 let result = producer.oneshot(exchange).await.unwrap();
587
588 assert_eq!(result.input.body.as_text(), Some("ok"));
589 assert_eq!(endpoint_calls.load(Ordering::SeqCst), 1);
590 assert_eq!(producer_calls.load(Ordering::SeqCst), 1);
591 }
592
593 #[test]
594 fn producer_passthrough_bubbles_delegate_errors() {
595 let endpoint_calls = Arc::new(AtomicUsize::new(0));
596 let producer_calls = Arc::new(AtomicUsize::new(0));
597 let delegate = Arc::new(MockProducerDelegateComponent {
598 create_endpoint_calls: Arc::clone(&endpoint_calls),
599 create_producer_calls: Arc::clone(&producer_calls),
600 fail_producer: true,
601 });
602
603 let ctx = MockDelegateContext {
604 delegate: delegate.clone(),
605 };
606
607 let master = MasterComponent::default();
608 let endpoint = master
609 .create_endpoint("master:lock-1:mock:delegate", &ctx)
610 .unwrap();
611 let producer_ctx = ProducerContext::new();
612 let err = endpoint.create_producer(&producer_ctx).unwrap_err();
613
614 assert!(matches!(err, CamelError::ProcessorError(_)));
615 assert_eq!(endpoint_calls.load(Ordering::SeqCst), 1);
616 assert_eq!(producer_calls.load(Ordering::SeqCst), 1);
617 }
618
619 struct FakeLeaderElector {
620 tx: Mutex<Option<watch::Sender<Option<LeadershipEvent>>>>,
621 is_leader: Arc<AtomicBool>,
622 initial: Option<LeadershipEvent>,
623 }
624
625 impl FakeLeaderElector {
626 fn new(initial: Option<LeadershipEvent>) -> Self {
627 let starts_as_leader = matches!(initial, Some(LeadershipEvent::StartedLeading));
628 Self {
629 tx: Mutex::new(None),
630 is_leader: Arc::new(AtomicBool::new(starts_as_leader)),
631 initial,
632 }
633 }
634
635 async fn emit(&self, event: LeadershipEvent) {
636 self.is_leader.store(
637 matches!(event, LeadershipEvent::StartedLeading),
638 Ordering::Release,
639 );
640 if let Some(tx) = self
641 .tx
642 .lock()
643 .expect("mutex poisoned: fake elector sender")
644 .as_ref()
645 {
646 let _ = tx.send(Some(event));
647 }
648 }
649 }
650
651 #[async_trait]
652 impl LeaderElector for FakeLeaderElector {
653 async fn start(
654 &self,
655 _identity: PlatformIdentity,
656 ) -> Result<LeadershipHandle, PlatformError> {
657 let (tx, rx) = watch::channel(self.initial.clone());
658 *self.tx.lock().expect("mutex poisoned: fake elector sender") = Some(tx);
659
660 let cancel = CancellationToken::new();
661 let cancel_wait = cancel.clone();
662 let (term_tx, term_rx) = oneshot::channel();
663 tokio::spawn(async move {
664 cancel_wait.cancelled().await;
665 let _ = term_tx.send(());
666 });
667
668 Ok(LeadershipHandle::new(
669 rx,
670 Arc::clone(&self.is_leader),
671 cancel,
672 term_rx,
673 ))
674 }
675 }
676
677 struct FakeDelegateComponent {
678 create_consumer_calls: Arc<AtomicUsize>,
679 start_calls: Arc<AtomicUsize>,
680 }
681
682 impl Component for FakeDelegateComponent {
683 fn scheme(&self) -> &str {
684 "fake"
685 }
686
687 fn create_endpoint(
688 &self,
689 _uri: &str,
690 _ctx: &dyn ComponentContext,
691 ) -> Result<Box<dyn Endpoint>, CamelError> {
692 Ok(Box::new(FakeDelegateEndpoint {
693 create_consumer_calls: Arc::clone(&self.create_consumer_calls),
694 start_calls: Arc::clone(&self.start_calls),
695 }))
696 }
697 }
698
699 struct FakeDelegateEndpoint {
700 create_consumer_calls: Arc<AtomicUsize>,
701 start_calls: Arc<AtomicUsize>,
702 }
703
704 impl Endpoint for FakeDelegateEndpoint {
705 fn uri(&self) -> &str {
706 "fake:delegate"
707 }
708
709 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
710 let epoch = self.create_consumer_calls.fetch_add(1, Ordering::SeqCst) + 1;
711 Ok(Box::new(FakeDelegateConsumer {
712 epoch,
713 start_calls: Arc::clone(&self.start_calls),
714 }))
715 }
716
717 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
718 Err(CamelError::EndpointCreationFailed("not used".to_string()))
719 }
720 }
721
722 struct FakeDelegateConsumer {
723 epoch: usize,
724 start_calls: Arc<AtomicUsize>,
725 }
726
727 #[async_trait]
728 impl Consumer for FakeDelegateConsumer {
729 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
730 self.start_calls.fetch_add(1, Ordering::SeqCst);
731 context
732 .send(Exchange::new(Message::new(format!("epoch-{}", self.epoch))))
733 .await?;
734
735 loop {
736 tokio::select! {
737 _ = context.cancelled() => {
738 break;
739 }
740 _ = sleep(Duration::from_millis(20)) => {
741 context
742 .send(Exchange::new(Message::new(format!("epoch-{}", self.epoch))))
743 .await?;
744 }
745 }
746 }
747
748 Ok(())
749 }
750
751 async fn stop(&mut self) -> Result<(), CamelError> {
752 Ok(())
753 }
754 }
755
756 fn build_master_consumer(
757 elector: Arc<dyn LeaderElector>,
758 create_consumer_calls: Arc<AtomicUsize>,
759 start_calls: Arc<AtomicUsize>,
760 ) -> MasterConsumer {
761 MasterConsumer::new(
762 "lock-a".to_string(),
763 "fake:delegate".to_string(),
764 Arc::new(FakeDelegateComponent {
765 create_consumer_calls,
766 start_calls,
767 }),
768 Arc::new(NoOpMetrics),
769 elector,
770 Duration::from_millis(500),
771 )
772 }
773
774 #[tokio::test]
775 async fn starts_delegate_only_after_started_leading() {
776 let elector = Arc::new(FakeLeaderElector::new(None));
777 let create_consumer_calls = Arc::new(AtomicUsize::new(0));
778 let start_calls = Arc::new(AtomicUsize::new(0));
779 let mut master = build_master_consumer(
780 elector.clone(),
781 Arc::clone(&create_consumer_calls),
782 Arc::clone(&start_calls),
783 );
784
785 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
786 let cancel = CancellationToken::new();
787 let ctx = ConsumerContext::new(tx, cancel.clone());
788
789 master.start(ctx).await.unwrap();
790
791 sleep(Duration::from_millis(80)).await;
792 assert!(rx.try_recv().is_err());
793 assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 0);
794
795 elector.emit(LeadershipEvent::StartedLeading).await;
796
797 let first = timeout(Duration::from_millis(500), rx.recv())
798 .await
799 .unwrap()
800 .unwrap();
801 assert_eq!(first.exchange.input.body.as_text(), Some("epoch-1"));
802 assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 1);
803 assert_eq!(start_calls.load(Ordering::SeqCst), 1);
804
805 cancel.cancel();
806 master.stop().await.unwrap();
807 }
808
809 #[tokio::test]
810 async fn stops_delegate_on_stopped_leading() {
811 let elector = Arc::new(FakeLeaderElector::new(None));
812 let create_consumer_calls = Arc::new(AtomicUsize::new(0));
813 let start_calls = Arc::new(AtomicUsize::new(0));
814 let mut master = build_master_consumer(
815 elector.clone(),
816 Arc::clone(&create_consumer_calls),
817 Arc::clone(&start_calls),
818 );
819
820 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
821 let cancel = CancellationToken::new();
822 let ctx = ConsumerContext::new(tx, cancel.clone());
823
824 master.start(ctx).await.unwrap();
825 elector.emit(LeadershipEvent::StartedLeading).await;
826 let _ = timeout(Duration::from_millis(500), rx.recv())
827 .await
828 .unwrap()
829 .unwrap();
830
831 elector.emit(LeadershipEvent::StoppedLeading).await;
832 sleep(Duration::from_millis(100)).await;
833 while rx.try_recv().is_ok() {}
834 assert!(
835 timeout(Duration::from_millis(120), rx.recv())
836 .await
837 .is_err()
838 );
839
840 cancel.cancel();
841 master.stop().await.unwrap();
842 }
843
844 #[tokio::test]
845 async fn recreates_delegate_on_new_leadership_epoch() {
846 let elector = Arc::new(FakeLeaderElector::new(None));
847 let create_consumer_calls = Arc::new(AtomicUsize::new(0));
848 let start_calls = Arc::new(AtomicUsize::new(0));
849 let mut master = build_master_consumer(
850 elector.clone(),
851 Arc::clone(&create_consumer_calls),
852 Arc::clone(&start_calls),
853 );
854
855 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
856 let cancel = CancellationToken::new();
857 let ctx = ConsumerContext::new(tx, cancel.clone());
858
859 master.start(ctx).await.unwrap();
860
861 elector.emit(LeadershipEvent::StartedLeading).await;
862 let first = timeout(Duration::from_millis(500), rx.recv())
863 .await
864 .unwrap()
865 .unwrap();
866 assert_eq!(first.exchange.input.body.as_text(), Some("epoch-1"));
867
868 elector.emit(LeadershipEvent::StoppedLeading).await;
869 sleep(Duration::from_millis(120)).await;
870
871 elector.emit(LeadershipEvent::StartedLeading).await;
872 let second = timeout(Duration::from_millis(500), rx.recv())
873 .await
874 .unwrap()
875 .unwrap();
876 assert_eq!(second.exchange.input.body.as_text(), Some("epoch-2"));
877
878 assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 2);
879 assert_eq!(start_calls.load(Ordering::SeqCst), 2);
880
881 cancel.cancel();
882 master.stop().await.unwrap();
883 }
884}