Skip to main content

camel_core/lifecycle/application/
supervision_service.rs

1//! Supervising route controller with automatic crash recovery.
2//!
3//! This module provides [`SupervisingRouteController`], which wraps a
4//! [`DefaultRouteController`] and monitors crashed routes, restarting them
5//! with exponential backoff based on [`SupervisionConfig`].
6
7use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::time::Instant;
11
12use tokio::sync::Mutex;
13use tracing::{error, info, warn};
14
15use camel_api::error_handler::ErrorHandlerConfig;
16use camel_api::{
17    CamelError, MetricsCollector, RouteController, RuntimeCommand, RuntimeHandle, RuntimeQuery,
18    RuntimeQueryResult, SupervisionConfig,
19};
20
21use crate::lifecycle::adapters::route_controller::{
22    CrashNotification, DefaultRouteController, RouteControllerInternal, SharedLanguageRegistry,
23};
24use crate::lifecycle::application::route_definition::RouteDefinition;
25use crate::shared::components::domain::Registry;
26
27/// A route controller that automatically restarts crashed routes.
28///
29/// Wraps a [`DefaultRouteController`] and spawns a supervision loop that
30/// receives crash notifications and restarts routes with exponential backoff.
31pub struct SupervisingRouteController {
32    /// The inner controller that manages actual routes.
33    inner: DefaultRouteController,
34    /// Supervision configuration.
35    config: SupervisionConfig,
36    /// Sender for crash notifications (cloned to inner controller).
37    crash_tx: tokio::sync::mpsc::Sender<CrashNotification>,
38    /// Receiver for crash notifications (taken when supervision loop starts).
39    crash_rx: Option<tokio::sync::mpsc::Receiver<CrashNotification>>,
40    /// Optional metrics collector.
41    metrics: Option<Arc<dyn MetricsCollector>>,
42}
43
44static SUPERVISION_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
45
46fn next_supervision_command_id(op: &str, route_id: &str) -> String {
47    let seq = SUPERVISION_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
48    format!("supervision:{op}:{route_id}:{seq}")
49}
50
51impl SupervisingRouteController {
52    /// Create a new supervising controller.
53    pub fn new(registry: Arc<std::sync::Mutex<Registry>>, config: SupervisionConfig) -> Self {
54        Self::with_languages(
55            registry,
56            config,
57            Arc::new(std::sync::Mutex::new(HashMap::new())),
58        )
59    }
60
61    /// Create a new supervising controller with shared language registry.
62    pub fn with_languages(
63        registry: Arc<std::sync::Mutex<Registry>>,
64        config: SupervisionConfig,
65        languages: SharedLanguageRegistry,
66    ) -> Self {
67        let (crash_tx, crash_rx) = tokio::sync::mpsc::channel(64);
68        Self {
69            inner: DefaultRouteController::with_languages(registry, languages),
70            config,
71            crash_tx,
72            crash_rx: Some(crash_rx),
73            metrics: None,
74        }
75    }
76
77    /// Set a metrics collector for the supervision loop.
78    pub fn with_metrics(mut self, metrics: Arc<dyn MetricsCollector>) -> Self {
79        self.metrics = Some(metrics);
80        self
81    }
82
83    fn ensure_supervision_loop_started(&mut self) {
84        self.inner.set_crash_notifier(self.crash_tx.clone());
85
86        if self.crash_rx.is_none() {
87            return;
88        }
89
90        let rx = self
91            .crash_rx
92            .take()
93            .expect("crash_rx checked as Some above");
94        let config = self.config.clone();
95        let metrics = self.metrics.clone();
96        let runtime = self.inner.runtime_handle_for_supervision();
97        tokio::spawn(async move {
98            supervision_loop(rx, runtime, config, metrics).await;
99        });
100    }
101}
102
103#[async_trait::async_trait]
104impl RouteController for SupervisingRouteController {
105    async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
106        self.ensure_supervision_loop_started();
107        self.inner.start_route(route_id).await
108    }
109
110    async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
111        self.inner.stop_route(route_id).await
112    }
113
114    async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
115        self.inner.restart_route(route_id).await
116    }
117
118    async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
119        self.inner.suspend_route(route_id).await
120    }
121
122    async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
123        self.inner.resume_route(route_id).await
124    }
125
126    async fn start_all_routes(&mut self) -> Result<(), CamelError> {
127        self.ensure_supervision_loop_started();
128        self.inner.start_all_routes().await
129    }
130
131    async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
132        self.inner.stop_all_routes().await
133    }
134}
135
136#[async_trait::async_trait]
137impl RouteControllerInternal for SupervisingRouteController {
138    fn add_route(&mut self, def: RouteDefinition) -> Result<(), CamelError> {
139        self.inner.add_route(def)
140    }
141
142    fn swap_pipeline(
143        &self,
144        route_id: &str,
145        pipeline: camel_api::BoxProcessor,
146    ) -> Result<(), CamelError> {
147        self.inner.swap_pipeline(route_id, pipeline)
148    }
149
150    fn route_from_uri(&self, route_id: &str) -> Option<String> {
151        self.inner.route_from_uri(route_id)
152    }
153
154    fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
155        self.inner.set_error_handler(config)
156    }
157
158    fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
159        self.inner.set_self_ref(self_ref)
160    }
161
162    fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
163        self.inner.set_runtime_handle(runtime)
164    }
165
166    fn route_count(&self) -> usize {
167        self.inner.route_count()
168    }
169
170    fn route_ids(&self) -> Vec<String> {
171        self.inner.route_ids()
172    }
173
174    fn auto_startup_route_ids(&self) -> Vec<String> {
175        self.inner.auto_startup_route_ids()
176    }
177
178    fn shutdown_route_ids(&self) -> Vec<String> {
179        self.inner.shutdown_route_ids()
180    }
181
182    fn set_tracer_config(&mut self, config: &crate::shared::observability::domain::TracerConfig) {
183        self.inner.set_tracer_config(config)
184    }
185
186    fn compile_route_definition(
187        &self,
188        def: RouteDefinition,
189    ) -> Result<camel_api::BoxProcessor, camel_api::CamelError> {
190        self.inner.compile_route_definition(def)
191    }
192
193    fn remove_route(&mut self, route_id: &str) -> Result<(), camel_api::CamelError> {
194        self.inner.remove_route(route_id)
195    }
196
197    async fn start_route_reload(&mut self, route_id: &str) -> Result<(), camel_api::CamelError> {
198        self.ensure_supervision_loop_started();
199        self.inner.start_route(route_id).await
200    }
201
202    async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), camel_api::CamelError> {
203        self.inner.stop_route(route_id).await
204    }
205}
206
207/// Supervision loop that restarts crashed routes.
208///
209/// Receives crash notifications and restarts routes with exponential backoff.
210/// Tracks attempt counts per route and respects `max_attempts` from config.
211async fn supervision_loop(
212    mut rx: tokio::sync::mpsc::Receiver<CrashNotification>,
213    runtime: Option<Arc<dyn RuntimeHandle>>,
214    config: SupervisionConfig,
215    _metrics: Option<Arc<dyn MetricsCollector>>,
216) {
217    let mut attempts: HashMap<String, u32> = HashMap::new();
218    let mut last_restart_time: HashMap<String, Instant> = HashMap::new();
219    let mut currently_restarting: HashSet<String> = HashSet::new();
220
221    info!("Supervision loop started");
222
223    while let Some(notification) = rx.recv().await {
224        let route_id = notification.route_id.clone();
225        let error = &notification.error;
226
227        // Skip if already processing a restart for this route
228        if currently_restarting.contains(&route_id) {
229            continue;
230        }
231
232        info!(
233            route_id = %route_id,
234            error = %error,
235            "Route crashed, checking restart policy"
236        );
237
238        // Reset attempt counter if route ran long enough before crashing
239        // A route that runs for >= initial_delay is considered a "successful run"
240        if let Some(last_time) = last_restart_time.get(&route_id)
241            && last_time.elapsed() >= config.initial_delay
242        {
243            attempts.insert(route_id.clone(), 0);
244        }
245
246        // Increment attempt counter
247        let current_attempt = attempts.entry(route_id.clone()).or_insert(0);
248        *current_attempt += 1;
249
250        // Check max attempts (collapse nested if-let)
251        if config
252            .max_attempts
253            .is_some_and(|max| *current_attempt > max)
254        {
255            error!(
256                route_id = %route_id,
257                attempts = current_attempt,
258                max = config.max_attempts.unwrap(),
259                "Route exceeded max restart attempts, giving up"
260            );
261            continue;
262        }
263
264        // Compute delay with exponential backoff
265        let delay = config.next_delay(*current_attempt);
266        info!(
267            route_id = %route_id,
268            attempt = current_attempt,
269            delay_ms = delay.as_millis(),
270            "Scheduling route restart"
271        );
272
273        // Mark as currently being processed
274        currently_restarting.insert(route_id.clone());
275
276        // Sleep before restart
277        tokio::time::sleep(delay).await;
278
279        let Some(runtime) = &runtime else {
280            warn!(
281                route_id = %route_id,
282                "Runtime handle unavailable, supervision restart skipped"
283            );
284            currently_restarting.remove(&route_id);
285            continue;
286        };
287
288        // If runtime lifecycle has already been intentionally transitioned to a
289        // non-running state, supervision must not revive it.
290        let pre_status = match runtime
291            .ask(RuntimeQuery::GetRouteStatus {
292                route_id: route_id.clone(),
293            })
294            .await
295        {
296            Ok(RuntimeQueryResult::RouteStatus { status, .. }) => status,
297            Ok(other) => {
298                warn!(
299                    route_id = %route_id,
300                    ?other,
301                    "Unexpected runtime query result, skipping supervision restart"
302                );
303                currently_restarting.remove(&route_id);
304                continue;
305            }
306            Err(err) => {
307                warn!(
308                    route_id = %route_id,
309                    error = %err,
310                    "Runtime status query failed, skipping supervision restart"
311                );
312                currently_restarting.remove(&route_id);
313                continue;
314            }
315        };
316
317        if matches!(pre_status.as_str(), "Registered" | "Stopped") {
318            warn!(
319                route_id = %route_id,
320                status = %pre_status,
321                "Runtime lifecycle is non-running; supervision restart skipped"
322            );
323            attempts.remove(&route_id);
324            currently_restarting.remove(&route_id);
325            continue;
326        }
327
328        // Record crash in runtime state first so the read-model remains authoritative
329        // for subsequent restart decisions.
330        if let Err(err) = runtime
331            .execute(RuntimeCommand::FailRoute {
332                route_id: route_id.clone(),
333                error: error.clone(),
334                command_id: next_supervision_command_id("fail", &route_id),
335                causation_id: None,
336            })
337            .await
338        {
339            warn!(
340                route_id = %route_id,
341                error = %err,
342                "Failed to persist crash state in runtime before restart check"
343            );
344        }
345
346        // Check current status before restarting using runtime read-model only.
347        let should_restart = match runtime
348            .ask(RuntimeQuery::GetRouteStatus {
349                route_id: route_id.clone(),
350            })
351            .await
352        {
353            Ok(RuntimeQueryResult::RouteStatus { status, .. }) if status == "Failed" => true,
354            Ok(RuntimeQueryResult::RouteStatus { status, .. }) => {
355                warn!(
356                    route_id = %route_id,
357                    status = %status,
358                    "Route no longer failed in runtime projection, skipping supervision restart"
359                );
360                attempts.remove(&route_id);
361                false
362            }
363            Ok(other) => {
364                warn!(
365                    route_id = %route_id,
366                    ?other,
367                    "Unexpected runtime query result, skipping supervision restart"
368                );
369                false
370            }
371            Err(err) => {
372                warn!(
373                    route_id = %route_id,
374                    error = %err,
375                    "Runtime status query failed, skipping supervision restart"
376                );
377                false
378            }
379        };
380
381        if should_restart {
382            let restart_result = runtime
383                .execute(RuntimeCommand::ReloadRoute {
384                    route_id: route_id.clone(),
385                    command_id: next_supervision_command_id("reload", &route_id),
386                    causation_id: None,
387                })
388                .await
389                .map(|_| ());
390
391            match restart_result {
392                Ok(()) => {
393                    info!(route_id = %route_id, "Route restarted successfully");
394                    // Record restart time instead of resetting attempts.
395                    // The counter will be reset on next crash if route ran long enough.
396                    last_restart_time.insert(route_id.clone(), Instant::now());
397                }
398                Err(e) => {
399                    error!(route_id = %route_id, error = %e, "Failed to restart route");
400                }
401            }
402        }
403
404        // No longer processing this route
405        currently_restarting.remove(&route_id);
406    }
407
408    info!("Supervision loop ended");
409}
410
411#[cfg(test)]
412mod tests {
413    use super::*;
414    use crate::lifecycle::adapters::{InMemoryRuntimeStore, RuntimeExecutionAdapter};
415    use crate::lifecycle::application::runtime_bus::RuntimeBus;
416    use crate::lifecycle::ports::RouteRegistrationPort as InternalRuntimeCommandBus;
417    use async_trait::async_trait;
418    use camel_api::RuntimeQueryBus;
419    use camel_component::{Component, ConcurrencyModel, Consumer, ConsumerContext, Endpoint};
420    use std::sync::Arc as StdArc;
421    use std::sync::atomic::{AtomicU32, Ordering};
422    use std::time::Duration;
423
424    async fn attach_runtime_bus(
425        controller: &StdArc<Mutex<dyn RouteControllerInternal>>,
426    ) -> StdArc<RuntimeBus> {
427        let store = InMemoryRuntimeStore::default();
428        let runtime = StdArc::new(
429            RuntimeBus::new(
430                StdArc::new(store.clone()),
431                StdArc::new(store.clone()),
432                StdArc::new(store.clone()),
433                StdArc::new(store.clone()),
434            )
435            .with_uow(StdArc::new(store))
436            .with_execution(StdArc::new(RuntimeExecutionAdapter::new(StdArc::clone(
437                controller,
438            )))),
439        );
440        let runtime_handle: StdArc<dyn RuntimeHandle> = runtime.clone();
441        controller.lock().await.set_runtime_handle(runtime_handle);
442        runtime
443    }
444
445    /// A consumer that crashes on first call, then blocks indefinitely.
446    struct CrashThenBlockConsumer {
447        call_count: StdArc<AtomicU32>,
448    }
449
450    #[async_trait]
451    impl Consumer for CrashThenBlockConsumer {
452        async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
453            let count = self.call_count.fetch_add(1, Ordering::SeqCst);
454
455            if count == 0 {
456                // First call: crash immediately
457                return Err(CamelError::RouteError("simulated crash".into()));
458            }
459
460            // Second+ call: block until cancelled
461            ctx.cancelled().await;
462            Ok(())
463        }
464
465        async fn stop(&mut self) -> Result<(), CamelError> {
466            Ok(())
467        }
468
469        fn concurrency_model(&self) -> ConcurrencyModel {
470            ConcurrencyModel::Sequential
471        }
472    }
473
474    struct CrashThenBlockEndpoint {
475        call_count: StdArc<AtomicU32>,
476    }
477
478    impl Endpoint for CrashThenBlockEndpoint {
479        fn uri(&self) -> &str {
480            "crash-then-block:test"
481        }
482
483        fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
484            Ok(Box::new(CrashThenBlockConsumer {
485                call_count: StdArc::clone(&self.call_count),
486            }))
487        }
488
489        fn create_producer(
490            &self,
491            _ctx: &camel_api::ProducerContext,
492        ) -> Result<camel_api::BoxProcessor, CamelError> {
493            Err(CamelError::RouteError("no producer".into()))
494        }
495    }
496
497    struct CrashThenBlockComponent {
498        call_count: StdArc<AtomicU32>,
499    }
500
501    impl Component for CrashThenBlockComponent {
502        fn scheme(&self) -> &str {
503            "crash-then-block"
504        }
505
506        fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
507            Ok(Box::new(CrashThenBlockEndpoint {
508                call_count: StdArc::clone(&self.call_count),
509            }))
510        }
511    }
512
513    #[tokio::test]
514    async fn test_supervising_controller_restarts_crashed_route() {
515        // Set up registry with crash-then-block component
516        let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
517        let call_count = StdArc::new(AtomicU32::new(0));
518        registry.lock().unwrap().register(CrashThenBlockComponent {
519            call_count: StdArc::clone(&call_count),
520        });
521
522        // Configure supervision with fast delays for testing
523        let config = SupervisionConfig {
524            max_attempts: Some(5),
525            initial_delay: Duration::from_millis(50),
526            backoff_multiplier: 1.0, // no growth
527            max_delay: Duration::from_secs(60),
528        };
529
530        // Create supervising controller
531        let controller: StdArc<Mutex<dyn RouteControllerInternal>> = StdArc::new(Mutex::new(
532            SupervisingRouteController::new(StdArc::clone(&registry), config),
533        ));
534
535        // Set self-ref
536        controller
537            .try_lock()
538            .unwrap()
539            .set_self_ref(StdArc::clone(&controller) as StdArc<Mutex<dyn RouteController>>);
540        let runtime = attach_runtime_bus(&controller).await;
541
542        // Add a route
543        let runtime_def = crate::route::RouteDefinition::new("crash-then-block:test", vec![])
544            .with_route_id("crash-route");
545        runtime.register_route(runtime_def).await.unwrap();
546
547        // Start all routes
548        controller.lock().await.start_all_routes().await.unwrap();
549
550        // Wait for crash + restart
551        tokio::time::sleep(Duration::from_millis(500)).await;
552
553        // Verify consumer was called at least twice (crash + restart)
554        let count = call_count.load(Ordering::SeqCst);
555        assert!(
556            count >= 2,
557            "expected at least 2 consumer calls (crash + restart), got {}",
558            count
559        );
560
561        // Verify runtime projection reports Started.
562        let status = match runtime
563            .ask(RuntimeQuery::GetRouteStatus {
564                route_id: "crash-route".into(),
565            })
566            .await
567            .unwrap()
568        {
569            RuntimeQueryResult::RouteStatus { status, .. } => status,
570            other => panic!("unexpected query result: {other:?}"),
571        };
572        assert_eq!(status, "Started");
573    }
574
575    #[tokio::test]
576    async fn test_supervising_controller_respects_max_attempts() {
577        // Set up registry with always-crash component
578        struct AlwaysCrashConsumer;
579        #[async_trait]
580        impl Consumer for AlwaysCrashConsumer {
581            async fn start(&mut self, _ctx: ConsumerContext) -> Result<(), CamelError> {
582                Err(CamelError::RouteError("always crashes".into()))
583            }
584            async fn stop(&mut self) -> Result<(), CamelError> {
585                Ok(())
586            }
587            fn concurrency_model(&self) -> ConcurrencyModel {
588                ConcurrencyModel::Sequential
589            }
590        }
591        struct AlwaysCrashEndpoint;
592        impl Endpoint for AlwaysCrashEndpoint {
593            fn uri(&self) -> &str {
594                "always-crash:test"
595            }
596            fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
597                Ok(Box::new(AlwaysCrashConsumer))
598            }
599            fn create_producer(
600                &self,
601                _ctx: &camel_api::ProducerContext,
602            ) -> Result<camel_api::BoxProcessor, CamelError> {
603                Err(CamelError::RouteError("no producer".into()))
604            }
605        }
606        struct AlwaysCrashComponent;
607        impl Component for AlwaysCrashComponent {
608            fn scheme(&self) -> &str {
609                "always-crash"
610            }
611            fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
612                Ok(Box::new(AlwaysCrashEndpoint))
613            }
614        }
615
616        let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
617        registry.lock().unwrap().register(AlwaysCrashComponent);
618
619        // Configure with max 2 attempts
620        let config = SupervisionConfig {
621            max_attempts: Some(2),
622            initial_delay: Duration::from_millis(10),
623            backoff_multiplier: 1.0,
624            max_delay: Duration::from_secs(1),
625        };
626
627        let controller: StdArc<Mutex<dyn RouteControllerInternal>> = StdArc::new(Mutex::new(
628            SupervisingRouteController::new(StdArc::clone(&registry), config),
629        ));
630
631        controller
632            .try_lock()
633            .unwrap()
634            .set_self_ref(StdArc::clone(&controller) as StdArc<Mutex<dyn RouteController>>);
635        let runtime = attach_runtime_bus(&controller).await;
636
637        let runtime_def = crate::route::RouteDefinition::new("always-crash:test", vec![])
638            .with_route_id("always-crash-route");
639        runtime.register_route(runtime_def).await.unwrap();
640
641        controller.lock().await.start_all_routes().await.unwrap();
642
643        // Wait for all attempts
644        tokio::time::sleep(Duration::from_millis(200)).await;
645
646        // Runtime projection should be in Failed state (not restarted after max attempts).
647        let status = match runtime
648            .ask(RuntimeQuery::GetRouteStatus {
649                route_id: "always-crash-route".into(),
650            })
651            .await
652            .unwrap()
653        {
654            RuntimeQueryResult::RouteStatus { status, .. } => status,
655            other => panic!("unexpected query result: {other:?}"),
656        };
657        assert_eq!(status, "Failed");
658    }
659
660    #[tokio::test]
661    async fn test_supervising_controller_delegates_to_inner() {
662        let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
663        let config = SupervisionConfig::default();
664        let mut controller = SupervisingRouteController::new(StdArc::clone(&registry), config);
665
666        // Set self-ref
667        let self_ref: StdArc<Mutex<dyn RouteController>> = StdArc::new(Mutex::new(
668            SupervisingRouteController::new(registry, SupervisionConfig::default()),
669        ));
670        controller.set_self_ref(self_ref);
671
672        // Test route_count and route_ids (delegation)
673        assert_eq!(controller.route_count(), 0);
674        assert_eq!(controller.route_ids(), Vec::<String>::new());
675    }
676
677    /// Consumer that always crashes immediately, tracking call count.
678    struct AlwaysCrashWithCountConsumer {
679        call_count: StdArc<AtomicU32>,
680    }
681
682    #[async_trait]
683    impl Consumer for AlwaysCrashWithCountConsumer {
684        async fn start(&mut self, _ctx: ConsumerContext) -> Result<(), CamelError> {
685            self.call_count.fetch_add(1, Ordering::SeqCst);
686            Err(CamelError::RouteError("always crashes".into()))
687        }
688
689        async fn stop(&mut self) -> Result<(), CamelError> {
690            Ok(())
691        }
692
693        fn concurrency_model(&self) -> ConcurrencyModel {
694            ConcurrencyModel::Sequential
695        }
696    }
697
698    struct AlwaysCrashWithCountEndpoint {
699        call_count: StdArc<AtomicU32>,
700    }
701
702    impl Endpoint for AlwaysCrashWithCountEndpoint {
703        fn uri(&self) -> &str {
704            "always-crash-count:test"
705        }
706
707        fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
708            Ok(Box::new(AlwaysCrashWithCountConsumer {
709                call_count: StdArc::clone(&self.call_count),
710            }))
711        }
712
713        fn create_producer(
714            &self,
715            _ctx: &camel_api::ProducerContext,
716        ) -> Result<camel_api::BoxProcessor, CamelError> {
717            Err(CamelError::RouteError("no producer".into()))
718        }
719    }
720
721    struct AlwaysCrashWithCountComponent {
722        call_count: StdArc<AtomicU32>,
723    }
724
725    impl Component for AlwaysCrashWithCountComponent {
726        fn scheme(&self) -> &str {
727            "always-crash-count"
728        }
729
730        fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
731            Ok(Box::new(AlwaysCrashWithCountEndpoint {
732                call_count: StdArc::clone(&self.call_count),
733            }))
734        }
735    }
736
737    #[tokio::test]
738    async fn test_supervision_gives_up_after_max_attempts() {
739        // Set up registry with always-crash component that tracks call count
740        let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
741        let call_count = StdArc::new(AtomicU32::new(0));
742        registry
743            .lock()
744            .unwrap()
745            .register(AlwaysCrashWithCountComponent {
746                call_count: StdArc::clone(&call_count),
747            });
748
749        // Configure supervision with max_attempts=2 and fast delays
750        let config = SupervisionConfig {
751            max_attempts: Some(2),
752            initial_delay: Duration::from_millis(50),
753            backoff_multiplier: 1.0,
754            max_delay: Duration::from_secs(60),
755        };
756
757        let controller: StdArc<Mutex<dyn RouteControllerInternal>> = StdArc::new(Mutex::new(
758            SupervisingRouteController::new(StdArc::clone(&registry), config),
759        ));
760
761        controller
762            .try_lock()
763            .unwrap()
764            .set_self_ref(StdArc::clone(&controller) as StdArc<Mutex<dyn RouteController>>);
765        let runtime = attach_runtime_bus(&controller).await;
766
767        let runtime_def = crate::route::RouteDefinition::new("always-crash-count:test", vec![])
768            .with_route_id("give-up-route");
769        runtime.register_route(runtime_def).await.unwrap();
770
771        controller.lock().await.start_all_routes().await.unwrap();
772
773        // Wait enough time for: initial start + 2 restart attempts
774        // With 50ms initial delay and no backoff: 50ms + 50ms = 100ms minimum
775        // Wait 800ms to be safe
776        tokio::time::sleep(Duration::from_millis(800)).await;
777
778        // Verify consumer was called exactly max_attempts + 1 times
779        // (1 initial start + 2 restart attempts = 3 total)
780        let count = call_count.load(Ordering::SeqCst);
781        assert_eq!(
782            count, 3,
783            "expected exactly 3 consumer calls (initial + 2 restarts), got {}",
784            count
785        );
786
787        // Verify runtime projection is in Failed state (supervision gave up).
788        let status = match runtime
789            .ask(RuntimeQuery::GetRouteStatus {
790                route_id: "give-up-route".into(),
791            })
792            .await
793            .unwrap()
794        {
795            RuntimeQueryResult::RouteStatus { status, .. } => status,
796            other => panic!("unexpected query result: {other:?}"),
797        };
798        assert_eq!(status, "Failed");
799    }
800
801    /// Consumer that crashes on odd calls, blocks briefly then crashes on even calls.
802    /// This simulates a route that sometimes runs successfully before crashing.
803    struct CrashOnOddBlockOnEvenConsumer {
804        call_count: StdArc<AtomicU32>,
805    }
806
807    #[async_trait]
808    impl Consumer for CrashOnOddBlockOnEvenConsumer {
809        async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
810            let count = self.call_count.fetch_add(1, Ordering::SeqCst);
811            // count is 0-indexed: 0=call1, 1=call2, 2=call3, etc.
812            // Odd calls (count 0, 2, 4, ...) crash immediately
813            // Even calls (count 1, 3, 5, ...) block for 100ms then crash
814
815            if count.is_multiple_of(2) {
816                // Odd-numbered call (1st, 3rd, 5th, ...): crash immediately
817                return Err(CamelError::RouteError("odd call crash".into()));
818            }
819
820            // Even-numbered call (2nd, 4th, 6th, ...): block briefly then crash
821            // This simulates "successful" operation before crashing
822            tokio::select! {
823                _ = ctx.cancelled() => {
824                    // Cancelled externally
825                    return Ok(());
826                }
827                _ = tokio::time::sleep(Duration::from_millis(100)) => {
828                    // Simulated "uptime" before crash
829                    return Err(CamelError::RouteError("even call crash after uptime".into()));
830                }
831            }
832        }
833
834        async fn stop(&mut self) -> Result<(), CamelError> {
835            Ok(())
836        }
837
838        fn concurrency_model(&self) -> ConcurrencyModel {
839            ConcurrencyModel::Sequential
840        }
841    }
842
843    struct CrashOnOddBlockOnEvenEndpoint {
844        call_count: StdArc<AtomicU32>,
845    }
846
847    impl Endpoint for CrashOnOddBlockOnEvenEndpoint {
848        fn uri(&self) -> &str {
849            "crash-odd-block-even:test"
850        }
851
852        fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
853            Ok(Box::new(CrashOnOddBlockOnEvenConsumer {
854                call_count: StdArc::clone(&self.call_count),
855            }))
856        }
857
858        fn create_producer(
859            &self,
860            _ctx: &camel_api::ProducerContext,
861        ) -> Result<camel_api::BoxProcessor, CamelError> {
862            Err(CamelError::RouteError("no producer".into()))
863        }
864    }
865
866    struct CrashOnOddBlockOnEvenComponent {
867        call_count: StdArc<AtomicU32>,
868    }
869
870    impl Component for CrashOnOddBlockOnEvenComponent {
871        fn scheme(&self) -> &str {
872            "crash-odd-block-even"
873        }
874
875        fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
876            Ok(Box::new(CrashOnOddBlockOnEvenEndpoint {
877                call_count: StdArc::clone(&self.call_count),
878            }))
879        }
880    }
881
882    #[tokio::test]
883    async fn test_supervision_resets_attempt_count_on_success() {
884        // Set up registry with crash-on-odd, block-on-even component
885        let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
886        let call_count = StdArc::new(AtomicU32::new(0));
887        registry
888            .lock()
889            .unwrap()
890            .register(CrashOnOddBlockOnEvenComponent {
891                call_count: StdArc::clone(&call_count),
892            });
893
894        // Configure with max_attempts=2 - allows continued restarts when successful runs reset the counter
895        // Without reset: would give up after 2 restarts (3 total calls)
896        // With reset: successful runs (100ms >= 50ms) reset counter, allowing continued restarts
897        let config = SupervisionConfig {
898            max_attempts: Some(2),
899            initial_delay: Duration::from_millis(50),
900            backoff_multiplier: 1.0,
901            max_delay: Duration::from_secs(60),
902        };
903
904        let controller: StdArc<Mutex<dyn RouteControllerInternal>> = StdArc::new(Mutex::new(
905            SupervisingRouteController::new(StdArc::clone(&registry), config),
906        ));
907
908        controller
909            .try_lock()
910            .unwrap()
911            .set_self_ref(StdArc::clone(&controller) as StdArc<Mutex<dyn RouteController>>);
912        let runtime = attach_runtime_bus(&controller).await;
913
914        let runtime_def = crate::route::RouteDefinition::new("crash-odd-block-even:test", vec![])
915            .with_route_id("reset-attempt-route");
916        runtime.register_route(runtime_def).await.unwrap();
917
918        controller.lock().await.start_all_routes().await.unwrap();
919
920        // Wait long enough for multiple crash-restart cycles:
921        // - Call 1: crash immediately → restart (attempt reset to 0)
922        // - Call 2: block 100ms, crash → restart (attempt reset to 0)
923        // - Call 3: crash immediately → restart (attempt reset to 0)
924        // - Call 4: block 100ms, crash → restart (attempt reset to 0)
925        // With 50ms initial delay and 100ms block time, each full cycle takes ~150ms
926        // Wait 1s to ensure we get at least 4 calls
927        tokio::time::sleep(Duration::from_millis(1000)).await;
928
929        // Verify consumer was called at least 4 times
930        // (proving the attempt count reset allows continued restarts)
931        let count = call_count.load(Ordering::SeqCst);
932        assert!(
933            count >= 4,
934            "expected at least 4 consumer calls (proving attempt reset), got {}",
935            count
936        );
937
938        // Verify runtime projection is NOT Failed (polling, since route crashes in a loop).
939        let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
940        loop {
941            let status = match runtime
942                .ask(RuntimeQuery::GetRouteStatus {
943                    route_id: "reset-attempt-route".into(),
944                })
945                .await
946                .unwrap()
947            {
948                RuntimeQueryResult::RouteStatus { status, .. } => status,
949                other => panic!("unexpected query result: {other:?}"),
950            };
951            if status != "Failed" {
952                break;
953            }
954            assert!(
955                tokio::time::Instant::now() < deadline,
956                "route remained in Failed state for 2s — supervision likely gave up"
957            );
958            tokio::time::sleep(Duration::from_millis(50)).await;
959        }
960    }
961}