1pub mod bundle;
2mod config;
3
4pub use bundle::MasterBundle;
5
6use std::sync::Arc;
7use std::time::Duration;
8
9use async_trait::async_trait;
10use camel_api::{CamelError, MetricsCollector, PlatformService};
11use camel_component_api::{
12 BoxProcessor, Component, ComponentContext, Consumer, ConsumerContext, Endpoint,
13 ExchangeEnvelope, ProducerContext, parse_uri,
14};
15use camel_language_api::Language;
16use tokio::task::JoinHandle;
17use tokio::time::{interval, timeout};
18use tokio_util::sync::CancellationToken;
19use tracing::{info, warn};
20
21use crate::config::{MasterComponentConfig, MasterUriConfig};
22
23const DELEGATE_RETRY_INTERVAL: Duration = Duration::from_millis(200);
24
25pub struct MasterComponent {
26 drain_timeout_ms: u64,
27 delegate_retry_max_attempts: Option<u32>,
28}
29
30impl MasterComponent {
31 pub fn new(config: MasterComponentConfig) -> Self {
32 Self {
33 drain_timeout_ms: config.drain_timeout_ms,
34 delegate_retry_max_attempts: config.delegate_retry_max_attempts,
35 }
36 }
37}
38
39impl Default for MasterComponent {
40 fn default() -> Self {
41 Self::new(MasterComponentConfig::default())
42 }
43}
44
45impl Component for MasterComponent {
46 fn scheme(&self) -> &str {
47 "master"
48 }
49
50 fn create_endpoint(
51 &self,
52 uri: &str,
53 ctx: &dyn ComponentContext,
54 ) -> Result<Box<dyn Endpoint>, CamelError> {
55 let parsed = MasterUriConfig::parse(uri)?;
56 let delegate_parts = parse_uri(&parsed.delegate_uri)?;
57 let delegate_scheme = delegate_parts.scheme;
58 let delegate_component = ctx
59 .resolve_component(&delegate_scheme)
60 .ok_or_else(|| CamelError::ComponentNotFound(delegate_scheme.clone()))?;
61
62 Ok(Box::new(MasterEndpoint {
63 uri: uri.to_string(),
64 lock_name: parsed.lock_name,
65 delegate_uri: parsed.delegate_uri,
66 delegate_component,
67 metrics: ctx.metrics(),
68 platform_service: ctx.platform_service(),
69 drain_timeout: Duration::from_millis(self.drain_timeout_ms),
70 delegate_retry_max_attempts: self.delegate_retry_max_attempts,
71 }))
72 }
73}
74
75struct MasterEndpoint {
76 uri: String,
77 lock_name: String,
78 delegate_uri: String,
79 delegate_component: Arc<dyn Component>,
80 metrics: Arc<dyn MetricsCollector>,
81 platform_service: Arc<dyn PlatformService>,
82 drain_timeout: Duration,
83 delegate_retry_max_attempts: Option<u32>,
84}
85
86impl Endpoint for MasterEndpoint {
87 fn uri(&self) -> &str {
88 &self.uri
89 }
90
91 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
92 Ok(Box::new(MasterConsumer::new(
93 self.lock_name.clone(),
94 self.delegate_uri.clone(),
95 Arc::clone(&self.delegate_component),
96 Arc::clone(&self.metrics),
97 Arc::clone(&self.platform_service),
98 self.drain_timeout,
99 self.delegate_retry_max_attempts,
100 )))
101 }
102
103 fn create_producer(&self, ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
104 let delegate_ctx = MasterDelegateContext {
105 delegate_component: Arc::clone(&self.delegate_component),
106 metrics: Arc::clone(&self.metrics),
107 platform_service: Arc::clone(&self.platform_service),
108 };
109
110 self.delegate_component
111 .create_endpoint(&self.delegate_uri, &delegate_ctx)?
112 .create_producer(ctx)
113 }
114}
115
116struct MasterDelegateContext {
117 delegate_component: Arc<dyn Component>,
118 metrics: Arc<dyn MetricsCollector>,
119 platform_service: Arc<dyn PlatformService>,
120}
121
122impl ComponentContext for MasterDelegateContext {
123 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
124 if self.delegate_component.scheme() == scheme {
125 Some(Arc::clone(&self.delegate_component))
126 } else {
127 None
128 }
129 }
130
131 fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
132 None
133 }
134
135 fn metrics(&self) -> Arc<dyn MetricsCollector> {
136 Arc::clone(&self.metrics)
137 }
138
139 fn platform_service(&self) -> Arc<dyn PlatformService> {
140 Arc::clone(&self.platform_service)
141 }
142}
143
144struct MasterConsumer {
145 lock_name: String,
146 delegate_uri: String,
147 delegate_component: Arc<dyn Component>,
148 metrics: Arc<dyn MetricsCollector>,
149 platform_service: Arc<dyn PlatformService>,
150 drain_timeout: Duration,
151 delegate_retry_max_attempts: Option<u32>,
152 leadership_handle: Option<camel_api::LeadershipHandle>,
153 leadership_task: Option<JoinHandle<()>>,
154 stop_token: Option<CancellationToken>,
155}
156
157impl MasterConsumer {
158 fn new(
159 lock_name: String,
160 delegate_uri: String,
161 delegate_component: Arc<dyn Component>,
162 metrics: Arc<dyn MetricsCollector>,
163 platform_service: Arc<dyn PlatformService>,
164 drain_timeout: Duration,
165 delegate_retry_max_attempts: Option<u32>,
166 ) -> Self {
167 Self {
168 lock_name,
169 delegate_uri,
170 delegate_component,
171 metrics,
172 platform_service,
173 drain_timeout,
174 delegate_retry_max_attempts,
175 leadership_handle: None,
176 leadership_task: None,
177 stop_token: None,
178 }
179 }
180}
181
182enum DelegateState {
183 Inactive,
184 Active {
185 run_token: CancellationToken,
186 handle: JoinHandle<()>,
187 },
188}
189
190async fn stop_delegate(state: &mut DelegateState, drain_timeout: Duration) {
191 if let DelegateState::Active {
192 run_token,
193 mut handle,
194 } = std::mem::replace(state, DelegateState::Inactive)
195 {
196 run_token.cancel();
197 match timeout(drain_timeout, &mut handle).await {
198 Ok(_) => {}
199 Err(_) => {
200 warn!("master delegate shutdown timed out, aborting");
201 handle.abort();
202 }
203 }
204 }
205}
206
207async fn reconcile_event(
208 event: camel_api::LeadershipEvent,
209 state: &mut DelegateState,
210 lock_name: &str,
211 delegate_component: &Arc<dyn Component>,
212 delegate_uri: &str,
213 sender: &tokio::sync::mpsc::Sender<ExchangeEnvelope>,
214 parent_cancel: &CancellationToken,
215 drain_timeout: Duration,
216 metrics: &Arc<dyn MetricsCollector>,
217 platform_service: &Arc<dyn PlatformService>,
218) {
219 match event {
220 camel_api::LeadershipEvent::StartedLeading => {
221 info!(lock = %lock_name, "master leadership acquired");
222 stop_delegate(state, drain_timeout).await;
223
224 let delegate_ctx = MasterDelegateContext {
225 delegate_component: Arc::clone(delegate_component),
226 metrics: Arc::clone(metrics),
227 platform_service: Arc::clone(platform_service),
228 };
229
230 let endpoint = match delegate_component.create_endpoint(delegate_uri, &delegate_ctx) {
231 Ok(endpoint) => endpoint,
232 Err(err) => {
233 warn!(lock = %lock_name, "failed to create delegate endpoint: {err}");
234 return;
235 }
236 };
237
238 let mut consumer = match endpoint.create_consumer() {
239 Ok(consumer) => consumer,
240 Err(err) => {
241 warn!(lock = %lock_name, "failed to create delegate consumer: {err}");
242 return;
243 }
244 };
245
246 let run_token = parent_cancel.child_token();
247 let delegate_ctx = ConsumerContext::new(sender.clone(), run_token.clone());
248 let handle = tokio::spawn(async move {
249 let _ = consumer.start(delegate_ctx).await;
250 let _ = consumer.stop().await;
251 });
252
253 *state = DelegateState::Active { run_token, handle };
254 }
255 camel_api::LeadershipEvent::StoppedLeading => {
256 info!(lock = %lock_name, "master leadership lost");
257 stop_delegate(state, drain_timeout).await;
258 }
259 }
260}
261
262#[async_trait]
263impl Consumer for MasterConsumer {
264 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
265 if self.leadership_task.is_some() {
266 return Ok(());
267 }
268
269 let handle = self
270 .platform_service
271 .leadership()
272 .start(&self.lock_name)
273 .await
274 .map_err(|e| {
275 CamelError::EndpointCreationFailed(format!("failed to start leader election: {e}"))
276 })?;
277
278 let lock_name = self.lock_name.clone();
279 let delegate_uri = self.delegate_uri.clone();
280 let delegate_component = Arc::clone(&self.delegate_component);
281 let metrics = Arc::clone(&self.metrics);
282 let platform_service = Arc::clone(&self.platform_service);
283 let sender = context.sender();
284 let parent_cancel = context.cancel_token();
285 let drain_timeout = self.drain_timeout;
286 let delegate_retry_max_attempts = self.delegate_retry_max_attempts;
287 let mut events = handle.events.clone();
288
289 let stop_token = CancellationToken::new();
290 let stop_token_loop = stop_token.clone();
291
292 let task = tokio::spawn(async move {
293 let mut state = DelegateState::Inactive;
294 let mut is_leading = false;
295 let mut delegate_attempts = 0u32;
296 let mut retry_tick = interval(DELEGATE_RETRY_INTERVAL);
297
298 let initial_event = { events.borrow().clone() };
299 if let Some(initial_event) = initial_event {
300 is_leading = matches!(&initial_event, camel_api::LeadershipEvent::StartedLeading);
301 if is_leading {
302 delegate_attempts = 0;
303 }
304 reconcile_event(
305 initial_event,
306 &mut state,
307 &lock_name,
308 &delegate_component,
309 &delegate_uri,
310 &sender,
311 &parent_cancel,
312 drain_timeout,
313 &metrics,
314 &platform_service,
315 )
316 .await;
317 }
318
319 loop {
320 tokio::select! {
321 _ = stop_token_loop.cancelled() => {
322 break;
323 }
324 _ = context.cancelled() => {
325 break;
326 }
327 changed = events.changed() => {
328 if changed.is_err() {
329 break;
330 }
331 let event = { events.borrow().clone() };
332 if let Some(event) = event {
333 let was_leading = is_leading;
334 is_leading = matches!(&event, camel_api::LeadershipEvent::StartedLeading);
335 if !was_leading && is_leading {
336 delegate_attempts = 0;
337 }
338 reconcile_event(
339 event,
340 &mut state,
341 &lock_name,
342 &delegate_component,
343 &delegate_uri,
344 &sender,
345 &parent_cancel,
346 drain_timeout,
347 &metrics,
348 &platform_service,
349 )
350 .await;
351 }
352 }
353 _ = retry_tick.tick() => {
354 if is_leading && matches!(state, DelegateState::Inactive) {
355 if let Some(max) = delegate_retry_max_attempts {
356 delegate_attempts = delegate_attempts.saturating_add(1);
357 if delegate_attempts > max {
358 warn!(
359 lock = %lock_name,
360 attempts = max,
361 "delegate start exceeded max attempts, stopping consumer"
362 );
363 break;
364 }
365 }
366 reconcile_event(
367 camel_api::LeadershipEvent::StartedLeading,
368 &mut state,
369 &lock_name,
370 &delegate_component,
371 &delegate_uri,
372 &sender,
373 &parent_cancel,
374 drain_timeout,
375 &metrics,
376 &platform_service,
377 )
378 .await;
379 }
380 }
381 }
382 }
383
384 stop_delegate(&mut state, drain_timeout).await;
385 });
386
387 self.leadership_handle = Some(handle);
388 self.stop_token = Some(stop_token);
389 self.leadership_task = Some(task);
390
391 Ok(())
392 }
393
394 async fn stop(&mut self) -> Result<(), CamelError> {
395 if let Some(token) = self.stop_token.take() {
396 token.cancel();
397 }
398
399 if let Some(task) = self.leadership_task.take()
400 && timeout(self.drain_timeout, task).await.is_err()
401 {
402 warn!("master leadership loop shutdown timed out");
403 }
404
405 if let Some(handle) = self.leadership_handle.take() {
406 let _ = timeout(self.drain_timeout, handle.step_down()).await;
407 }
408
409 Ok(())
410 }
411}
412
413#[cfg(test)]
414mod tests {
415 use std::sync::Arc;
416 use std::sync::Mutex;
417 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
418
419 use camel_api::{
420 BoxProcessorExt, Exchange, LeadershipEvent, LeadershipHandle, LeadershipService, Message,
421 NoOpMetrics, NoopPlatformService, NoopReadinessGate, PlatformError, PlatformIdentity,
422 PlatformService, ReadinessGate,
423 };
424 use camel_component_api::NoOpComponentContext;
425 use tokio::sync::{oneshot, watch};
426 use tokio::time::{sleep, timeout};
427 use tokio_util::sync::CancellationToken;
428 use tower::ServiceExt;
429
430 use super::*;
431
432 #[test]
433 fn parse_master_uri_valid() {
434 let cfg = MasterUriConfig::parse("master:mylock:timer:tick?period=250").unwrap();
435 assert_eq!(cfg.lock_name, "mylock");
436 assert_eq!(cfg.delegate_uri, "timer:tick?period=250");
437 }
438
439 #[test]
440 fn parse_master_uri_missing_lockname() {
441 let err = MasterUriConfig::parse("master::timer:tick").unwrap_err();
442 assert!(matches!(err, CamelError::InvalidUri(_)));
443 }
444
445 #[test]
446 fn parse_master_uri_missing_delegate() {
447 let err = MasterUriConfig::parse("master:mylock:").unwrap_err();
448 assert!(matches!(err, CamelError::InvalidUri(_)));
449 }
450
451 #[test]
452 fn endpoint_fails_when_delegate_component_missing() {
453 let master = MasterComponent::default();
454 let result =
455 master.create_endpoint("master:lock-1:missing:delegate", &NoOpComponentContext);
456 assert!(matches!(result, Err(CamelError::ComponentNotFound(_))));
457 }
458
459 #[test]
460 fn delegate_scheme_is_parsed_from_delegate_uri() {
461 let seen_scheme = Arc::new(AtomicBool::new(false));
462
463 struct SchemeAwareContext {
464 delegate: Arc<dyn Component>,
465 seen_scheme: Arc<AtomicBool>,
466 }
467
468 impl ComponentContext for SchemeAwareContext {
469 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
470 if scheme == "mock" {
471 self.seen_scheme.store(true, Ordering::SeqCst);
472 Some(Arc::clone(&self.delegate))
473 } else {
474 None
475 }
476 }
477
478 fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
479 None
480 }
481
482 fn metrics(&self) -> Arc<dyn MetricsCollector> {
483 Arc::new(NoOpMetrics)
484 }
485
486 fn platform_service(&self) -> Arc<dyn PlatformService> {
487 Arc::new(NoopPlatformService::default())
488 }
489 }
490
491 struct MockDelegateComponent;
492
493 impl Component for MockDelegateComponent {
494 fn scheme(&self) -> &str {
495 "mock"
496 }
497
498 fn create_endpoint(
499 &self,
500 _uri: &str,
501 _ctx: &dyn ComponentContext,
502 ) -> Result<Box<dyn Endpoint>, CamelError> {
503 Ok(Box::new(MockDelegateEndpoint))
504 }
505 }
506
507 struct MockDelegateEndpoint;
508
509 impl Endpoint for MockDelegateEndpoint {
510 fn uri(&self) -> &str {
511 "mock:delegate"
512 }
513
514 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
515 Err(CamelError::EndpointCreationFailed("not used".to_string()))
516 }
517
518 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
519 Err(CamelError::EndpointCreationFailed("not used".to_string()))
520 }
521 }
522
523 let delegate = Arc::new(MockDelegateComponent);
524 let ctx = SchemeAwareContext {
525 delegate,
526 seen_scheme: Arc::clone(&seen_scheme),
527 };
528
529 let master = MasterComponent::default();
530 let endpoint = master
531 .create_endpoint("master:mylock:mock:delegate?x=1", &ctx)
532 .unwrap();
533
534 assert_eq!(endpoint.uri(), "master:mylock:mock:delegate?x=1");
535 assert!(seen_scheme.load(Ordering::SeqCst));
536 }
537
538 struct MockDelegateContext {
539 delegate: Arc<dyn Component>,
540 }
541
542 impl ComponentContext for MockDelegateContext {
543 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
544 if self.delegate.scheme() == scheme {
545 Some(Arc::clone(&self.delegate))
546 } else {
547 None
548 }
549 }
550
551 fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
552 None
553 }
554
555 fn metrics(&self) -> Arc<dyn MetricsCollector> {
556 Arc::new(NoOpMetrics)
557 }
558
559 fn platform_service(&self) -> Arc<dyn PlatformService> {
560 Arc::new(NoopPlatformService::default())
561 }
562 }
563
564 struct MockProducerDelegateComponent {
565 create_endpoint_calls: Arc<AtomicUsize>,
566 create_producer_calls: Arc<AtomicUsize>,
567 fail_producer: bool,
568 }
569
570 impl Component for MockProducerDelegateComponent {
571 fn scheme(&self) -> &str {
572 "mock"
573 }
574
575 fn create_endpoint(
576 &self,
577 _uri: &str,
578 _ctx: &dyn ComponentContext,
579 ) -> Result<Box<dyn Endpoint>, CamelError> {
580 self.create_endpoint_calls.fetch_add(1, Ordering::SeqCst);
581 Ok(Box::new(MockProducerDelegateEndpoint {
582 create_producer_calls: Arc::clone(&self.create_producer_calls),
583 fail_producer: self.fail_producer,
584 }))
585 }
586 }
587
588 struct MockProducerDelegateEndpoint {
589 create_producer_calls: Arc<AtomicUsize>,
590 fail_producer: bool,
591 }
592
593 impl Endpoint for MockProducerDelegateEndpoint {
594 fn uri(&self) -> &str {
595 "mock:delegate"
596 }
597
598 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
599 Err(CamelError::EndpointCreationFailed(
600 "not used in test".to_string(),
601 ))
602 }
603
604 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
605 self.create_producer_calls.fetch_add(1, Ordering::SeqCst);
606 if self.fail_producer {
607 return Err(CamelError::ProcessorError(
608 "delegate producer failed".to_string(),
609 ));
610 }
611 Ok(BoxProcessor::from_fn(
612 |exchange| async move { Ok(exchange) },
613 ))
614 }
615 }
616
617 #[tokio::test]
618 async fn producer_passthrough_delegates_and_produces() {
619 let endpoint_calls = Arc::new(AtomicUsize::new(0));
620 let producer_calls = Arc::new(AtomicUsize::new(0));
621 let delegate = Arc::new(MockProducerDelegateComponent {
622 create_endpoint_calls: Arc::clone(&endpoint_calls),
623 create_producer_calls: Arc::clone(&producer_calls),
624 fail_producer: false,
625 });
626
627 let ctx = MockDelegateContext {
628 delegate: delegate.clone(),
629 };
630
631 let master = MasterComponent::default();
632 let endpoint = master
633 .create_endpoint("master:lock-1:mock:delegate", &ctx)
634 .unwrap();
635 let producer_ctx = ProducerContext::new();
636 let producer = endpoint.create_producer(&producer_ctx).unwrap();
637
638 let exchange = Exchange::new(Message::new("ok"));
639 let result = producer.oneshot(exchange).await.unwrap();
640
641 assert_eq!(result.input.body.as_text(), Some("ok"));
642 assert_eq!(endpoint_calls.load(Ordering::SeqCst), 1);
643 assert_eq!(producer_calls.load(Ordering::SeqCst), 1);
644 }
645
646 #[test]
647 fn producer_passthrough_bubbles_delegate_errors() {
648 let endpoint_calls = Arc::new(AtomicUsize::new(0));
649 let producer_calls = Arc::new(AtomicUsize::new(0));
650 let delegate = Arc::new(MockProducerDelegateComponent {
651 create_endpoint_calls: Arc::clone(&endpoint_calls),
652 create_producer_calls: Arc::clone(&producer_calls),
653 fail_producer: true,
654 });
655
656 let ctx = MockDelegateContext {
657 delegate: delegate.clone(),
658 };
659
660 let master = MasterComponent::default();
661 let endpoint = master
662 .create_endpoint("master:lock-1:mock:delegate", &ctx)
663 .unwrap();
664 let producer_ctx = ProducerContext::new();
665 let err = endpoint.create_producer(&producer_ctx).unwrap_err();
666
667 assert!(matches!(err, CamelError::ProcessorError(_)));
668 assert_eq!(endpoint_calls.load(Ordering::SeqCst), 1);
669 assert_eq!(producer_calls.load(Ordering::SeqCst), 1);
670 }
671
672 struct FakeLeadershipService {
673 tx: Mutex<Option<watch::Sender<Option<LeadershipEvent>>>>,
674 is_leader: Arc<AtomicBool>,
675 initial: Option<LeadershipEvent>,
676 }
677
678 impl FakeLeadershipService {
679 fn new(initial: Option<LeadershipEvent>) -> Self {
680 let starts_as_leader = matches!(initial, Some(LeadershipEvent::StartedLeading));
681 Self {
682 tx: Mutex::new(None),
683 is_leader: Arc::new(AtomicBool::new(starts_as_leader)),
684 initial,
685 }
686 }
687
688 async fn emit(&self, event: LeadershipEvent) {
689 self.is_leader.store(
690 matches!(event, LeadershipEvent::StartedLeading),
691 Ordering::Release,
692 );
693 if let Some(tx) = self
694 .tx
695 .lock()
696 .expect("mutex poisoned: fake elector sender")
697 .as_ref()
698 {
699 let _ = tx.send(Some(event));
700 }
701 }
702 }
703
704 #[async_trait]
705 impl LeadershipService for FakeLeadershipService {
706 async fn start(&self, _lock_name: &str) -> Result<LeadershipHandle, PlatformError> {
707 let (tx, rx) = watch::channel(self.initial.clone());
708 *self.tx.lock().expect("mutex poisoned: fake elector sender") = Some(tx);
709
710 let cancel = CancellationToken::new();
711 let cancel_wait = cancel.clone();
712 let (term_tx, term_rx) = oneshot::channel();
713 tokio::spawn(async move {
714 cancel_wait.cancelled().await;
715 let _ = term_tx.send(());
716 });
717
718 Ok(LeadershipHandle::new(
719 rx,
720 Arc::clone(&self.is_leader),
721 cancel,
722 term_rx,
723 ))
724 }
725 }
726
727 struct FakePlatformService {
728 identity: PlatformIdentity,
729 readiness_gate: Arc<dyn ReadinessGate>,
730 leadership: Arc<dyn LeadershipService>,
731 }
732
733 impl FakePlatformService {
734 fn new(leadership: Arc<dyn LeadershipService>) -> Self {
735 Self {
736 identity: PlatformIdentity::local("master-tests"),
737 readiness_gate: Arc::new(NoopReadinessGate),
738 leadership,
739 }
740 }
741 }
742
743 impl PlatformService for FakePlatformService {
744 fn identity(&self) -> PlatformIdentity {
745 self.identity.clone()
746 }
747
748 fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
749 Arc::clone(&self.readiness_gate)
750 }
751
752 fn leadership(&self) -> Arc<dyn LeadershipService> {
753 Arc::clone(&self.leadership)
754 }
755 }
756
757 struct FakeDelegateComponent {
758 create_consumer_calls: Arc<AtomicUsize>,
759 start_calls: Arc<AtomicUsize>,
760 }
761
762 impl Component for FakeDelegateComponent {
763 fn scheme(&self) -> &str {
764 "fake"
765 }
766
767 fn create_endpoint(
768 &self,
769 _uri: &str,
770 _ctx: &dyn ComponentContext,
771 ) -> Result<Box<dyn Endpoint>, CamelError> {
772 Ok(Box::new(FakeDelegateEndpoint {
773 create_consumer_calls: Arc::clone(&self.create_consumer_calls),
774 start_calls: Arc::clone(&self.start_calls),
775 }))
776 }
777 }
778
779 struct FakeDelegateEndpoint {
780 create_consumer_calls: Arc<AtomicUsize>,
781 start_calls: Arc<AtomicUsize>,
782 }
783
784 impl Endpoint for FakeDelegateEndpoint {
785 fn uri(&self) -> &str {
786 "fake:delegate"
787 }
788
789 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
790 let epoch = self.create_consumer_calls.fetch_add(1, Ordering::SeqCst) + 1;
791 Ok(Box::new(FakeDelegateConsumer {
792 epoch,
793 start_calls: Arc::clone(&self.start_calls),
794 }))
795 }
796
797 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
798 Err(CamelError::EndpointCreationFailed("not used".to_string()))
799 }
800 }
801
802 struct FakeDelegateConsumer {
803 epoch: usize,
804 start_calls: Arc<AtomicUsize>,
805 }
806
807 struct FailingDelegateComponent {
808 create_endpoint_calls: Arc<AtomicUsize>,
809 }
810
811 impl Component for FailingDelegateComponent {
812 fn scheme(&self) -> &str {
813 "failing"
814 }
815
816 fn create_endpoint(
817 &self,
818 _uri: &str,
819 _ctx: &dyn ComponentContext,
820 ) -> Result<Box<dyn Endpoint>, CamelError> {
821 self.create_endpoint_calls.fetch_add(1, Ordering::SeqCst);
822 Err(CamelError::EndpointCreationFailed(
823 "delegate endpoint creation failed".to_string(),
824 ))
825 }
826 }
827
828 #[async_trait]
829 impl Consumer for FakeDelegateConsumer {
830 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
831 self.start_calls.fetch_add(1, Ordering::SeqCst);
832 context
833 .send(Exchange::new(Message::new(format!("epoch-{}", self.epoch))))
834 .await?;
835
836 loop {
837 tokio::select! {
838 _ = context.cancelled() => {
839 break;
840 }
841 _ = sleep(Duration::from_millis(20)) => {
842 context
843 .send(Exchange::new(Message::new(format!("epoch-{}", self.epoch))))
844 .await?;
845 }
846 }
847 }
848
849 Ok(())
850 }
851
852 async fn stop(&mut self) -> Result<(), CamelError> {
853 Ok(())
854 }
855 }
856
857 fn build_master_consumer(
858 platform_service: Arc<dyn PlatformService>,
859 create_consumer_calls: Arc<AtomicUsize>,
860 start_calls: Arc<AtomicUsize>,
861 delegate_retry_max_attempts: Option<u32>,
862 ) -> MasterConsumer {
863 MasterConsumer::new(
864 "lock-a".to_string(),
865 "fake:delegate".to_string(),
866 Arc::new(FakeDelegateComponent {
867 create_consumer_calls,
868 start_calls,
869 }),
870 Arc::new(NoOpMetrics),
871 platform_service,
872 Duration::from_millis(500),
873 delegate_retry_max_attempts,
874 )
875 }
876
877 #[tokio::test]
878 async fn starts_delegate_only_after_started_leading() {
879 let leadership = Arc::new(FakeLeadershipService::new(None));
880 let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
881 let create_consumer_calls = Arc::new(AtomicUsize::new(0));
882 let start_calls = Arc::new(AtomicUsize::new(0));
883 let mut master = build_master_consumer(
884 platform_service,
885 Arc::clone(&create_consumer_calls),
886 Arc::clone(&start_calls),
887 Some(30),
888 );
889
890 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
891 let cancel = CancellationToken::new();
892 let ctx = ConsumerContext::new(tx, cancel.clone());
893
894 master.start(ctx).await.unwrap();
895
896 sleep(Duration::from_millis(80)).await;
897 assert!(rx.try_recv().is_err());
898 assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 0);
899
900 leadership.emit(LeadershipEvent::StartedLeading).await;
901
902 let first = timeout(Duration::from_millis(500), rx.recv())
903 .await
904 .unwrap()
905 .unwrap();
906 assert_eq!(first.exchange.input.body.as_text(), Some("epoch-1"));
907 assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 1);
908 assert_eq!(start_calls.load(Ordering::SeqCst), 1);
909
910 cancel.cancel();
911 master.stop().await.unwrap();
912 }
913
914 #[tokio::test]
915 async fn stops_delegate_on_stopped_leading() {
916 let leadership = Arc::new(FakeLeadershipService::new(None));
917 let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
918 let create_consumer_calls = Arc::new(AtomicUsize::new(0));
919 let start_calls = Arc::new(AtomicUsize::new(0));
920 let mut master = build_master_consumer(
921 platform_service,
922 Arc::clone(&create_consumer_calls),
923 Arc::clone(&start_calls),
924 Some(30),
925 );
926
927 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
928 let cancel = CancellationToken::new();
929 let ctx = ConsumerContext::new(tx, cancel.clone());
930
931 master.start(ctx).await.unwrap();
932 leadership.emit(LeadershipEvent::StartedLeading).await;
933 let _ = timeout(Duration::from_millis(500), rx.recv())
934 .await
935 .unwrap()
936 .unwrap();
937
938 leadership.emit(LeadershipEvent::StoppedLeading).await;
939 sleep(Duration::from_millis(100)).await;
940 while rx.try_recv().is_ok() {}
941 assert!(
942 timeout(Duration::from_millis(120), rx.recv())
943 .await
944 .is_err()
945 );
946
947 cancel.cancel();
948 master.stop().await.unwrap();
949 }
950
951 #[tokio::test]
952 async fn recreates_delegate_on_new_leadership_epoch() {
953 let leadership = Arc::new(FakeLeadershipService::new(None));
954 let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
955 let create_consumer_calls = Arc::new(AtomicUsize::new(0));
956 let start_calls = Arc::new(AtomicUsize::new(0));
957 let mut master = build_master_consumer(
958 platform_service,
959 Arc::clone(&create_consumer_calls),
960 Arc::clone(&start_calls),
961 Some(30),
962 );
963
964 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
965 let cancel = CancellationToken::new();
966 let ctx = ConsumerContext::new(tx, cancel.clone());
967
968 master.start(ctx).await.unwrap();
969
970 leadership.emit(LeadershipEvent::StartedLeading).await;
971 let first = timeout(Duration::from_millis(500), rx.recv())
972 .await
973 .unwrap()
974 .unwrap();
975 assert_eq!(first.exchange.input.body.as_text(), Some("epoch-1"));
976
977 leadership.emit(LeadershipEvent::StoppedLeading).await;
978 sleep(Duration::from_millis(120)).await;
979
980 leadership.emit(LeadershipEvent::StartedLeading).await;
981 let second = timeout(Duration::from_millis(500), rx.recv())
982 .await
983 .unwrap()
984 .unwrap();
985 assert_eq!(second.exchange.input.body.as_text(), Some("epoch-2"));
986
987 assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 2);
988 assert_eq!(start_calls.load(Ordering::SeqCst), 2);
989
990 cancel.cancel();
991 master.stop().await.unwrap();
992 }
993
994 #[tokio::test]
995 async fn stops_retrying_delegate_start_after_max_attempts() {
996 let leadership = Arc::new(FakeLeadershipService::new(Some(
997 LeadershipEvent::StartedLeading,
998 )));
999 let platform_service = Arc::new(FakePlatformService::new(leadership));
1000 let create_endpoint_calls = Arc::new(AtomicUsize::new(0));
1001
1002 let mut master = MasterConsumer::new(
1003 "lock-a".to_string(),
1004 "failing:delegate".to_string(),
1005 Arc::new(FailingDelegateComponent {
1006 create_endpoint_calls: Arc::clone(&create_endpoint_calls),
1007 }),
1008 Arc::new(NoOpMetrics),
1009 platform_service,
1010 Duration::from_millis(500),
1011 Some(1),
1012 );
1013
1014 let (tx, _rx) = tokio::sync::mpsc::channel(16);
1015 let cancel = CancellationToken::new();
1016 let ctx = ConsumerContext::new(tx, cancel.clone());
1017
1018 master.start(ctx).await.unwrap();
1019 sleep(Duration::from_millis(750)).await;
1020
1021 assert_eq!(create_endpoint_calls.load(Ordering::SeqCst), 2);
1022
1023 cancel.cancel();
1024 master.stop().await.unwrap();
1025 }
1026}