1pub mod bundle;
2mod config;
3
4pub use bundle::MasterBundle;
5
6use std::sync::Arc;
7use std::time::Duration;
8
9use async_trait::async_trait;
10use camel_api::{CamelError, MetricsCollector, PlatformService};
11use camel_component_api::{
12 BoxProcessor, Component, ComponentContext, Consumer, ConsumerContext, Endpoint,
13 ExchangeEnvelope, ProducerContext, parse_uri,
14};
15use camel_language_api::Language;
16use tokio::task::JoinHandle;
17use tokio::time::{interval, timeout};
18use tokio_util::sync::CancellationToken;
19use tracing::{info, warn};
20
21use crate::config::{MasterComponentConfig, MasterUriConfig};
22
23const DELEGATE_RETRY_INTERVAL: Duration = Duration::from_millis(200);
24
25pub struct MasterComponent {
26 drain_timeout_ms: u64,
27 delegate_retry_max_attempts: Option<u32>,
28}
29
30impl MasterComponent {
31 pub fn new(config: MasterComponentConfig) -> Self {
32 Self {
33 drain_timeout_ms: config.drain_timeout_ms,
34 delegate_retry_max_attempts: config.delegate_retry_max_attempts,
35 }
36 }
37}
38
39impl Default for MasterComponent {
40 fn default() -> Self {
41 Self::new(MasterComponentConfig::default())
42 }
43}
44
45impl Component for MasterComponent {
46 fn scheme(&self) -> &str {
47 "master"
48 }
49
50 fn create_endpoint(
51 &self,
52 uri: &str,
53 ctx: &dyn ComponentContext,
54 ) -> Result<Box<dyn Endpoint>, CamelError> {
55 let parsed = MasterUriConfig::parse(uri)?;
56 let delegate_parts = parse_uri(&parsed.delegate_uri)?;
57 let delegate_scheme = delegate_parts.scheme;
58 let delegate_component = ctx
59 .resolve_component(&delegate_scheme)
60 .ok_or_else(|| CamelError::ComponentNotFound(delegate_scheme.clone()))?;
61
62 Ok(Box::new(MasterEndpoint {
63 uri: uri.to_string(),
64 lock_name: parsed.lock_name,
65 delegate_uri: parsed.delegate_uri,
66 delegate_component,
67 metrics: ctx.metrics(),
68 platform_service: ctx.platform_service(),
69 drain_timeout: Duration::from_millis(self.drain_timeout_ms),
70 delegate_retry_max_attempts: self.delegate_retry_max_attempts,
71 }))
72 }
73}
74
75struct MasterEndpoint {
76 uri: String,
77 lock_name: String,
78 delegate_uri: String,
79 delegate_component: Arc<dyn Component>,
80 metrics: Arc<dyn MetricsCollector>,
81 platform_service: Arc<dyn PlatformService>,
82 drain_timeout: Duration,
83 delegate_retry_max_attempts: Option<u32>,
84}
85
86impl Endpoint for MasterEndpoint {
87 fn uri(&self) -> &str {
88 &self.uri
89 }
90
91 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
92 Ok(Box::new(MasterConsumer::new(
93 self.lock_name.clone(),
94 self.delegate_uri.clone(),
95 Arc::clone(&self.delegate_component),
96 Arc::clone(&self.metrics),
97 Arc::clone(&self.platform_service),
98 self.drain_timeout,
99 self.delegate_retry_max_attempts,
100 )))
101 }
102
103 fn create_producer(&self, ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
104 let delegate_ctx = MasterDelegateContext {
105 delegate_component: Arc::clone(&self.delegate_component),
106 metrics: Arc::clone(&self.metrics),
107 platform_service: Arc::clone(&self.platform_service),
108 };
109
110 self.delegate_component
111 .create_endpoint(&self.delegate_uri, &delegate_ctx)?
112 .create_producer(ctx)
113 }
114}
115
116struct MasterDelegateContext {
117 delegate_component: Arc<dyn Component>,
118 metrics: Arc<dyn MetricsCollector>,
119 platform_service: Arc<dyn PlatformService>,
120}
121
122impl ComponentContext for MasterDelegateContext {
123 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
124 if self.delegate_component.scheme() == scheme {
125 Some(Arc::clone(&self.delegate_component))
126 } else {
127 None
128 }
129 }
130
131 fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
132 None
133 }
134
135 fn metrics(&self) -> Arc<dyn MetricsCollector> {
136 Arc::clone(&self.metrics)
137 }
138
139 fn platform_service(&self) -> Arc<dyn PlatformService> {
140 Arc::clone(&self.platform_service)
141 }
142}
143
144struct MasterConsumer {
145 lock_name: String,
146 delegate_uri: String,
147 delegate_component: Arc<dyn Component>,
148 metrics: Arc<dyn MetricsCollector>,
149 platform_service: Arc<dyn PlatformService>,
150 drain_timeout: Duration,
151 delegate_retry_max_attempts: Option<u32>,
152 leadership_task: Option<JoinHandle<()>>,
153 stop_token: Option<CancellationToken>,
154}
155
156impl MasterConsumer {
157 fn new(
158 lock_name: String,
159 delegate_uri: String,
160 delegate_component: Arc<dyn Component>,
161 metrics: Arc<dyn MetricsCollector>,
162 platform_service: Arc<dyn PlatformService>,
163 drain_timeout: Duration,
164 delegate_retry_max_attempts: Option<u32>,
165 ) -> Self {
166 Self {
167 lock_name,
168 delegate_uri,
169 delegate_component,
170 metrics,
171 platform_service,
172 drain_timeout,
173 delegate_retry_max_attempts,
174 leadership_task: None,
175 stop_token: None,
176 }
177 }
178}
179
180enum DelegateState {
181 Inactive,
182 Active {
183 run_token: CancellationToken,
184 handle: JoinHandle<()>,
185 },
186}
187
188async fn stop_delegate(state: &mut DelegateState, drain_timeout: Duration) {
189 if let DelegateState::Active {
190 run_token,
191 mut handle,
192 } = std::mem::replace(state, DelegateState::Inactive)
193 {
194 run_token.cancel();
195 match timeout(drain_timeout, &mut handle).await {
196 Ok(_) => {}
197 Err(_) => {
198 warn!("master delegate shutdown timed out, aborting");
199 handle.abort();
200 }
201 }
202 }
203}
204
205struct ReconcileContext<'a> {
206 lock_name: &'a str,
207 delegate_component: &'a Arc<dyn Component>,
208 delegate_uri: &'a str,
209 sender: &'a tokio::sync::mpsc::Sender<ExchangeEnvelope>,
210 parent_cancel: &'a CancellationToken,
211 drain_timeout: Duration,
212 metrics: &'a Arc<dyn MetricsCollector>,
213 platform_service: &'a Arc<dyn PlatformService>,
214}
215
216async fn reconcile_event(
217 event: camel_api::LeadershipEvent,
218 state: &mut DelegateState,
219 ctx: &ReconcileContext<'_>,
220) {
221 match event {
222 camel_api::LeadershipEvent::StartedLeading => {
223 info!(lock = %ctx.lock_name, "master leadership acquired");
224 stop_delegate(state, ctx.drain_timeout).await;
225
226 let delegate_ctx = MasterDelegateContext {
227 delegate_component: Arc::clone(ctx.delegate_component),
228 metrics: Arc::clone(ctx.metrics),
229 platform_service: Arc::clone(ctx.platform_service),
230 };
231
232 let endpoint = match ctx
233 .delegate_component
234 .create_endpoint(ctx.delegate_uri, &delegate_ctx)
235 {
236 Ok(endpoint) => endpoint,
237 Err(err) => {
238 warn!(lock = %ctx.lock_name, "failed to create delegate endpoint: {err}");
239 return;
240 }
241 };
242
243 let mut consumer = match endpoint.create_consumer() {
244 Ok(consumer) => consumer,
245 Err(err) => {
246 warn!(lock = %ctx.lock_name, "failed to create delegate consumer: {err}");
247 return;
248 }
249 };
250
251 let run_token = ctx.parent_cancel.child_token();
252 let delegate_ctx = ConsumerContext::new(ctx.sender.clone(), run_token.clone());
253 let handle = tokio::spawn(async move {
254 let _ = consumer.start(delegate_ctx).await;
255 let _ = consumer.stop().await;
256 });
257
258 *state = DelegateState::Active { run_token, handle };
259 }
260 camel_api::LeadershipEvent::StoppedLeading => {
261 info!(lock = %ctx.lock_name, "master leadership lost");
262 stop_delegate(state, ctx.drain_timeout).await;
263 }
264 }
265}
266
267#[async_trait]
268impl Consumer for MasterConsumer {
269 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
270 if self.leadership_task.is_some() {
271 return Ok(());
272 }
273
274 let handle = self
275 .platform_service
276 .leadership()
277 .start(&self.lock_name)
278 .await
279 .map_err(|e| {
280 CamelError::EndpointCreationFailed(format!("failed to start leader election: {e}"))
281 })?;
282
283 let lock_name = self.lock_name.clone();
284 let delegate_uri = self.delegate_uri.clone();
285 let delegate_component = Arc::clone(&self.delegate_component);
286 let metrics = Arc::clone(&self.metrics);
287 let platform_service = Arc::clone(&self.platform_service);
288 let sender = context.sender();
289 let parent_cancel = context.cancel_token();
290 let drain_timeout = self.drain_timeout;
291 let delegate_retry_max_attempts = self.delegate_retry_max_attempts;
292 let mut events = handle.events.clone();
293
294 let stop_token = CancellationToken::new();
295 let stop_token_loop = stop_token.clone();
296 let leadership_handle = handle;
297
298 let task = tokio::spawn(async move {
299 let mut state = DelegateState::Inactive;
300 let mut is_leading = false;
301 let mut delegate_attempts = 0u32;
302 let mut retry_tick = interval(DELEGATE_RETRY_INTERVAL);
303
304 let rctx = ReconcileContext {
305 lock_name: &lock_name,
306 delegate_component: &delegate_component,
307 delegate_uri: &delegate_uri,
308 sender: &sender,
309 parent_cancel: &parent_cancel,
310 drain_timeout,
311 metrics: &metrics,
312 platform_service: &platform_service,
313 };
314
315 let initial_event = { events.borrow().clone() };
316 if let Some(initial_event) = initial_event {
317 is_leading = matches!(&initial_event, camel_api::LeadershipEvent::StartedLeading);
318 if is_leading {
319 delegate_attempts = 0;
320 }
321 reconcile_event(initial_event, &mut state, &rctx).await;
322 }
323
324 loop {
325 tokio::select! {
326 _ = stop_token_loop.cancelled() => {
327 break;
328 }
329 _ = context.cancelled() => {
330 break;
331 }
332 changed = events.changed() => {
333 if changed.is_err() {
334 break;
335 }
336 let event = { events.borrow().clone() };
337 if let Some(event) = event {
338 let was_leading = is_leading;
339 is_leading = matches!(&event, camel_api::LeadershipEvent::StartedLeading);
340 if !was_leading && is_leading {
341 delegate_attempts = 0;
342 }
343 reconcile_event(event, &mut state, &rctx).await;
344 }
345 }
346 _ = retry_tick.tick() => {
347 if is_leading && matches!(state, DelegateState::Inactive) {
348 if let Some(max) = delegate_retry_max_attempts {
349 delegate_attempts = delegate_attempts.saturating_add(1);
350 if delegate_attempts > max {
351 warn!(
352 lock = %lock_name,
353 attempts = max,
354 "delegate start exceeded max attempts, stopping consumer"
355 );
356 break;
357 }
358 }
359 reconcile_event(
360 camel_api::LeadershipEvent::StartedLeading,
361 &mut state,
362 &rctx,
363 )
364 .await;
365 }
366 }
367 }
368 }
369
370 stop_delegate(&mut state, drain_timeout).await;
371 let _ = timeout(drain_timeout, leadership_handle.step_down()).await;
372 });
373
374 self.stop_token = Some(stop_token);
375 self.leadership_task = Some(task);
376
377 Ok(())
378 }
379
380 async fn stop(&mut self) -> Result<(), CamelError> {
381 if let Some(token) = self.stop_token.take() {
382 token.cancel();
383 }
384
385 if let Some(task) = self.leadership_task.take()
386 && timeout(self.drain_timeout, task).await.is_err()
387 {
388 warn!("master leadership loop shutdown timed out");
389 }
390
391 Ok(())
392 }
393}
394
395#[cfg(test)]
396mod tests {
397 use std::sync::Arc;
398 use std::sync::Mutex;
399 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
400
401 use camel_api::{
402 BoxProcessorExt, Exchange, LeadershipEvent, LeadershipHandle, LeadershipService, Message,
403 NoOpMetrics, NoopPlatformService, NoopReadinessGate, PlatformError, PlatformIdentity,
404 PlatformService, ReadinessGate,
405 };
406 use camel_component_api::NoOpComponentContext;
407 use tokio::sync::{oneshot, watch};
408 use tokio::time::{sleep, timeout};
409 use tokio_util::sync::CancellationToken;
410 use tower::ServiceExt;
411
412 use super::*;
413
414 #[test]
415 fn parse_master_uri_valid() {
416 let cfg = MasterUriConfig::parse("master:mylock:timer:tick?period=250").unwrap();
417 assert_eq!(cfg.lock_name, "mylock");
418 assert_eq!(cfg.delegate_uri, "timer:tick?period=250");
419 }
420
421 #[test]
422 fn parse_master_uri_missing_lockname() {
423 let err = MasterUriConfig::parse("master::timer:tick").unwrap_err();
424 assert!(matches!(err, CamelError::InvalidUri(_)));
425 }
426
427 #[test]
428 fn parse_master_uri_missing_delegate() {
429 let err = MasterUriConfig::parse("master:mylock:").unwrap_err();
430 assert!(matches!(err, CamelError::InvalidUri(_)));
431 }
432
433 #[test]
434 fn endpoint_fails_when_delegate_component_missing() {
435 let master = MasterComponent::default();
436 let result =
437 master.create_endpoint("master:lock-1:missing:delegate", &NoOpComponentContext);
438 assert!(matches!(result, Err(CamelError::ComponentNotFound(_))));
439 }
440
441 #[test]
442 fn delegate_scheme_is_parsed_from_delegate_uri() {
443 let seen_scheme = Arc::new(AtomicBool::new(false));
444
445 struct SchemeAwareContext {
446 delegate: Arc<dyn Component>,
447 seen_scheme: Arc<AtomicBool>,
448 }
449
450 impl ComponentContext for SchemeAwareContext {
451 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
452 if scheme == "mock" {
453 self.seen_scheme.store(true, Ordering::SeqCst);
454 Some(Arc::clone(&self.delegate))
455 } else {
456 None
457 }
458 }
459
460 fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
461 None
462 }
463
464 fn metrics(&self) -> Arc<dyn MetricsCollector> {
465 Arc::new(NoOpMetrics)
466 }
467
468 fn platform_service(&self) -> Arc<dyn PlatformService> {
469 Arc::new(NoopPlatformService::default())
470 }
471 }
472
473 struct MockDelegateComponent;
474
475 impl Component for MockDelegateComponent {
476 fn scheme(&self) -> &str {
477 "mock"
478 }
479
480 fn create_endpoint(
481 &self,
482 _uri: &str,
483 _ctx: &dyn ComponentContext,
484 ) -> Result<Box<dyn Endpoint>, CamelError> {
485 Ok(Box::new(MockDelegateEndpoint))
486 }
487 }
488
489 struct MockDelegateEndpoint;
490
491 impl Endpoint for MockDelegateEndpoint {
492 fn uri(&self) -> &str {
493 "mock:delegate"
494 }
495
496 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
497 Err(CamelError::EndpointCreationFailed("not used".to_string()))
498 }
499
500 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
501 Err(CamelError::EndpointCreationFailed("not used".to_string()))
502 }
503 }
504
505 let delegate = Arc::new(MockDelegateComponent);
506 let ctx = SchemeAwareContext {
507 delegate,
508 seen_scheme: Arc::clone(&seen_scheme),
509 };
510
511 let master = MasterComponent::default();
512 let endpoint = master
513 .create_endpoint("master:mylock:mock:delegate?x=1", &ctx)
514 .unwrap();
515
516 assert_eq!(endpoint.uri(), "master:mylock:mock:delegate?x=1");
517 assert!(seen_scheme.load(Ordering::SeqCst));
518 }
519
520 struct MockDelegateContext {
521 delegate: Arc<dyn Component>,
522 }
523
524 impl ComponentContext for MockDelegateContext {
525 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
526 if self.delegate.scheme() == scheme {
527 Some(Arc::clone(&self.delegate))
528 } else {
529 None
530 }
531 }
532
533 fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
534 None
535 }
536
537 fn metrics(&self) -> Arc<dyn MetricsCollector> {
538 Arc::new(NoOpMetrics)
539 }
540
541 fn platform_service(&self) -> Arc<dyn PlatformService> {
542 Arc::new(NoopPlatformService::default())
543 }
544 }
545
546 struct MockProducerDelegateComponent {
547 create_endpoint_calls: Arc<AtomicUsize>,
548 create_producer_calls: Arc<AtomicUsize>,
549 fail_producer: bool,
550 }
551
552 impl Component for MockProducerDelegateComponent {
553 fn scheme(&self) -> &str {
554 "mock"
555 }
556
557 fn create_endpoint(
558 &self,
559 _uri: &str,
560 _ctx: &dyn ComponentContext,
561 ) -> Result<Box<dyn Endpoint>, CamelError> {
562 self.create_endpoint_calls.fetch_add(1, Ordering::SeqCst);
563 Ok(Box::new(MockProducerDelegateEndpoint {
564 create_producer_calls: Arc::clone(&self.create_producer_calls),
565 fail_producer: self.fail_producer,
566 }))
567 }
568 }
569
570 struct MockProducerDelegateEndpoint {
571 create_producer_calls: Arc<AtomicUsize>,
572 fail_producer: bool,
573 }
574
575 impl Endpoint for MockProducerDelegateEndpoint {
576 fn uri(&self) -> &str {
577 "mock:delegate"
578 }
579
580 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
581 Err(CamelError::EndpointCreationFailed(
582 "not used in test".to_string(),
583 ))
584 }
585
586 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
587 self.create_producer_calls.fetch_add(1, Ordering::SeqCst);
588 if self.fail_producer {
589 return Err(CamelError::ProcessorError(
590 "delegate producer failed".to_string(),
591 ));
592 }
593 Ok(BoxProcessor::from_fn(
594 |exchange| async move { Ok(exchange) },
595 ))
596 }
597 }
598
599 #[tokio::test]
600 async fn producer_passthrough_delegates_and_produces() {
601 let endpoint_calls = Arc::new(AtomicUsize::new(0));
602 let producer_calls = Arc::new(AtomicUsize::new(0));
603 let delegate = Arc::new(MockProducerDelegateComponent {
604 create_endpoint_calls: Arc::clone(&endpoint_calls),
605 create_producer_calls: Arc::clone(&producer_calls),
606 fail_producer: false,
607 });
608
609 let ctx = MockDelegateContext {
610 delegate: delegate.clone(),
611 };
612
613 let master = MasterComponent::default();
614 let endpoint = master
615 .create_endpoint("master:lock-1:mock:delegate", &ctx)
616 .unwrap();
617 let producer_ctx = ProducerContext::new();
618 let producer = endpoint.create_producer(&producer_ctx).unwrap();
619
620 let exchange = Exchange::new(Message::new("ok"));
621 let result = producer.oneshot(exchange).await.unwrap();
622
623 assert_eq!(result.input.body.as_text(), Some("ok"));
624 assert_eq!(endpoint_calls.load(Ordering::SeqCst), 1);
625 assert_eq!(producer_calls.load(Ordering::SeqCst), 1);
626 }
627
628 #[test]
629 fn producer_passthrough_bubbles_delegate_errors() {
630 let endpoint_calls = Arc::new(AtomicUsize::new(0));
631 let producer_calls = Arc::new(AtomicUsize::new(0));
632 let delegate = Arc::new(MockProducerDelegateComponent {
633 create_endpoint_calls: Arc::clone(&endpoint_calls),
634 create_producer_calls: Arc::clone(&producer_calls),
635 fail_producer: true,
636 });
637
638 let ctx = MockDelegateContext {
639 delegate: delegate.clone(),
640 };
641
642 let master = MasterComponent::default();
643 let endpoint = master
644 .create_endpoint("master:lock-1:mock:delegate", &ctx)
645 .unwrap();
646 let producer_ctx = ProducerContext::new();
647 let err = endpoint.create_producer(&producer_ctx).unwrap_err();
648
649 assert!(matches!(err, CamelError::ProcessorError(_)));
650 assert_eq!(endpoint_calls.load(Ordering::SeqCst), 1);
651 assert_eq!(producer_calls.load(Ordering::SeqCst), 1);
652 }
653
654 struct FakeLeadershipService {
655 tx: Mutex<Option<watch::Sender<Option<LeadershipEvent>>>>,
656 is_leader: Arc<AtomicBool>,
657 initial: Option<LeadershipEvent>,
658 }
659
660 impl FakeLeadershipService {
661 fn new(initial: Option<LeadershipEvent>) -> Self {
662 let starts_as_leader = matches!(initial, Some(LeadershipEvent::StartedLeading));
663 Self {
664 tx: Mutex::new(None),
665 is_leader: Arc::new(AtomicBool::new(starts_as_leader)),
666 initial,
667 }
668 }
669
670 async fn emit(&self, event: LeadershipEvent) {
671 self.is_leader.store(
672 matches!(event, LeadershipEvent::StartedLeading),
673 Ordering::Release,
674 );
675 if let Some(tx) = self
676 .tx
677 .lock()
678 .expect("mutex poisoned: fake elector sender")
679 .as_ref()
680 {
681 let _ = tx.send(Some(event));
682 }
683 }
684 }
685
686 #[async_trait]
687 impl LeadershipService for FakeLeadershipService {
688 async fn start(&self, _lock_name: &str) -> Result<LeadershipHandle, PlatformError> {
689 let (tx, rx) = watch::channel(self.initial.clone());
690 *self.tx.lock().expect("mutex poisoned: fake elector sender") = Some(tx);
691
692 let cancel = CancellationToken::new();
693 let cancel_wait = cancel.clone();
694 let (term_tx, term_rx) = oneshot::channel();
695 tokio::spawn(async move {
696 cancel_wait.cancelled().await;
697 let _ = term_tx.send(());
698 });
699
700 Ok(LeadershipHandle::new(
701 rx,
702 Arc::clone(&self.is_leader),
703 cancel,
704 term_rx,
705 ))
706 }
707 }
708
709 struct FakePlatformService {
710 identity: PlatformIdentity,
711 readiness_gate: Arc<dyn ReadinessGate>,
712 leadership: Arc<dyn LeadershipService>,
713 }
714
715 impl FakePlatformService {
716 fn new(leadership: Arc<dyn LeadershipService>) -> Self {
717 Self {
718 identity: PlatformIdentity::local("master-tests"),
719 readiness_gate: Arc::new(NoopReadinessGate),
720 leadership,
721 }
722 }
723 }
724
725 impl PlatformService for FakePlatformService {
726 fn identity(&self) -> PlatformIdentity {
727 self.identity.clone()
728 }
729
730 fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
731 Arc::clone(&self.readiness_gate)
732 }
733
734 fn leadership(&self) -> Arc<dyn LeadershipService> {
735 Arc::clone(&self.leadership)
736 }
737 }
738
739 struct FakeDelegateComponent {
740 create_consumer_calls: Arc<AtomicUsize>,
741 start_calls: Arc<AtomicUsize>,
742 }
743
744 impl Component for FakeDelegateComponent {
745 fn scheme(&self) -> &str {
746 "fake"
747 }
748
749 fn create_endpoint(
750 &self,
751 _uri: &str,
752 _ctx: &dyn ComponentContext,
753 ) -> Result<Box<dyn Endpoint>, CamelError> {
754 Ok(Box::new(FakeDelegateEndpoint {
755 create_consumer_calls: Arc::clone(&self.create_consumer_calls),
756 start_calls: Arc::clone(&self.start_calls),
757 }))
758 }
759 }
760
761 struct FakeDelegateEndpoint {
762 create_consumer_calls: Arc<AtomicUsize>,
763 start_calls: Arc<AtomicUsize>,
764 }
765
766 impl Endpoint for FakeDelegateEndpoint {
767 fn uri(&self) -> &str {
768 "fake:delegate"
769 }
770
771 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
772 let epoch = self.create_consumer_calls.fetch_add(1, Ordering::SeqCst) + 1;
773 Ok(Box::new(FakeDelegateConsumer {
774 epoch,
775 start_calls: Arc::clone(&self.start_calls),
776 }))
777 }
778
779 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
780 Err(CamelError::EndpointCreationFailed("not used".to_string()))
781 }
782 }
783
784 struct FakeDelegateConsumer {
785 epoch: usize,
786 start_calls: Arc<AtomicUsize>,
787 }
788
789 struct FailingDelegateComponent {
790 create_endpoint_calls: Arc<AtomicUsize>,
791 }
792
793 impl Component for FailingDelegateComponent {
794 fn scheme(&self) -> &str {
795 "failing"
796 }
797
798 fn create_endpoint(
799 &self,
800 _uri: &str,
801 _ctx: &dyn ComponentContext,
802 ) -> Result<Box<dyn Endpoint>, CamelError> {
803 self.create_endpoint_calls.fetch_add(1, Ordering::SeqCst);
804 Err(CamelError::EndpointCreationFailed(
805 "delegate endpoint creation failed".to_string(),
806 ))
807 }
808 }
809
810 #[async_trait]
811 impl Consumer for FakeDelegateConsumer {
812 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
813 self.start_calls.fetch_add(1, Ordering::SeqCst);
814 context
815 .send(Exchange::new(Message::new(format!("epoch-{}", self.epoch))))
816 .await?;
817
818 loop {
819 tokio::select! {
820 _ = context.cancelled() => {
821 break;
822 }
823 _ = sleep(Duration::from_millis(20)) => {
824 context
825 .send(Exchange::new(Message::new(format!("epoch-{}", self.epoch))))
826 .await?;
827 }
828 }
829 }
830
831 Ok(())
832 }
833
834 async fn stop(&mut self) -> Result<(), CamelError> {
835 Ok(())
836 }
837 }
838
839 fn build_master_consumer(
840 platform_service: Arc<dyn PlatformService>,
841 create_consumer_calls: Arc<AtomicUsize>,
842 start_calls: Arc<AtomicUsize>,
843 delegate_retry_max_attempts: Option<u32>,
844 ) -> MasterConsumer {
845 MasterConsumer::new(
846 "lock-a".to_string(),
847 "fake:delegate".to_string(),
848 Arc::new(FakeDelegateComponent {
849 create_consumer_calls,
850 start_calls,
851 }),
852 Arc::new(NoOpMetrics),
853 platform_service,
854 Duration::from_millis(500),
855 delegate_retry_max_attempts,
856 )
857 }
858
859 #[tokio::test]
860 async fn starts_delegate_only_after_started_leading() {
861 let leadership = Arc::new(FakeLeadershipService::new(None));
862 let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
863 let create_consumer_calls = Arc::new(AtomicUsize::new(0));
864 let start_calls = Arc::new(AtomicUsize::new(0));
865 let mut master = build_master_consumer(
866 platform_service,
867 Arc::clone(&create_consumer_calls),
868 Arc::clone(&start_calls),
869 Some(30),
870 );
871
872 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
873 let cancel = CancellationToken::new();
874 let ctx = ConsumerContext::new(tx, cancel.clone());
875
876 master.start(ctx).await.unwrap();
877
878 sleep(Duration::from_millis(80)).await;
879 assert!(rx.try_recv().is_err());
880 assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 0);
881
882 leadership.emit(LeadershipEvent::StartedLeading).await;
883
884 let first = timeout(Duration::from_millis(500), rx.recv())
885 .await
886 .unwrap()
887 .unwrap();
888 assert_eq!(first.exchange.input.body.as_text(), Some("epoch-1"));
889 assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 1);
890 assert_eq!(start_calls.load(Ordering::SeqCst), 1);
891
892 cancel.cancel();
893 master.stop().await.unwrap();
894 }
895
896 #[tokio::test]
897 async fn stops_delegate_on_stopped_leading() {
898 let leadership = Arc::new(FakeLeadershipService::new(None));
899 let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
900 let create_consumer_calls = Arc::new(AtomicUsize::new(0));
901 let start_calls = Arc::new(AtomicUsize::new(0));
902 let mut master = build_master_consumer(
903 platform_service,
904 Arc::clone(&create_consumer_calls),
905 Arc::clone(&start_calls),
906 Some(30),
907 );
908
909 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
910 let cancel = CancellationToken::new();
911 let ctx = ConsumerContext::new(tx, cancel.clone());
912
913 master.start(ctx).await.unwrap();
914 leadership.emit(LeadershipEvent::StartedLeading).await;
915 let _ = timeout(Duration::from_millis(500), rx.recv())
916 .await
917 .unwrap()
918 .unwrap();
919
920 leadership.emit(LeadershipEvent::StoppedLeading).await;
921 sleep(Duration::from_millis(100)).await;
922 while rx.try_recv().is_ok() {}
923 assert!(
924 timeout(Duration::from_millis(120), rx.recv())
925 .await
926 .is_err()
927 );
928
929 cancel.cancel();
930 master.stop().await.unwrap();
931 }
932
933 #[tokio::test]
934 async fn recreates_delegate_on_new_leadership_epoch() {
935 let leadership = Arc::new(FakeLeadershipService::new(None));
936 let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
937 let create_consumer_calls = Arc::new(AtomicUsize::new(0));
938 let start_calls = Arc::new(AtomicUsize::new(0));
939 let mut master = build_master_consumer(
940 platform_service,
941 Arc::clone(&create_consumer_calls),
942 Arc::clone(&start_calls),
943 Some(30),
944 );
945
946 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
947 let cancel = CancellationToken::new();
948 let ctx = ConsumerContext::new(tx, cancel.clone());
949
950 master.start(ctx).await.unwrap();
951
952 leadership.emit(LeadershipEvent::StartedLeading).await;
953 let first = timeout(Duration::from_millis(500), rx.recv())
954 .await
955 .unwrap()
956 .unwrap();
957 assert_eq!(first.exchange.input.body.as_text(), Some("epoch-1"));
958
959 leadership.emit(LeadershipEvent::StoppedLeading).await;
960 sleep(Duration::from_millis(120)).await;
961
962 leadership.emit(LeadershipEvent::StartedLeading).await;
963 let second = timeout(Duration::from_millis(500), rx.recv())
964 .await
965 .unwrap()
966 .unwrap();
967 assert_eq!(second.exchange.input.body.as_text(), Some("epoch-2"));
968
969 assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 2);
970 assert_eq!(start_calls.load(Ordering::SeqCst), 2);
971
972 cancel.cancel();
973 master.stop().await.unwrap();
974 }
975
976 #[tokio::test]
977 async fn stops_retrying_delegate_start_after_max_attempts() {
978 let leadership = Arc::new(FakeLeadershipService::new(Some(
979 LeadershipEvent::StartedLeading,
980 )));
981 let platform_service = Arc::new(FakePlatformService::new(leadership));
982 let create_endpoint_calls = Arc::new(AtomicUsize::new(0));
983
984 let mut master = MasterConsumer::new(
985 "lock-a".to_string(),
986 "failing:delegate".to_string(),
987 Arc::new(FailingDelegateComponent {
988 create_endpoint_calls: Arc::clone(&create_endpoint_calls),
989 }),
990 Arc::new(NoOpMetrics),
991 platform_service,
992 Duration::from_millis(500),
993 Some(1),
994 );
995
996 let (tx, _rx) = tokio::sync::mpsc::channel(16);
997 let cancel = CancellationToken::new();
998 let ctx = ConsumerContext::new(tx, cancel.clone());
999
1000 master.start(ctx).await.unwrap();
1001 sleep(Duration::from_millis(750)).await;
1002
1003 assert_eq!(create_endpoint_calls.load(Ordering::SeqCst), 2);
1004
1005 cancel.cancel();
1006 master.stop().await.unwrap();
1007 }
1008}