1pub mod bundle;
7pub mod config;
8
9pub use bundle::MasterBundle;
10
11use std::sync::Arc;
12use std::time::Duration;
13
14use async_trait::async_trait;
15use camel_api::{CamelError, MetricsCollector, PlatformService};
16use camel_component_api::{
17 BoxProcessor, Component, ComponentContext, Consumer, ConsumerContext, Endpoint,
18 ExchangeEnvelope, ProducerContext, parse_uri,
19};
20use camel_language_api::Language;
21use tokio::task::JoinHandle;
22use tokio::time::{interval, timeout};
23use tokio_util::sync::CancellationToken;
24use tracing::{error, info, warn};
25
26use crate::config::{MasterComponentConfig, MasterUriConfig};
27
28const DELEGATE_RETRY_INTERVAL: Duration = Duration::from_millis(200);
29
30pub struct MasterComponent {
31 drain_timeout_ms: u64,
32 delegate_retry_max_attempts: Option<u32>,
33}
34
35impl MasterComponent {
36 pub fn new(config: MasterComponentConfig) -> Self {
37 Self {
38 drain_timeout_ms: config.drain_timeout_ms,
39 delegate_retry_max_attempts: config.delegate_retry_max_attempts,
40 }
41 }
42}
43
44impl Default for MasterComponent {
45 fn default() -> Self {
46 Self::new(MasterComponentConfig::default())
47 }
48}
49
50impl Component for MasterComponent {
51 fn scheme(&self) -> &str {
52 "master"
53 }
54
55 fn create_endpoint(
56 &self,
57 uri: &str,
58 ctx: &dyn ComponentContext,
59 ) -> Result<Box<dyn Endpoint>, CamelError> {
60 let parsed = MasterUriConfig::parse(uri)?;
61 let delegate_parts = parse_uri(&parsed.delegate_uri)?;
62 let delegate_scheme = delegate_parts.scheme;
63 let delegate_component = ctx
64 .resolve_component(&delegate_scheme)
65 .ok_or_else(|| CamelError::ComponentNotFound(delegate_scheme.clone()))?;
66
67 Ok(Box::new(MasterEndpoint {
68 uri: uri.to_string(),
69 lock_name: parsed.lock_name,
70 delegate_uri: parsed.delegate_uri,
71 delegate_component,
72 metrics: ctx.metrics(),
73 platform_service: ctx.platform_service(),
74 drain_timeout: Duration::from_millis(self.drain_timeout_ms),
75 delegate_retry_max_attempts: self.delegate_retry_max_attempts,
76 }))
77 }
78}
79
80struct MasterEndpoint {
81 uri: String,
82 lock_name: String,
83 delegate_uri: String,
84 delegate_component: Arc<dyn Component>,
85 metrics: Arc<dyn MetricsCollector>,
89 platform_service: Arc<dyn PlatformService>,
90 drain_timeout: Duration,
91 delegate_retry_max_attempts: Option<u32>,
92}
93
94impl Endpoint for MasterEndpoint {
95 fn uri(&self) -> &str {
96 &self.uri
97 }
98
99 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
100 Ok(Box::new(MasterConsumer::new(
101 self.lock_name.clone(),
102 self.delegate_uri.clone(),
103 Arc::clone(&self.delegate_component),
104 Arc::clone(&self.metrics),
105 Arc::clone(&self.platform_service),
106 self.drain_timeout,
107 self.delegate_retry_max_attempts,
108 )))
109 }
110
111 fn create_producer(&self, ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
112 let delegate_ctx = MasterDelegateContext {
113 delegate_component: Arc::clone(&self.delegate_component),
114 metrics: Arc::clone(&self.metrics),
115 platform_service: Arc::clone(&self.platform_service),
116 };
117
118 self.delegate_component
119 .create_endpoint(&self.delegate_uri, &delegate_ctx)?
120 .create_producer(ctx)
121 }
122}
123
124struct MasterDelegateContext {
125 delegate_component: Arc<dyn Component>,
126 metrics: Arc<dyn MetricsCollector>,
127 platform_service: Arc<dyn PlatformService>,
128}
129
130impl ComponentContext for MasterDelegateContext {
131 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
132 if self.delegate_component.scheme() == scheme {
133 Some(Arc::clone(&self.delegate_component))
134 } else {
135 None
136 }
137 }
138
139 fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
140 None
141 }
142
143 fn metrics(&self) -> Arc<dyn MetricsCollector> {
144 Arc::clone(&self.metrics)
145 }
146
147 fn platform_service(&self) -> Arc<dyn PlatformService> {
148 Arc::clone(&self.platform_service)
149 }
150
151 fn register_route_health_check(
152 &self,
153 _route_id: &str,
154 _check: Arc<dyn camel_api::AsyncHealthCheck>,
155 ) {
156 }
157
158 fn unregister_route_health_check(&self, _route_id: &str) {}
159}
160
161struct MasterConsumer {
162 lock_name: String,
163 delegate_uri: String,
164 delegate_component: Arc<dyn Component>,
165 metrics: Arc<dyn MetricsCollector>,
168 platform_service: Arc<dyn PlatformService>,
169 drain_timeout: Duration,
170 delegate_retry_max_attempts: Option<u32>,
171 leadership_task: Option<JoinHandle<Result<(), CamelError>>>,
172 stop_token: Option<CancellationToken>,
173}
174
175impl MasterConsumer {
176 fn new(
177 lock_name: String,
178 delegate_uri: String,
179 delegate_component: Arc<dyn Component>,
180 metrics: Arc<dyn MetricsCollector>,
181 platform_service: Arc<dyn PlatformService>,
182 drain_timeout: Duration,
183 delegate_retry_max_attempts: Option<u32>,
184 ) -> Self {
185 Self {
186 lock_name,
187 delegate_uri,
188 delegate_component,
189 metrics,
190 platform_service,
191 drain_timeout,
192 delegate_retry_max_attempts,
193 leadership_task: None,
194 stop_token: None,
195 }
196 }
197}
198
199enum DelegateState {
200 Inactive,
201 Active {
202 run_token: CancellationToken,
203 handle: JoinHandle<Result<(), CamelError>>,
204 },
205}
206
207async fn stop_delegate(
208 state: &mut DelegateState,
209 drain_timeout: Duration,
210) -> Result<(), CamelError> {
211 if let DelegateState::Active {
212 run_token,
213 mut handle,
214 } = std::mem::replace(state, DelegateState::Inactive)
215 {
216 run_token.cancel();
217 match timeout(drain_timeout, &mut handle).await {
218 Ok(Ok(Ok(()))) => {}
219 Ok(Ok(Err(err))) => {
220 return Err(err);
221 }
222 Ok(Err(e)) if e.is_panic() => {
223 error!(error = %e, "master delegate task panicked");
224 return Err(CamelError::ProcessorError(format!(
225 "master delegate task panicked: {e}"
226 )));
227 }
228 Ok(Err(e)) => {
229 warn!(error = %e, "master delegate task cancelled");
230 return Err(CamelError::ProcessorError(format!(
231 "master delegate task cancelled: {e}"
232 )));
233 }
234 Err(_) => {
235 warn!("master delegate shutdown timed out, aborting");
236 handle.abort();
237 }
238 }
239 }
240 Ok(())
241}
242
243struct ReconcileContext<'a> {
244 lock_name: &'a str,
245 delegate_component: &'a Arc<dyn Component>,
246 delegate_uri: &'a str,
247 sender: &'a tokio::sync::mpsc::Sender<ExchangeEnvelope>,
248 parent_cancel: &'a CancellationToken,
249 drain_timeout: Duration,
250 metrics: &'a Arc<dyn MetricsCollector>,
251 platform_service: &'a Arc<dyn PlatformService>,
252}
253
254async fn reconcile_event(
255 event: camel_api::LeadershipEvent,
256 state: &mut DelegateState,
257 ctx: &ReconcileContext<'_>,
258) -> Result<(), CamelError> {
259 match event {
260 camel_api::LeadershipEvent::StartedLeading => {
261 info!(lock = %ctx.lock_name, "master leadership acquired");
262 tracing::info!(lock = %ctx.lock_name, "metrics emission placeholder: leadership acquired");
264 stop_delegate(state, ctx.drain_timeout).await?;
265
266 let delegate_ctx = MasterDelegateContext {
267 delegate_component: Arc::clone(ctx.delegate_component),
268 metrics: Arc::clone(ctx.metrics),
269 platform_service: Arc::clone(ctx.platform_service),
270 };
271
272 let endpoint = match ctx
273 .delegate_component
274 .create_endpoint(ctx.delegate_uri, &delegate_ctx)
275 {
276 Ok(endpoint) => endpoint,
277 Err(err) => {
278 warn!(lock = %ctx.lock_name, "failed to create delegate endpoint: {err}");
279 return Ok(());
280 }
281 };
282
283 let mut consumer = match endpoint.create_consumer() {
284 Ok(consumer) => consumer,
285 Err(err) => {
286 warn!(lock = %ctx.lock_name, "failed to create delegate consumer: {err}");
287 return Ok(());
288 }
289 };
290
291 let run_token = ctx.parent_cancel.child_token();
292 let delegate_ctx = ConsumerContext::new(ctx.sender.clone(), run_token.clone());
293 let handle = tokio::spawn(async move {
294 consumer.start(delegate_ctx).await?;
295 consumer.stop().await?;
296 Ok::<(), CamelError>(())
297 });
298
299 *state = DelegateState::Active { run_token, handle };
300 }
301 camel_api::LeadershipEvent::StoppedLeading => {
302 info!(lock = %ctx.lock_name, "master leadership lost");
303 tracing::info!(lock = %ctx.lock_name, "metrics emission placeholder: leadership lost");
305 stop_delegate(state, ctx.drain_timeout).await?;
306 }
307 }
308 Ok(())
309}
310
311#[async_trait]
312impl Consumer for MasterConsumer {
313 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
314 if self.leadership_task.is_some() {
315 return Ok(());
316 }
317
318 let handle = self
319 .platform_service
320 .leadership()
321 .start(&self.lock_name)
322 .await
323 .map_err(|e| {
324 CamelError::EndpointCreationFailed(format!("failed to start leader election: {e}"))
325 })?;
326
327 let lock_name = self.lock_name.clone();
328 let delegate_uri = self.delegate_uri.clone();
329 let delegate_component = Arc::clone(&self.delegate_component);
330 let metrics = Arc::clone(&self.metrics);
331 let platform_service = Arc::clone(&self.platform_service);
332 let sender = context.sender();
333 let parent_cancel = context.cancel_token();
334 let drain_timeout = self.drain_timeout;
335 let delegate_retry_max_attempts = self.delegate_retry_max_attempts;
336 let mut events = handle.events.clone();
337
338 let stop_token = CancellationToken::new();
339 let stop_token_loop = stop_token.clone();
340 let leadership_handle = handle;
341
342 let task = tokio::spawn(async move {
343 let mut state = DelegateState::Inactive;
344 let mut is_leading = false;
345 let mut delegate_attempts = 0u32;
346 let mut retry_tick = interval(DELEGATE_RETRY_INTERVAL);
347
348 let rctx = ReconcileContext {
349 lock_name: &lock_name,
350 delegate_component: &delegate_component,
351 delegate_uri: &delegate_uri,
352 sender: &sender,
353 parent_cancel: &parent_cancel,
354 drain_timeout,
355 metrics: &metrics,
356 platform_service: &platform_service,
357 };
358
359 let initial_event = { events.borrow().clone() };
360 if let Some(initial_event) = initial_event {
361 is_leading = matches!(&initial_event, camel_api::LeadershipEvent::StartedLeading);
362 if is_leading {
363 delegate_attempts = 0;
364 }
365 if let Err(err) = reconcile_event(initial_event, &mut state, &rctx).await {
366 error!(lock = %lock_name, "master delegate error: {err}");
367 return Err(err);
368 }
369 }
370
371 loop {
372 tokio::select! {
373 _ = stop_token_loop.cancelled() => {
374 break;
375 }
376 _ = context.cancelled() => {
377 break;
378 }
379 changed = events.changed() => {
380 if changed.is_err() {
381 break;
382 }
383 let event = { events.borrow().clone() };
384 if let Some(event) = event {
385 let was_leading = is_leading;
386 is_leading = matches!(&event, camel_api::LeadershipEvent::StartedLeading);
387 if !was_leading && is_leading {
388 delegate_attempts = 0;
389 }
390 if let Err(err) = reconcile_event(event, &mut state, &rctx).await {
391 error!(lock = %lock_name, "master delegate error: {err}");
392 return Err(err);
393 }
394 }
395 }
396 _ = retry_tick.tick() => {
397 if matches!(&state, DelegateState::Active { handle, .. } if handle.is_finished())
398 && let Err(err) = stop_delegate(&mut state, drain_timeout).await
399 {
400 error!(lock = %lock_name, "master delegate task failed: {err}");
401 return Err(err);
402 }
403
404 if is_leading && matches!(state, DelegateState::Inactive) {
405 if let Some(max) = delegate_retry_max_attempts {
406 delegate_attempts = delegate_attempts.saturating_add(1);
407 if delegate_attempts > max {
408 warn!(
409 lock = %lock_name,
410 attempts = max,
411 "delegate start exceeded max attempts, stopping consumer"
412 );
413 break;
414 }
415 }
416 if let Err(err) = reconcile_event(
417 camel_api::LeadershipEvent::StartedLeading,
418 &mut state,
419 &rctx,
420 )
421 .await {
422 error!(lock = %lock_name, "master delegate retry error: {err}");
423 return Err(err);
424 }
425 }
426 }
427 }
428 }
429
430 stop_delegate(&mut state, drain_timeout).await?;
431 let _ = timeout(drain_timeout, leadership_handle.step_down()).await;
432 Ok::<(), CamelError>(())
433 });
434
435 self.stop_token = Some(stop_token);
436 self.leadership_task = Some(task);
437
438 Ok(())
439 }
440
441 async fn stop(&mut self) -> Result<(), CamelError> {
442 if let Some(token) = self.stop_token.take() {
443 token.cancel();
444 }
445
446 if let Some(handle) = self.leadership_task.take() {
447 if handle.is_finished() {
448 match timeout(self.drain_timeout, handle).await {
449 Ok(Ok(Ok(()))) => {}
450 Ok(Ok(Err(err))) => return Err(err),
451 Ok(Err(e)) => {
452 return Err(CamelError::ProcessorError(format!(
453 "leadership task join failed: {e}"
454 )));
455 }
456 Err(_) => {
457 return Err(CamelError::ProcessorError(
458 "leadership task join timed out".to_string(),
459 ));
460 }
461 }
462 return Ok(());
463 }
464
465 handle.abort();
468 match timeout(self.drain_timeout, handle).await {
469 Ok(Ok(Ok(()))) => {}
470 Ok(Ok(Err(err))) => return Err(err),
471 Ok(Err(e)) if e.is_panic() => {
472 error!(lock = %self.lock_name, error = %e, "leadership task panicked");
473 }
474 Ok(Err(e)) => {
475 warn!(lock = %self.lock_name, error = %e, "leadership task cancelled");
476 }
477 Err(_) => {
478 warn!("master leadership loop shutdown timed out after abort");
479 }
480 }
481 }
482
483 Ok(())
484 }
485
486 fn background_task_handle(
487 &mut self,
488 ) -> Option<tokio::task::JoinHandle<Result<(), CamelError>>> {
489 self.leadership_task.take()
490 }
491}
492
493#[cfg(test)]
494mod tests {
495 use std::sync::Arc;
496 use std::sync::Mutex;
497 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
498
499 use camel_api::{
500 BoxProcessorExt, Exchange, LeadershipEvent, LeadershipHandle, LeadershipService, Message,
501 NoOpMetrics, NoopPlatformService, NoopReadinessGate, PlatformError, PlatformIdentity,
502 PlatformService, ReadinessGate,
503 };
504 use camel_component_api::NoOpComponentContext;
505 use std::time::Instant;
506 use tokio::sync::{oneshot, watch};
507 use tokio::time::{sleep, timeout};
508 use tokio_util::sync::CancellationToken;
509 use tower::ServiceExt;
510
511 use super::*;
512
513 #[test]
514 fn parse_master_uri_valid() {
515 let cfg = MasterUriConfig::parse("master:mylock:timer:tick?period=250").unwrap();
516 assert_eq!(cfg.lock_name, "mylock");
517 assert_eq!(cfg.delegate_uri, "timer:tick?period=250");
518 }
519
520 #[test]
521 fn parse_master_uri_missing_lockname() {
522 let err = MasterUriConfig::parse("master::timer:tick").unwrap_err();
523 assert!(matches!(err, CamelError::InvalidUri(_)));
524 }
525
526 #[test]
527 fn parse_master_uri_missing_delegate() {
528 let err = MasterUriConfig::parse("master:mylock:").unwrap_err();
529 assert!(matches!(err, CamelError::InvalidUri(_)));
530 }
531
532 #[test]
533 fn endpoint_fails_when_delegate_component_missing() {
534 let master = MasterComponent::default();
535 let result =
536 master.create_endpoint("master:lock-1:missing:delegate", &NoOpComponentContext);
537 assert!(matches!(result, Err(CamelError::ComponentNotFound(_))));
538 }
539
540 #[test]
541 fn delegate_scheme_is_parsed_from_delegate_uri() {
542 let seen_scheme = Arc::new(AtomicBool::new(false));
543
544 struct SchemeAwareContext {
545 delegate: Arc<dyn Component>,
546 seen_scheme: Arc<AtomicBool>,
547 }
548
549 impl ComponentContext for SchemeAwareContext {
550 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
551 if scheme == "mock" {
552 self.seen_scheme.store(true, Ordering::SeqCst);
553 Some(Arc::clone(&self.delegate))
554 } else {
555 None
556 }
557 }
558
559 fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
560 None
561 }
562
563 fn metrics(&self) -> Arc<dyn MetricsCollector> {
564 Arc::new(NoOpMetrics)
565 }
566
567 fn platform_service(&self) -> Arc<dyn PlatformService> {
568 Arc::new(NoopPlatformService::default())
569 }
570
571 fn register_route_health_check(
572 &self,
573 _route_id: &str,
574 _check: Arc<dyn camel_api::AsyncHealthCheck>,
575 ) {
576 }
577
578 fn unregister_route_health_check(&self, _route_id: &str) {}
579 }
580
581 struct MockDelegateComponent;
582
583 impl Component for MockDelegateComponent {
584 fn scheme(&self) -> &str {
585 "mock"
586 }
587
588 fn create_endpoint(
589 &self,
590 _uri: &str,
591 _ctx: &dyn ComponentContext,
592 ) -> Result<Box<dyn Endpoint>, CamelError> {
593 Ok(Box::new(MockDelegateEndpoint))
594 }
595 }
596
597 struct MockDelegateEndpoint;
598
599 impl Endpoint for MockDelegateEndpoint {
600 fn uri(&self) -> &str {
601 "mock:delegate"
602 }
603
604 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
605 Err(CamelError::EndpointCreationFailed("not used".to_string()))
606 }
607
608 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
609 Err(CamelError::EndpointCreationFailed("not used".to_string()))
610 }
611 }
612
613 let delegate = Arc::new(MockDelegateComponent);
614 let ctx = SchemeAwareContext {
615 delegate,
616 seen_scheme: Arc::clone(&seen_scheme),
617 };
618
619 let master = MasterComponent::default();
620 let endpoint = master
621 .create_endpoint("master:mylock:mock:delegate?x=1", &ctx)
622 .unwrap();
623
624 assert_eq!(endpoint.uri(), "master:mylock:mock:delegate?x=1");
625 assert!(seen_scheme.load(Ordering::SeqCst));
626 }
627
628 struct MockDelegateContext {
629 delegate: Arc<dyn Component>,
630 }
631
632 impl ComponentContext for MockDelegateContext {
633 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
634 if self.delegate.scheme() == scheme {
635 Some(Arc::clone(&self.delegate))
636 } else {
637 None
638 }
639 }
640
641 fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
642 None
643 }
644
645 fn metrics(&self) -> Arc<dyn MetricsCollector> {
646 Arc::new(NoOpMetrics)
647 }
648
649 fn platform_service(&self) -> Arc<dyn PlatformService> {
650 Arc::new(NoopPlatformService::default())
651 }
652
653 fn register_route_health_check(
654 &self,
655 _route_id: &str,
656 _check: Arc<dyn camel_api::AsyncHealthCheck>,
657 ) {
658 }
659
660 fn unregister_route_health_check(&self, _route_id: &str) {}
661 }
662
663 struct MockProducerDelegateComponent {
664 create_endpoint_calls: Arc<AtomicUsize>,
665 create_producer_calls: Arc<AtomicUsize>,
666 fail_producer: bool,
667 }
668
669 impl Component for MockProducerDelegateComponent {
670 fn scheme(&self) -> &str {
671 "mock"
672 }
673
674 fn create_endpoint(
675 &self,
676 _uri: &str,
677 _ctx: &dyn ComponentContext,
678 ) -> Result<Box<dyn Endpoint>, CamelError> {
679 self.create_endpoint_calls.fetch_add(1, Ordering::SeqCst);
680 Ok(Box::new(MockProducerDelegateEndpoint {
681 create_producer_calls: Arc::clone(&self.create_producer_calls),
682 fail_producer: self.fail_producer,
683 }))
684 }
685 }
686
687 struct MockProducerDelegateEndpoint {
688 create_producer_calls: Arc<AtomicUsize>,
689 fail_producer: bool,
690 }
691
692 impl Endpoint for MockProducerDelegateEndpoint {
693 fn uri(&self) -> &str {
694 "mock:delegate"
695 }
696
697 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
698 Err(CamelError::EndpointCreationFailed(
699 "not used in test".to_string(),
700 ))
701 }
702
703 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
704 self.create_producer_calls.fetch_add(1, Ordering::SeqCst);
705 if self.fail_producer {
706 return Err(CamelError::ProcessorError(
707 "delegate producer failed".to_string(),
708 ));
709 }
710 Ok(BoxProcessor::from_fn(
711 |exchange| async move { Ok(exchange) },
712 ))
713 }
714 }
715
716 #[tokio::test]
717 async fn producer_passthrough_delegates_and_produces() {
718 let endpoint_calls = Arc::new(AtomicUsize::new(0));
719 let producer_calls = Arc::new(AtomicUsize::new(0));
720 let delegate = Arc::new(MockProducerDelegateComponent {
721 create_endpoint_calls: Arc::clone(&endpoint_calls),
722 create_producer_calls: Arc::clone(&producer_calls),
723 fail_producer: false,
724 });
725
726 let ctx = MockDelegateContext {
727 delegate: delegate.clone(),
728 };
729
730 let master = MasterComponent::default();
731 let endpoint = master
732 .create_endpoint("master:lock-1:mock:delegate", &ctx)
733 .unwrap();
734 let producer_ctx = ProducerContext::new();
735 let producer = endpoint.create_producer(&producer_ctx).unwrap();
736
737 let exchange = Exchange::new(Message::new("ok"));
738 let result = producer.oneshot(exchange).await.unwrap();
739
740 assert_eq!(result.input.body.as_text(), Some("ok"));
741 assert_eq!(endpoint_calls.load(Ordering::SeqCst), 1);
742 assert_eq!(producer_calls.load(Ordering::SeqCst), 1);
743 }
744
745 #[test]
746 fn producer_passthrough_bubbles_delegate_errors() {
747 let endpoint_calls = Arc::new(AtomicUsize::new(0));
748 let producer_calls = Arc::new(AtomicUsize::new(0));
749 let delegate = Arc::new(MockProducerDelegateComponent {
750 create_endpoint_calls: Arc::clone(&endpoint_calls),
751 create_producer_calls: Arc::clone(&producer_calls),
752 fail_producer: true,
753 });
754
755 let ctx = MockDelegateContext {
756 delegate: delegate.clone(),
757 };
758
759 let master = MasterComponent::default();
760 let endpoint = master
761 .create_endpoint("master:lock-1:mock:delegate", &ctx)
762 .unwrap();
763 let producer_ctx = ProducerContext::new();
764 let err = endpoint.create_producer(&producer_ctx).unwrap_err();
765
766 assert!(matches!(err, CamelError::ProcessorError(_)));
767 assert_eq!(endpoint_calls.load(Ordering::SeqCst), 1);
768 assert_eq!(producer_calls.load(Ordering::SeqCst), 1);
769 }
770
771 struct FakeLeadershipService {
772 tx: Mutex<Option<watch::Sender<Option<LeadershipEvent>>>>,
773 is_leader: Arc<AtomicBool>,
774 initial: Option<LeadershipEvent>,
775 }
776
777 impl FakeLeadershipService {
778 fn new(initial: Option<LeadershipEvent>) -> Self {
779 let starts_as_leader = matches!(initial, Some(LeadershipEvent::StartedLeading));
780 Self {
781 tx: Mutex::new(None),
782 is_leader: Arc::new(AtomicBool::new(starts_as_leader)),
783 initial,
784 }
785 }
786
787 async fn emit(&self, event: LeadershipEvent) {
788 self.is_leader.store(
789 matches!(event, LeadershipEvent::StartedLeading),
790 Ordering::Release,
791 );
792 if let Some(tx) = self
793 .tx
794 .lock()
795 .expect("mutex poisoned: fake elector sender")
796 .as_ref()
797 {
798 let _ = tx.send(Some(event));
799 }
800 }
801 }
802
803 #[async_trait]
804 impl LeadershipService for FakeLeadershipService {
805 async fn start(&self, _lock_name: &str) -> Result<LeadershipHandle, PlatformError> {
806 let (tx, rx) = watch::channel(self.initial.clone());
807 *self.tx.lock().expect("mutex poisoned: fake elector sender") = Some(tx);
808
809 let cancel = CancellationToken::new();
810 let cancel_wait = cancel.clone();
811 let (term_tx, term_rx) = oneshot::channel();
812 tokio::spawn(async move {
813 cancel_wait.cancelled().await;
814 let _ = term_tx.send(());
815 });
816
817 Ok(LeadershipHandle::new(
818 rx,
819 Arc::clone(&self.is_leader),
820 cancel,
821 term_rx,
822 ))
823 }
824 }
825
826 struct FakePlatformService {
827 identity: PlatformIdentity,
828 readiness_gate: Arc<dyn ReadinessGate>,
829 leadership: Arc<dyn LeadershipService>,
830 }
831
832 impl FakePlatformService {
833 fn new(leadership: Arc<dyn LeadershipService>) -> Self {
834 Self {
835 identity: PlatformIdentity::local("master-tests"),
836 readiness_gate: Arc::new(NoopReadinessGate),
837 leadership,
838 }
839 }
840 }
841
842 impl PlatformService for FakePlatformService {
843 fn identity(&self) -> PlatformIdentity {
844 self.identity.clone()
845 }
846
847 fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
848 Arc::clone(&self.readiness_gate)
849 }
850
851 fn leadership(&self) -> Arc<dyn LeadershipService> {
852 Arc::clone(&self.leadership)
853 }
854 }
855
856 struct FakeDelegateComponent {
857 create_consumer_calls: Arc<AtomicUsize>,
858 start_calls: Arc<AtomicUsize>,
859 }
860
861 impl Component for FakeDelegateComponent {
862 fn scheme(&self) -> &str {
863 "fake"
864 }
865
866 fn create_endpoint(
867 &self,
868 _uri: &str,
869 _ctx: &dyn ComponentContext,
870 ) -> Result<Box<dyn Endpoint>, CamelError> {
871 Ok(Box::new(FakeDelegateEndpoint {
872 create_consumer_calls: Arc::clone(&self.create_consumer_calls),
873 start_calls: Arc::clone(&self.start_calls),
874 }))
875 }
876 }
877
878 struct FakeDelegateEndpoint {
879 create_consumer_calls: Arc<AtomicUsize>,
880 start_calls: Arc<AtomicUsize>,
881 }
882
883 impl Endpoint for FakeDelegateEndpoint {
884 fn uri(&self) -> &str {
885 "fake:delegate"
886 }
887
888 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
889 let epoch = self.create_consumer_calls.fetch_add(1, Ordering::SeqCst) + 1;
890 Ok(Box::new(FakeDelegateConsumer {
891 epoch,
892 start_calls: Arc::clone(&self.start_calls),
893 }))
894 }
895
896 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
897 Err(CamelError::EndpointCreationFailed("not used".to_string()))
898 }
899 }
900
901 struct FakeDelegateConsumer {
902 epoch: usize,
903 start_calls: Arc<AtomicUsize>,
904 }
905
906 struct FailingDelegateComponent {
907 create_endpoint_calls: Arc<AtomicUsize>,
908 }
909
910 impl Component for FailingDelegateComponent {
911 fn scheme(&self) -> &str {
912 "failing"
913 }
914
915 fn create_endpoint(
916 &self,
917 _uri: &str,
918 _ctx: &dyn ComponentContext,
919 ) -> Result<Box<dyn Endpoint>, CamelError> {
920 self.create_endpoint_calls.fetch_add(1, Ordering::SeqCst);
921 Err(CamelError::EndpointCreationFailed(
922 "delegate endpoint creation failed".to_string(),
923 ))
924 }
925 }
926
927 #[async_trait]
928 impl Consumer for FakeDelegateConsumer {
929 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
930 self.start_calls.fetch_add(1, Ordering::SeqCst);
931 context
932 .send(Exchange::new(Message::new(format!("epoch-{}", self.epoch))))
933 .await?;
934
935 loop {
936 tokio::select! {
937 _ = context.cancelled() => {
938 break;
939 }
940 _ = sleep(Duration::from_millis(20)) => {
941 context
942 .send(Exchange::new(Message::new(format!("epoch-{}", self.epoch))))
943 .await?;
944 }
945 }
946 }
947
948 Ok(())
949 }
950
951 async fn stop(&mut self) -> Result<(), CamelError> {
952 Ok(())
953 }
954 }
955
956 fn build_master_consumer(
957 platform_service: Arc<dyn PlatformService>,
958 create_consumer_calls: Arc<AtomicUsize>,
959 start_calls: Arc<AtomicUsize>,
960 delegate_retry_max_attempts: Option<u32>,
961 ) -> MasterConsumer {
962 MasterConsumer::new(
963 "lock-a".to_string(),
964 "fake:delegate".to_string(),
965 Arc::new(FakeDelegateComponent {
966 create_consumer_calls,
967 start_calls,
968 }),
969 Arc::new(NoOpMetrics),
970 platform_service,
971 Duration::from_millis(500),
972 delegate_retry_max_attempts,
973 )
974 }
975
976 #[tokio::test]
977 async fn starts_delegate_only_after_started_leading() {
978 let leadership = Arc::new(FakeLeadershipService::new(None));
979 let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
980 let create_consumer_calls = Arc::new(AtomicUsize::new(0));
981 let start_calls = Arc::new(AtomicUsize::new(0));
982 let mut master = build_master_consumer(
983 platform_service,
984 Arc::clone(&create_consumer_calls),
985 Arc::clone(&start_calls),
986 Some(30),
987 );
988
989 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
990 let cancel = CancellationToken::new();
991 let ctx = ConsumerContext::new(tx, cancel.clone());
992
993 master.start(ctx).await.unwrap();
994
995 sleep(Duration::from_millis(80)).await;
996 assert!(rx.try_recv().is_err());
997 assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 0);
998
999 leadership.emit(LeadershipEvent::StartedLeading).await;
1000
1001 let first = timeout(Duration::from_millis(500), rx.recv())
1002 .await
1003 .unwrap()
1004 .unwrap();
1005 assert_eq!(first.exchange.input.body.as_text(), Some("epoch-1"));
1006 assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 1);
1007 assert_eq!(start_calls.load(Ordering::SeqCst), 1);
1008
1009 cancel.cancel();
1010 master.stop().await.unwrap();
1011 }
1012
1013 #[tokio::test]
1014 async fn stops_delegate_on_stopped_leading() {
1015 let leadership = Arc::new(FakeLeadershipService::new(None));
1016 let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
1017 let create_consumer_calls = Arc::new(AtomicUsize::new(0));
1018 let start_calls = Arc::new(AtomicUsize::new(0));
1019 let mut master = build_master_consumer(
1020 platform_service,
1021 Arc::clone(&create_consumer_calls),
1022 Arc::clone(&start_calls),
1023 Some(30),
1024 );
1025
1026 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1027 let cancel = CancellationToken::new();
1028 let ctx = ConsumerContext::new(tx, cancel.clone());
1029
1030 master.start(ctx).await.unwrap();
1031 leadership.emit(LeadershipEvent::StartedLeading).await;
1032 let _ = timeout(Duration::from_millis(500), rx.recv())
1033 .await
1034 .unwrap()
1035 .unwrap();
1036
1037 leadership.emit(LeadershipEvent::StoppedLeading).await;
1038 sleep(Duration::from_millis(100)).await;
1039 while rx.try_recv().is_ok() {}
1040 assert!(
1041 timeout(Duration::from_millis(120), rx.recv())
1042 .await
1043 .is_err()
1044 );
1045
1046 cancel.cancel();
1047 master.stop().await.unwrap();
1048 }
1049
1050 #[tokio::test]
1051 async fn recreates_delegate_on_new_leadership_epoch() {
1052 let leadership = Arc::new(FakeLeadershipService::new(None));
1053 let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
1054 let create_consumer_calls = Arc::new(AtomicUsize::new(0));
1055 let start_calls = Arc::new(AtomicUsize::new(0));
1056 let mut master = build_master_consumer(
1057 platform_service,
1058 Arc::clone(&create_consumer_calls),
1059 Arc::clone(&start_calls),
1060 Some(30),
1061 );
1062
1063 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1064 let cancel = CancellationToken::new();
1065 let ctx = ConsumerContext::new(tx, cancel.clone());
1066
1067 master.start(ctx).await.unwrap();
1068
1069 leadership.emit(LeadershipEvent::StartedLeading).await;
1070 let first = timeout(Duration::from_millis(500), rx.recv())
1071 .await
1072 .unwrap()
1073 .unwrap();
1074 assert_eq!(first.exchange.input.body.as_text(), Some("epoch-1"));
1075
1076 leadership.emit(LeadershipEvent::StoppedLeading).await;
1077 sleep(Duration::from_millis(120)).await;
1078
1079 leadership.emit(LeadershipEvent::StartedLeading).await;
1080 let second = timeout(Duration::from_millis(500), rx.recv())
1081 .await
1082 .unwrap()
1083 .unwrap();
1084 assert_eq!(second.exchange.input.body.as_text(), Some("epoch-2"));
1085
1086 assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 2);
1087 assert_eq!(start_calls.load(Ordering::SeqCst), 2);
1088
1089 cancel.cancel();
1090 master.stop().await.unwrap();
1091 }
1092
1093 #[tokio::test]
1094 async fn stops_retrying_delegate_start_after_max_attempts() {
1095 let leadership = Arc::new(FakeLeadershipService::new(Some(
1096 LeadershipEvent::StartedLeading,
1097 )));
1098 let platform_service = Arc::new(FakePlatformService::new(leadership));
1099 let create_endpoint_calls = Arc::new(AtomicUsize::new(0));
1100
1101 let mut master = MasterConsumer::new(
1102 "lock-a".to_string(),
1103 "failing:delegate".to_string(),
1104 Arc::new(FailingDelegateComponent {
1105 create_endpoint_calls: Arc::clone(&create_endpoint_calls),
1106 }),
1107 Arc::new(NoOpMetrics),
1108 platform_service,
1109 Duration::from_millis(500),
1110 Some(1),
1111 );
1112
1113 let (tx, _rx) = tokio::sync::mpsc::channel(16);
1114 let cancel = CancellationToken::new();
1115 let ctx = ConsumerContext::new(tx, cancel.clone());
1116
1117 master.start(ctx).await.unwrap();
1118 sleep(Duration::from_millis(750)).await;
1119
1120 assert_eq!(create_endpoint_calls.load(Ordering::SeqCst), 2);
1121
1122 cancel.cancel();
1123 master.stop().await.unwrap();
1124 }
1125
1126 #[tokio::test]
1132 async fn stop_completes_quickly_when_leadership_task_is_slow() {
1133 struct SlowStoppingConsumer;
1135
1136 #[async_trait]
1137 impl Consumer for SlowStoppingConsumer {
1138 async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
1139 ctx.send(Exchange::new(Message::new("slow-start")))
1140 .await
1141 .ok();
1142 sleep(Duration::from_secs(60)).await;
1144 Ok(())
1145 }
1146
1147 async fn stop(&mut self) -> Result<(), CamelError> {
1148 Ok(())
1149 }
1150 }
1151
1152 struct SlowStoppingComponent;
1153
1154 impl Component for SlowStoppingComponent {
1155 fn scheme(&self) -> &str {
1156 "slow"
1157 }
1158
1159 fn create_endpoint(
1160 &self,
1161 _uri: &str,
1162 _ctx: &dyn ComponentContext,
1163 ) -> Result<Box<dyn Endpoint>, CamelError> {
1164 Ok(Box::new(SlowStoppingEndpoint))
1165 }
1166 }
1167
1168 struct SlowStoppingEndpoint;
1169
1170 impl Endpoint for SlowStoppingEndpoint {
1171 fn uri(&self) -> &str {
1172 "slow:delegate"
1173 }
1174
1175 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1176 Ok(Box::new(SlowStoppingConsumer))
1177 }
1178
1179 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1180 Err(CamelError::EndpointCreationFailed("not used".into()))
1181 }
1182 }
1183
1184 let leadership = Arc::new(FakeLeadershipService::new(Some(
1185 LeadershipEvent::StartedLeading,
1186 )));
1187 let platform_service = Arc::new(FakePlatformService::new(leadership));
1188
1189 let mut master = MasterConsumer::new(
1190 "lock-slow".into(),
1191 "slow:delegate".into(),
1192 Arc::new(SlowStoppingComponent),
1193 Arc::new(NoOpMetrics),
1194 platform_service,
1195 Duration::from_millis(500), Some(30),
1197 );
1198
1199 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1200 let cancel = CancellationToken::new();
1201 let ctx = ConsumerContext::new(tx, cancel.clone());
1202
1203 master.start(ctx).await.unwrap();
1204
1205 let msg = timeout(Duration::from_secs(2), rx.recv())
1207 .await
1208 .unwrap()
1209 .unwrap();
1210 assert_eq!(msg.exchange.input.body.as_text(), Some("slow-start"));
1211
1212 let start = Instant::now();
1215 master.stop().await.unwrap();
1216 let elapsed = start.elapsed();
1217
1218 assert!(
1221 elapsed < Duration::from_millis(250),
1222 "stop() took {:?}, expected < 250 ms (abort should be near-instant)",
1223 elapsed,
1224 );
1225
1226 cancel.cancel();
1227 }
1228
1229 #[tokio::test]
1230 async fn stop_propagates_delegate_start_error() {
1231 struct FailingStartConsumer;
1232
1233 #[async_trait]
1234 impl Consumer for FailingStartConsumer {
1235 async fn start(&mut self, _ctx: ConsumerContext) -> Result<(), CamelError> {
1236 Err(CamelError::ProcessorError(
1237 "delegate start failed".to_string(),
1238 ))
1239 }
1240
1241 async fn stop(&mut self) -> Result<(), CamelError> {
1242 Ok(())
1243 }
1244 }
1245
1246 struct FailingStartComponent;
1247
1248 impl Component for FailingStartComponent {
1249 fn scheme(&self) -> &str {
1250 "failstart"
1251 }
1252
1253 fn create_endpoint(
1254 &self,
1255 _uri: &str,
1256 _ctx: &dyn ComponentContext,
1257 ) -> Result<Box<dyn Endpoint>, CamelError> {
1258 Ok(Box::new(FailingStartEndpoint))
1259 }
1260 }
1261
1262 struct FailingStartEndpoint;
1263
1264 impl Endpoint for FailingStartEndpoint {
1265 fn uri(&self) -> &str {
1266 "failstart:delegate"
1267 }
1268
1269 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
1270 Ok(Box::new(FailingStartConsumer))
1271 }
1272
1273 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
1274 Err(CamelError::EndpointCreationFailed("not used".into()))
1275 }
1276 }
1277
1278 let leadership = Arc::new(FakeLeadershipService::new(Some(
1279 LeadershipEvent::StartedLeading,
1280 )));
1281 let platform_service = Arc::new(FakePlatformService::new(leadership));
1282
1283 let mut master = MasterConsumer::new(
1284 "lock-error".into(),
1285 "failstart:delegate".into(),
1286 Arc::new(FailingStartComponent),
1287 Arc::new(NoOpMetrics),
1288 platform_service,
1289 Duration::from_millis(500),
1290 Some(30),
1291 );
1292
1293 let (tx, _rx) = tokio::sync::mpsc::channel(16);
1294 let cancel = CancellationToken::new();
1295 let ctx = ConsumerContext::new(tx, cancel.clone());
1296
1297 master.start(ctx).await.unwrap();
1298 sleep(Duration::from_millis(250)).await;
1299 assert!(
1300 master
1301 .leadership_task
1302 .as_ref()
1303 .is_some_and(tokio::task::JoinHandle::is_finished),
1304 "leadership task should finish after delegate error"
1305 );
1306 let err = master.stop().await.expect_err("expected delegate error");
1307 assert!(err.to_string().contains("delegate start failed"));
1308
1309 cancel.cancel();
1310 }
1311}