1pub mod bundle;
7mod config;
8
9pub use bundle::MasterBundle;
10
11use std::sync::Arc;
12use std::time::Duration;
13
14use async_trait::async_trait;
15use camel_api::{CamelError, MetricsCollector, PlatformService};
16use camel_component_api::{
17 BoxProcessor, Component, ComponentContext, Consumer, ConsumerContext, Endpoint,
18 ExchangeEnvelope, ProducerContext, parse_uri,
19};
20use camel_language_api::Language;
21use tokio::task::JoinHandle;
22use tokio::time::{interval, timeout};
23use tokio_util::sync::CancellationToken;
24use tracing::{error, info, warn};
25
26use crate::config::{MasterComponentConfig, MasterUriConfig};
27
28const DELEGATE_RETRY_INTERVAL: Duration = Duration::from_millis(200);
29
30pub struct MasterComponent {
31 drain_timeout_ms: u64,
32 delegate_retry_max_attempts: Option<u32>,
33}
34
35impl MasterComponent {
36 pub fn new(config: MasterComponentConfig) -> Self {
37 Self {
38 drain_timeout_ms: config.drain_timeout_ms,
39 delegate_retry_max_attempts: config.delegate_retry_max_attempts,
40 }
41 }
42}
43
44impl Default for MasterComponent {
45 fn default() -> Self {
46 Self::new(MasterComponentConfig::default())
47 }
48}
49
50impl Component for MasterComponent {
51 fn scheme(&self) -> &str {
52 "master"
53 }
54
55 fn create_endpoint(
56 &self,
57 uri: &str,
58 ctx: &dyn ComponentContext,
59 ) -> Result<Box<dyn Endpoint>, CamelError> {
60 let parsed = MasterUriConfig::parse(uri)?;
61 let delegate_parts = parse_uri(&parsed.delegate_uri)?;
62 let delegate_scheme = delegate_parts.scheme;
63 let delegate_component = ctx
64 .resolve_component(&delegate_scheme)
65 .ok_or_else(|| CamelError::ComponentNotFound(delegate_scheme.clone()))?;
66
67 Ok(Box::new(MasterEndpoint {
68 uri: uri.to_string(),
69 lock_name: parsed.lock_name,
70 delegate_uri: parsed.delegate_uri,
71 delegate_component,
72 metrics: ctx.metrics(),
73 platform_service: ctx.platform_service(),
74 drain_timeout: Duration::from_millis(self.drain_timeout_ms),
75 delegate_retry_max_attempts: self.delegate_retry_max_attempts,
76 }))
77 }
78}
79
80struct MasterEndpoint {
81 uri: String,
82 lock_name: String,
83 delegate_uri: String,
84 delegate_component: Arc<dyn Component>,
85 metrics: Arc<dyn MetricsCollector>,
86 platform_service: Arc<dyn PlatformService>,
87 drain_timeout: Duration,
88 delegate_retry_max_attempts: Option<u32>,
89}
90
91impl Endpoint for MasterEndpoint {
92 fn uri(&self) -> &str {
93 &self.uri
94 }
95
96 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
97 Ok(Box::new(MasterConsumer::new(
98 self.lock_name.clone(),
99 self.delegate_uri.clone(),
100 Arc::clone(&self.delegate_component),
101 Arc::clone(&self.metrics),
102 Arc::clone(&self.platform_service),
103 self.drain_timeout,
104 self.delegate_retry_max_attempts,
105 )))
106 }
107
108 fn create_producer(&self, ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
109 let delegate_ctx = MasterDelegateContext {
110 delegate_component: Arc::clone(&self.delegate_component),
111 metrics: Arc::clone(&self.metrics),
112 platform_service: Arc::clone(&self.platform_service),
113 };
114
115 self.delegate_component
116 .create_endpoint(&self.delegate_uri, &delegate_ctx)?
117 .create_producer(ctx)
118 }
119}
120
121struct MasterDelegateContext {
122 delegate_component: Arc<dyn Component>,
123 metrics: Arc<dyn MetricsCollector>,
124 platform_service: Arc<dyn PlatformService>,
125}
126
127impl ComponentContext for MasterDelegateContext {
128 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
129 if self.delegate_component.scheme() == scheme {
130 Some(Arc::clone(&self.delegate_component))
131 } else {
132 None
133 }
134 }
135
136 fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
137 None
138 }
139
140 fn metrics(&self) -> Arc<dyn MetricsCollector> {
141 Arc::clone(&self.metrics)
142 }
143
144 fn platform_service(&self) -> Arc<dyn PlatformService> {
145 Arc::clone(&self.platform_service)
146 }
147}
148
149struct MasterConsumer {
150 lock_name: String,
151 delegate_uri: String,
152 delegate_component: Arc<dyn Component>,
153 metrics: Arc<dyn MetricsCollector>,
154 platform_service: Arc<dyn PlatformService>,
155 drain_timeout: Duration,
156 delegate_retry_max_attempts: Option<u32>,
157 leadership_task: Option<JoinHandle<()>>,
158 stop_token: Option<CancellationToken>,
159}
160
161impl MasterConsumer {
162 fn new(
163 lock_name: String,
164 delegate_uri: String,
165 delegate_component: Arc<dyn Component>,
166 metrics: Arc<dyn MetricsCollector>,
167 platform_service: Arc<dyn PlatformService>,
168 drain_timeout: Duration,
169 delegate_retry_max_attempts: Option<u32>,
170 ) -> Self {
171 Self {
172 lock_name,
173 delegate_uri,
174 delegate_component,
175 metrics,
176 platform_service,
177 drain_timeout,
178 delegate_retry_max_attempts,
179 leadership_task: None,
180 stop_token: None,
181 }
182 }
183}
184
185enum DelegateState {
186 Inactive,
187 Active {
188 run_token: CancellationToken,
189 handle: JoinHandle<()>,
190 },
191}
192
193async fn stop_delegate(state: &mut DelegateState, drain_timeout: Duration) {
194 if let DelegateState::Active {
195 run_token,
196 mut handle,
197 } = std::mem::replace(state, DelegateState::Inactive)
198 {
199 run_token.cancel();
200 match timeout(drain_timeout, &mut handle).await {
201 Ok(Ok(())) => {}
202 Ok(Err(e)) if e.is_panic() => {
203 error!(error = %e, "master delegate task panicked");
204 }
205 Ok(Err(e)) => {
206 warn!(error = %e, "master delegate task cancelled");
207 }
208 Err(_) => {
209 warn!("master delegate shutdown timed out, aborting");
210 handle.abort();
211 }
212 }
213 }
214}
215
216struct ReconcileContext<'a> {
217 lock_name: &'a str,
218 delegate_component: &'a Arc<dyn Component>,
219 delegate_uri: &'a str,
220 sender: &'a tokio::sync::mpsc::Sender<ExchangeEnvelope>,
221 parent_cancel: &'a CancellationToken,
222 drain_timeout: Duration,
223 metrics: &'a Arc<dyn MetricsCollector>,
224 platform_service: &'a Arc<dyn PlatformService>,
225}
226
227async fn reconcile_event(
228 event: camel_api::LeadershipEvent,
229 state: &mut DelegateState,
230 ctx: &ReconcileContext<'_>,
231) {
232 match event {
233 camel_api::LeadershipEvent::StartedLeading => {
234 info!(lock = %ctx.lock_name, "master leadership acquired");
235 stop_delegate(state, ctx.drain_timeout).await;
236
237 let delegate_ctx = MasterDelegateContext {
238 delegate_component: Arc::clone(ctx.delegate_component),
239 metrics: Arc::clone(ctx.metrics),
240 platform_service: Arc::clone(ctx.platform_service),
241 };
242
243 let endpoint = match ctx
244 .delegate_component
245 .create_endpoint(ctx.delegate_uri, &delegate_ctx)
246 {
247 Ok(endpoint) => endpoint,
248 Err(err) => {
249 warn!(lock = %ctx.lock_name, "failed to create delegate endpoint: {err}");
250 return;
251 }
252 };
253
254 let mut consumer = match endpoint.create_consumer() {
255 Ok(consumer) => consumer,
256 Err(err) => {
257 warn!(lock = %ctx.lock_name, "failed to create delegate consumer: {err}");
258 return;
259 }
260 };
261
262 let run_token = ctx.parent_cancel.child_token();
263 let delegate_ctx = ConsumerContext::new(ctx.sender.clone(), run_token.clone());
264 let handle = tokio::spawn(async move {
265 let _ = consumer.start(delegate_ctx).await;
266 let _ = consumer.stop().await;
267 });
268
269 *state = DelegateState::Active { run_token, handle };
270 }
271 camel_api::LeadershipEvent::StoppedLeading => {
272 info!(lock = %ctx.lock_name, "master leadership lost");
273 stop_delegate(state, ctx.drain_timeout).await;
274 }
275 }
276}
277
278#[async_trait]
279impl Consumer for MasterConsumer {
280 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
281 if self.leadership_task.is_some() {
282 return Ok(());
283 }
284
285 let handle = self
286 .platform_service
287 .leadership()
288 .start(&self.lock_name)
289 .await
290 .map_err(|e| {
291 CamelError::EndpointCreationFailed(format!("failed to start leader election: {e}"))
292 })?;
293
294 let lock_name = self.lock_name.clone();
295 let delegate_uri = self.delegate_uri.clone();
296 let delegate_component = Arc::clone(&self.delegate_component);
297 let metrics = Arc::clone(&self.metrics);
298 let platform_service = Arc::clone(&self.platform_service);
299 let sender = context.sender();
300 let parent_cancel = context.cancel_token();
301 let drain_timeout = self.drain_timeout;
302 let delegate_retry_max_attempts = self.delegate_retry_max_attempts;
303 let mut events = handle.events.clone();
304
305 let stop_token = CancellationToken::new();
306 let stop_token_loop = stop_token.clone();
307 let leadership_handle = handle;
308
309 let task = tokio::spawn(async move {
310 let mut state = DelegateState::Inactive;
311 let mut is_leading = false;
312 let mut delegate_attempts = 0u32;
313 let mut retry_tick = interval(DELEGATE_RETRY_INTERVAL);
314
315 let rctx = ReconcileContext {
316 lock_name: &lock_name,
317 delegate_component: &delegate_component,
318 delegate_uri: &delegate_uri,
319 sender: &sender,
320 parent_cancel: &parent_cancel,
321 drain_timeout,
322 metrics: &metrics,
323 platform_service: &platform_service,
324 };
325
326 let initial_event = { events.borrow().clone() };
327 if let Some(initial_event) = initial_event {
328 is_leading = matches!(&initial_event, camel_api::LeadershipEvent::StartedLeading);
329 if is_leading {
330 delegate_attempts = 0;
331 }
332 reconcile_event(initial_event, &mut state, &rctx).await;
333 }
334
335 loop {
336 tokio::select! {
337 _ = stop_token_loop.cancelled() => {
338 break;
339 }
340 _ = context.cancelled() => {
341 break;
342 }
343 changed = events.changed() => {
344 if changed.is_err() {
345 break;
346 }
347 let event = { events.borrow().clone() };
348 if let Some(event) = event {
349 let was_leading = is_leading;
350 is_leading = matches!(&event, camel_api::LeadershipEvent::StartedLeading);
351 if !was_leading && is_leading {
352 delegate_attempts = 0;
353 }
354 reconcile_event(event, &mut state, &rctx).await;
355 }
356 }
357 _ = retry_tick.tick() => {
358 if is_leading && matches!(state, DelegateState::Inactive) {
359 if let Some(max) = delegate_retry_max_attempts {
360 delegate_attempts = delegate_attempts.saturating_add(1);
361 if delegate_attempts > max {
362 warn!(
363 lock = %lock_name,
364 attempts = max,
365 "delegate start exceeded max attempts, stopping consumer"
366 );
367 break;
368 }
369 }
370 reconcile_event(
371 camel_api::LeadershipEvent::StartedLeading,
372 &mut state,
373 &rctx,
374 )
375 .await;
376 }
377 }
378 }
379 }
380
381 stop_delegate(&mut state, drain_timeout).await;
382 let _ = timeout(drain_timeout, leadership_handle.step_down()).await;
383 });
384
385 self.stop_token = Some(stop_token);
386 self.leadership_task = Some(task);
387
388 Ok(())
389 }
390
391 async fn stop(&mut self) -> Result<(), CamelError> {
392 if let Some(token) = self.stop_token.take() {
393 token.cancel();
394 }
395
396 if let Some(task) = self.leadership_task.take() {
397 match timeout(self.drain_timeout, task).await {
398 Ok(Ok(())) => {}
399 Ok(Err(e)) if e.is_panic() => {
400 error!(lock = %self.lock_name, error = %e, "leadership task panicked");
401 }
402 Ok(Err(e)) => {
403 warn!(lock = %self.lock_name, error = %e, "leadership task cancelled");
404 }
405 Err(_) => {
406 warn!("master leadership loop shutdown timed out");
407 }
408 }
409 }
410
411 Ok(())
412 }
413}
414
415#[cfg(test)]
416mod tests {
417 use std::sync::Arc;
418 use std::sync::Mutex;
419 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
420
421 use camel_api::{
422 BoxProcessorExt, Exchange, LeadershipEvent, LeadershipHandle, LeadershipService, Message,
423 NoOpMetrics, NoopPlatformService, NoopReadinessGate, PlatformError, PlatformIdentity,
424 PlatformService, ReadinessGate,
425 };
426 use camel_component_api::NoOpComponentContext;
427 use tokio::sync::{oneshot, watch};
428 use tokio::time::{sleep, timeout};
429 use tokio_util::sync::CancellationToken;
430 use tower::ServiceExt;
431
432 use super::*;
433
434 #[test]
435 fn parse_master_uri_valid() {
436 let cfg = MasterUriConfig::parse("master:mylock:timer:tick?period=250").unwrap();
437 assert_eq!(cfg.lock_name, "mylock");
438 assert_eq!(cfg.delegate_uri, "timer:tick?period=250");
439 }
440
441 #[test]
442 fn parse_master_uri_missing_lockname() {
443 let err = MasterUriConfig::parse("master::timer:tick").unwrap_err();
444 assert!(matches!(err, CamelError::InvalidUri(_)));
445 }
446
447 #[test]
448 fn parse_master_uri_missing_delegate() {
449 let err = MasterUriConfig::parse("master:mylock:").unwrap_err();
450 assert!(matches!(err, CamelError::InvalidUri(_)));
451 }
452
453 #[test]
454 fn endpoint_fails_when_delegate_component_missing() {
455 let master = MasterComponent::default();
456 let result =
457 master.create_endpoint("master:lock-1:missing:delegate", &NoOpComponentContext);
458 assert!(matches!(result, Err(CamelError::ComponentNotFound(_))));
459 }
460
461 #[test]
462 fn delegate_scheme_is_parsed_from_delegate_uri() {
463 let seen_scheme = Arc::new(AtomicBool::new(false));
464
465 struct SchemeAwareContext {
466 delegate: Arc<dyn Component>,
467 seen_scheme: Arc<AtomicBool>,
468 }
469
470 impl ComponentContext for SchemeAwareContext {
471 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
472 if scheme == "mock" {
473 self.seen_scheme.store(true, Ordering::SeqCst);
474 Some(Arc::clone(&self.delegate))
475 } else {
476 None
477 }
478 }
479
480 fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
481 None
482 }
483
484 fn metrics(&self) -> Arc<dyn MetricsCollector> {
485 Arc::new(NoOpMetrics)
486 }
487
488 fn platform_service(&self) -> Arc<dyn PlatformService> {
489 Arc::new(NoopPlatformService::default())
490 }
491 }
492
493 struct MockDelegateComponent;
494
495 impl Component for MockDelegateComponent {
496 fn scheme(&self) -> &str {
497 "mock"
498 }
499
500 fn create_endpoint(
501 &self,
502 _uri: &str,
503 _ctx: &dyn ComponentContext,
504 ) -> Result<Box<dyn Endpoint>, CamelError> {
505 Ok(Box::new(MockDelegateEndpoint))
506 }
507 }
508
509 struct MockDelegateEndpoint;
510
511 impl Endpoint for MockDelegateEndpoint {
512 fn uri(&self) -> &str {
513 "mock:delegate"
514 }
515
516 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
517 Err(CamelError::EndpointCreationFailed("not used".to_string()))
518 }
519
520 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
521 Err(CamelError::EndpointCreationFailed("not used".to_string()))
522 }
523 }
524
525 let delegate = Arc::new(MockDelegateComponent);
526 let ctx = SchemeAwareContext {
527 delegate,
528 seen_scheme: Arc::clone(&seen_scheme),
529 };
530
531 let master = MasterComponent::default();
532 let endpoint = master
533 .create_endpoint("master:mylock:mock:delegate?x=1", &ctx)
534 .unwrap();
535
536 assert_eq!(endpoint.uri(), "master:mylock:mock:delegate?x=1");
537 assert!(seen_scheme.load(Ordering::SeqCst));
538 }
539
540 struct MockDelegateContext {
541 delegate: Arc<dyn Component>,
542 }
543
544 impl ComponentContext for MockDelegateContext {
545 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
546 if self.delegate.scheme() == scheme {
547 Some(Arc::clone(&self.delegate))
548 } else {
549 None
550 }
551 }
552
553 fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
554 None
555 }
556
557 fn metrics(&self) -> Arc<dyn MetricsCollector> {
558 Arc::new(NoOpMetrics)
559 }
560
561 fn platform_service(&self) -> Arc<dyn PlatformService> {
562 Arc::new(NoopPlatformService::default())
563 }
564 }
565
566 struct MockProducerDelegateComponent {
567 create_endpoint_calls: Arc<AtomicUsize>,
568 create_producer_calls: Arc<AtomicUsize>,
569 fail_producer: bool,
570 }
571
572 impl Component for MockProducerDelegateComponent {
573 fn scheme(&self) -> &str {
574 "mock"
575 }
576
577 fn create_endpoint(
578 &self,
579 _uri: &str,
580 _ctx: &dyn ComponentContext,
581 ) -> Result<Box<dyn Endpoint>, CamelError> {
582 self.create_endpoint_calls.fetch_add(1, Ordering::SeqCst);
583 Ok(Box::new(MockProducerDelegateEndpoint {
584 create_producer_calls: Arc::clone(&self.create_producer_calls),
585 fail_producer: self.fail_producer,
586 }))
587 }
588 }
589
590 struct MockProducerDelegateEndpoint {
591 create_producer_calls: Arc<AtomicUsize>,
592 fail_producer: bool,
593 }
594
595 impl Endpoint for MockProducerDelegateEndpoint {
596 fn uri(&self) -> &str {
597 "mock:delegate"
598 }
599
600 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
601 Err(CamelError::EndpointCreationFailed(
602 "not used in test".to_string(),
603 ))
604 }
605
606 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
607 self.create_producer_calls.fetch_add(1, Ordering::SeqCst);
608 if self.fail_producer {
609 return Err(CamelError::ProcessorError(
610 "delegate producer failed".to_string(),
611 ));
612 }
613 Ok(BoxProcessor::from_fn(
614 |exchange| async move { Ok(exchange) },
615 ))
616 }
617 }
618
619 #[tokio::test]
620 async fn producer_passthrough_delegates_and_produces() {
621 let endpoint_calls = Arc::new(AtomicUsize::new(0));
622 let producer_calls = Arc::new(AtomicUsize::new(0));
623 let delegate = Arc::new(MockProducerDelegateComponent {
624 create_endpoint_calls: Arc::clone(&endpoint_calls),
625 create_producer_calls: Arc::clone(&producer_calls),
626 fail_producer: false,
627 });
628
629 let ctx = MockDelegateContext {
630 delegate: delegate.clone(),
631 };
632
633 let master = MasterComponent::default();
634 let endpoint = master
635 .create_endpoint("master:lock-1:mock:delegate", &ctx)
636 .unwrap();
637 let producer_ctx = ProducerContext::new();
638 let producer = endpoint.create_producer(&producer_ctx).unwrap();
639
640 let exchange = Exchange::new(Message::new("ok"));
641 let result = producer.oneshot(exchange).await.unwrap();
642
643 assert_eq!(result.input.body.as_text(), Some("ok"));
644 assert_eq!(endpoint_calls.load(Ordering::SeqCst), 1);
645 assert_eq!(producer_calls.load(Ordering::SeqCst), 1);
646 }
647
648 #[test]
649 fn producer_passthrough_bubbles_delegate_errors() {
650 let endpoint_calls = Arc::new(AtomicUsize::new(0));
651 let producer_calls = Arc::new(AtomicUsize::new(0));
652 let delegate = Arc::new(MockProducerDelegateComponent {
653 create_endpoint_calls: Arc::clone(&endpoint_calls),
654 create_producer_calls: Arc::clone(&producer_calls),
655 fail_producer: true,
656 });
657
658 let ctx = MockDelegateContext {
659 delegate: delegate.clone(),
660 };
661
662 let master = MasterComponent::default();
663 let endpoint = master
664 .create_endpoint("master:lock-1:mock:delegate", &ctx)
665 .unwrap();
666 let producer_ctx = ProducerContext::new();
667 let err = endpoint.create_producer(&producer_ctx).unwrap_err();
668
669 assert!(matches!(err, CamelError::ProcessorError(_)));
670 assert_eq!(endpoint_calls.load(Ordering::SeqCst), 1);
671 assert_eq!(producer_calls.load(Ordering::SeqCst), 1);
672 }
673
674 struct FakeLeadershipService {
675 tx: Mutex<Option<watch::Sender<Option<LeadershipEvent>>>>,
676 is_leader: Arc<AtomicBool>,
677 initial: Option<LeadershipEvent>,
678 }
679
680 impl FakeLeadershipService {
681 fn new(initial: Option<LeadershipEvent>) -> Self {
682 let starts_as_leader = matches!(initial, Some(LeadershipEvent::StartedLeading));
683 Self {
684 tx: Mutex::new(None),
685 is_leader: Arc::new(AtomicBool::new(starts_as_leader)),
686 initial,
687 }
688 }
689
690 async fn emit(&self, event: LeadershipEvent) {
691 self.is_leader.store(
692 matches!(event, LeadershipEvent::StartedLeading),
693 Ordering::Release,
694 );
695 if let Some(tx) = self
696 .tx
697 .lock()
698 .expect("mutex poisoned: fake elector sender")
699 .as_ref()
700 {
701 let _ = tx.send(Some(event));
702 }
703 }
704 }
705
706 #[async_trait]
707 impl LeadershipService for FakeLeadershipService {
708 async fn start(&self, _lock_name: &str) -> Result<LeadershipHandle, PlatformError> {
709 let (tx, rx) = watch::channel(self.initial.clone());
710 *self.tx.lock().expect("mutex poisoned: fake elector sender") = Some(tx);
711
712 let cancel = CancellationToken::new();
713 let cancel_wait = cancel.clone();
714 let (term_tx, term_rx) = oneshot::channel();
715 tokio::spawn(async move {
716 cancel_wait.cancelled().await;
717 let _ = term_tx.send(());
718 });
719
720 Ok(LeadershipHandle::new(
721 rx,
722 Arc::clone(&self.is_leader),
723 cancel,
724 term_rx,
725 ))
726 }
727 }
728
729 struct FakePlatformService {
730 identity: PlatformIdentity,
731 readiness_gate: Arc<dyn ReadinessGate>,
732 leadership: Arc<dyn LeadershipService>,
733 }
734
735 impl FakePlatformService {
736 fn new(leadership: Arc<dyn LeadershipService>) -> Self {
737 Self {
738 identity: PlatformIdentity::local("master-tests"),
739 readiness_gate: Arc::new(NoopReadinessGate),
740 leadership,
741 }
742 }
743 }
744
745 impl PlatformService for FakePlatformService {
746 fn identity(&self) -> PlatformIdentity {
747 self.identity.clone()
748 }
749
750 fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
751 Arc::clone(&self.readiness_gate)
752 }
753
754 fn leadership(&self) -> Arc<dyn LeadershipService> {
755 Arc::clone(&self.leadership)
756 }
757 }
758
759 struct FakeDelegateComponent {
760 create_consumer_calls: Arc<AtomicUsize>,
761 start_calls: Arc<AtomicUsize>,
762 }
763
764 impl Component for FakeDelegateComponent {
765 fn scheme(&self) -> &str {
766 "fake"
767 }
768
769 fn create_endpoint(
770 &self,
771 _uri: &str,
772 _ctx: &dyn ComponentContext,
773 ) -> Result<Box<dyn Endpoint>, CamelError> {
774 Ok(Box::new(FakeDelegateEndpoint {
775 create_consumer_calls: Arc::clone(&self.create_consumer_calls),
776 start_calls: Arc::clone(&self.start_calls),
777 }))
778 }
779 }
780
781 struct FakeDelegateEndpoint {
782 create_consumer_calls: Arc<AtomicUsize>,
783 start_calls: Arc<AtomicUsize>,
784 }
785
786 impl Endpoint for FakeDelegateEndpoint {
787 fn uri(&self) -> &str {
788 "fake:delegate"
789 }
790
791 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
792 let epoch = self.create_consumer_calls.fetch_add(1, Ordering::SeqCst) + 1;
793 Ok(Box::new(FakeDelegateConsumer {
794 epoch,
795 start_calls: Arc::clone(&self.start_calls),
796 }))
797 }
798
799 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
800 Err(CamelError::EndpointCreationFailed("not used".to_string()))
801 }
802 }
803
804 struct FakeDelegateConsumer {
805 epoch: usize,
806 start_calls: Arc<AtomicUsize>,
807 }
808
809 struct FailingDelegateComponent {
810 create_endpoint_calls: Arc<AtomicUsize>,
811 }
812
813 impl Component for FailingDelegateComponent {
814 fn scheme(&self) -> &str {
815 "failing"
816 }
817
818 fn create_endpoint(
819 &self,
820 _uri: &str,
821 _ctx: &dyn ComponentContext,
822 ) -> Result<Box<dyn Endpoint>, CamelError> {
823 self.create_endpoint_calls.fetch_add(1, Ordering::SeqCst);
824 Err(CamelError::EndpointCreationFailed(
825 "delegate endpoint creation failed".to_string(),
826 ))
827 }
828 }
829
830 #[async_trait]
831 impl Consumer for FakeDelegateConsumer {
832 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
833 self.start_calls.fetch_add(1, Ordering::SeqCst);
834 context
835 .send(Exchange::new(Message::new(format!("epoch-{}", self.epoch))))
836 .await?;
837
838 loop {
839 tokio::select! {
840 _ = context.cancelled() => {
841 break;
842 }
843 _ = sleep(Duration::from_millis(20)) => {
844 context
845 .send(Exchange::new(Message::new(format!("epoch-{}", self.epoch))))
846 .await?;
847 }
848 }
849 }
850
851 Ok(())
852 }
853
854 async fn stop(&mut self) -> Result<(), CamelError> {
855 Ok(())
856 }
857 }
858
859 fn build_master_consumer(
860 platform_service: Arc<dyn PlatformService>,
861 create_consumer_calls: Arc<AtomicUsize>,
862 start_calls: Arc<AtomicUsize>,
863 delegate_retry_max_attempts: Option<u32>,
864 ) -> MasterConsumer {
865 MasterConsumer::new(
866 "lock-a".to_string(),
867 "fake:delegate".to_string(),
868 Arc::new(FakeDelegateComponent {
869 create_consumer_calls,
870 start_calls,
871 }),
872 Arc::new(NoOpMetrics),
873 platform_service,
874 Duration::from_millis(500),
875 delegate_retry_max_attempts,
876 )
877 }
878
879 #[tokio::test]
880 async fn starts_delegate_only_after_started_leading() {
881 let leadership = Arc::new(FakeLeadershipService::new(None));
882 let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
883 let create_consumer_calls = Arc::new(AtomicUsize::new(0));
884 let start_calls = Arc::new(AtomicUsize::new(0));
885 let mut master = build_master_consumer(
886 platform_service,
887 Arc::clone(&create_consumer_calls),
888 Arc::clone(&start_calls),
889 Some(30),
890 );
891
892 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
893 let cancel = CancellationToken::new();
894 let ctx = ConsumerContext::new(tx, cancel.clone());
895
896 master.start(ctx).await.unwrap();
897
898 sleep(Duration::from_millis(80)).await;
899 assert!(rx.try_recv().is_err());
900 assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 0);
901
902 leadership.emit(LeadershipEvent::StartedLeading).await;
903
904 let first = timeout(Duration::from_millis(500), rx.recv())
905 .await
906 .unwrap()
907 .unwrap();
908 assert_eq!(first.exchange.input.body.as_text(), Some("epoch-1"));
909 assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 1);
910 assert_eq!(start_calls.load(Ordering::SeqCst), 1);
911
912 cancel.cancel();
913 master.stop().await.unwrap();
914 }
915
916 #[tokio::test]
917 async fn stops_delegate_on_stopped_leading() {
918 let leadership = Arc::new(FakeLeadershipService::new(None));
919 let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
920 let create_consumer_calls = Arc::new(AtomicUsize::new(0));
921 let start_calls = Arc::new(AtomicUsize::new(0));
922 let mut master = build_master_consumer(
923 platform_service,
924 Arc::clone(&create_consumer_calls),
925 Arc::clone(&start_calls),
926 Some(30),
927 );
928
929 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
930 let cancel = CancellationToken::new();
931 let ctx = ConsumerContext::new(tx, cancel.clone());
932
933 master.start(ctx).await.unwrap();
934 leadership.emit(LeadershipEvent::StartedLeading).await;
935 let _ = timeout(Duration::from_millis(500), rx.recv())
936 .await
937 .unwrap()
938 .unwrap();
939
940 leadership.emit(LeadershipEvent::StoppedLeading).await;
941 sleep(Duration::from_millis(100)).await;
942 while rx.try_recv().is_ok() {}
943 assert!(
944 timeout(Duration::from_millis(120), rx.recv())
945 .await
946 .is_err()
947 );
948
949 cancel.cancel();
950 master.stop().await.unwrap();
951 }
952
953 #[tokio::test]
954 async fn recreates_delegate_on_new_leadership_epoch() {
955 let leadership = Arc::new(FakeLeadershipService::new(None));
956 let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
957 let create_consumer_calls = Arc::new(AtomicUsize::new(0));
958 let start_calls = Arc::new(AtomicUsize::new(0));
959 let mut master = build_master_consumer(
960 platform_service,
961 Arc::clone(&create_consumer_calls),
962 Arc::clone(&start_calls),
963 Some(30),
964 );
965
966 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
967 let cancel = CancellationToken::new();
968 let ctx = ConsumerContext::new(tx, cancel.clone());
969
970 master.start(ctx).await.unwrap();
971
972 leadership.emit(LeadershipEvent::StartedLeading).await;
973 let first = timeout(Duration::from_millis(500), rx.recv())
974 .await
975 .unwrap()
976 .unwrap();
977 assert_eq!(first.exchange.input.body.as_text(), Some("epoch-1"));
978
979 leadership.emit(LeadershipEvent::StoppedLeading).await;
980 sleep(Duration::from_millis(120)).await;
981
982 leadership.emit(LeadershipEvent::StartedLeading).await;
983 let second = timeout(Duration::from_millis(500), rx.recv())
984 .await
985 .unwrap()
986 .unwrap();
987 assert_eq!(second.exchange.input.body.as_text(), Some("epoch-2"));
988
989 assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 2);
990 assert_eq!(start_calls.load(Ordering::SeqCst), 2);
991
992 cancel.cancel();
993 master.stop().await.unwrap();
994 }
995
996 #[tokio::test]
997 async fn stops_retrying_delegate_start_after_max_attempts() {
998 let leadership = Arc::new(FakeLeadershipService::new(Some(
999 LeadershipEvent::StartedLeading,
1000 )));
1001 let platform_service = Arc::new(FakePlatformService::new(leadership));
1002 let create_endpoint_calls = Arc::new(AtomicUsize::new(0));
1003
1004 let mut master = MasterConsumer::new(
1005 "lock-a".to_string(),
1006 "failing:delegate".to_string(),
1007 Arc::new(FailingDelegateComponent {
1008 create_endpoint_calls: Arc::clone(&create_endpoint_calls),
1009 }),
1010 Arc::new(NoOpMetrics),
1011 platform_service,
1012 Duration::from_millis(500),
1013 Some(1),
1014 );
1015
1016 let (tx, _rx) = tokio::sync::mpsc::channel(16);
1017 let cancel = CancellationToken::new();
1018 let ctx = ConsumerContext::new(tx, cancel.clone());
1019
1020 master.start(ctx).await.unwrap();
1021 sleep(Duration::from_millis(750)).await;
1022
1023 assert_eq!(create_endpoint_calls.load(Ordering::SeqCst), 2);
1024
1025 cancel.cancel();
1026 master.stop().await.unwrap();
1027 }
1028}