1pub mod bundle;
7pub mod config;
8
9pub use bundle::MasterBundle;
10
11use std::sync::Arc;
12use std::time::Duration;
13
14use async_trait::async_trait;
15use camel_api::{CamelError, MetricsCollector, PlatformService};
16use camel_component_api::{
17 BoxProcessor, Component, ComponentContext, Consumer, ConsumerContext, Endpoint,
18 ExchangeEnvelope, NetworkRetryPolicy, ProducerContext, is_retryable_camel_error, parse_uri,
19};
20use camel_language_api::Language;
21use tokio::task::JoinHandle;
22use tokio::time::{interval, timeout};
23use tokio_util::sync::CancellationToken;
24use tracing::{error, info, warn};
25
26use crate::config::{MasterComponentConfig, MasterUriConfig};
27
28const DELEGATE_RETRY_INTERVAL: Duration = Duration::from_millis(200);
29
30pub struct MasterComponent {
31 drain_timeout_ms: u64,
32 reconnect: NetworkRetryPolicy,
34}
35
36impl MasterComponent {
37 pub fn new(config: MasterComponentConfig) -> Self {
38 let mut reconnect = config.reconnect;
42 if let Some(max) = config.delegate_retry_max_attempts
43 && reconnect.max_attempts == 0
44 {
45 reconnect.max_attempts = max;
46 }
47 Self {
48 drain_timeout_ms: config.drain_timeout_ms,
49 reconnect,
50 }
51 }
52}
53
54impl Default for MasterComponent {
55 fn default() -> Self {
56 Self::new(MasterComponentConfig::default())
57 }
58}
59
60impl Component for MasterComponent {
61 fn scheme(&self) -> &str {
62 "master"
63 }
64
65 fn create_endpoint(
66 &self,
67 uri: &str,
68 ctx: &dyn ComponentContext,
69 ) -> Result<Box<dyn Endpoint>, CamelError> {
70 let parsed = MasterUriConfig::parse(uri)?;
71 let delegate_parts = parse_uri(&parsed.delegate_uri)?;
72 let delegate_scheme = delegate_parts.scheme;
73 let delegate_component = ctx
74 .resolve_component(&delegate_scheme)
75 .ok_or_else(|| CamelError::ComponentNotFound(delegate_scheme.clone()))?;
76
77 Ok(Box::new(MasterEndpoint {
78 uri: uri.to_string(),
79 lock_name: parsed.lock_name,
80 delegate_uri: parsed.delegate_uri,
81 delegate_component,
82 metrics: ctx.metrics(),
83 platform_service: ctx.platform_service(),
84 drain_timeout: Duration::from_millis(self.drain_timeout_ms),
85 reconnect: self.reconnect.clone(),
86 }))
87 }
88}
89
90struct MasterEndpoint {
91 uri: String,
92 lock_name: String,
93 delegate_uri: String,
94 delegate_component: Arc<dyn Component>,
95 metrics: Arc<dyn MetricsCollector>,
99 platform_service: Arc<dyn PlatformService>,
100 drain_timeout: Duration,
101 reconnect: NetworkRetryPolicy,
102}
103
104impl Endpoint for MasterEndpoint {
105 fn uri(&self) -> &str {
106 &self.uri
107 }
108
109 fn create_consumer(
110 &self,
111 rt: Arc<dyn camel_component_api::RuntimeObservability>,
112 ) -> Result<Box<dyn Consumer>, CamelError> {
113 Ok(Box::new(MasterConsumer::new(
114 self.lock_name.clone(),
115 self.delegate_uri.clone(),
116 Arc::clone(&self.delegate_component),
117 Arc::clone(&self.metrics),
118 Arc::clone(&self.platform_service),
119 self.drain_timeout,
120 self.reconnect.clone(),
121 rt,
122 )))
123 }
124
125 fn create_producer(
126 &self,
127 rt: Arc<dyn camel_component_api::RuntimeObservability>,
128 ctx: &ProducerContext,
129 ) -> Result<BoxProcessor, CamelError> {
130 let delegate_ctx = MasterDelegateContext {
131 delegate_component: Arc::clone(&self.delegate_component),
132 metrics: Arc::clone(&self.metrics),
133 platform_service: Arc::clone(&self.platform_service),
134 };
135
136 self.delegate_component
137 .create_endpoint(&self.delegate_uri, &delegate_ctx)?
138 .create_producer(rt, ctx)
139 }
140}
141
142struct MasterDelegateContext {
143 delegate_component: Arc<dyn Component>,
144 metrics: Arc<dyn MetricsCollector>,
145 platform_service: Arc<dyn PlatformService>,
146}
147
148impl ComponentContext for MasterDelegateContext {
149 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
150 if self.delegate_component.scheme() == scheme {
151 Some(Arc::clone(&self.delegate_component))
152 } else {
153 None
154 }
155 }
156
157 fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
158 None
159 }
160
161 fn metrics(&self) -> Arc<dyn MetricsCollector> {
162 Arc::clone(&self.metrics)
163 }
164
165 fn platform_service(&self) -> Arc<dyn PlatformService> {
166 Arc::clone(&self.platform_service)
167 }
168
169 fn register_route_health_check(
170 &self,
171 _route_id: &str,
172 _check: Arc<dyn camel_api::AsyncHealthCheck>,
173 ) {
174 }
175
176 fn unregister_route_health_check(&self, _route_id: &str) {}
177}
178
179struct MasterConsumer {
180 lock_name: String,
181 delegate_uri: String,
182 delegate_component: Arc<dyn Component>,
183 metrics: Arc<dyn MetricsCollector>,
186 platform_service: Arc<dyn PlatformService>,
187 drain_timeout: Duration,
188 reconnect: NetworkRetryPolicy,
189 leadership_task: Option<JoinHandle<Result<(), CamelError>>>,
190 stop_token: Option<CancellationToken>,
191 runtime: Arc<dyn camel_component_api::RuntimeObservability>,
192}
193
194impl MasterConsumer {
195 #[allow(clippy::too_many_arguments)]
196 fn new(
197 lock_name: String,
198 delegate_uri: String,
199 delegate_component: Arc<dyn Component>,
200 metrics: Arc<dyn MetricsCollector>,
201 platform_service: Arc<dyn PlatformService>,
202 drain_timeout: Duration,
203 reconnect: NetworkRetryPolicy,
204 runtime: Arc<dyn camel_component_api::RuntimeObservability>,
205 ) -> Self {
206 Self {
207 lock_name,
208 delegate_uri,
209 delegate_component,
210 metrics,
211 platform_service,
212 drain_timeout,
213 reconnect,
214 leadership_task: None,
215 stop_token: None,
216 runtime,
217 }
218 }
219}
220
221enum DelegateState {
222 Inactive,
223 Active {
224 run_token: CancellationToken,
225 handle: JoinHandle<Result<(), CamelError>>,
226 },
227}
228
229async fn stop_delegate(
230 state: &mut DelegateState,
231 drain_timeout: Duration,
232) -> Result<(), CamelError> {
233 if let DelegateState::Active {
234 run_token,
235 mut handle,
236 } = std::mem::replace(state, DelegateState::Inactive)
237 {
238 run_token.cancel();
239 match timeout(drain_timeout, &mut handle).await {
240 Ok(Ok(Ok(()))) => {}
241 Ok(Ok(Err(err))) => {
242 return Err(err);
243 }
244 Ok(Err(e)) if e.is_panic() => {
245 error!(error = %e, "master delegate task panicked");
247 return Err(CamelError::ProcessorError(format!(
248 "master delegate task panicked: {e}"
249 )));
250 }
251 Ok(Err(e)) => {
252 warn!(error = %e, "master delegate task cancelled");
253 return Err(CamelError::ProcessorError(format!(
254 "master delegate task cancelled: {e}"
255 )));
256 }
257 Err(_) => {
258 warn!("master delegate shutdown timed out, aborting");
259 handle.abort();
260 }
261 }
262 }
263 Ok(())
264}
265
266struct ReconcileContext<'a> {
267 lock_name: &'a str,
268 delegate_component: &'a Arc<dyn Component>,
269 delegate_uri: &'a str,
270 sender: &'a tokio::sync::mpsc::Sender<ExchangeEnvelope>,
271 parent_cancel: &'a CancellationToken,
272 drain_timeout: Duration,
273 metrics: &'a Arc<dyn MetricsCollector>,
274 platform_service: &'a Arc<dyn PlatformService>,
275 runtime: Arc<dyn camel_component_api::RuntimeObservability>,
276}
277
278async fn reconcile_event(
279 event: camel_api::LeadershipEvent,
280 state: &mut DelegateState,
281 ctx: &ReconcileContext<'_>,
282) -> Result<(), CamelError> {
283 match event {
284 camel_api::LeadershipEvent::StartedLeading => {
285 info!(lock = %ctx.lock_name, "master leadership acquired");
286 tracing::info!(lock = %ctx.lock_name, "metrics emission placeholder: leadership acquired");
288 stop_delegate(state, ctx.drain_timeout).await?;
289
290 let delegate_ctx = MasterDelegateContext {
291 delegate_component: Arc::clone(ctx.delegate_component),
292 metrics: Arc::clone(ctx.metrics),
293 platform_service: Arc::clone(ctx.platform_service),
294 };
295
296 let endpoint = match ctx
297 .delegate_component
298 .create_endpoint(ctx.delegate_uri, &delegate_ctx)
299 {
300 Ok(endpoint) => endpoint,
301 Err(err) => {
302 if is_retryable_camel_error(&err) {
303 warn!(lock = %ctx.lock_name, error = %err, "transient delegate endpoint error (will retry)");
304 return Ok(()); }
306 return Err(err); }
308 };
309
310 let mut consumer = match endpoint.create_consumer(Arc::clone(&ctx.runtime)) {
311 Ok(consumer) => consumer,
312 Err(err) => {
313 if is_retryable_camel_error(&err) {
314 warn!(lock = %ctx.lock_name, error = %err, "transient delegate consumer error (will retry)");
315 return Ok(()); }
317 return Err(err); }
319 };
320
321 let run_token = ctx.parent_cancel.child_token();
322 let delegate_ctx = ConsumerContext::new(ctx.sender.clone(), run_token.clone());
323 let handle = tokio::spawn(async move {
324 consumer.start(delegate_ctx).await?;
325 consumer.stop().await?;
326 Ok::<(), CamelError>(())
327 });
328
329 *state = DelegateState::Active { run_token, handle };
330 }
331 camel_api::LeadershipEvent::StoppedLeading => {
332 info!(lock = %ctx.lock_name, "master leadership lost");
333 tracing::info!(lock = %ctx.lock_name, "metrics emission placeholder: leadership lost");
335 stop_delegate(state, ctx.drain_timeout).await?;
336 }
337 }
338 Ok(())
339}
340
341#[async_trait]
345impl Consumer for MasterConsumer {
346 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
347 if self.leadership_task.is_some() {
348 return Ok(());
349 }
350
351 let handle = self
352 .platform_service
353 .leadership()
354 .start(&self.lock_name)
355 .await
356 .map_err(|e| {
357 CamelError::EndpointCreationFailed(format!("failed to start leader election: {e}"))
358 })?;
359
360 let lock_name = self.lock_name.clone();
361 let delegate_uri = self.delegate_uri.clone();
362 let delegate_component = Arc::clone(&self.delegate_component);
363 let metrics = Arc::clone(&self.metrics);
364 let platform_service = Arc::clone(&self.platform_service);
365 let sender = context.sender();
366 let parent_cancel = context.cancel_token();
367 let drain_timeout = self.drain_timeout;
368 let reconnect = self.reconnect.clone();
369 let runtime = Arc::clone(&self.runtime);
370 let mut events = handle.events.clone();
371
372 let stop_token = CancellationToken::new();
373 let stop_token_loop = stop_token.clone();
374 let leadership_handle = handle;
375
376 let task = tokio::spawn(async move {
377 let mut state = DelegateState::Inactive;
378 let mut is_leading = false;
379 let mut delegate_attempts = 0u32;
380 let mut retry_tick = interval(DELEGATE_RETRY_INTERVAL);
381
382 let rctx = ReconcileContext {
383 lock_name: &lock_name,
384 delegate_component: &delegate_component,
385 delegate_uri: &delegate_uri,
386 sender: &sender,
387 parent_cancel: &parent_cancel,
388 drain_timeout,
389 metrics: &metrics,
390 platform_service: &platform_service,
391 runtime: Arc::clone(&runtime),
392 };
393
394 let initial_event = { events.borrow().clone() };
395 if let Some(initial_event) = initial_event {
396 is_leading = matches!(&initial_event, camel_api::LeadershipEvent::StartedLeading);
397 if is_leading {
398 delegate_attempts = 0;
399 }
400 if let Err(err) = reconcile_event(initial_event, &mut state, &rctx).await {
401 error!(lock = %lock_name, "master delegate error: {err}");
403 return Err(err);
404 }
405 }
406
407 loop {
408 tokio::select! {
409 _ = stop_token_loop.cancelled() => {
410 break;
411 }
412 _ = context.cancelled() => {
413 break;
414 }
415 changed = events.changed() => {
416 if changed.is_err() {
417 break;
418 }
419 let event = { events.borrow().clone() };
420 if let Some(event) = event {
421 let was_leading = is_leading;
422 is_leading = matches!(&event, camel_api::LeadershipEvent::StartedLeading);
423 if !was_leading && is_leading {
424 delegate_attempts = 0;
425 }
426 if let Err(err) = reconcile_event(event, &mut state, &rctx).await {
427 error!(lock = %lock_name, "master delegate error: {err}");
429 return Err(err);
430 }
431 }
432 }
433 _ = retry_tick.tick() => {
434 if matches!(&state, DelegateState::Active { handle, .. } if handle.is_finished())
435 && let Err(err) = stop_delegate(&mut state, drain_timeout).await
436 {
437 error!(lock = %lock_name, "master delegate task failed: {err}");
439 return Err(err);
440 }
441
442 if is_leading && matches!(state, DelegateState::Inactive) {
443 if !reconnect.should_retry(delegate_attempts) {
458 warn!(
459 lock = %lock_name,
460 attempts = delegate_attempts,
461 "delegate start exceeded max attempts, stopping consumer"
462 );
463 break;
464 }
465 if delegate_attempts > 0 {
467 let delay = reconnect.delay_for(delegate_attempts - 1);
468 if delay > DELEGATE_RETRY_INTERVAL {
469 tokio::select! {
470 _ = stop_token_loop.cancelled() => break,
471 _ = tokio::time::sleep(delay.saturating_sub(DELEGATE_RETRY_INTERVAL)) => {}
472 }
473 }
474 }
475 delegate_attempts = delegate_attempts.saturating_add(1);
476 if let Err(err) = reconcile_event(
477 camel_api::LeadershipEvent::StartedLeading,
478 &mut state,
479 &rctx,
480 )
481 .await {
482 if is_retryable_camel_error(&err) {
483 error!(
485 lock = %lock_name,
486 error = %err,
487 attempt = delegate_attempts,
488 "master delegate transient error, will retry"
489 );
490 } else {
492 error!(
494 lock = %lock_name,
495 error = %err,
496 "master delegate permanent error, terminating"
497 );
498 return Err(err);
499 }
500 }
501 }
502 }
503 }
504 }
505
506 stop_delegate(&mut state, drain_timeout).await?;
507 let _ = timeout(drain_timeout, leadership_handle.step_down()).await;
508 Ok::<(), CamelError>(())
509 });
510
511 self.stop_token = Some(stop_token);
512 self.leadership_task = Some(task);
513
514 Ok(())
515 }
516
517 async fn stop(&mut self) -> Result<(), CamelError> {
518 if let Some(token) = self.stop_token.take() {
519 token.cancel();
520 }
521
522 if let Some(handle) = self.leadership_task.take() {
523 if handle.is_finished() {
524 match timeout(self.drain_timeout, handle).await {
525 Ok(Ok(Ok(()))) => {}
526 Ok(Ok(Err(err))) => return Err(err),
527 Ok(Err(e)) => {
528 return Err(CamelError::ProcessorError(format!(
529 "leadership task join failed: {e}"
530 )));
531 }
532 Err(_) => {
533 return Err(CamelError::ProcessorError(
534 "leadership task join timed out".to_string(),
535 ));
536 }
537 }
538 return Ok(());
539 }
540
541 handle.abort();
544 match timeout(self.drain_timeout, handle).await {
545 Ok(Ok(Ok(()))) => {}
546 Ok(Ok(Err(err))) => return Err(err),
547 Ok(Err(e)) if e.is_panic() => {
548 error!(lock = %self.lock_name, error = %e, "leadership task panicked");
550 }
551 Ok(Err(e)) => {
552 warn!(lock = %self.lock_name, error = %e, "leadership task cancelled");
553 }
554 Err(_) => {
555 warn!("master leadership loop shutdown timed out after abort");
556 }
557 }
558 }
559
560 Ok(())
561 }
562
563 fn background_task_handle(
564 &mut self,
565 ) -> Option<tokio::task::JoinHandle<Result<(), CamelError>>> {
566 self.leadership_task.take()
567 }
568}
569
570#[cfg(test)]
571mod tests {
572 use std::sync::Arc;
573 use std::sync::Mutex;
574 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
575
576 use camel_api::{
577 BoxProcessorExt, Exchange, LeadershipEvent, LeadershipHandle, LeadershipService, Message,
578 NoOpMetrics, NoopPlatformService, NoopReadinessGate, PlatformError, PlatformIdentity,
579 PlatformService, ReadinessGate,
580 };
581 use camel_component_api::NoOpComponentContext;
582 use camel_component_api::test_support::PanicRuntimeObservability;
583 use std::time::Instant;
584 use tokio::sync::{oneshot, watch};
585 use tokio::time::{sleep, timeout};
586 use tokio_util::sync::CancellationToken;
587 use tower::ServiceExt;
588
589 use super::*;
590
591 #[test]
592 fn parse_master_uri_valid() {
593 let cfg = MasterUriConfig::parse("master:mylock:timer:tick?period=250").unwrap();
594 assert_eq!(cfg.lock_name, "mylock");
595 assert_eq!(cfg.delegate_uri, "timer:tick?period=250");
596 }
597
598 #[test]
599 fn parse_master_uri_missing_lockname() {
600 let err = MasterUriConfig::parse("master::timer:tick").unwrap_err();
601 assert!(matches!(err, CamelError::InvalidUri(_)));
602 }
603
604 #[test]
605 fn parse_master_uri_missing_delegate() {
606 let err = MasterUriConfig::parse("master:mylock:").unwrap_err();
607 assert!(matches!(err, CamelError::InvalidUri(_)));
608 }
609
610 #[test]
611 fn endpoint_fails_when_delegate_component_missing() {
612 let master = MasterComponent::default();
613 let result =
614 master.create_endpoint("master:lock-1:missing:delegate", &NoOpComponentContext);
615 assert!(matches!(result, Err(CamelError::ComponentNotFound(_))));
616 }
617
618 #[test]
619 fn delegate_scheme_is_parsed_from_delegate_uri() {
620 let seen_scheme = Arc::new(AtomicBool::new(false));
621
622 struct SchemeAwareContext {
623 delegate: Arc<dyn Component>,
624 seen_scheme: Arc<AtomicBool>,
625 }
626
627 impl ComponentContext for SchemeAwareContext {
628 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
629 if scheme == "mock" {
630 self.seen_scheme.store(true, Ordering::SeqCst);
631 Some(Arc::clone(&self.delegate))
632 } else {
633 None
634 }
635 }
636
637 fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
638 None
639 }
640
641 fn metrics(&self) -> Arc<dyn MetricsCollector> {
642 Arc::new(NoOpMetrics)
643 }
644
645 fn platform_service(&self) -> Arc<dyn PlatformService> {
646 Arc::new(NoopPlatformService::default())
647 }
648
649 fn register_route_health_check(
650 &self,
651 _route_id: &str,
652 _check: Arc<dyn camel_api::AsyncHealthCheck>,
653 ) {
654 }
655
656 fn unregister_route_health_check(&self, _route_id: &str) {}
657 }
658
659 struct MockDelegateComponent;
660
661 impl Component for MockDelegateComponent {
662 fn scheme(&self) -> &str {
663 "mock"
664 }
665
666 fn create_endpoint(
667 &self,
668 _uri: &str,
669 _ctx: &dyn ComponentContext,
670 ) -> Result<Box<dyn Endpoint>, CamelError> {
671 Ok(Box::new(MockDelegateEndpoint))
672 }
673 }
674
675 struct MockDelegateEndpoint;
676
677 impl Endpoint for MockDelegateEndpoint {
678 fn uri(&self) -> &str {
679 "mock:delegate"
680 }
681
682 fn create_consumer(
683 &self,
684 _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
685 ) -> Result<Box<dyn Consumer>, CamelError> {
686 Err(CamelError::EndpointCreationFailed("not used".to_string()))
687 }
688
689 fn create_producer(
690 &self,
691 _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
692 _ctx: &ProducerContext,
693 ) -> Result<BoxProcessor, CamelError> {
694 Err(CamelError::EndpointCreationFailed("not used".to_string()))
695 }
696 }
697
698 let delegate = Arc::new(MockDelegateComponent);
699 let ctx = SchemeAwareContext {
700 delegate,
701 seen_scheme: Arc::clone(&seen_scheme),
702 };
703
704 let master = MasterComponent::default();
705 let endpoint = master
706 .create_endpoint("master:mylock:mock:delegate?x=1", &ctx)
707 .unwrap();
708
709 assert_eq!(endpoint.uri(), "master:mylock:mock:delegate?x=1");
710 assert!(seen_scheme.load(Ordering::SeqCst));
711 }
712
713 struct MockDelegateContext {
714 delegate: Arc<dyn Component>,
715 }
716
717 impl ComponentContext for MockDelegateContext {
718 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
719 if self.delegate.scheme() == scheme {
720 Some(Arc::clone(&self.delegate))
721 } else {
722 None
723 }
724 }
725
726 fn resolve_language(&self, _name: &str) -> Option<Arc<dyn Language>> {
727 None
728 }
729
730 fn metrics(&self) -> Arc<dyn MetricsCollector> {
731 Arc::new(NoOpMetrics)
732 }
733
734 fn platform_service(&self) -> Arc<dyn PlatformService> {
735 Arc::new(NoopPlatformService::default())
736 }
737
738 fn register_route_health_check(
739 &self,
740 _route_id: &str,
741 _check: Arc<dyn camel_api::AsyncHealthCheck>,
742 ) {
743 }
744
745 fn unregister_route_health_check(&self, _route_id: &str) {}
746 }
747
748 struct MockProducerDelegateComponent {
749 create_endpoint_calls: Arc<AtomicUsize>,
750 create_producer_calls: Arc<AtomicUsize>,
751 fail_producer: bool,
752 }
753
754 impl Component for MockProducerDelegateComponent {
755 fn scheme(&self) -> &str {
756 "mock"
757 }
758
759 fn create_endpoint(
760 &self,
761 _uri: &str,
762 _ctx: &dyn ComponentContext,
763 ) -> Result<Box<dyn Endpoint>, CamelError> {
764 self.create_endpoint_calls.fetch_add(1, Ordering::SeqCst);
765 Ok(Box::new(MockProducerDelegateEndpoint {
766 create_producer_calls: Arc::clone(&self.create_producer_calls),
767 fail_producer: self.fail_producer,
768 }))
769 }
770 }
771
772 struct MockProducerDelegateEndpoint {
773 create_producer_calls: Arc<AtomicUsize>,
774 fail_producer: bool,
775 }
776
777 impl Endpoint for MockProducerDelegateEndpoint {
778 fn uri(&self) -> &str {
779 "mock:delegate"
780 }
781
782 fn create_consumer(
783 &self,
784 _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
785 ) -> Result<Box<dyn Consumer>, CamelError> {
786 Err(CamelError::EndpointCreationFailed(
787 "not used in test".to_string(),
788 ))
789 }
790
791 fn create_producer(
792 &self,
793 _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
794 _ctx: &ProducerContext,
795 ) -> Result<BoxProcessor, CamelError> {
796 self.create_producer_calls.fetch_add(1, Ordering::SeqCst);
797 if self.fail_producer {
798 return Err(CamelError::ProcessorError(
799 "delegate producer failed".to_string(),
800 ));
801 }
802 Ok(BoxProcessor::from_fn(
803 |exchange| async move { Ok(exchange) },
804 ))
805 }
806 }
807
808 #[tokio::test]
809 async fn producer_passthrough_delegates_and_produces() {
810 let endpoint_calls = Arc::new(AtomicUsize::new(0));
811 let producer_calls = Arc::new(AtomicUsize::new(0));
812 let delegate = Arc::new(MockProducerDelegateComponent {
813 create_endpoint_calls: Arc::clone(&endpoint_calls),
814 create_producer_calls: Arc::clone(&producer_calls),
815 fail_producer: false,
816 });
817
818 let ctx = MockDelegateContext {
819 delegate: delegate.clone(),
820 };
821
822 let master = MasterComponent::default();
823 let endpoint = master
824 .create_endpoint("master:lock-1:mock:delegate", &ctx)
825 .unwrap();
826 let producer_ctx = ProducerContext::new();
827 let producer = endpoint
828 .create_producer(
829 Arc::new(PanicRuntimeObservability)
830 as Arc<dyn camel_component_api::RuntimeObservability>,
831 &producer_ctx,
832 )
833 .unwrap();
834
835 let exchange = Exchange::new(Message::new("ok"));
836 let result = producer.oneshot(exchange).await.unwrap();
837
838 assert_eq!(result.input.body.as_text(), Some("ok"));
839 assert_eq!(endpoint_calls.load(Ordering::SeqCst), 1);
840 assert_eq!(producer_calls.load(Ordering::SeqCst), 1);
841 }
842
843 #[test]
844 fn producer_passthrough_bubbles_delegate_errors() {
845 let endpoint_calls = Arc::new(AtomicUsize::new(0));
846 let producer_calls = Arc::new(AtomicUsize::new(0));
847 let delegate = Arc::new(MockProducerDelegateComponent {
848 create_endpoint_calls: Arc::clone(&endpoint_calls),
849 create_producer_calls: Arc::clone(&producer_calls),
850 fail_producer: true,
851 });
852
853 let ctx = MockDelegateContext {
854 delegate: delegate.clone(),
855 };
856
857 let master = MasterComponent::default();
858 let endpoint = master
859 .create_endpoint("master:lock-1:mock:delegate", &ctx)
860 .unwrap();
861 let producer_ctx = ProducerContext::new();
862 let err = endpoint
863 .create_producer(
864 Arc::new(PanicRuntimeObservability)
865 as Arc<dyn camel_component_api::RuntimeObservability>,
866 &producer_ctx,
867 )
868 .unwrap_err();
869
870 assert!(matches!(err, CamelError::ProcessorError(_)));
871 assert_eq!(endpoint_calls.load(Ordering::SeqCst), 1);
872 assert_eq!(producer_calls.load(Ordering::SeqCst), 1);
873 }
874
875 struct FakeLeadershipService {
876 tx: Mutex<Option<watch::Sender<Option<LeadershipEvent>>>>,
877 is_leader: Arc<AtomicBool>,
878 initial: Option<LeadershipEvent>,
879 }
880
881 impl FakeLeadershipService {
882 fn new(initial: Option<LeadershipEvent>) -> Self {
883 let starts_as_leader = matches!(initial, Some(LeadershipEvent::StartedLeading));
884 Self {
885 tx: Mutex::new(None),
886 is_leader: Arc::new(AtomicBool::new(starts_as_leader)),
887 initial,
888 }
889 }
890
891 async fn emit(&self, event: LeadershipEvent) {
892 self.is_leader.store(
893 matches!(event, LeadershipEvent::StartedLeading),
894 Ordering::Release,
895 );
896 if let Some(tx) = self
897 .tx
898 .lock()
899 .expect("mutex poisoned: fake elector sender")
900 .as_ref()
901 {
902 let _ = tx.send(Some(event));
903 }
904 }
905 }
906
907 #[async_trait]
908 impl LeadershipService for FakeLeadershipService {
909 async fn start(&self, _lock_name: &str) -> Result<LeadershipHandle, PlatformError> {
910 let (tx, rx) = watch::channel(self.initial.clone());
911 *self.tx.lock().expect("mutex poisoned: fake elector sender") = Some(tx);
912
913 let cancel = CancellationToken::new();
914 let cancel_wait = cancel.clone();
915 let (term_tx, term_rx) = oneshot::channel();
916 tokio::spawn(async move {
917 cancel_wait.cancelled().await;
918 let _ = term_tx.send(());
919 });
920
921 Ok(LeadershipHandle::new(
922 rx,
923 Arc::clone(&self.is_leader),
924 cancel,
925 term_rx,
926 ))
927 }
928 }
929
930 struct FakePlatformService {
931 identity: PlatformIdentity,
932 readiness_gate: Arc<dyn ReadinessGate>,
933 leadership: Arc<dyn LeadershipService>,
934 }
935
936 impl FakePlatformService {
937 fn new(leadership: Arc<dyn LeadershipService>) -> Self {
938 Self {
939 identity: PlatformIdentity::local("master-tests"),
940 readiness_gate: Arc::new(NoopReadinessGate),
941 leadership,
942 }
943 }
944 }
945
946 impl PlatformService for FakePlatformService {
947 fn identity(&self) -> PlatformIdentity {
948 self.identity.clone()
949 }
950
951 fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
952 Arc::clone(&self.readiness_gate)
953 }
954
955 fn leadership(&self) -> Arc<dyn LeadershipService> {
956 Arc::clone(&self.leadership)
957 }
958 }
959
960 struct FakeDelegateComponent {
961 create_consumer_calls: Arc<AtomicUsize>,
962 start_calls: Arc<AtomicUsize>,
963 }
964
965 impl Component for FakeDelegateComponent {
966 fn scheme(&self) -> &str {
967 "fake"
968 }
969
970 fn create_endpoint(
971 &self,
972 _uri: &str,
973 _ctx: &dyn ComponentContext,
974 ) -> Result<Box<dyn Endpoint>, CamelError> {
975 Ok(Box::new(FakeDelegateEndpoint {
976 create_consumer_calls: Arc::clone(&self.create_consumer_calls),
977 start_calls: Arc::clone(&self.start_calls),
978 }))
979 }
980 }
981
982 struct FakeDelegateEndpoint {
983 create_consumer_calls: Arc<AtomicUsize>,
984 start_calls: Arc<AtomicUsize>,
985 }
986
987 impl Endpoint for FakeDelegateEndpoint {
988 fn uri(&self) -> &str {
989 "fake:delegate"
990 }
991
992 fn create_consumer(
993 &self,
994 _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
995 ) -> Result<Box<dyn Consumer>, CamelError> {
996 let epoch = self.create_consumer_calls.fetch_add(1, Ordering::SeqCst) + 1;
997 Ok(Box::new(FakeDelegateConsumer {
998 epoch,
999 start_calls: Arc::clone(&self.start_calls),
1000 }))
1001 }
1002
1003 fn create_producer(
1004 &self,
1005 _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
1006 _ctx: &ProducerContext,
1007 ) -> Result<BoxProcessor, CamelError> {
1008 Err(CamelError::EndpointCreationFailed("not used".to_string()))
1009 }
1010 }
1011
1012 struct FakeDelegateConsumer {
1013 epoch: usize,
1014 start_calls: Arc<AtomicUsize>,
1015 }
1016
1017 struct FailingDelegateComponent {
1018 create_endpoint_calls: Arc<AtomicUsize>,
1019 }
1020
1021 impl Component for FailingDelegateComponent {
1022 fn scheme(&self) -> &str {
1023 "failing"
1024 }
1025
1026 fn create_endpoint(
1027 &self,
1028 _uri: &str,
1029 _ctx: &dyn ComponentContext,
1030 ) -> Result<Box<dyn Endpoint>, CamelError> {
1031 self.create_endpoint_calls.fetch_add(1, Ordering::SeqCst);
1032 Err(CamelError::EndpointCreationFailed(
1033 "delegate endpoint creation failed".to_string(),
1034 ))
1035 }
1036 }
1037
1038 #[async_trait]
1039 impl Consumer for FakeDelegateConsumer {
1040 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1041 self.start_calls.fetch_add(1, Ordering::SeqCst);
1042 context
1043 .send(Exchange::new(Message::new(format!("epoch-{}", self.epoch))))
1044 .await?;
1045
1046 loop {
1047 tokio::select! {
1048 _ = context.cancelled() => {
1049 break;
1050 }
1051 _ = sleep(Duration::from_millis(20)) => {
1052 context
1053 .send(Exchange::new(Message::new(format!("epoch-{}", self.epoch))))
1054 .await?;
1055 }
1056 }
1057 }
1058
1059 Ok(())
1060 }
1061
1062 async fn stop(&mut self) -> Result<(), CamelError> {
1063 Ok(())
1064 }
1065 }
1066
1067 fn build_master_consumer(
1068 platform_service: Arc<dyn PlatformService>,
1069 create_consumer_calls: Arc<AtomicUsize>,
1070 start_calls: Arc<AtomicUsize>,
1071 delegate_retry_max_attempts: Option<u32>,
1072 ) -> MasterConsumer {
1073 let reconnect = match delegate_retry_max_attempts {
1074 Some(max) => NetworkRetryPolicy {
1075 max_attempts: max,
1076 ..NetworkRetryPolicy::default()
1077 },
1078 None => NetworkRetryPolicy {
1079 max_attempts: 0,
1080 ..NetworkRetryPolicy::default()
1081 },
1082 };
1083 MasterConsumer::new(
1084 "lock-a".to_string(),
1085 "fake:delegate".to_string(),
1086 Arc::new(FakeDelegateComponent {
1087 create_consumer_calls,
1088 start_calls,
1089 }),
1090 Arc::new(NoOpMetrics),
1091 platform_service,
1092 Duration::from_millis(500),
1093 reconnect,
1094 Arc::new(PanicRuntimeObservability)
1095 as Arc<dyn camel_component_api::RuntimeObservability>,
1096 )
1097 }
1098
1099 #[tokio::test]
1100 async fn starts_delegate_only_after_started_leading() {
1101 let leadership = Arc::new(FakeLeadershipService::new(None));
1102 let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
1103 let create_consumer_calls = Arc::new(AtomicUsize::new(0));
1104 let start_calls = Arc::new(AtomicUsize::new(0));
1105 let mut master = build_master_consumer(
1106 platform_service,
1107 Arc::clone(&create_consumer_calls),
1108 Arc::clone(&start_calls),
1109 Some(30),
1110 );
1111
1112 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1113 let cancel = CancellationToken::new();
1114 let ctx = ConsumerContext::new(tx, cancel.clone());
1115
1116 master.start(ctx).await.unwrap();
1117
1118 sleep(Duration::from_millis(80)).await;
1119 assert!(rx.try_recv().is_err());
1120 assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 0);
1121
1122 leadership.emit(LeadershipEvent::StartedLeading).await;
1123
1124 let first = timeout(Duration::from_millis(500), rx.recv())
1125 .await
1126 .unwrap()
1127 .unwrap();
1128 assert_eq!(first.exchange.input.body.as_text(), Some("epoch-1"));
1129 assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 1);
1130 assert_eq!(start_calls.load(Ordering::SeqCst), 1);
1131
1132 cancel.cancel();
1133 master.stop().await.unwrap();
1134 }
1135
1136 #[tokio::test]
1137 async fn stops_delegate_on_stopped_leading() {
1138 let leadership = Arc::new(FakeLeadershipService::new(None));
1139 let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
1140 let create_consumer_calls = Arc::new(AtomicUsize::new(0));
1141 let start_calls = Arc::new(AtomicUsize::new(0));
1142 let mut master = build_master_consumer(
1143 platform_service,
1144 Arc::clone(&create_consumer_calls),
1145 Arc::clone(&start_calls),
1146 Some(30),
1147 );
1148
1149 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1150 let cancel = CancellationToken::new();
1151 let ctx = ConsumerContext::new(tx, cancel.clone());
1152
1153 master.start(ctx).await.unwrap();
1154 leadership.emit(LeadershipEvent::StartedLeading).await;
1155 let _ = timeout(Duration::from_millis(500), rx.recv())
1156 .await
1157 .unwrap()
1158 .unwrap();
1159
1160 leadership.emit(LeadershipEvent::StoppedLeading).await;
1161 sleep(Duration::from_millis(100)).await;
1162 while rx.try_recv().is_ok() {}
1163 assert!(
1164 timeout(Duration::from_millis(120), rx.recv())
1165 .await
1166 .is_err()
1167 );
1168
1169 cancel.cancel();
1170 master.stop().await.unwrap();
1171 }
1172
1173 #[tokio::test]
1174 async fn recreates_delegate_on_new_leadership_epoch() {
1175 let leadership = Arc::new(FakeLeadershipService::new(None));
1176 let platform_service = Arc::new(FakePlatformService::new(leadership.clone()));
1177 let create_consumer_calls = Arc::new(AtomicUsize::new(0));
1178 let start_calls = Arc::new(AtomicUsize::new(0));
1179 let mut master = build_master_consumer(
1180 platform_service,
1181 Arc::clone(&create_consumer_calls),
1182 Arc::clone(&start_calls),
1183 Some(30),
1184 );
1185
1186 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1187 let cancel = CancellationToken::new();
1188 let ctx = ConsumerContext::new(tx, cancel.clone());
1189
1190 master.start(ctx).await.unwrap();
1191
1192 leadership.emit(LeadershipEvent::StartedLeading).await;
1193 let first = timeout(Duration::from_millis(500), rx.recv())
1194 .await
1195 .unwrap()
1196 .unwrap();
1197 assert_eq!(first.exchange.input.body.as_text(), Some("epoch-1"));
1198
1199 leadership.emit(LeadershipEvent::StoppedLeading).await;
1200 sleep(Duration::from_millis(120)).await;
1201
1202 leadership.emit(LeadershipEvent::StartedLeading).await;
1203 let second = timeout(Duration::from_millis(500), rx.recv())
1204 .await
1205 .unwrap()
1206 .unwrap();
1207 assert_eq!(second.exchange.input.body.as_text(), Some("epoch-2"));
1208
1209 assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 2);
1210 assert_eq!(start_calls.load(Ordering::SeqCst), 2);
1211
1212 cancel.cancel();
1213 master.stop().await.unwrap();
1214 }
1215
1216 struct ErrorDelegateComponent {
1222 create_endpoint_calls: Arc<AtomicUsize>,
1223 create_consumer_calls: Arc<AtomicUsize>,
1224 endpoint_error: Option<CamelError>,
1225 consumer_error_after: usize, consumer_error: Option<CamelError>,
1227 }
1228
1229 impl Component for ErrorDelegateComponent {
1230 fn scheme(&self) -> &str {
1231 "errdelegate"
1232 }
1233
1234 fn create_endpoint(
1235 &self,
1236 _uri: &str,
1237 _ctx: &dyn ComponentContext,
1238 ) -> Result<Box<dyn Endpoint>, CamelError> {
1239 self.create_endpoint_calls.fetch_add(1, Ordering::SeqCst);
1240 if let Some(ref err) = self.endpoint_error {
1241 return Err(err.clone());
1242 }
1243 Ok(Box::new(ErrorDelegateEndpoint {
1244 create_consumer_calls: Arc::clone(&self.create_consumer_calls),
1245 consumer_error_after: self.consumer_error_after,
1246 consumer_error: self.consumer_error.clone(),
1247 }))
1248 }
1249 }
1250
1251 struct ErrorDelegateEndpoint {
1252 create_consumer_calls: Arc<AtomicUsize>,
1253 consumer_error_after: usize,
1254 consumer_error: Option<CamelError>,
1255 }
1256
1257 impl Endpoint for ErrorDelegateEndpoint {
1258 fn uri(&self) -> &str {
1259 "errdelegate:delegate"
1260 }
1261
1262 fn create_consumer(
1263 &self,
1264 _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
1265 ) -> Result<Box<dyn Consumer>, CamelError> {
1266 let call_idx = self.create_consumer_calls.fetch_add(1, Ordering::SeqCst) + 1;
1267 if call_idx <= self.consumer_error_after {
1268 return Err(self
1269 .consumer_error
1270 .clone()
1271 .unwrap_or_else(|| CamelError::ProcessorError("default error".to_string())));
1272 }
1273 Ok(Box::new(SuccessDelegateConsumer))
1274 }
1275
1276 fn create_producer(
1277 &self,
1278 _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
1279 _ctx: &ProducerContext,
1280 ) -> Result<BoxProcessor, CamelError> {
1281 Err(CamelError::EndpointCreationFailed("not used".to_string()))
1282 }
1283 }
1284
1285 struct SuccessDelegateConsumer;
1287
1288 #[async_trait]
1289 impl Consumer for SuccessDelegateConsumer {
1290 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
1291 context.send(Exchange::new(Message::new("ok"))).await?;
1292 context.cancelled().await;
1293 Ok(())
1294 }
1295
1296 async fn stop(&mut self) -> Result<(), CamelError> {
1297 Ok(())
1298 }
1299 }
1300
1301 fn build_error_delegate_master(
1302 platform_service: Arc<dyn PlatformService>,
1303 create_endpoint_calls: Arc<AtomicUsize>,
1304 create_consumer_calls: Arc<AtomicUsize>,
1305 endpoint_error: Option<CamelError>,
1306 consumer_error_after: usize,
1307 consumer_error: Option<CamelError>,
1308 max_attempts: u32,
1309 ) -> MasterConsumer {
1310 let reconnect = NetworkRetryPolicy {
1311 max_attempts,
1312 initial_delay: Duration::from_millis(1),
1313 max_delay: Duration::from_millis(5),
1314 multiplier: 1.0,
1315 ..NetworkRetryPolicy::default()
1316 };
1317 MasterConsumer::new(
1318 "lock-err".to_string(),
1319 "errdelegate:delegate".to_string(),
1320 Arc::new(ErrorDelegateComponent {
1321 create_endpoint_calls,
1322 create_consumer_calls,
1323 endpoint_error,
1324 consumer_error_after,
1325 consumer_error,
1326 }),
1327 Arc::new(NoOpMetrics),
1328 platform_service,
1329 Duration::from_millis(500),
1330 reconnect,
1331 Arc::new(PanicRuntimeObservability)
1332 as Arc<dyn camel_component_api::RuntimeObservability>,
1333 )
1334 }
1335
1336 #[tokio::test]
1337 async fn delegate_permanent_error_terminates_master_without_retry() {
1338 let leadership = Arc::new(FakeLeadershipService::new(Some(
1339 LeadershipEvent::StartedLeading,
1340 )));
1341 let platform_service = Arc::new(FakePlatformService::new(leadership));
1342 let create_endpoint_calls = Arc::new(AtomicUsize::new(0));
1343 let create_consumer_calls = Arc::new(AtomicUsize::new(0));
1344
1345 let mut master = build_error_delegate_master(
1350 platform_service,
1351 Arc::clone(&create_endpoint_calls),
1352 Arc::clone(&create_consumer_calls),
1353 Some(CamelError::Config("permanent delegate error".to_string())),
1354 0, None,
1356 0, );
1358
1359 let (tx, _rx) = tokio::sync::mpsc::channel(16);
1360 let cancel = CancellationToken::new();
1361 let ctx = ConsumerContext::new(tx, cancel.clone());
1362
1363 master.start(ctx).await.unwrap();
1364
1365 let task_finished = timeout(Duration::from_millis(500), async {
1369 loop {
1370 if master
1371 .leadership_task
1372 .as_ref()
1373 .is_some_and(tokio::task::JoinHandle::is_finished)
1374 {
1375 break;
1376 }
1377 sleep(Duration::from_millis(5)).await;
1378 }
1379 })
1380 .await;
1381
1382 assert!(
1383 task_finished.is_ok(),
1384 "master should terminate within 500ms via fail-fast classification"
1385 );
1386
1387 assert_eq!(
1389 create_endpoint_calls.load(Ordering::SeqCst),
1390 1,
1391 "permanent error must terminate master after exactly 1 invocation"
1392 );
1393
1394 let _ = master.stop().await;
1396
1397 cancel.cancel();
1398 }
1399
1400 #[tokio::test]
1401 async fn delegate_transient_error_retries_and_eventually_succeeds() {
1402 let leadership = Arc::new(FakeLeadershipService::new(Some(
1403 LeadershipEvent::StartedLeading,
1404 )));
1405 let platform_service = Arc::new(FakePlatformService::new(leadership));
1406 let create_endpoint_calls = Arc::new(AtomicUsize::new(0));
1407 let create_consumer_calls = Arc::new(AtomicUsize::new(0));
1408
1409 let mut master = build_error_delegate_master(
1412 platform_service,
1413 Arc::clone(&create_endpoint_calls),
1414 Arc::clone(&create_consumer_calls),
1415 None, 2, Some(CamelError::Io("connection refused".to_string())),
1418 5, );
1420
1421 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1422 let cancel = CancellationToken::new();
1423 let ctx = ConsumerContext::new(tx, cancel.clone());
1424
1425 master.start(ctx).await.unwrap();
1426
1427 let msg = timeout(Duration::from_secs(2), rx.recv())
1429 .await
1430 .unwrap()
1431 .unwrap();
1432 assert_eq!(msg.exchange.input.body.as_text(), Some("ok"));
1433
1434 assert_eq!(create_endpoint_calls.load(Ordering::SeqCst), 3);
1437 assert_eq!(create_consumer_calls.load(Ordering::SeqCst), 3);
1438
1439 cancel.cancel();
1440 master.stop().await.unwrap();
1441 }
1442
1443 #[tokio::test]
1446 async fn stops_retrying_delegate_start_after_max_attempts() {
1447 let leadership = Arc::new(FakeLeadershipService::new(Some(
1448 LeadershipEvent::StartedLeading,
1449 )));
1450 let platform_service = Arc::new(FakePlatformService::new(leadership));
1451 let create_endpoint_calls = Arc::new(AtomicUsize::new(0));
1452
1453 let mut master = MasterConsumer::new(
1454 "lock-a".to_string(),
1455 "failing:delegate".to_string(),
1456 Arc::new(FailingDelegateComponent {
1457 create_endpoint_calls: Arc::clone(&create_endpoint_calls),
1458 }),
1459 Arc::new(NoOpMetrics),
1460 platform_service,
1461 Duration::from_millis(500),
1462 NetworkRetryPolicy {
1463 max_attempts: 1,
1464 ..NetworkRetryPolicy::default()
1465 },
1466 Arc::new(PanicRuntimeObservability)
1467 as Arc<dyn camel_component_api::RuntimeObservability>,
1468 );
1469
1470 let (tx, _rx) = tokio::sync::mpsc::channel(16);
1471 let cancel = CancellationToken::new();
1472 let ctx = ConsumerContext::new(tx, cancel.clone());
1473
1474 master.start(ctx).await.unwrap();
1475 sleep(Duration::from_millis(750)).await;
1476
1477 assert_eq!(create_endpoint_calls.load(Ordering::SeqCst), 1);
1482
1483 cancel.cancel();
1484 let _ = master.stop().await;
1485 }
1486
1487 #[tokio::test]
1493 async fn stop_completes_quickly_when_leadership_task_is_slow() {
1494 struct SlowStoppingConsumer;
1496
1497 #[async_trait]
1498 impl Consumer for SlowStoppingConsumer {
1499 async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
1500 ctx.send(Exchange::new(Message::new("slow-start")))
1501 .await
1502 .ok();
1503 sleep(Duration::from_secs(60)).await;
1505 Ok(())
1506 }
1507
1508 async fn stop(&mut self) -> Result<(), CamelError> {
1509 Ok(())
1510 }
1511 }
1512
1513 struct SlowStoppingComponent;
1514
1515 impl Component for SlowStoppingComponent {
1516 fn scheme(&self) -> &str {
1517 "slow"
1518 }
1519
1520 fn create_endpoint(
1521 &self,
1522 _uri: &str,
1523 _ctx: &dyn ComponentContext,
1524 ) -> Result<Box<dyn Endpoint>, CamelError> {
1525 Ok(Box::new(SlowStoppingEndpoint))
1526 }
1527 }
1528
1529 struct SlowStoppingEndpoint;
1530
1531 impl Endpoint for SlowStoppingEndpoint {
1532 fn uri(&self) -> &str {
1533 "slow:delegate"
1534 }
1535
1536 fn create_consumer(
1537 &self,
1538 _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
1539 ) -> Result<Box<dyn Consumer>, CamelError> {
1540 Ok(Box::new(SlowStoppingConsumer))
1541 }
1542
1543 fn create_producer(
1544 &self,
1545 _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
1546 _ctx: &ProducerContext,
1547 ) -> Result<BoxProcessor, CamelError> {
1548 Err(CamelError::EndpointCreationFailed("not used".into()))
1549 }
1550 }
1551
1552 let leadership = Arc::new(FakeLeadershipService::new(Some(
1553 LeadershipEvent::StartedLeading,
1554 )));
1555 let platform_service = Arc::new(FakePlatformService::new(leadership));
1556
1557 let mut master = MasterConsumer::new(
1558 "lock-slow".into(),
1559 "slow:delegate".into(),
1560 Arc::new(SlowStoppingComponent),
1561 Arc::new(NoOpMetrics),
1562 platform_service,
1563 Duration::from_millis(500), NetworkRetryPolicy {
1565 max_attempts: 30,
1566 ..NetworkRetryPolicy::default()
1567 },
1568 Arc::new(PanicRuntimeObservability)
1569 as Arc<dyn camel_component_api::RuntimeObservability>,
1570 );
1571
1572 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
1573 let cancel = CancellationToken::new();
1574 let ctx = ConsumerContext::new(tx, cancel.clone());
1575
1576 master.start(ctx).await.unwrap();
1577
1578 let msg = timeout(Duration::from_secs(2), rx.recv())
1580 .await
1581 .unwrap()
1582 .unwrap();
1583 assert_eq!(msg.exchange.input.body.as_text(), Some("slow-start"));
1584
1585 let start = Instant::now();
1588 master.stop().await.unwrap();
1589 let elapsed = start.elapsed();
1590
1591 assert!(
1594 elapsed < Duration::from_millis(250),
1595 "stop() took {:?}, expected < 250 ms (abort should be near-instant)",
1596 elapsed,
1597 );
1598
1599 cancel.cancel();
1600 }
1601
1602 #[tokio::test]
1603 async fn stop_propagates_delegate_start_error() {
1604 struct FailingStartConsumer;
1605
1606 #[async_trait]
1607 impl Consumer for FailingStartConsumer {
1608 async fn start(&mut self, _ctx: ConsumerContext) -> Result<(), CamelError> {
1609 Err(CamelError::ProcessorError(
1610 "delegate start failed".to_string(),
1611 ))
1612 }
1613
1614 async fn stop(&mut self) -> Result<(), CamelError> {
1615 Ok(())
1616 }
1617 }
1618
1619 struct FailingStartComponent;
1620
1621 impl Component for FailingStartComponent {
1622 fn scheme(&self) -> &str {
1623 "failstart"
1624 }
1625
1626 fn create_endpoint(
1627 &self,
1628 _uri: &str,
1629 _ctx: &dyn ComponentContext,
1630 ) -> Result<Box<dyn Endpoint>, CamelError> {
1631 Ok(Box::new(FailingStartEndpoint))
1632 }
1633 }
1634
1635 struct FailingStartEndpoint;
1636
1637 impl Endpoint for FailingStartEndpoint {
1638 fn uri(&self) -> &str {
1639 "failstart:delegate"
1640 }
1641
1642 fn create_consumer(
1643 &self,
1644 _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
1645 ) -> Result<Box<dyn Consumer>, CamelError> {
1646 Ok(Box::new(FailingStartConsumer))
1647 }
1648
1649 fn create_producer(
1650 &self,
1651 _rt: std::sync::Arc<dyn camel_component_api::RuntimeObservability>,
1652 _ctx: &ProducerContext,
1653 ) -> Result<BoxProcessor, CamelError> {
1654 Err(CamelError::EndpointCreationFailed("not used".into()))
1655 }
1656 }
1657
1658 let leadership = Arc::new(FakeLeadershipService::new(Some(
1659 LeadershipEvent::StartedLeading,
1660 )));
1661 let platform_service = Arc::new(FakePlatformService::new(leadership));
1662
1663 let mut master = MasterConsumer::new(
1664 "lock-error".into(),
1665 "failstart:delegate".into(),
1666 Arc::new(FailingStartComponent),
1667 Arc::new(NoOpMetrics),
1668 platform_service,
1669 Duration::from_millis(500),
1670 NetworkRetryPolicy {
1671 max_attempts: 30,
1672 ..NetworkRetryPolicy::default()
1673 },
1674 Arc::new(PanicRuntimeObservability)
1675 as Arc<dyn camel_component_api::RuntimeObservability>,
1676 );
1677
1678 let (tx, _rx) = tokio::sync::mpsc::channel(16);
1679 let cancel = CancellationToken::new();
1680 let ctx = ConsumerContext::new(tx, cancel.clone());
1681
1682 master.start(ctx).await.unwrap();
1683 sleep(Duration::from_millis(250)).await;
1684 assert!(
1685 master
1686 .leadership_task
1687 .as_ref()
1688 .is_some_and(tokio::task::JoinHandle::is_finished),
1689 "leadership task should finish after delegate error"
1690 );
1691 let err = master.stop().await.expect_err("expected delegate error");
1692 assert!(err.to_string().contains("delegate start failed"));
1693
1694 cancel.cancel();
1695 }
1696}