1pub mod bundle;
7pub mod config;
8
9pub use bundle::MasterBundle;
10
11use std::sync::Arc;
12use std::time::Duration;
13
14use async_trait::async_trait;
15use camel_api::{CamelError, MetricsCollector, PlatformService};
16use camel_component_api::{
17 BoxProcessor, Component, ComponentContext, Consumer, ConsumerContext, Endpoint,
18 ExchangeEnvelope, ProducerContext, parse_uri,
19};
20use camel_language_api::Language;
21use tokio::task::JoinHandle;
22use tokio::time::{interval, timeout};
23use tokio_util::sync::CancellationToken;
24use tracing::{error, info, warn};
25
26use crate::config::{MasterComponentConfig, MasterUriConfig};
27
28const DELEGATE_RETRY_INTERVAL: Duration = Duration::from_millis(200);
29
30pub struct MasterComponent {
31 drain_timeout_ms: u64,
32 delegate_retry_max_attempts: Option<u32>,
33}
34
35impl MasterComponent {
36 pub fn new(config: MasterComponentConfig) -> Self {
37 Self {
38 drain_timeout_ms: config.drain_timeout_ms,
39 delegate_retry_max_attempts: config.delegate_retry_max_attempts,
40 }
41 }
42}
43
44impl Default for MasterComponent {
45 fn default() -> Self {
46 Self::new(MasterComponentConfig::default())
47 }
48}
49
50impl Component for MasterComponent {
51 fn scheme(&self) -> &str {
52 "master"
53 }
54
55 fn create_endpoint(
56 &self,
57 uri: &str,
58 ctx: &dyn ComponentContext,
59 ) -> Result<Box<dyn Endpoint>, CamelError> {
60 let parsed = MasterUriConfig::parse(uri)?;
61 let delegate_parts = parse_uri(&parsed.delegate_uri)?;
62 let delegate_scheme = delegate_parts.scheme;
63 let delegate_component = ctx
64 .resolve_component(&delegate_scheme)
65 .ok_or_else(|| CamelError::ComponentNotFound(delegate_scheme.clone()))?;
66
67 Ok(Box::new(MasterEndpoint {
68 uri: uri.to_string(),
69 lock_name: parsed.lock_name,
70 delegate_uri: parsed.delegate_uri,
71 delegate_component,
72 metrics: ctx.metrics(),
73 platform_service: ctx.platform_service(),
74 drain_timeout: Duration::from_millis(self.drain_timeout_ms),
75 delegate_retry_max_attempts: self.delegate_retry_max_attempts,
76 }))
77 }
78}
79
80struct MasterEndpoint {
81 uri: String,
82 lock_name: String,
83 delegate_uri: String,
84 delegate_component: Arc<dyn Component>,
85 metrics: Arc<dyn MetricsCollector>,
89 platform_service: Arc<dyn PlatformService>,
90 drain_timeout: Duration,
91 delegate_retry_max_attempts: Option<u32>,
92}
93
94impl Endpoint for MasterEndpoint {
95 fn uri(&self) -> &str {
96 &self.uri
97 }
98
99 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
100 Ok(Box::new(MasterConsumer::new(
101 self.lock_name.clone(),
102 self.delegate_uri.clone(),
103 Arc::clone(&self.delegate_component),
104 Arc::clone(&self.metrics),
105 Arc::clone(&self.platform_service),
106 self.drain_timeout,
107 self.delegate_retry_max_attempts,
108 )))
109 }
110
111 fn create_producer(&self, ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
112 let delegate_ctx = MasterDelegateContext {
113 delegate_component: Arc::clone(&self.delegate_component),
114 metrics: Arc::clone(&self.metrics),
115 platform_service: Arc::clone(&self.platform_service),
116 };
117
118 self.delegate_component
119 .create_endpoint(&self.delegate_uri, &delegate_ctx)?
120 .create_producer(ctx)
121 }
122}
123
124struct MasterDelegateContext {
125 delegate_component: Arc<dyn Component>,
126 metrics: Arc<dyn MetricsCollector>,
127 platform_service: Arc<dyn PlatformService>,
128}
129
130impl ComponentContext for MasterDelegateContext {
131 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
132 if self.delegate_component.scheme() == scheme {
133 Some(Arc::clone(&self.delegate_component))
134 } else {
135 None
136 }
137 }
138
139 fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
140 None
141 }
142
143 fn metrics(&self) -> Arc<dyn MetricsCollector> {
144 Arc::clone(&self.metrics)
145 }
146
147 fn platform_service(&self) -> Arc<dyn PlatformService> {
148 Arc::clone(&self.platform_service)
149 }
150}
151
152struct MasterConsumer {
153 lock_name: String,
154 delegate_uri: String,
155 delegate_component: Arc<dyn Component>,
156 metrics: Arc<dyn MetricsCollector>,
159 platform_service: Arc<dyn PlatformService>,
160 drain_timeout: Duration,
161 delegate_retry_max_attempts: Option<u32>,
162 leadership_task: Option<JoinHandle<Result<(), CamelError>>>,
163 stop_token: Option<CancellationToken>,
164}
165
166impl MasterConsumer {
167 fn new(
168 lock_name: String,
169 delegate_uri: String,
170 delegate_component: Arc<dyn Component>,
171 metrics: Arc<dyn MetricsCollector>,
172 platform_service: Arc<dyn PlatformService>,
173 drain_timeout: Duration,
174 delegate_retry_max_attempts: Option<u32>,
175 ) -> Self {
176 Self {
177 lock_name,
178 delegate_uri,
179 delegate_component,
180 metrics,
181 platform_service,
182 drain_timeout,
183 delegate_retry_max_attempts,
184 leadership_task: None,
185 stop_token: None,
186 }
187 }
188}
189
190enum DelegateState {
191 Inactive,
192 Active {
193 run_token: CancellationToken,
194 handle: JoinHandle<Result<(), CamelError>>,
195 },
196}
197
198async fn stop_delegate(
199 state: &mut DelegateState,
200 drain_timeout: Duration,
201) -> Result<(), CamelError> {
202 if let DelegateState::Active {
203 run_token,
204 mut handle,
205 } = std::mem::replace(state, DelegateState::Inactive)
206 {
207 run_token.cancel();
208 match timeout(drain_timeout, &mut handle).await {
209 Ok(Ok(Ok(()))) => {}
210 Ok(Ok(Err(err))) => {
211 return Err(err);
212 }
213 Ok(Err(e)) if e.is_panic() => {
214 error!(error = %e, "master delegate task panicked");
215 return Err(CamelError::ProcessorError(format!(
216 "master delegate task panicked: {e}"
217 )));
218 }
219 Ok(Err(e)) => {
220 warn!(error = %e, "master delegate task cancelled");
221 return Err(CamelError::ProcessorError(format!(
222 "master delegate task cancelled: {e}"
223 )));
224 }
225 Err(_) => {
226 warn!("master delegate shutdown timed out, aborting");
227 handle.abort();
228 }
229 }
230 }
231 Ok(())
232}
233
234struct ReconcileContext<'a> {
235 lock_name: &'a str,
236 delegate_component: &'a Arc<dyn Component>,
237 delegate_uri: &'a str,
238 sender: &'a tokio::sync::mpsc::Sender<ExchangeEnvelope>,
239 parent_cancel: &'a CancellationToken,
240 drain_timeout: Duration,
241 metrics: &'a Arc<dyn MetricsCollector>,
242 platform_service: &'a Arc<dyn PlatformService>,
243}
244
245async fn reconcile_event(
246 event: camel_api::LeadershipEvent,
247 state: &mut DelegateState,
248 ctx: &ReconcileContext<'_>,
249) -> Result<(), CamelError> {
250 match event {
251 camel_api::LeadershipEvent::StartedLeading => {
252 info!(lock = %ctx.lock_name, "master leadership acquired");
253 tracing::info!(lock = %ctx.lock_name, "metrics emission placeholder: leadership acquired");
255 stop_delegate(state, ctx.drain_timeout).await?;
256
257 let delegate_ctx = MasterDelegateContext {
258 delegate_component: Arc::clone(ctx.delegate_component),
259 metrics: Arc::clone(ctx.metrics),
260 platform_service: Arc::clone(ctx.platform_service),
261 };
262
263 let endpoint = match ctx
264 .delegate_component
265 .create_endpoint(ctx.delegate_uri, &delegate_ctx)
266 {
267 Ok(endpoint) => endpoint,
268 Err(err) => {
269 warn!(lock = %ctx.lock_name, "failed to create delegate endpoint: {err}");
270 return Ok(());
271 }
272 };
273
274 let mut consumer = match endpoint.create_consumer() {
275 Ok(consumer) => consumer,
276 Err(err) => {
277 warn!(lock = %ctx.lock_name, "failed to create delegate consumer: {err}");
278 return Ok(());
279 }
280 };
281
282 let run_token = ctx.parent_cancel.child_token();
283 let delegate_ctx = ConsumerContext::new(ctx.sender.clone(), run_token.clone());
284 let handle = tokio::spawn(async move {
285 consumer.start(delegate_ctx).await?;
286 consumer.stop().await?;
287 Ok::<(), CamelError>(())
288 });
289
290 *state = DelegateState::Active { run_token, handle };
291 }
292 camel_api::LeadershipEvent::StoppedLeading => {
293 info!(lock = %ctx.lock_name, "master leadership lost");
294 tracing::info!(lock = %ctx.lock_name, "metrics emission placeholder: leadership lost");
296 stop_delegate(state, ctx.drain_timeout).await?;
297 }
298 }
299 Ok(())
300}
301
302#[async_trait]
303impl Consumer for MasterConsumer {
304 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
305 if self.leadership_task.is_some() {
306 return Ok(());
307 }
308
309 let handle = self
310 .platform_service
311 .leadership()
312 .start(&self.lock_name)
313 .await
314 .map_err(|e| {
315 CamelError::EndpointCreationFailed(format!("failed to start leader election: {e}"))
316 })?;
317
318 let lock_name = self.lock_name.clone();
319 let delegate_uri = self.delegate_uri.clone();
320 let delegate_component = Arc::clone(&self.delegate_component);
321 let metrics = Arc::clone(&self.metrics);
322 let platform_service = Arc::clone(&self.platform_service);
323 let sender = context.sender();
324 let parent_cancel = context.cancel_token();
325 let drain_timeout = self.drain_timeout;
326 let delegate_retry_max_attempts = self.delegate_retry_max_attempts;
327 let mut events = handle.events.clone();
328
329 let stop_token = CancellationToken::new();
330 let stop_token_loop = stop_token.clone();
331 let leadership_handle = handle;
332
333 let task = tokio::spawn(async move {
334 let mut state = DelegateState::Inactive;
335 let mut is_leading = false;
336 let mut delegate_attempts = 0u32;
337 let mut retry_tick = interval(DELEGATE_RETRY_INTERVAL);
338
339 let rctx = ReconcileContext {
340 lock_name: &lock_name,
341 delegate_component: &delegate_component,
342 delegate_uri: &delegate_uri,
343 sender: &sender,
344 parent_cancel: &parent_cancel,
345 drain_timeout,
346 metrics: &metrics,
347 platform_service: &platform_service,
348 };
349
350 let initial_event = { events.borrow().clone() };
351 if let Some(initial_event) = initial_event {
352 is_leading = matches!(&initial_event, camel_api::LeadershipEvent::StartedLeading);
353 if is_leading {
354 delegate_attempts = 0;
355 }
356 if let Err(err) = reconcile_event(initial_event, &mut state, &rctx).await {
357 error!(lock = %lock_name, "master delegate error: {err}");
358 return Err(err);
359 }
360 }
361
362 loop {
363 tokio::select! {
364 _ = stop_token_loop.cancelled() => {
365 break;
366 }
367 _ = context.cancelled() => {
368 break;
369 }
370 changed = events.changed() => {
371 if changed.is_err() {
372 break;
373 }
374 let event = { events.borrow().clone() };
375 if let Some(event) = event {
376 let was_leading = is_leading;
377 is_leading = matches!(&event, camel_api::LeadershipEvent::StartedLeading);
378 if !was_leading && is_leading {
379 delegate_attempts = 0;
380 }
381 if let Err(err) = reconcile_event(event, &mut state, &rctx).await {
382 error!(lock = %lock_name, "master delegate error: {err}");
383 return Err(err);
384 }
385 }
386 }
387 _ = retry_tick.tick() => {
388 if matches!(&state, DelegateState::Active { handle, .. } if handle.is_finished())
389 && let Err(err) = stop_delegate(&mut state, drain_timeout).await
390 {
391 error!(lock = %lock_name, "master delegate task failed: {err}");
392 return Err(err);
393 }
394
395 if is_leading && matches!(state, DelegateState::Inactive) {
396 if let Some(max) = delegate_retry_max_attempts {
397 delegate_attempts = delegate_attempts.saturating_add(1);
398 if delegate_attempts > max {
399 warn!(
400 lock = %lock_name,
401 attempts = max,
402 "delegate start exceeded max attempts, stopping consumer"
403 );
404 break;
405 }
406 }
407 if let Err(err) = reconcile_event(
408 camel_api::LeadershipEvent::StartedLeading,
409 &mut state,
410 &rctx,
411 )
412 .await {
413 error!(lock = %lock_name, "master delegate retry error: {err}");
414 return Err(err);
415 }
416 }
417 }
418 }
419 }
420
421 stop_delegate(&mut state, drain_timeout).await?;
422 let _ = timeout(drain_timeout, leadership_handle.step_down()).await;
423 Ok::<(), CamelError>(())
424 });
425
426 self.stop_token = Some(stop_token);
427 self.leadership_task = Some(task);
428
429 Ok(())
430 }
431
432 async fn stop(&mut self) -> Result<(), CamelError> {
433 if let Some(token) = self.stop_token.take() {
434 token.cancel();
435 }
436
437 if let Some(handle) = self.leadership_task.take() {
438 if handle.is_finished() {
439 match timeout(self.drain_timeout, handle).await {
440 Ok(Ok(Ok(()))) => {}
441 Ok(Ok(Err(err))) => return Err(err),
442 Ok(Err(e)) => {
443 return Err(CamelError::ProcessorError(format!(
444 "leadership task join failed: {e}"
445 )));
446 }
447 Err(_) => {
448 return Err(CamelError::ProcessorError(
449 "leadership task join timed out".to_string(),
450 ));
451 }
452 }
453 return Ok(());
454 }
455
456 handle.abort();
459 match timeout(self.drain_timeout, handle).await {
460 Ok(Ok(Ok(()))) => {}
461 Ok(Ok(Err(err))) => return Err(err),
462 Ok(Err(e)) if e.is_panic() => {
463 error!(lock = %self.lock_name, error = %e, "leadership task panicked");
464 }
465 Ok(Err(e)) => {
466 warn!(lock = %self.lock_name, error = %e, "leadership task cancelled");
467 }
468 Err(_) => {
469 warn!("master leadership loop shutdown timed out after abort");
470 }
471 }
472 }
473
474 Ok(())
475 }
476}
477
478#[cfg(test)]
479mod tests {
480 use std::sync::Arc;
481 use std::sync::Mutex;
482 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
483
484 use camel_api::{
485 BoxProcessorExt, Exchange, LeadershipEvent, LeadershipHandle, LeadershipService, Message,
486 NoOpMetrics, NoopPlatformService, NoopReadinessGate, PlatformError, PlatformIdentity,
487 PlatformService, ReadinessGate,
488 };
489 use camel_component_api::NoOpComponentContext;
490 use std::time::Instant;
491 use tokio::sync::{oneshot, watch};
492 use tokio::time::{sleep, timeout};
493 use tokio_util::sync::CancellationToken;
494 use tower::ServiceExt;
495
496 use super::*;
497
498 #[test]
499 fn parse_master_uri_valid() {
500 let cfg = MasterUriConfig::parse("master:mylock:timer:tick?period=250").unwrap();
501 assert_eq!(cfg.lock_name, "mylock");
502 assert_eq!(cfg.delegate_uri, "timer:tick?period=250");
503 }
504
505 #[test]
506 fn parse_master_uri_missing_lockname() {
507 let err = MasterUriConfig::parse("master::timer:tick").unwrap_err();
508 assert!(matches!(err, CamelError::InvalidUri(_)));
509 }
510
511 #[test]
512 fn parse_master_uri_missing_delegate() {
513 let err = MasterUriConfig::parse("master:mylock:").unwrap_err();
514 assert!(matches!(err, CamelError::InvalidUri(_)));
515 }
516
517 #[test]
518 fn endpoint_fails_when_delegate_component_missing() {
519 let master = MasterComponent::default();
520 let result =
521 master.create_endpoint("master:lock-1:missing:delegate", &NoOpComponentContext);
522 assert!(matches!(result, Err(CamelError::ComponentNotFound(_))));
523 }
524
525 #[test]
526 fn delegate_scheme_is_parsed_from_delegate_uri() {
527 let seen_scheme = Arc::new(AtomicBool::new(false));
528
529 struct SchemeAwareContext {
530 delegate: Arc<dyn Component>,
531 seen_scheme: Arc<AtomicBool>,
532 }
533
534 impl ComponentContext for SchemeAwareContext {
535 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
536 if scheme == "mock" {
537 self.seen_scheme.store(true, Ordering::SeqCst);
538 Some(Arc::clone(&self.delegate))
539 } else {
540 None
541 }
542 }
543
544 fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
545 None
546 }
547
548 fn metrics(&self) -> Arc<dyn MetricsCollector> {
549 Arc::new(NoOpMetrics)
550 }
551
552 fn platform_service(&self) -> Arc<dyn PlatformService> {
553 Arc::new(NoopPlatformService::default())
554 }
555 }
556
557 struct MockDelegateComponent;
558
559 impl Component for MockDelegateComponent {
560 fn scheme(&self) -> &str {
561 "mock"
562 }
563
564 fn create_endpoint(
565 &self,
566 _uri: &str,
567 _ctx: &dyn ComponentContext,
568 ) -> Result<Box<dyn Endpoint>, CamelError> {
569 Ok(Box::new(MockDelegateEndpoint))
570 }
571 }
572
573 struct MockDelegateEndpoint;
574
575 impl Endpoint for MockDelegateEndpoint {
576 fn uri(&self) -> &str {
577 "mock:delegate"
578 }
579
580 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
581 Err(CamelError::EndpointCreationFailed("not used".to_string()))
582 }
583
584 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
585 Err(CamelError::EndpointCreationFailed("not used".to_string()))
586 }
587 }
588
589 let delegate = Arc::new(MockDelegateComponent);
590 let ctx = SchemeAwareContext {
591 delegate,
592 seen_scheme: Arc::clone(&seen_scheme),
593 };
594
595 let master = MasterComponent::default();
596 let endpoint = master
597 .create_endpoint("master:mylock:mock:delegate?x=1", &ctx)
598 .unwrap();
599
600 assert_eq!(endpoint.uri(), "master:mylock:mock:delegate?x=1");
601 assert!(seen_scheme.load(Ordering::SeqCst));
602 }
603
604 struct MockDelegateContext {
605 delegate: Arc<dyn Component>,
606 }
607
608 impl ComponentContext for MockDelegateContext {
609 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
610 if self.delegate.scheme() == scheme {
611 Some(Arc::clone(&self.delegate))
612 } else {
613 None
614 }
615 }
616
617 fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
618 None
619 }
620
621 fn metrics(&self) -> Arc<dyn MetricsCollector> {
622 Arc::new(NoOpMetrics)
623 }
624
625 fn platform_service(&self) -> Arc<dyn PlatformService> {
626 Arc::new(NoopPlatformService::default())
627 }
628 }
629
630 struct MockProducerDelegateComponent {
631 create_endpoint_calls: Arc<AtomicUsize>,
632 create_producer_calls: Arc<AtomicUsize>,
633 fail_producer: bool,
634 }
635
636 impl Component for MockProducerDelegateComponent {
637 fn scheme(&self) -> &str {
638 "mock"
639 }
640
641 fn create_endpoint(
642 &self,
643 _uri: &str,
644 _ctx: &dyn ComponentContext,
645 ) -> Result<Box<dyn Endpoint>, CamelError> {
646 self.create_endpoint_calls.fetch_add(1, Ordering::SeqCst);
647 Ok(Box::new(MockProducerDelegateEndpoint {
648 create_producer_calls: Arc::clone(&self.create_producer_calls),
649 fail_producer: self.fail_producer,
650 }))
651 }
652 }
653
654 struct MockProducerDelegateEndpoint {
655 create_producer_calls: Arc<AtomicUsize>,
656 fail_producer: bool,
657 }
658
659 impl Endpoint for MockProducerDelegateEndpoint {
660 fn uri(&self) -> &str {
661 "mock:delegate"
662 }
663
664 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
665 Err(CamelError::EndpointCreationFailed(
666 "not used in test".to_string(),
667 ))
668 }
669
670 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
671 self.create_producer_calls.fetch_add(1, Ordering::SeqCst);
672 if self.fail_producer {
673 return Err(CamelError::ProcessorError(
674 "delegate producer failed".to_string(),
675 ));
676 }
677 Ok(BoxProcessor::from_fn(
678 |exchange| async move { Ok(exchange) },
679 ))
680 }
681 }
682
683 #[tokio::test]
684 async fn producer_passthrough_delegates_and_produces() {
685 let endpoint_calls = Arc::new(AtomicUsize::new(0));
686 let producer_calls = Arc::new(AtomicUsize::new(0));
687 let delegate = Arc::new(MockProducerDelegateComponent {
688 create_endpoint_calls: Arc::clone(&endpoint_calls),
689 create_producer_calls: Arc::clone(&producer_calls),
690 fail_producer: false,
691 });
692
693 let ctx = MockDelegateContext {
694 delegate: delegate.clone(),
695 };
696
697 let master = MasterComponent::default();
698 let endpoint = master
699 .create_endpoint("master:lock-1:mock:delegate", &ctx)
700 .unwrap();
701 let producer_ctx = ProducerContext::new();
702 let producer = endpoint.create_producer(&producer_ctx).unwrap();
703
704 let exchange = Exchange::new(Message::new("ok"));
705 let result = producer.oneshot(exchange).await.unwrap();
706
707 assert_eq!(result.input.body.as_text(), Some("ok"));
708 assert_eq!(endpoint_calls.load(Ordering::SeqCst), 1);
709 assert_eq!(producer_calls.load(Ordering::SeqCst), 1);
710 }
711
712 #[test]
713 fn producer_passthrough_bubbles_delegate_errors() {
714 let endpoint_calls = Arc::new(AtomicUsize::new(0));
715 let producer_calls = Arc::new(AtomicUsize::new(0));
716 let delegate = Arc::new(MockProducerDelegateComponent {
717 create_endpoint_calls: Arc::clone(&endpoint_calls),
718 create_producer_calls: Arc::clone(&producer_calls),
719 fail_producer: true,
720 });
721
722 let ctx = MockDelegateContext {
723 delegate: delegate.clone(),
724 };
725
726 let master = MasterComponent::default();
727 let endpoint = master
728 .create_endpoint("master:lock-1:mock:delegate", &ctx)
729 .unwrap();
730 let producer_ctx = ProducerContext::new();
731 let err = endpoint.create_producer(&producer_ctx).unwrap_err();
732
733 assert!(matches!(err, CamelError::ProcessorError(_)));
734 assert_eq!(endpoint_calls.load(Ordering::SeqCst), 1);
735 assert_eq!(producer_calls.load(Ordering::SeqCst), 1);
736 }
737
738 struct FakeLeadershipService {
739 tx: Mutex<Option<watch::Sender<Option<LeadershipEvent>>>>,
740 is_leader: Arc<AtomicBool>,
741 initial: Option<LeadershipEvent>,
742 }
743
744 impl FakeLeadershipService {
745 fn new(initial: Option<LeadershipEvent>) -> Self {
746 let starts_as_leader = matches!(initial, Some(LeadershipEvent::StartedLeading));
747 Self {
748 tx: Mutex::new(None),
749 is_leader: Arc::new(AtomicBool::new(starts_as_leader)),
750 initial,
751 }
752 }
753
754 async fn emit(&self, event: LeadershipEvent) {
755 self.is_leader.store(
756 matches!(event, LeadershipEvent::StartedLeading),
757 Ordering::Release,
758 );
759 if let Some(tx) = self
760 .tx
761 .lock()
762 .expect("mutex poisoned: fake elector sender")
763 .as_ref()
764 {
765 let _ = tx.send(Some(event));
766 }
767 }
768 }
769
770 #[async_trait]
771 impl LeadershipService for FakeLeadershipService {
772 async fn start(&self, _lock_name: &str) -> Result<LeadershipHandle, PlatformError> {
773 let (tx, rx) = watch::channel(self.initial.clone());
774 *self.tx.lock().expect("mutex poisoned: fake elector sender") = Some(tx);
775
776 let cancel = CancellationToken::new();
777 let cancel_wait = cancel.clone();
778 let (term_tx, term_rx) = oneshot::channel();
779 tokio::spawn(async move {
780 cancel_wait.cancelled().await;
781 let _ = term_tx.send(());
782 });
783
784 Ok(LeadershipHandle::new(
785 rx,
786 Arc::clone(&self.is_leader),
787 cancel,
788 term_rx,
789 ))
790 }
791 }
792
793 struct FakePlatformService {
794 identity: PlatformIdentity,
795 readiness_gate: Arc<dyn ReadinessGate>,
796 leadership: Arc<dyn LeadershipService>,
797 }
798
799 impl FakePlatformService {
800 fn new(leadership: Arc<dyn LeadershipService>) -> Self {
801 Self {
802 identity: PlatformIdentity::local("master-tests"),
803 readiness_gate: Arc::new(NoopReadinessGate),
804 leadership,
805 }
806 }
807 }
808
809 impl PlatformService for FakePlatformService {
810 fn identity(&self) -> PlatformIdentity {
811 self.identity.clone()
812 }
813
814 fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
815 Arc::clone(&self.readiness_gate)
816 }
817
818 fn leadership(&self) -> Arc<dyn LeadershipService> {
819 Arc::clone(&self.leadership)
820 }
821 }
822
823 struct FakeDelegateComponent {
824 create_consumer_calls: Arc<AtomicUsize>,
825 start_calls: Arc<AtomicUsize>,
826 }
827
828 impl Component for FakeDelegateComponent {
829 fn scheme(&self) -> &str {
830 "fake"
831 }
832
833 fn create_endpoint(
834 &self,
835 _uri: &str,
836 _ctx: &dyn ComponentContext,
837 ) -> Result<Box<dyn Endpoint>, CamelError> {
838 Ok(Box::new(FakeDelegateEndpoint {
839 create_consumer_calls: Arc::clone(&self.create_consumer_calls),
840 start_calls: Arc::clone(&self.start_calls),
841 }))
842 }
843 }
844
845 struct FakeDelegateEndpoint {
846 create_consumer_calls: Arc<AtomicUsize>,
847 start_calls: Arc<AtomicUsize>,
848 }
849
850 impl Endpoint for FakeDelegateEndpoint {
851 fn uri(&self) -> &str {
852 "fake:delegate"
853 }
854
855 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
856 let epoch = self.create_consumer_calls.fetch_add(1, Ordering::SeqCst) + 1;
857 Ok(Box::new(FakeDelegateConsumer {
858 epoch,
859 start_calls: Arc::clone(&self.start_calls),
860 }))
861 }
862
863 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
864 Err(CamelError::EndpointCreationFailed("not used".to_string()))
865 }
866 }
867
868 struct FakeDelegateConsumer {
869 epoch: usize,
870 start_calls: Arc<AtomicUsize>,
871 }
872
873 struct FailingDelegateComponent {
874 create_endpoint_calls: Arc<AtomicUsize>,
875 }
876
877 impl Component for FailingDelegateComponent {
878 fn scheme(&self) -> &str {
879 "failing"
880 }
881
882 fn create_endpoint(
883 &self,
884 _uri: &str,
885 _ctx: &dyn ComponentContext,
886 ) -> Result<Box<dyn Endpoint>, CamelError> {
887 self.create_endpoint_calls.fetch_add(1, Ordering::SeqCst);
888 Err(CamelError::EndpointCreationFailed(
889 "delegate endpoint creation failed".to_string(),
890 ))
891 }
892 }
893
894 #[async_trait]
895 impl Consumer for FakeDelegateConsumer {
896 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
897 self.start_calls.fetch_add(1, Ordering::SeqCst);
898 context
899 .send(Exchange::new(Message::new(format!("epoch-{}", self.epoch))))
900 .await?;
901
902 loop {
903 tokio::select! {
904 _ = context.cancelled() => {
905 break;
906 }
907 _ = sleep(Duration::from_millis(20)) => {
908 context
909 .send(Exchange::new(Message::new(format!("epoch-{}", self.epoch))))
910 .await?;
911 }
912 }
913 }
914
915 Ok(())
916 }
917
918 async fn stop(&mut self) -> Result<(), CamelError> {
919 Ok(())
920 }
921 }
922
923 fn build_master_consumer(
924 platform_service: Arc<dyn PlatformService>,
925 create_consumer_calls: Arc<AtomicUsize>,
926 start_calls: Arc<AtomicUsize>,
927 delegate_retry_max_attempts: Option<u32>,
928 ) -> MasterConsumer {
929 MasterConsumer::new(
930 "lock-a".to_string(),
931 "fake:delegate".to_string(),
932 Arc::new(FakeDelegateComponent {
933 create_consumer_calls,
934 start_calls,
935 }),
936 Arc::new(NoOpMetrics),
937 platform_service,
938 Duration::from_millis(500),
939 delegate_retry_max_attempts,
940 )
941 }
942
943 #[tokio::test]
944 async fn starts_delegate_only_after_started_leading() {
945 let leadership = Arc::new(FakeLeadershipService::new(None));
946 let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
947 let create_consumer_calls = Arc::new(AtomicUsize::new(0));
948 let start_calls = Arc::new(AtomicUsize::new(0));
949 let mut master = build_master_consumer(
950 platform_service,
951 Arc::clone(&create_consumer_calls),
952 Arc::clone(&start_calls),
953 Some(30),
954 );
955
956 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
957 let cancel = CancellationToken::new();
958 let ctx = ConsumerContext::new(tx, cancel.clone());
959
960 master.start(ctx).await.unwrap();
961
962 sleep(Duration::from_millis(80)).await;
963 assert!(rx.try_recv().is_err());
964 assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 0);
965
966 leadership.emit(LeadershipEvent::StartedLeading).await;
967
968 let first = timeout(Duration::from_millis(500), rx.recv())
969 .await
970 .unwrap()
971 .unwrap();
972 assert_eq!(first.exchange.input.body.as_text(), Some("epoch-1"));
973 assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 1);
974 assert_eq!(start_calls.load(Ordering::SeqCst), 1);
975
976 cancel.cancel();
977 master.stop().await.unwrap();
978 }
979
980 #[tokio::test]
981 async fn stops_delegate_on_stopped_leading() {
982 let leadership = Arc::new(FakeLeadershipService::new(None));
983 let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
984 let create_consumer_calls = Arc::new(AtomicUsize::new(0));
985 let start_calls = Arc::new(AtomicUsize::new(0));
986 let mut master = build_master_consumer(
987 platform_service,
988 Arc::clone(&create_consumer_calls),
989 Arc::clone(&start_calls),
990 Some(30),
991 );
992
993 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
994 let cancel = CancellationToken::new();
995 let ctx = ConsumerContext::new(tx, cancel.clone());
996
997 master.start(ctx).await.unwrap();
998 leadership.emit(LeadershipEvent::StartedLeading).await;
999 let _ = timeout(Duration::from_millis(500), rx.recv())
1000 .await
1001 .unwrap()
1002 .unwrap();
1003
1004 leadership.emit(LeadershipEvent::StoppedLeading).await;
1005 sleep(Duration::from_millis(100)).await;
1006 while rx.try_recv().is_ok() {}
1007 assert!(
1008 timeout(Duration::from_millis(120), rx.recv())
1009 .await
1010 .is_err()
1011 );
1012
1013 cancel.cancel();
1014 master.stop().await.unwrap();
1015 }
1016
1017 #[tokio::test]
1018 async fn recreates_delegate_on_new_leadership_epoch() {
1019 let leadership = Arc::new(FakeLeadershipService::new(None));
1020 let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
1021 let create_consumer_calls = Arc::new(AtomicUsize::new(0));
1022 let start_calls = Arc::new(AtomicUsize::new(0));
1023 let mut master = build_master_consumer(
1024 platform_service,
1025 Arc::clone(&create_consumer_calls),
1026 Arc::clone(&start_calls),
1027 Some(30),
1028 );
1029
1030 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1031 let cancel = CancellationToken::new();
1032 let ctx = ConsumerContext::new(tx, cancel.clone());
1033
1034 master.start(ctx).await.unwrap();
1035
1036 leadership.emit(LeadershipEvent::StartedLeading).await;
1037 let first = timeout(Duration::from_millis(500), rx.recv())
1038 .await
1039 .unwrap()
1040 .unwrap();
1041 assert_eq!(first.exchange.input.body.as_text(), Some("epoch-1"));
1042
1043 leadership.emit(LeadershipEvent::StoppedLeading).await;
1044 sleep(Duration::from_millis(120)).await;
1045
1046 leadership.emit(LeadershipEvent::StartedLeading).await;
1047 let second = timeout(Duration::from_millis(500), rx.recv())
1048 .await
1049 .unwrap()
1050 .unwrap();
1051 assert_eq!(second.exchange.input.body.as_text(), Some("epoch-2"));
1052
1053 assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 2);
1054 assert_eq!(start_calls.load(Ordering::SeqCst), 2);
1055
1056 cancel.cancel();
1057 master.stop().await.unwrap();
1058 }
1059
1060 #[tokio::test]
1061 async fn stops_retrying_delegate_start_after_max_attempts() {
1062 let leadership = Arc::new(FakeLeadershipService::new(Some(
1063 LeadershipEvent::StartedLeading,
1064 )));
1065 let platform_service = Arc::new(FakePlatformService::new(leadership));
1066 let create_endpoint_calls = Arc::new(AtomicUsize::new(0));
1067
1068 let mut master = MasterConsumer::new(
1069 "lock-a".to_string(),
1070 "failing:delegate".to_string(),
1071 Arc::new(FailingDelegateComponent {
1072 create_endpoint_calls: Arc::clone(&create_endpoint_calls),
1073 }),
1074 Arc::new(NoOpMetrics),
1075 platform_service,
1076 Duration::from_millis(500),
1077 Some(1),
1078 );
1079
1080 let (tx, _rx) = tokio::sync::mpsc::channel(16);
1081 let cancel = CancellationToken::new();
1082 let ctx = ConsumerContext::new(tx, cancel.clone());
1083
1084 master.start(ctx).await.unwrap();
1085 sleep(Duration::from_millis(750)).await;
1086
1087 assert_eq!(create_endpoint_calls.load(Ordering::SeqCst), 2);
1088
1089 cancel.cancel();
1090 master.stop().await.unwrap();
1091 }
1092
1093 #[tokio::test]
1099 async fn stop_completes_quickly_when_leadership_task_is_slow() {
1100 struct SlowStoppingConsumer;
1102
1103 #[async_trait]
1104 impl Consumer for SlowStoppingConsumer {
1105 async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
1106 ctx.send(Exchange::new(Message::new("slow-start")))
1107 .await
1108 .ok();
1109 sleep(Duration::from_secs(60)).await;
1111 Ok(())
1112 }
1113
1114 async fn stop(&mut self) -> Result<(), CamelError> {
1115 Ok(())
1116 }
1117 }
1118
1119 struct SlowStoppingComponent;
1120
1121 impl Component for SlowStoppingComponent {
1122 fn scheme(&self) -> &str {
1123 "slow"
1124 }
1125
1126 fn create_endpoint(
1127 &self,
1128 _uri: &str,
1129 _ctx: &dyn ComponentContext,
1130 ) -> Result<Box<dyn Endpoint>, CamelError> {
1131 Ok(Box::new(SlowStoppingEndpoint))
1132 }
1133 }
1134
1135 struct SlowStoppingEndpoint;
1136
1137 impl Endpoint for SlowStoppingEndpoint {
1138 fn uri(&self) -> &str {
1139 "slow:delegate"
1140 }
1141
1142 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1143 Ok(Box::new(SlowStoppingConsumer))
1144 }
1145
1146 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1147 Err(CamelError::EndpointCreationFailed("not used".into()))
1148 }
1149 }
1150
1151 let leadership = Arc::new(FakeLeadershipService::new(Some(
1152 LeadershipEvent::StartedLeading,
1153 )));
1154 let platform_service = Arc::new(FakePlatformService::new(leadership));
1155
1156 let mut master = MasterConsumer::new(
1157 "lock-slow".into(),
1158 "slow:delegate".into(),
1159 Arc::new(SlowStoppingComponent),
1160 Arc::new(NoOpMetrics),
1161 platform_service,
1162 Duration::from_millis(500), Some(30),
1164 );
1165
1166 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1167 let cancel = CancellationToken::new();
1168 let ctx = ConsumerContext::new(tx, cancel.clone());
1169
1170 master.start(ctx).await.unwrap();
1171
1172 let msg = timeout(Duration::from_secs(2), rx.recv())
1174 .await
1175 .unwrap()
1176 .unwrap();
1177 assert_eq!(msg.exchange.input.body.as_text(), Some("slow-start"));
1178
1179 let start = Instant::now();
1182 master.stop().await.unwrap();
1183 let elapsed = start.elapsed();
1184
1185 assert!(
1188 elapsed < Duration::from_millis(250),
1189 "stop() took {:?}, expected < 250 ms (abort should be near-instant)",
1190 elapsed,
1191 );
1192
1193 cancel.cancel();
1194 }
1195
1196 #[tokio::test]
1197 async fn stop_propagates_delegate_start_error() {
1198 struct FailingStartConsumer;
1199
1200 #[async_trait]
1201 impl Consumer for FailingStartConsumer {
1202 async fn start(&mut self, _ctx: ConsumerContext) -> Result<(), CamelError> {
1203 Err(CamelError::ProcessorError(
1204 "delegate start failed".to_string(),
1205 ))
1206 }
1207
1208 async fn stop(&mut self) -> Result<(), CamelError> {
1209 Ok(())
1210 }
1211 }
1212
1213 struct FailingStartComponent;
1214
1215 impl Component for FailingStartComponent {
1216 fn scheme(&self) -> &str {
1217 "failstart"
1218 }
1219
1220 fn create_endpoint(
1221 &self,
1222 _uri: &str,
1223 _ctx: &dyn ComponentContext,
1224 ) -> Result<Box<dyn Endpoint>, CamelError> {
1225 Ok(Box::new(FailingStartEndpoint))
1226 }
1227 }
1228
1229 struct FailingStartEndpoint;
1230
1231 impl Endpoint for FailingStartEndpoint {
1232 fn uri(&self) -> &str {
1233 "failstart:delegate"
1234 }
1235
1236 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1237 Ok(Box::new(FailingStartConsumer))
1238 }
1239
1240 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1241 Err(CamelError::EndpointCreationFailed("not used".into()))
1242 }
1243 }
1244
1245 let leadership = Arc::new(FakeLeadershipService::new(Some(
1246 LeadershipEvent::StartedLeading,
1247 )));
1248 let platform_service = Arc::new(FakePlatformService::new(leadership));
1249
1250 let mut master = MasterConsumer::new(
1251 "lock-error".into(),
1252 "failstart:delegate".into(),
1253 Arc::new(FailingStartComponent),
1254 Arc::new(NoOpMetrics),
1255 platform_service,
1256 Duration::from_millis(500),
1257 Some(30),
1258 );
1259
1260 let (tx, _rx) = tokio::sync::mpsc::channel(16);
1261 let cancel = CancellationToken::new();
1262 let ctx = ConsumerContext::new(tx, cancel.clone());
1263
1264 master.start(ctx).await.unwrap();
1265 sleep(Duration::from_millis(250)).await;
1266 assert!(
1267 master
1268 .leadership_task
1269 .as_ref()
1270 .is_some_and(tokio::task::JoinHandle::is_finished),
1271 "leadership task should finish after delegate error"
1272 );
1273 let err = master.stop().await.expect_err("expected delegate error");
1274 assert!(err.to_string().contains("delegate start failed"));
1275
1276 cancel.cancel();
1277 }
1278}