1use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::time::Instant;
11
12use tokio::sync::Mutex;
13use tracing::{error, info, warn};
14
15use camel_api::error_handler::ErrorHandlerConfig;
16use camel_api::{
17 CamelError, MetricsCollector, RouteController, RuntimeCommand, RuntimeHandle, RuntimeQuery,
18 RuntimeQueryResult, SupervisionConfig,
19};
20
21use crate::lifecycle::adapters::route_controller::{
22 CrashNotification, DefaultRouteController, RouteControllerInternal, SharedLanguageRegistry,
23};
24use crate::lifecycle::application::route_definition::RouteDefinition;
25use crate::shared::components::domain::Registry;
26
27pub struct SupervisingRouteController {
32 inner: DefaultRouteController,
34 config: SupervisionConfig,
36 crash_tx: tokio::sync::mpsc::Sender<CrashNotification>,
38 crash_rx: Option<tokio::sync::mpsc::Receiver<CrashNotification>>,
40 metrics: Option<Arc<dyn MetricsCollector>>,
42}
43
44static SUPERVISION_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
45
46fn next_supervision_command_id(op: &str, route_id: &str) -> String {
47 let seq = SUPERVISION_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
48 format!("supervision:{op}:{route_id}:{seq}")
49}
50
51impl SupervisingRouteController {
52 pub fn new(registry: Arc<std::sync::Mutex<Registry>>, config: SupervisionConfig) -> Self {
54 Self::with_languages(
55 registry,
56 config,
57 Arc::new(std::sync::Mutex::new(HashMap::new())),
58 )
59 }
60
61 pub fn with_languages(
63 registry: Arc<std::sync::Mutex<Registry>>,
64 config: SupervisionConfig,
65 languages: SharedLanguageRegistry,
66 ) -> Self {
67 let (crash_tx, crash_rx) = tokio::sync::mpsc::channel(64);
68 Self {
69 inner: DefaultRouteController::with_languages(registry, languages),
70 config,
71 crash_tx,
72 crash_rx: Some(crash_rx),
73 metrics: None,
74 }
75 }
76
77 pub fn with_metrics(mut self, metrics: Arc<dyn MetricsCollector>) -> Self {
79 self.metrics = Some(metrics);
80 self
81 }
82
83 fn ensure_supervision_loop_started(&mut self) {
84 self.inner.set_crash_notifier(self.crash_tx.clone());
85
86 if self.crash_rx.is_none() {
87 return;
88 }
89
90 let rx = self
91 .crash_rx
92 .take()
93 .expect("crash_rx checked as Some above");
94 let config = self.config.clone();
95 let metrics = self.metrics.clone();
96 let runtime = self.inner.runtime_handle_for_supervision();
97 tokio::spawn(async move {
98 supervision_loop(rx, runtime, config, metrics).await;
99 });
100 }
101}
102
103#[async_trait::async_trait]
104impl RouteController for SupervisingRouteController {
105 async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
106 self.ensure_supervision_loop_started();
107 self.inner.start_route(route_id).await
108 }
109
110 async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
111 self.inner.stop_route(route_id).await
112 }
113
114 async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
115 self.inner.restart_route(route_id).await
116 }
117
118 async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
119 self.inner.suspend_route(route_id).await
120 }
121
122 async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
123 self.inner.resume_route(route_id).await
124 }
125
126 async fn start_all_routes(&mut self) -> Result<(), CamelError> {
127 self.ensure_supervision_loop_started();
128 self.inner.start_all_routes().await
129 }
130
131 async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
132 self.inner.stop_all_routes().await
133 }
134}
135
136#[async_trait::async_trait]
137impl RouteControllerInternal for SupervisingRouteController {
138 fn add_route(&mut self, def: RouteDefinition) -> Result<(), CamelError> {
139 self.inner.add_route(def)
140 }
141
142 fn swap_pipeline(
143 &self,
144 route_id: &str,
145 pipeline: camel_api::BoxProcessor,
146 ) -> Result<(), CamelError> {
147 self.inner.swap_pipeline(route_id, pipeline)
148 }
149
150 fn route_from_uri(&self, route_id: &str) -> Option<String> {
151 self.inner.route_from_uri(route_id)
152 }
153
154 fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
155 self.inner.set_error_handler(config)
156 }
157
158 fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
159 self.inner.set_self_ref(self_ref)
160 }
161
162 fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
163 self.inner.set_runtime_handle(runtime)
164 }
165
166 fn route_count(&self) -> usize {
167 self.inner.route_count()
168 }
169
170 fn route_ids(&self) -> Vec<String> {
171 self.inner.route_ids()
172 }
173
174 fn auto_startup_route_ids(&self) -> Vec<String> {
175 self.inner.auto_startup_route_ids()
176 }
177
178 fn shutdown_route_ids(&self) -> Vec<String> {
179 self.inner.shutdown_route_ids()
180 }
181
182 fn set_tracer_config(&mut self, config: &crate::shared::observability::domain::TracerConfig) {
183 self.inner.set_tracer_config(config)
184 }
185
186 fn compile_route_definition(
187 &self,
188 def: RouteDefinition,
189 ) -> Result<camel_api::BoxProcessor, camel_api::CamelError> {
190 self.inner.compile_route_definition(def)
191 }
192
193 fn remove_route(&mut self, route_id: &str) -> Result<(), camel_api::CamelError> {
194 self.inner.remove_route(route_id)
195 }
196
197 async fn start_route_reload(&mut self, route_id: &str) -> Result<(), camel_api::CamelError> {
198 self.ensure_supervision_loop_started();
199 self.inner.start_route(route_id).await
200 }
201
202 async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), camel_api::CamelError> {
203 self.inner.stop_route(route_id).await
204 }
205}
206
207async fn supervision_loop(
212 mut rx: tokio::sync::mpsc::Receiver<CrashNotification>,
213 runtime: Option<Arc<dyn RuntimeHandle>>,
214 config: SupervisionConfig,
215 _metrics: Option<Arc<dyn MetricsCollector>>,
216) {
217 let mut attempts: HashMap<String, u32> = HashMap::new();
218 let mut last_restart_time: HashMap<String, Instant> = HashMap::new();
219 let mut currently_restarting: HashSet<String> = HashSet::new();
220
221 info!("Supervision loop started");
222
223 while let Some(notification) = rx.recv().await {
224 let route_id = notification.route_id.clone();
225 let error = ¬ification.error;
226
227 if currently_restarting.contains(&route_id) {
229 continue;
230 }
231
232 info!(
233 route_id = %route_id,
234 error = %error,
235 "Route crashed, checking restart policy"
236 );
237
238 if let Some(last_time) = last_restart_time.get(&route_id)
241 && last_time.elapsed() >= config.initial_delay
242 {
243 attempts.insert(route_id.clone(), 0);
244 }
245
246 let current_attempt = attempts.entry(route_id.clone()).or_insert(0);
248 *current_attempt += 1;
249
250 if config
252 .max_attempts
253 .is_some_and(|max| *current_attempt > max)
254 {
255 error!(
256 route_id = %route_id,
257 attempts = current_attempt,
258 max = config.max_attempts.unwrap(),
259 "Route exceeded max restart attempts, giving up"
260 );
261 continue;
262 }
263
264 let delay = config.next_delay(*current_attempt);
266 info!(
267 route_id = %route_id,
268 attempt = current_attempt,
269 delay_ms = delay.as_millis(),
270 "Scheduling route restart"
271 );
272
273 currently_restarting.insert(route_id.clone());
275
276 tokio::time::sleep(delay).await;
278
279 let Some(runtime) = &runtime else {
280 warn!(
281 route_id = %route_id,
282 "Runtime handle unavailable, supervision restart skipped"
283 );
284 currently_restarting.remove(&route_id);
285 continue;
286 };
287
288 let pre_status = match runtime
291 .ask(RuntimeQuery::GetRouteStatus {
292 route_id: route_id.clone(),
293 })
294 .await
295 {
296 Ok(RuntimeQueryResult::RouteStatus { status, .. }) => status,
297 Ok(other) => {
298 warn!(
299 route_id = %route_id,
300 ?other,
301 "Unexpected runtime query result, skipping supervision restart"
302 );
303 currently_restarting.remove(&route_id);
304 continue;
305 }
306 Err(err) => {
307 warn!(
308 route_id = %route_id,
309 error = %err,
310 "Runtime status query failed, skipping supervision restart"
311 );
312 currently_restarting.remove(&route_id);
313 continue;
314 }
315 };
316
317 if matches!(pre_status.as_str(), "Registered" | "Stopped") {
318 warn!(
319 route_id = %route_id,
320 status = %pre_status,
321 "Runtime lifecycle is non-running; supervision restart skipped"
322 );
323 attempts.remove(&route_id);
324 currently_restarting.remove(&route_id);
325 continue;
326 }
327
328 if let Err(err) = runtime
331 .execute(RuntimeCommand::FailRoute {
332 route_id: route_id.clone(),
333 error: error.clone(),
334 command_id: next_supervision_command_id("fail", &route_id),
335 causation_id: None,
336 })
337 .await
338 {
339 warn!(
340 route_id = %route_id,
341 error = %err,
342 "Failed to persist crash state in runtime before restart check"
343 );
344 }
345
346 let should_restart = match runtime
348 .ask(RuntimeQuery::GetRouteStatus {
349 route_id: route_id.clone(),
350 })
351 .await
352 {
353 Ok(RuntimeQueryResult::RouteStatus { status, .. }) if status == "Failed" => true,
354 Ok(RuntimeQueryResult::RouteStatus { status, .. }) => {
355 warn!(
356 route_id = %route_id,
357 status = %status,
358 "Route no longer failed in runtime projection, skipping supervision restart"
359 );
360 attempts.remove(&route_id);
361 false
362 }
363 Ok(other) => {
364 warn!(
365 route_id = %route_id,
366 ?other,
367 "Unexpected runtime query result, skipping supervision restart"
368 );
369 false
370 }
371 Err(err) => {
372 warn!(
373 route_id = %route_id,
374 error = %err,
375 "Runtime status query failed, skipping supervision restart"
376 );
377 false
378 }
379 };
380
381 if should_restart {
382 let restart_result = runtime
383 .execute(RuntimeCommand::ReloadRoute {
384 route_id: route_id.clone(),
385 command_id: next_supervision_command_id("reload", &route_id),
386 causation_id: None,
387 })
388 .await
389 .map(|_| ());
390
391 match restart_result {
392 Ok(()) => {
393 info!(route_id = %route_id, "Route restarted successfully");
394 last_restart_time.insert(route_id.clone(), Instant::now());
397 }
398 Err(e) => {
399 error!(route_id = %route_id, error = %e, "Failed to restart route");
400 }
401 }
402 }
403
404 currently_restarting.remove(&route_id);
406 }
407
408 info!("Supervision loop ended");
409}
410
411#[cfg(test)]
412mod tests {
413 use super::*;
414 use crate::lifecycle::adapters::{InMemoryRuntimeStore, RuntimeExecutionAdapter};
415 use crate::lifecycle::application::runtime_bus::RuntimeBus;
416 use crate::lifecycle::ports::RouteRegistrationPort as InternalRuntimeCommandBus;
417 use async_trait::async_trait;
418 use camel_api::RuntimeQueryBus;
419 use camel_component::{Component, ConcurrencyModel, Consumer, ConsumerContext, Endpoint};
420 use std::sync::Arc as StdArc;
421 use std::sync::atomic::{AtomicU32, Ordering};
422 use std::time::Duration;
423
424 async fn attach_runtime_bus(
425 controller: &StdArc<Mutex<dyn RouteControllerInternal>>,
426 ) -> StdArc<RuntimeBus> {
427 let store = InMemoryRuntimeStore::default();
428 let runtime = StdArc::new(
429 RuntimeBus::new(
430 StdArc::new(store.clone()),
431 StdArc::new(store.clone()),
432 StdArc::new(store.clone()),
433 StdArc::new(store.clone()),
434 )
435 .with_uow(StdArc::new(store))
436 .with_execution(StdArc::new(RuntimeExecutionAdapter::new(StdArc::clone(
437 controller,
438 )))),
439 );
440 let runtime_handle: StdArc<dyn RuntimeHandle> = runtime.clone();
441 controller.lock().await.set_runtime_handle(runtime_handle);
442 runtime
443 }
444
445 struct CrashThenBlockConsumer {
447 call_count: StdArc<AtomicU32>,
448 }
449
450 #[async_trait]
451 impl Consumer for CrashThenBlockConsumer {
452 async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
453 let count = self.call_count.fetch_add(1, Ordering::SeqCst);
454
455 if count == 0 {
456 return Err(CamelError::RouteError("simulated crash".into()));
458 }
459
460 ctx.cancelled().await;
462 Ok(())
463 }
464
465 async fn stop(&mut self) -> Result<(), CamelError> {
466 Ok(())
467 }
468
469 fn concurrency_model(&self) -> ConcurrencyModel {
470 ConcurrencyModel::Sequential
471 }
472 }
473
474 struct CrashThenBlockEndpoint {
475 call_count: StdArc<AtomicU32>,
476 }
477
478 impl Endpoint for CrashThenBlockEndpoint {
479 fn uri(&self) -> &str {
480 "crash-then-block:test"
481 }
482
483 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
484 Ok(Box::new(CrashThenBlockConsumer {
485 call_count: StdArc::clone(&self.call_count),
486 }))
487 }
488
489 fn create_producer(
490 &self,
491 _ctx: &camel_api::ProducerContext,
492 ) -> Result<camel_api::BoxProcessor, CamelError> {
493 Err(CamelError::RouteError("no producer".into()))
494 }
495 }
496
497 struct CrashThenBlockComponent {
498 call_count: StdArc<AtomicU32>,
499 }
500
501 impl Component for CrashThenBlockComponent {
502 fn scheme(&self) -> &str {
503 "crash-then-block"
504 }
505
506 fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
507 Ok(Box::new(CrashThenBlockEndpoint {
508 call_count: StdArc::clone(&self.call_count),
509 }))
510 }
511 }
512
513 #[tokio::test]
514 async fn test_supervising_controller_restarts_crashed_route() {
515 let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
517 let call_count = StdArc::new(AtomicU32::new(0));
518 registry.lock().unwrap().register(CrashThenBlockComponent {
519 call_count: StdArc::clone(&call_count),
520 });
521
522 let config = SupervisionConfig {
524 max_attempts: Some(5),
525 initial_delay: Duration::from_millis(50),
526 backoff_multiplier: 1.0, max_delay: Duration::from_secs(60),
528 };
529
530 let controller: StdArc<Mutex<dyn RouteControllerInternal>> = StdArc::new(Mutex::new(
532 SupervisingRouteController::new(StdArc::clone(®istry), config),
533 ));
534
535 controller
537 .try_lock()
538 .unwrap()
539 .set_self_ref(StdArc::clone(&controller) as StdArc<Mutex<dyn RouteController>>);
540 let runtime = attach_runtime_bus(&controller).await;
541
542 let runtime_def = crate::route::RouteDefinition::new("crash-then-block:test", vec![])
544 .with_route_id("crash-route");
545 runtime.register_route(runtime_def).await.unwrap();
546
547 controller.lock().await.start_all_routes().await.unwrap();
549
550 tokio::time::sleep(Duration::from_millis(500)).await;
552
553 let count = call_count.load(Ordering::SeqCst);
555 assert!(
556 count >= 2,
557 "expected at least 2 consumer calls (crash + restart), got {}",
558 count
559 );
560
561 let status = match runtime
563 .ask(RuntimeQuery::GetRouteStatus {
564 route_id: "crash-route".into(),
565 })
566 .await
567 .unwrap()
568 {
569 RuntimeQueryResult::RouteStatus { status, .. } => status,
570 other => panic!("unexpected query result: {other:?}"),
571 };
572 assert_eq!(status, "Started");
573 }
574
575 #[tokio::test]
576 async fn test_supervising_controller_respects_max_attempts() {
577 struct AlwaysCrashConsumer;
579 #[async_trait]
580 impl Consumer for AlwaysCrashConsumer {
581 async fn start(&mut self, _ctx: ConsumerContext) -> Result<(), CamelError> {
582 Err(CamelError::RouteError("always crashes".into()))
583 }
584 async fn stop(&mut self) -> Result<(), CamelError> {
585 Ok(())
586 }
587 fn concurrency_model(&self) -> ConcurrencyModel {
588 ConcurrencyModel::Sequential
589 }
590 }
591 struct AlwaysCrashEndpoint;
592 impl Endpoint for AlwaysCrashEndpoint {
593 fn uri(&self) -> &str {
594 "always-crash:test"
595 }
596 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
597 Ok(Box::new(AlwaysCrashConsumer))
598 }
599 fn create_producer(
600 &self,
601 _ctx: &camel_api::ProducerContext,
602 ) -> Result<camel_api::BoxProcessor, CamelError> {
603 Err(CamelError::RouteError("no producer".into()))
604 }
605 }
606 struct AlwaysCrashComponent;
607 impl Component for AlwaysCrashComponent {
608 fn scheme(&self) -> &str {
609 "always-crash"
610 }
611 fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
612 Ok(Box::new(AlwaysCrashEndpoint))
613 }
614 }
615
616 let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
617 registry.lock().unwrap().register(AlwaysCrashComponent);
618
619 let config = SupervisionConfig {
621 max_attempts: Some(2),
622 initial_delay: Duration::from_millis(10),
623 backoff_multiplier: 1.0,
624 max_delay: Duration::from_secs(1),
625 };
626
627 let controller: StdArc<Mutex<dyn RouteControllerInternal>> = StdArc::new(Mutex::new(
628 SupervisingRouteController::new(StdArc::clone(®istry), config),
629 ));
630
631 controller
632 .try_lock()
633 .unwrap()
634 .set_self_ref(StdArc::clone(&controller) as StdArc<Mutex<dyn RouteController>>);
635 let runtime = attach_runtime_bus(&controller).await;
636
637 let runtime_def = crate::route::RouteDefinition::new("always-crash:test", vec![])
638 .with_route_id("always-crash-route");
639 runtime.register_route(runtime_def).await.unwrap();
640
641 controller.lock().await.start_all_routes().await.unwrap();
642
643 tokio::time::sleep(Duration::from_millis(200)).await;
645
646 let status = match runtime
648 .ask(RuntimeQuery::GetRouteStatus {
649 route_id: "always-crash-route".into(),
650 })
651 .await
652 .unwrap()
653 {
654 RuntimeQueryResult::RouteStatus { status, .. } => status,
655 other => panic!("unexpected query result: {other:?}"),
656 };
657 assert_eq!(status, "Failed");
658 }
659
660 #[tokio::test]
661 async fn test_supervising_controller_delegates_to_inner() {
662 let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
663 let config = SupervisionConfig::default();
664 let mut controller = SupervisingRouteController::new(StdArc::clone(®istry), config);
665
666 let self_ref: StdArc<Mutex<dyn RouteController>> = StdArc::new(Mutex::new(
668 SupervisingRouteController::new(registry, SupervisionConfig::default()),
669 ));
670 controller.set_self_ref(self_ref);
671
672 assert_eq!(controller.route_count(), 0);
674 assert_eq!(controller.route_ids(), Vec::<String>::new());
675 }
676
677 struct AlwaysCrashWithCountConsumer {
679 call_count: StdArc<AtomicU32>,
680 }
681
682 #[async_trait]
683 impl Consumer for AlwaysCrashWithCountConsumer {
684 async fn start(&mut self, _ctx: ConsumerContext) -> Result<(), CamelError> {
685 self.call_count.fetch_add(1, Ordering::SeqCst);
686 Err(CamelError::RouteError("always crashes".into()))
687 }
688
689 async fn stop(&mut self) -> Result<(), CamelError> {
690 Ok(())
691 }
692
693 fn concurrency_model(&self) -> ConcurrencyModel {
694 ConcurrencyModel::Sequential
695 }
696 }
697
698 struct AlwaysCrashWithCountEndpoint {
699 call_count: StdArc<AtomicU32>,
700 }
701
702 impl Endpoint for AlwaysCrashWithCountEndpoint {
703 fn uri(&self) -> &str {
704 "always-crash-count:test"
705 }
706
707 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
708 Ok(Box::new(AlwaysCrashWithCountConsumer {
709 call_count: StdArc::clone(&self.call_count),
710 }))
711 }
712
713 fn create_producer(
714 &self,
715 _ctx: &camel_api::ProducerContext,
716 ) -> Result<camel_api::BoxProcessor, CamelError> {
717 Err(CamelError::RouteError("no producer".into()))
718 }
719 }
720
721 struct AlwaysCrashWithCountComponent {
722 call_count: StdArc<AtomicU32>,
723 }
724
725 impl Component for AlwaysCrashWithCountComponent {
726 fn scheme(&self) -> &str {
727 "always-crash-count"
728 }
729
730 fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
731 Ok(Box::new(AlwaysCrashWithCountEndpoint {
732 call_count: StdArc::clone(&self.call_count),
733 }))
734 }
735 }
736
737 #[tokio::test]
738 async fn test_supervision_gives_up_after_max_attempts() {
739 let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
741 let call_count = StdArc::new(AtomicU32::new(0));
742 registry
743 .lock()
744 .unwrap()
745 .register(AlwaysCrashWithCountComponent {
746 call_count: StdArc::clone(&call_count),
747 });
748
749 let config = SupervisionConfig {
751 max_attempts: Some(2),
752 initial_delay: Duration::from_millis(50),
753 backoff_multiplier: 1.0,
754 max_delay: Duration::from_secs(60),
755 };
756
757 let controller: StdArc<Mutex<dyn RouteControllerInternal>> = StdArc::new(Mutex::new(
758 SupervisingRouteController::new(StdArc::clone(®istry), config),
759 ));
760
761 controller
762 .try_lock()
763 .unwrap()
764 .set_self_ref(StdArc::clone(&controller) as StdArc<Mutex<dyn RouteController>>);
765 let runtime = attach_runtime_bus(&controller).await;
766
767 let runtime_def = crate::route::RouteDefinition::new("always-crash-count:test", vec![])
768 .with_route_id("give-up-route");
769 runtime.register_route(runtime_def).await.unwrap();
770
771 controller.lock().await.start_all_routes().await.unwrap();
772
773 tokio::time::sleep(Duration::from_millis(800)).await;
777
778 let count = call_count.load(Ordering::SeqCst);
781 assert_eq!(
782 count, 3,
783 "expected exactly 3 consumer calls (initial + 2 restarts), got {}",
784 count
785 );
786
787 let status = match runtime
789 .ask(RuntimeQuery::GetRouteStatus {
790 route_id: "give-up-route".into(),
791 })
792 .await
793 .unwrap()
794 {
795 RuntimeQueryResult::RouteStatus { status, .. } => status,
796 other => panic!("unexpected query result: {other:?}"),
797 };
798 assert_eq!(status, "Failed");
799 }
800
801 struct CrashOnOddBlockOnEvenConsumer {
804 call_count: StdArc<AtomicU32>,
805 }
806
807 #[async_trait]
808 impl Consumer for CrashOnOddBlockOnEvenConsumer {
809 async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
810 let count = self.call_count.fetch_add(1, Ordering::SeqCst);
811 if count.is_multiple_of(2) {
816 return Err(CamelError::RouteError("odd call crash".into()));
818 }
819
820 tokio::select! {
823 _ = ctx.cancelled() => {
824 return Ok(());
826 }
827 _ = tokio::time::sleep(Duration::from_millis(100)) => {
828 return Err(CamelError::RouteError("even call crash after uptime".into()));
830 }
831 }
832 }
833
834 async fn stop(&mut self) -> Result<(), CamelError> {
835 Ok(())
836 }
837
838 fn concurrency_model(&self) -> ConcurrencyModel {
839 ConcurrencyModel::Sequential
840 }
841 }
842
843 struct CrashOnOddBlockOnEvenEndpoint {
844 call_count: StdArc<AtomicU32>,
845 }
846
847 impl Endpoint for CrashOnOddBlockOnEvenEndpoint {
848 fn uri(&self) -> &str {
849 "crash-odd-block-even:test"
850 }
851
852 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
853 Ok(Box::new(CrashOnOddBlockOnEvenConsumer {
854 call_count: StdArc::clone(&self.call_count),
855 }))
856 }
857
858 fn create_producer(
859 &self,
860 _ctx: &camel_api::ProducerContext,
861 ) -> Result<camel_api::BoxProcessor, CamelError> {
862 Err(CamelError::RouteError("no producer".into()))
863 }
864 }
865
866 struct CrashOnOddBlockOnEvenComponent {
867 call_count: StdArc<AtomicU32>,
868 }
869
870 impl Component for CrashOnOddBlockOnEvenComponent {
871 fn scheme(&self) -> &str {
872 "crash-odd-block-even"
873 }
874
875 fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
876 Ok(Box::new(CrashOnOddBlockOnEvenEndpoint {
877 call_count: StdArc::clone(&self.call_count),
878 }))
879 }
880 }
881
882 #[tokio::test]
883 async fn test_supervision_resets_attempt_count_on_success() {
884 let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
886 let call_count = StdArc::new(AtomicU32::new(0));
887 registry
888 .lock()
889 .unwrap()
890 .register(CrashOnOddBlockOnEvenComponent {
891 call_count: StdArc::clone(&call_count),
892 });
893
894 let config = SupervisionConfig {
898 max_attempts: Some(2),
899 initial_delay: Duration::from_millis(50),
900 backoff_multiplier: 1.0,
901 max_delay: Duration::from_secs(60),
902 };
903
904 let controller: StdArc<Mutex<dyn RouteControllerInternal>> = StdArc::new(Mutex::new(
905 SupervisingRouteController::new(StdArc::clone(®istry), config),
906 ));
907
908 controller
909 .try_lock()
910 .unwrap()
911 .set_self_ref(StdArc::clone(&controller) as StdArc<Mutex<dyn RouteController>>);
912 let runtime = attach_runtime_bus(&controller).await;
913
914 let runtime_def = crate::route::RouteDefinition::new("crash-odd-block-even:test", vec![])
915 .with_route_id("reset-attempt-route");
916 runtime.register_route(runtime_def).await.unwrap();
917
918 controller.lock().await.start_all_routes().await.unwrap();
919
920 tokio::time::sleep(Duration::from_millis(1000)).await;
928
929 let count = call_count.load(Ordering::SeqCst);
932 assert!(
933 count >= 4,
934 "expected at least 4 consumer calls (proving attempt reset), got {}",
935 count
936 );
937
938 let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
940 loop {
941 let status = match runtime
942 .ask(RuntimeQuery::GetRouteStatus {
943 route_id: "reset-attempt-route".into(),
944 })
945 .await
946 .unwrap()
947 {
948 RuntimeQueryResult::RouteStatus { status, .. } => status,
949 other => panic!("unexpected query result: {other:?}"),
950 };
951 if status != "Failed" {
952 break;
953 }
954 assert!(
955 tokio::time::Instant::now() < deadline,
956 "route remained in Failed state for 2s — supervision likely gave up"
957 );
958 tokio::time::sleep(Duration::from_millis(50)).await;
959 }
960 }
961}