1use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::time::Instant;
11
12use tokio::sync::Mutex;
13use tracing::{error, info, warn};
14
15use camel_api::error_handler::ErrorHandlerConfig;
16use camel_api::{
17 CamelError, MetricsCollector, RouteController, RuntimeCommand, RuntimeHandle, RuntimeQuery,
18 RuntimeQueryResult, SupervisionConfig,
19};
20
21use crate::lifecycle::adapters::route_controller::{
22 CrashNotification, DefaultRouteController, RouteControllerInternal, SharedLanguageRegistry,
23};
24use crate::lifecycle::application::route_definition::RouteDefinition;
25use crate::shared::components::domain::Registry;
26
27pub struct SupervisingRouteController {
32 inner: DefaultRouteController,
34 config: SupervisionConfig,
36 crash_tx: tokio::sync::mpsc::Sender<CrashNotification>,
38 crash_rx: Option<tokio::sync::mpsc::Receiver<CrashNotification>>,
40 metrics: Option<Arc<dyn MetricsCollector>>,
42}
43
44static SUPERVISION_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
45
46fn next_supervision_command_id(op: &str, route_id: &str) -> String {
47 let seq = SUPERVISION_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
48 format!("supervision:{op}:{route_id}:{seq}")
49}
50
51impl SupervisingRouteController {
52 pub fn new(registry: Arc<std::sync::Mutex<Registry>>, config: SupervisionConfig) -> Self {
54 Self::with_languages(
55 registry,
56 config,
57 Arc::new(std::sync::Mutex::new(HashMap::new())),
58 )
59 }
60
61 pub fn with_languages(
63 registry: Arc<std::sync::Mutex<Registry>>,
64 config: SupervisionConfig,
65 languages: SharedLanguageRegistry,
66 ) -> Self {
67 let (crash_tx, crash_rx) = tokio::sync::mpsc::channel(64);
68 Self {
69 inner: DefaultRouteController::with_languages(registry, languages),
70 config,
71 crash_tx,
72 crash_rx: Some(crash_rx),
73 metrics: None,
74 }
75 }
76
77 pub fn with_metrics(mut self, metrics: Arc<dyn MetricsCollector>) -> Self {
79 self.metrics = Some(metrics);
80 self
81 }
82
83 fn ensure_supervision_loop_started(&mut self) {
84 self.inner.set_crash_notifier(self.crash_tx.clone());
85
86 if self.crash_rx.is_none() {
87 return;
88 }
89
90 let rx = self
91 .crash_rx
92 .take()
93 .expect("crash_rx checked as Some above");
94 let config = self.config.clone();
95 let metrics = self.metrics.clone();
96 let runtime = self.inner.runtime_handle_for_supervision();
97 tokio::spawn(async move {
98 supervision_loop(rx, runtime, config, metrics).await;
99 });
100 }
101}
102
103#[async_trait::async_trait]
104impl RouteController for SupervisingRouteController {
105 async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
106 self.ensure_supervision_loop_started();
107 self.inner.start_route(route_id).await
108 }
109
110 async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
111 self.inner.stop_route(route_id).await
112 }
113
114 async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
115 self.inner.restart_route(route_id).await
116 }
117
118 async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
119 self.inner.suspend_route(route_id).await
120 }
121
122 async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
123 self.inner.resume_route(route_id).await
124 }
125
126 async fn start_all_routes(&mut self) -> Result<(), CamelError> {
127 self.ensure_supervision_loop_started();
128 self.inner.start_all_routes().await
129 }
130
131 async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
132 self.inner.stop_all_routes().await
133 }
134}
135
136#[async_trait::async_trait]
137impl RouteControllerInternal for SupervisingRouteController {
138 fn add_route(&mut self, def: RouteDefinition) -> Result<(), CamelError> {
139 self.inner.add_route(def)
140 }
141
142 fn swap_pipeline(
143 &self,
144 route_id: &str,
145 pipeline: camel_api::BoxProcessor,
146 ) -> Result<(), CamelError> {
147 self.inner.swap_pipeline(route_id, pipeline)
148 }
149
150 fn route_from_uri(&self, route_id: &str) -> Option<String> {
151 self.inner.route_from_uri(route_id)
152 }
153
154 fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
155 self.inner.set_error_handler(config)
156 }
157
158 fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
159 self.inner.set_self_ref(self_ref)
160 }
161
162 fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
163 self.inner.set_runtime_handle(runtime)
164 }
165
166 fn route_count(&self) -> usize {
167 self.inner.route_count()
168 }
169
170 fn route_ids(&self) -> Vec<String> {
171 self.inner.route_ids()
172 }
173
174 fn in_flight_count(&self, route_id: &str) -> Option<u64> {
175 self.inner.in_flight_count(route_id)
176 }
177
178 fn route_exists(&self, route_id: &str) -> bool {
179 self.inner.route_exists(route_id)
180 }
181
182 fn auto_startup_route_ids(&self) -> Vec<String> {
183 self.inner.auto_startup_route_ids()
184 }
185
186 fn shutdown_route_ids(&self) -> Vec<String> {
187 self.inner.shutdown_route_ids()
188 }
189
190 fn set_tracer_config(&mut self, config: &crate::shared::observability::domain::TracerConfig) {
191 self.inner.set_tracer_config(config)
192 }
193
194 fn compile_route_definition(
195 &self,
196 def: RouteDefinition,
197 ) -> Result<camel_api::BoxProcessor, camel_api::CamelError> {
198 self.inner.compile_route_definition(def)
199 }
200
201 fn remove_route(&mut self, route_id: &str) -> Result<(), camel_api::CamelError> {
202 self.inner.remove_route(route_id)
203 }
204
205 async fn start_route_reload(&mut self, route_id: &str) -> Result<(), camel_api::CamelError> {
206 self.ensure_supervision_loop_started();
207 self.inner.start_route(route_id).await
208 }
209
210 async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), camel_api::CamelError> {
211 self.inner.stop_route(route_id).await
212 }
213}
214
215async fn supervision_loop(
220 mut rx: tokio::sync::mpsc::Receiver<CrashNotification>,
221 runtime: Option<Arc<dyn RuntimeHandle>>,
222 config: SupervisionConfig,
223 _metrics: Option<Arc<dyn MetricsCollector>>,
224) {
225 let mut attempts: HashMap<String, u32> = HashMap::new();
226 let mut last_restart_time: HashMap<String, Instant> = HashMap::new();
227 let mut currently_restarting: HashSet<String> = HashSet::new();
228
229 info!("Supervision loop started");
230
231 while let Some(notification) = rx.recv().await {
232 let route_id = notification.route_id.clone();
233 let error = ¬ification.error;
234
235 if currently_restarting.contains(&route_id) {
237 continue;
238 }
239
240 info!(
241 route_id = %route_id,
242 error = %error,
243 "Route crashed, checking restart policy"
244 );
245
246 if let Some(last_time) = last_restart_time.get(&route_id)
249 && last_time.elapsed() >= config.initial_delay
250 {
251 attempts.insert(route_id.clone(), 0);
252 }
253
254 let current_attempt = attempts.entry(route_id.clone()).or_insert(0);
256 *current_attempt += 1;
257
258 if config
260 .max_attempts
261 .is_some_and(|max| *current_attempt > max)
262 {
263 error!(
264 route_id = %route_id,
265 attempts = current_attempt,
266 max = config.max_attempts.unwrap(),
267 "Route exceeded max restart attempts, giving up"
268 );
269 continue;
270 }
271
272 let delay = config.next_delay(*current_attempt);
274 info!(
275 route_id = %route_id,
276 attempt = current_attempt,
277 delay_ms = delay.as_millis(),
278 "Scheduling route restart"
279 );
280
281 currently_restarting.insert(route_id.clone());
283
284 tokio::time::sleep(delay).await;
286
287 let Some(runtime) = &runtime else {
288 warn!(
289 route_id = %route_id,
290 "Runtime handle unavailable, supervision restart skipped"
291 );
292 currently_restarting.remove(&route_id);
293 continue;
294 };
295
296 let pre_status = match runtime
299 .ask(RuntimeQuery::GetRouteStatus {
300 route_id: route_id.clone(),
301 })
302 .await
303 {
304 Ok(RuntimeQueryResult::RouteStatus { status, .. }) => status,
305 Ok(other) => {
306 warn!(
307 route_id = %route_id,
308 ?other,
309 "Unexpected runtime query result, skipping supervision restart"
310 );
311 currently_restarting.remove(&route_id);
312 continue;
313 }
314 Err(err) => {
315 warn!(
316 route_id = %route_id,
317 error = %err,
318 "Runtime status query failed, skipping supervision restart"
319 );
320 currently_restarting.remove(&route_id);
321 continue;
322 }
323 };
324
325 if matches!(pre_status.as_str(), "Registered" | "Stopped") {
326 warn!(
327 route_id = %route_id,
328 status = %pre_status,
329 "Runtime lifecycle is non-running; supervision restart skipped"
330 );
331 attempts.remove(&route_id);
332 currently_restarting.remove(&route_id);
333 continue;
334 }
335
336 if let Err(err) = runtime
339 .execute(RuntimeCommand::FailRoute {
340 route_id: route_id.clone(),
341 error: error.clone(),
342 command_id: next_supervision_command_id("fail", &route_id),
343 causation_id: None,
344 })
345 .await
346 {
347 warn!(
348 route_id = %route_id,
349 error = %err,
350 "Failed to persist crash state in runtime before restart check"
351 );
352 }
353
354 let should_restart = match runtime
356 .ask(RuntimeQuery::GetRouteStatus {
357 route_id: route_id.clone(),
358 })
359 .await
360 {
361 Ok(RuntimeQueryResult::RouteStatus { status, .. }) if status == "Failed" => true,
362 Ok(RuntimeQueryResult::RouteStatus { status, .. }) => {
363 warn!(
364 route_id = %route_id,
365 status = %status,
366 "Route no longer failed in runtime projection, skipping supervision restart"
367 );
368 attempts.remove(&route_id);
369 false
370 }
371 Ok(other) => {
372 warn!(
373 route_id = %route_id,
374 ?other,
375 "Unexpected runtime query result, skipping supervision restart"
376 );
377 false
378 }
379 Err(err) => {
380 warn!(
381 route_id = %route_id,
382 error = %err,
383 "Runtime status query failed, skipping supervision restart"
384 );
385 false
386 }
387 };
388
389 if should_restart {
390 let restart_result = runtime
391 .execute(RuntimeCommand::ReloadRoute {
392 route_id: route_id.clone(),
393 command_id: next_supervision_command_id("reload", &route_id),
394 causation_id: None,
395 })
396 .await
397 .map(|_| ());
398
399 match restart_result {
400 Ok(()) => {
401 info!(route_id = %route_id, "Route restarted successfully");
402 last_restart_time.insert(route_id.clone(), Instant::now());
405 }
406 Err(e) => {
407 error!(route_id = %route_id, error = %e, "Failed to restart route");
408 }
409 }
410 }
411
412 currently_restarting.remove(&route_id);
414 }
415
416 info!("Supervision loop ended");
417}
418
419#[cfg(test)]
420mod tests {
421 use super::*;
422 use crate::lifecycle::adapters::{InMemoryRuntimeStore, RuntimeExecutionAdapter};
423 use crate::lifecycle::application::runtime_bus::RuntimeBus;
424 use crate::lifecycle::ports::RouteRegistrationPort as InternalRuntimeCommandBus;
425 use async_trait::async_trait;
426 use camel_api::RuntimeQueryBus;
427 use camel_component::{Component, ConcurrencyModel, Consumer, ConsumerContext, Endpoint};
428 use std::sync::Arc as StdArc;
429 use std::sync::atomic::{AtomicU32, Ordering};
430 use std::time::Duration;
431
432 async fn attach_runtime_bus(
433 controller: &StdArc<Mutex<dyn RouteControllerInternal>>,
434 ) -> StdArc<RuntimeBus> {
435 let store = InMemoryRuntimeStore::default();
436 let runtime = StdArc::new(
437 RuntimeBus::new(
438 StdArc::new(store.clone()),
439 StdArc::new(store.clone()),
440 StdArc::new(store.clone()),
441 StdArc::new(store.clone()),
442 )
443 .with_uow(StdArc::new(store))
444 .with_execution(StdArc::new(RuntimeExecutionAdapter::new(StdArc::clone(
445 controller,
446 )))),
447 );
448 let runtime_handle: StdArc<dyn RuntimeHandle> = runtime.clone();
449 controller.lock().await.set_runtime_handle(runtime_handle);
450 runtime
451 }
452
453 #[test]
454 fn supervision_command_id_is_unique_and_well_formed() {
455 let id1 = next_supervision_command_id("start", "route-a");
456 let id2 = next_supervision_command_id("start", "route-a");
457 assert_ne!(id1, id2);
458 assert!(id1.starts_with("supervision:start:route-a:"));
459 }
460
461 #[tokio::test]
462 async fn supervision_loop_exits_cleanly_without_runtime_handle() {
463 let (tx, rx) = tokio::sync::mpsc::channel(8);
464 let config = SupervisionConfig {
465 max_attempts: Some(1),
466 initial_delay: Duration::from_millis(5),
467 backoff_multiplier: 1.0,
468 max_delay: Duration::from_millis(5),
469 };
470 let handle = tokio::spawn(supervision_loop(rx, None, config, None));
471
472 tx.send(CrashNotification {
473 route_id: "r-no-runtime".into(),
474 error: "boom".into(),
475 })
476 .await
477 .unwrap();
478 drop(tx);
479
480 let join = tokio::time::timeout(Duration::from_secs(1), handle)
481 .await
482 .expect("supervision loop should terminate");
483 join.expect("supervision task should not panic");
484 }
485
486 #[test]
487 fn with_metrics_stores_collector() {
488 let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
489 let controller = SupervisingRouteController::new(registry, SupervisionConfig::default())
490 .with_metrics(StdArc::new(camel_api::NoOpMetrics));
491 assert!(controller.metrics.is_some());
492 }
493
494 #[tokio::test]
495 async fn ensure_supervision_loop_started_is_idempotent() {
496 let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
497 let mut controller =
498 SupervisingRouteController::new(registry, SupervisionConfig::default());
499
500 assert!(controller.crash_rx.is_some());
501 controller.ensure_supervision_loop_started();
502 assert!(controller.crash_rx.is_none());
503
504 controller.ensure_supervision_loop_started();
506 assert!(controller.crash_rx.is_none());
507 }
508
509 struct CrashThenBlockConsumer {
511 call_count: StdArc<AtomicU32>,
512 }
513
514 #[async_trait]
515 impl Consumer for CrashThenBlockConsumer {
516 async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
517 let count = self.call_count.fetch_add(1, Ordering::SeqCst);
518
519 if count == 0 {
520 return Err(CamelError::RouteError("simulated crash".into()));
522 }
523
524 ctx.cancelled().await;
526 Ok(())
527 }
528
529 async fn stop(&mut self) -> Result<(), CamelError> {
530 Ok(())
531 }
532
533 fn concurrency_model(&self) -> ConcurrencyModel {
534 ConcurrencyModel::Sequential
535 }
536 }
537
538 struct CrashThenBlockEndpoint {
539 call_count: StdArc<AtomicU32>,
540 }
541
542 impl Endpoint for CrashThenBlockEndpoint {
543 fn uri(&self) -> &str {
544 "crash-then-block:test"
545 }
546
547 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
548 Ok(Box::new(CrashThenBlockConsumer {
549 call_count: StdArc::clone(&self.call_count),
550 }))
551 }
552
553 fn create_producer(
554 &self,
555 _ctx: &camel_api::ProducerContext,
556 ) -> Result<camel_api::BoxProcessor, CamelError> {
557 Err(CamelError::RouteError("no producer".into()))
558 }
559 }
560
561 struct CrashThenBlockComponent {
562 call_count: StdArc<AtomicU32>,
563 }
564
565 impl Component for CrashThenBlockComponent {
566 fn scheme(&self) -> &str {
567 "crash-then-block"
568 }
569
570 fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
571 Ok(Box::new(CrashThenBlockEndpoint {
572 call_count: StdArc::clone(&self.call_count),
573 }))
574 }
575 }
576
577 #[tokio::test]
578 async fn test_supervising_controller_restarts_crashed_route() {
579 let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
581 let call_count = StdArc::new(AtomicU32::new(0));
582 registry.lock().unwrap().register(CrashThenBlockComponent {
583 call_count: StdArc::clone(&call_count),
584 });
585
586 let config = SupervisionConfig {
588 max_attempts: Some(5),
589 initial_delay: Duration::from_millis(50),
590 backoff_multiplier: 1.0, max_delay: Duration::from_secs(60),
592 };
593
594 let controller: StdArc<Mutex<dyn RouteControllerInternal>> = StdArc::new(Mutex::new(
596 SupervisingRouteController::new(StdArc::clone(®istry), config),
597 ));
598
599 controller
601 .try_lock()
602 .unwrap()
603 .set_self_ref(StdArc::clone(&controller) as StdArc<Mutex<dyn RouteController>>);
604 let runtime = attach_runtime_bus(&controller).await;
605
606 let runtime_def = crate::route::RouteDefinition::new("crash-then-block:test", vec![])
608 .with_route_id("crash-route");
609 runtime.register_route(runtime_def).await.unwrap();
610
611 controller.lock().await.start_all_routes().await.unwrap();
613
614 tokio::time::sleep(Duration::from_millis(500)).await;
616
617 let count = call_count.load(Ordering::SeqCst);
619 assert!(
620 count >= 2,
621 "expected at least 2 consumer calls (crash + restart), got {}",
622 count
623 );
624
625 let status = match runtime
627 .ask(RuntimeQuery::GetRouteStatus {
628 route_id: "crash-route".into(),
629 })
630 .await
631 .unwrap()
632 {
633 RuntimeQueryResult::RouteStatus { status, .. } => status,
634 other => panic!("unexpected query result: {other:?}"),
635 };
636 assert_eq!(status, "Started");
637 }
638
639 #[tokio::test]
640 async fn test_supervising_controller_respects_max_attempts() {
641 struct AlwaysCrashConsumer;
643 #[async_trait]
644 impl Consumer for AlwaysCrashConsumer {
645 async fn start(&mut self, _ctx: ConsumerContext) -> Result<(), CamelError> {
646 Err(CamelError::RouteError("always crashes".into()))
647 }
648 async fn stop(&mut self) -> Result<(), CamelError> {
649 Ok(())
650 }
651 fn concurrency_model(&self) -> ConcurrencyModel {
652 ConcurrencyModel::Sequential
653 }
654 }
655 struct AlwaysCrashEndpoint;
656 impl Endpoint for AlwaysCrashEndpoint {
657 fn uri(&self) -> &str {
658 "always-crash:test"
659 }
660 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
661 Ok(Box::new(AlwaysCrashConsumer))
662 }
663 fn create_producer(
664 &self,
665 _ctx: &camel_api::ProducerContext,
666 ) -> Result<camel_api::BoxProcessor, CamelError> {
667 Err(CamelError::RouteError("no producer".into()))
668 }
669 }
670 struct AlwaysCrashComponent;
671 impl Component for AlwaysCrashComponent {
672 fn scheme(&self) -> &str {
673 "always-crash"
674 }
675 fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
676 Ok(Box::new(AlwaysCrashEndpoint))
677 }
678 }
679
680 let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
681 registry.lock().unwrap().register(AlwaysCrashComponent);
682
683 let config = SupervisionConfig {
685 max_attempts: Some(2),
686 initial_delay: Duration::from_millis(10),
687 backoff_multiplier: 1.0,
688 max_delay: Duration::from_secs(1),
689 };
690
691 let controller: StdArc<Mutex<dyn RouteControllerInternal>> = StdArc::new(Mutex::new(
692 SupervisingRouteController::new(StdArc::clone(®istry), config),
693 ));
694
695 controller
696 .try_lock()
697 .unwrap()
698 .set_self_ref(StdArc::clone(&controller) as StdArc<Mutex<dyn RouteController>>);
699 let runtime = attach_runtime_bus(&controller).await;
700
701 let runtime_def = crate::route::RouteDefinition::new("always-crash:test", vec![])
702 .with_route_id("always-crash-route");
703 runtime.register_route(runtime_def).await.unwrap();
704
705 controller.lock().await.start_all_routes().await.unwrap();
706
707 tokio::time::sleep(Duration::from_millis(200)).await;
709
710 let status = match runtime
712 .ask(RuntimeQuery::GetRouteStatus {
713 route_id: "always-crash-route".into(),
714 })
715 .await
716 .unwrap()
717 {
718 RuntimeQueryResult::RouteStatus { status, .. } => status,
719 other => panic!("unexpected query result: {other:?}"),
720 };
721 assert_eq!(status, "Failed");
722 }
723
724 #[tokio::test]
725 async fn test_supervising_controller_delegates_to_inner() {
726 let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
727 let config = SupervisionConfig::default();
728 let mut controller = SupervisingRouteController::new(StdArc::clone(®istry), config);
729
730 let self_ref: StdArc<Mutex<dyn RouteController>> = StdArc::new(Mutex::new(
732 SupervisingRouteController::new(registry, SupervisionConfig::default()),
733 ));
734 controller.set_self_ref(self_ref);
735
736 assert_eq!(controller.route_count(), 0);
738 assert_eq!(controller.route_ids(), Vec::<String>::new());
739 }
740
741 struct AlwaysCrashWithCountConsumer {
743 call_count: StdArc<AtomicU32>,
744 }
745
746 #[async_trait]
747 impl Consumer for AlwaysCrashWithCountConsumer {
748 async fn start(&mut self, _ctx: ConsumerContext) -> Result<(), CamelError> {
749 self.call_count.fetch_add(1, Ordering::SeqCst);
750 Err(CamelError::RouteError("always crashes".into()))
751 }
752
753 async fn stop(&mut self) -> Result<(), CamelError> {
754 Ok(())
755 }
756
757 fn concurrency_model(&self) -> ConcurrencyModel {
758 ConcurrencyModel::Sequential
759 }
760 }
761
762 struct AlwaysCrashWithCountEndpoint {
763 call_count: StdArc<AtomicU32>,
764 }
765
766 impl Endpoint for AlwaysCrashWithCountEndpoint {
767 fn uri(&self) -> &str {
768 "always-crash-count:test"
769 }
770
771 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
772 Ok(Box::new(AlwaysCrashWithCountConsumer {
773 call_count: StdArc::clone(&self.call_count),
774 }))
775 }
776
777 fn create_producer(
778 &self,
779 _ctx: &camel_api::ProducerContext,
780 ) -> Result<camel_api::BoxProcessor, CamelError> {
781 Err(CamelError::RouteError("no producer".into()))
782 }
783 }
784
785 struct AlwaysCrashWithCountComponent {
786 call_count: StdArc<AtomicU32>,
787 }
788
789 impl Component for AlwaysCrashWithCountComponent {
790 fn scheme(&self) -> &str {
791 "always-crash-count"
792 }
793
794 fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
795 Ok(Box::new(AlwaysCrashWithCountEndpoint {
796 call_count: StdArc::clone(&self.call_count),
797 }))
798 }
799 }
800
801 #[tokio::test]
802 async fn test_supervision_gives_up_after_max_attempts() {
803 let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
805 let call_count = StdArc::new(AtomicU32::new(0));
806 registry
807 .lock()
808 .unwrap()
809 .register(AlwaysCrashWithCountComponent {
810 call_count: StdArc::clone(&call_count),
811 });
812
813 let config = SupervisionConfig {
815 max_attempts: Some(2),
816 initial_delay: Duration::from_millis(50),
817 backoff_multiplier: 1.0,
818 max_delay: Duration::from_secs(60),
819 };
820
821 let controller: StdArc<Mutex<dyn RouteControllerInternal>> = StdArc::new(Mutex::new(
822 SupervisingRouteController::new(StdArc::clone(®istry), config),
823 ));
824
825 controller
826 .try_lock()
827 .unwrap()
828 .set_self_ref(StdArc::clone(&controller) as StdArc<Mutex<dyn RouteController>>);
829 let runtime = attach_runtime_bus(&controller).await;
830
831 let runtime_def = crate::route::RouteDefinition::new("always-crash-count:test", vec![])
832 .with_route_id("give-up-route");
833 runtime.register_route(runtime_def).await.unwrap();
834
835 controller.lock().await.start_all_routes().await.unwrap();
836
837 tokio::time::sleep(Duration::from_millis(800)).await;
841
842 let count = call_count.load(Ordering::SeqCst);
845 assert_eq!(
846 count, 3,
847 "expected exactly 3 consumer calls (initial + 2 restarts), got {}",
848 count
849 );
850
851 let status = match runtime
853 .ask(RuntimeQuery::GetRouteStatus {
854 route_id: "give-up-route".into(),
855 })
856 .await
857 .unwrap()
858 {
859 RuntimeQueryResult::RouteStatus { status, .. } => status,
860 other => panic!("unexpected query result: {other:?}"),
861 };
862 assert_eq!(status, "Failed");
863 }
864
865 struct CrashOnOddBlockOnEvenConsumer {
868 call_count: StdArc<AtomicU32>,
869 }
870
871 #[async_trait]
872 impl Consumer for CrashOnOddBlockOnEvenConsumer {
873 async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
874 let count = self.call_count.fetch_add(1, Ordering::SeqCst);
875 if count.is_multiple_of(2) {
880 return Err(CamelError::RouteError("odd call crash".into()));
882 }
883
884 tokio::select! {
887 _ = ctx.cancelled() => {
888 return Ok(());
890 }
891 _ = tokio::time::sleep(Duration::from_millis(100)) => {
892 return Err(CamelError::RouteError("even call crash after uptime".into()));
894 }
895 }
896 }
897
898 async fn stop(&mut self) -> Result<(), CamelError> {
899 Ok(())
900 }
901
902 fn concurrency_model(&self) -> ConcurrencyModel {
903 ConcurrencyModel::Sequential
904 }
905 }
906
907 struct CrashOnOddBlockOnEvenEndpoint {
908 call_count: StdArc<AtomicU32>,
909 }
910
911 impl Endpoint for CrashOnOddBlockOnEvenEndpoint {
912 fn uri(&self) -> &str {
913 "crash-odd-block-even:test"
914 }
915
916 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
917 Ok(Box::new(CrashOnOddBlockOnEvenConsumer {
918 call_count: StdArc::clone(&self.call_count),
919 }))
920 }
921
922 fn create_producer(
923 &self,
924 _ctx: &camel_api::ProducerContext,
925 ) -> Result<camel_api::BoxProcessor, CamelError> {
926 Err(CamelError::RouteError("no producer".into()))
927 }
928 }
929
930 struct CrashOnOddBlockOnEvenComponent {
931 call_count: StdArc<AtomicU32>,
932 }
933
934 impl Component for CrashOnOddBlockOnEvenComponent {
935 fn scheme(&self) -> &str {
936 "crash-odd-block-even"
937 }
938
939 fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
940 Ok(Box::new(CrashOnOddBlockOnEvenEndpoint {
941 call_count: StdArc::clone(&self.call_count),
942 }))
943 }
944 }
945
946 #[tokio::test]
947 async fn test_supervision_resets_attempt_count_on_success() {
948 let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
950 let call_count = StdArc::new(AtomicU32::new(0));
951 registry
952 .lock()
953 .unwrap()
954 .register(CrashOnOddBlockOnEvenComponent {
955 call_count: StdArc::clone(&call_count),
956 });
957
958 let config = SupervisionConfig {
962 max_attempts: Some(2),
963 initial_delay: Duration::from_millis(50),
964 backoff_multiplier: 1.0,
965 max_delay: Duration::from_secs(60),
966 };
967
968 let controller: StdArc<Mutex<dyn RouteControllerInternal>> = StdArc::new(Mutex::new(
969 SupervisingRouteController::new(StdArc::clone(®istry), config),
970 ));
971
972 controller
973 .try_lock()
974 .unwrap()
975 .set_self_ref(StdArc::clone(&controller) as StdArc<Mutex<dyn RouteController>>);
976 let runtime = attach_runtime_bus(&controller).await;
977
978 let runtime_def = crate::route::RouteDefinition::new("crash-odd-block-even:test", vec![])
979 .with_route_id("reset-attempt-route");
980 runtime.register_route(runtime_def).await.unwrap();
981
982 controller.lock().await.start_all_routes().await.unwrap();
983
984 tokio::time::sleep(Duration::from_millis(1000)).await;
992
993 let count = call_count.load(Ordering::SeqCst);
996 assert!(
997 count >= 4,
998 "expected at least 4 consumer calls (proving attempt reset), got {}",
999 count
1000 );
1001
1002 let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
1004 loop {
1005 let status = match runtime
1006 .ask(RuntimeQuery::GetRouteStatus {
1007 route_id: "reset-attempt-route".into(),
1008 })
1009 .await
1010 .unwrap()
1011 {
1012 RuntimeQueryResult::RouteStatus { status, .. } => status,
1013 other => panic!("unexpected query result: {other:?}"),
1014 };
1015 if status != "Failed" {
1016 break;
1017 }
1018 assert!(
1019 tokio::time::Instant::now() < deadline,
1020 "route remained in Failed state for 2s — supervision likely gave up"
1021 );
1022 tokio::time::sleep(Duration::from_millis(50)).await;
1023 }
1024 }
1025}