Skip to main content

camel_core/lifecycle/application/
supervision_service.rs

1//! Supervising route controller with automatic crash recovery.
2//!
3//! This module provides [`SupervisingRouteController`], which wraps a
4//! [`DefaultRouteController`] and monitors crashed routes, restarting them
5//! with exponential backoff based on [`SupervisionConfig`].
6
7use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::time::Instant;
11
12use tokio::sync::Mutex;
13use tracing::{error, info, warn};
14
15use camel_api::error_handler::ErrorHandlerConfig;
16use camel_api::{
17    CamelError, MetricsCollector, RouteController, RuntimeCommand, RuntimeHandle, RuntimeQuery,
18    RuntimeQueryResult, SupervisionConfig,
19};
20
21use crate::lifecycle::adapters::route_controller::{
22    CrashNotification, DefaultRouteController, RouteControllerInternal, SharedLanguageRegistry,
23};
24use crate::lifecycle::application::route_definition::RouteDefinition;
25use crate::shared::components::domain::Registry;
26
27/// A route controller that automatically restarts crashed routes.
28///
29/// Wraps a [`DefaultRouteController`] and spawns a supervision loop that
30/// receives crash notifications and restarts routes with exponential backoff.
31pub struct SupervisingRouteController {
32    /// The inner controller that manages actual routes.
33    inner: DefaultRouteController,
34    /// Supervision configuration.
35    config: SupervisionConfig,
36    /// Sender for crash notifications (cloned to inner controller).
37    crash_tx: tokio::sync::mpsc::Sender<CrashNotification>,
38    /// Receiver for crash notifications (taken when supervision loop starts).
39    crash_rx: Option<tokio::sync::mpsc::Receiver<CrashNotification>>,
40    /// Optional metrics collector.
41    metrics: Option<Arc<dyn MetricsCollector>>,
42}
43
44static SUPERVISION_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
45
46fn next_supervision_command_id(op: &str, route_id: &str) -> String {
47    let seq = SUPERVISION_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
48    format!("supervision:{op}:{route_id}:{seq}")
49}
50
51impl SupervisingRouteController {
52    /// Create a new supervising controller.
53    pub fn new(registry: Arc<std::sync::Mutex<Registry>>, config: SupervisionConfig) -> Self {
54        Self::with_languages(
55            registry,
56            config,
57            Arc::new(std::sync::Mutex::new(HashMap::new())),
58        )
59    }
60
61    /// Create a new supervising controller with shared language registry.
62    pub fn with_languages(
63        registry: Arc<std::sync::Mutex<Registry>>,
64        config: SupervisionConfig,
65        languages: SharedLanguageRegistry,
66    ) -> Self {
67        let (crash_tx, crash_rx) = tokio::sync::mpsc::channel(64);
68        Self {
69            inner: DefaultRouteController::with_languages(registry, languages),
70            config,
71            crash_tx,
72            crash_rx: Some(crash_rx),
73            metrics: None,
74        }
75    }
76
77    /// Set a metrics collector for the supervision loop.
78    pub fn with_metrics(mut self, metrics: Arc<dyn MetricsCollector>) -> Self {
79        self.metrics = Some(metrics);
80        self
81    }
82
83    fn ensure_supervision_loop_started(&mut self) {
84        self.inner.set_crash_notifier(self.crash_tx.clone());
85
86        if self.crash_rx.is_none() {
87            return;
88        }
89
90        let rx = self
91            .crash_rx
92            .take()
93            .expect("crash_rx checked as Some above");
94        let config = self.config.clone();
95        let metrics = self.metrics.clone();
96        let runtime = self.inner.runtime_handle_for_supervision();
97        tokio::spawn(async move {
98            supervision_loop(rx, runtime, config, metrics).await;
99        });
100    }
101}
102
103#[async_trait::async_trait]
104impl RouteController for SupervisingRouteController {
105    async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
106        self.ensure_supervision_loop_started();
107        self.inner.start_route(route_id).await
108    }
109
110    async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
111        self.inner.stop_route(route_id).await
112    }
113
114    async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
115        self.inner.restart_route(route_id).await
116    }
117
118    async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
119        self.inner.suspend_route(route_id).await
120    }
121
122    async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
123        self.inner.resume_route(route_id).await
124    }
125
126    async fn start_all_routes(&mut self) -> Result<(), CamelError> {
127        self.ensure_supervision_loop_started();
128        self.inner.start_all_routes().await
129    }
130
131    async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
132        self.inner.stop_all_routes().await
133    }
134}
135
136#[async_trait::async_trait]
137impl RouteControllerInternal for SupervisingRouteController {
138    fn add_route(&mut self, def: RouteDefinition) -> Result<(), CamelError> {
139        self.inner.add_route(def)
140    }
141
142    fn swap_pipeline(
143        &self,
144        route_id: &str,
145        pipeline: camel_api::BoxProcessor,
146    ) -> Result<(), CamelError> {
147        self.inner.swap_pipeline(route_id, pipeline)
148    }
149
150    fn route_from_uri(&self, route_id: &str) -> Option<String> {
151        self.inner.route_from_uri(route_id)
152    }
153
154    fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
155        self.inner.set_error_handler(config)
156    }
157
158    fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
159        self.inner.set_self_ref(self_ref)
160    }
161
162    fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
163        self.inner.set_runtime_handle(runtime)
164    }
165
166    fn route_count(&self) -> usize {
167        self.inner.route_count()
168    }
169
170    fn route_ids(&self) -> Vec<String> {
171        self.inner.route_ids()
172    }
173
174    fn in_flight_count(&self, route_id: &str) -> Option<u64> {
175        self.inner.in_flight_count(route_id)
176    }
177
178    fn route_exists(&self, route_id: &str) -> bool {
179        self.inner.route_exists(route_id)
180    }
181
182    fn auto_startup_route_ids(&self) -> Vec<String> {
183        self.inner.auto_startup_route_ids()
184    }
185
186    fn shutdown_route_ids(&self) -> Vec<String> {
187        self.inner.shutdown_route_ids()
188    }
189
190    fn set_tracer_config(&mut self, config: &crate::shared::observability::domain::TracerConfig) {
191        self.inner.set_tracer_config(config)
192    }
193
194    fn compile_route_definition(
195        &self,
196        def: RouteDefinition,
197    ) -> Result<camel_api::BoxProcessor, camel_api::CamelError> {
198        self.inner.compile_route_definition(def)
199    }
200
201    fn remove_route(&mut self, route_id: &str) -> Result<(), camel_api::CamelError> {
202        self.inner.remove_route(route_id)
203    }
204
205    async fn start_route_reload(&mut self, route_id: &str) -> Result<(), camel_api::CamelError> {
206        self.ensure_supervision_loop_started();
207        self.inner.start_route(route_id).await
208    }
209
210    async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), camel_api::CamelError> {
211        self.inner.stop_route(route_id).await
212    }
213}
214
215/// Supervision loop that restarts crashed routes.
216///
217/// Receives crash notifications and restarts routes with exponential backoff.
218/// Tracks attempt counts per route and respects `max_attempts` from config.
219async fn supervision_loop(
220    mut rx: tokio::sync::mpsc::Receiver<CrashNotification>,
221    runtime: Option<Arc<dyn RuntimeHandle>>,
222    config: SupervisionConfig,
223    _metrics: Option<Arc<dyn MetricsCollector>>,
224) {
225    let mut attempts: HashMap<String, u32> = HashMap::new();
226    let mut last_restart_time: HashMap<String, Instant> = HashMap::new();
227    let mut currently_restarting: HashSet<String> = HashSet::new();
228
229    info!("Supervision loop started");
230
231    while let Some(notification) = rx.recv().await {
232        let route_id = notification.route_id.clone();
233        let error = &notification.error;
234
235        // Skip if already processing a restart for this route
236        if currently_restarting.contains(&route_id) {
237            continue;
238        }
239
240        info!(
241            route_id = %route_id,
242            error = %error,
243            "Route crashed, checking restart policy"
244        );
245
246        // Reset attempt counter if route ran long enough before crashing
247        // A route that runs for >= initial_delay is considered a "successful run"
248        if let Some(last_time) = last_restart_time.get(&route_id)
249            && last_time.elapsed() >= config.initial_delay
250        {
251            attempts.insert(route_id.clone(), 0);
252        }
253
254        // Increment attempt counter
255        let current_attempt = attempts.entry(route_id.clone()).or_insert(0);
256        *current_attempt += 1;
257
258        // Check max attempts (collapse nested if-let)
259        if config
260            .max_attempts
261            .is_some_and(|max| *current_attempt > max)
262        {
263            error!(
264                route_id = %route_id,
265                attempts = current_attempt,
266                max = config.max_attempts.unwrap(),
267                "Route exceeded max restart attempts, giving up"
268            );
269            continue;
270        }
271
272        // Compute delay with exponential backoff
273        let delay = config.next_delay(*current_attempt);
274        info!(
275            route_id = %route_id,
276            attempt = current_attempt,
277            delay_ms = delay.as_millis(),
278            "Scheduling route restart"
279        );
280
281        // Mark as currently being processed
282        currently_restarting.insert(route_id.clone());
283
284        // Sleep before restart
285        tokio::time::sleep(delay).await;
286
287        let Some(runtime) = &runtime else {
288            warn!(
289                route_id = %route_id,
290                "Runtime handle unavailable, supervision restart skipped"
291            );
292            currently_restarting.remove(&route_id);
293            continue;
294        };
295
296        // If runtime lifecycle has already been intentionally transitioned to a
297        // non-running state, supervision must not revive it.
298        let pre_status = match runtime
299            .ask(RuntimeQuery::GetRouteStatus {
300                route_id: route_id.clone(),
301            })
302            .await
303        {
304            Ok(RuntimeQueryResult::RouteStatus { status, .. }) => status,
305            Ok(other) => {
306                warn!(
307                    route_id = %route_id,
308                    ?other,
309                    "Unexpected runtime query result, skipping supervision restart"
310                );
311                currently_restarting.remove(&route_id);
312                continue;
313            }
314            Err(err) => {
315                warn!(
316                    route_id = %route_id,
317                    error = %err,
318                    "Runtime status query failed, skipping supervision restart"
319                );
320                currently_restarting.remove(&route_id);
321                continue;
322            }
323        };
324
325        if matches!(pre_status.as_str(), "Registered" | "Stopped") {
326            warn!(
327                route_id = %route_id,
328                status = %pre_status,
329                "Runtime lifecycle is non-running; supervision restart skipped"
330            );
331            attempts.remove(&route_id);
332            currently_restarting.remove(&route_id);
333            continue;
334        }
335
336        // Record crash in runtime state first so the read-model remains authoritative
337        // for subsequent restart decisions.
338        if let Err(err) = runtime
339            .execute(RuntimeCommand::FailRoute {
340                route_id: route_id.clone(),
341                error: error.clone(),
342                command_id: next_supervision_command_id("fail", &route_id),
343                causation_id: None,
344            })
345            .await
346        {
347            warn!(
348                route_id = %route_id,
349                error = %err,
350                "Failed to persist crash state in runtime before restart check"
351            );
352        }
353
354        // Check current status before restarting using runtime read-model only.
355        let should_restart = match runtime
356            .ask(RuntimeQuery::GetRouteStatus {
357                route_id: route_id.clone(),
358            })
359            .await
360        {
361            Ok(RuntimeQueryResult::RouteStatus { status, .. }) if status == "Failed" => true,
362            Ok(RuntimeQueryResult::RouteStatus { status, .. }) => {
363                warn!(
364                    route_id = %route_id,
365                    status = %status,
366                    "Route no longer failed in runtime projection, skipping supervision restart"
367                );
368                attempts.remove(&route_id);
369                false
370            }
371            Ok(other) => {
372                warn!(
373                    route_id = %route_id,
374                    ?other,
375                    "Unexpected runtime query result, skipping supervision restart"
376                );
377                false
378            }
379            Err(err) => {
380                warn!(
381                    route_id = %route_id,
382                    error = %err,
383                    "Runtime status query failed, skipping supervision restart"
384                );
385                false
386            }
387        };
388
389        if should_restart {
390            let restart_result = runtime
391                .execute(RuntimeCommand::ReloadRoute {
392                    route_id: route_id.clone(),
393                    command_id: next_supervision_command_id("reload", &route_id),
394                    causation_id: None,
395                })
396                .await
397                .map(|_| ());
398
399            match restart_result {
400                Ok(()) => {
401                    info!(route_id = %route_id, "Route restarted successfully");
402                    // Record restart time instead of resetting attempts.
403                    // The counter will be reset on next crash if route ran long enough.
404                    last_restart_time.insert(route_id.clone(), Instant::now());
405                }
406                Err(e) => {
407                    error!(route_id = %route_id, error = %e, "Failed to restart route");
408                }
409            }
410        }
411
412        // No longer processing this route
413        currently_restarting.remove(&route_id);
414    }
415
416    info!("Supervision loop ended");
417}
418
419#[cfg(test)]
420mod tests {
421    use super::*;
422    use crate::lifecycle::adapters::{InMemoryRuntimeStore, RuntimeExecutionAdapter};
423    use crate::lifecycle::application::runtime_bus::RuntimeBus;
424    use crate::lifecycle::ports::RouteRegistrationPort as InternalRuntimeCommandBus;
425    use async_trait::async_trait;
426    use camel_api::RuntimeQueryBus;
427    use camel_component_api::{Component, ConcurrencyModel, Consumer, ConsumerContext, Endpoint};
428    use std::sync::Arc as StdArc;
429    use std::sync::atomic::{AtomicU32, Ordering};
430    use std::time::Duration;
431
432    async fn attach_runtime_bus(
433        controller: &StdArc<Mutex<dyn RouteControllerInternal>>,
434    ) -> StdArc<RuntimeBus> {
435        let store = InMemoryRuntimeStore::default();
436        let runtime = StdArc::new(
437            RuntimeBus::new(
438                StdArc::new(store.clone()),
439                StdArc::new(store.clone()),
440                StdArc::new(store.clone()),
441                StdArc::new(store.clone()),
442            )
443            .with_uow(StdArc::new(store))
444            .with_execution(StdArc::new(RuntimeExecutionAdapter::new(StdArc::clone(
445                controller,
446            )))),
447        );
448        let runtime_handle: StdArc<dyn RuntimeHandle> = runtime.clone();
449        controller.lock().await.set_runtime_handle(runtime_handle);
450        runtime
451    }
452
453    #[test]
454    fn supervision_command_id_is_unique_and_well_formed() {
455        let id1 = next_supervision_command_id("start", "route-a");
456        let id2 = next_supervision_command_id("start", "route-a");
457        assert_ne!(id1, id2);
458        assert!(id1.starts_with("supervision:start:route-a:"));
459    }
460
461    #[tokio::test]
462    async fn supervision_loop_exits_cleanly_without_runtime_handle() {
463        let (tx, rx) = tokio::sync::mpsc::channel(8);
464        let config = SupervisionConfig {
465            max_attempts: Some(1),
466            initial_delay: Duration::from_millis(5),
467            backoff_multiplier: 1.0,
468            max_delay: Duration::from_millis(5),
469        };
470        let handle = tokio::spawn(supervision_loop(rx, None, config, None));
471
472        tx.send(CrashNotification {
473            route_id: "r-no-runtime".into(),
474            error: "boom".into(),
475        })
476        .await
477        .unwrap();
478        drop(tx);
479
480        let join = tokio::time::timeout(Duration::from_secs(1), handle)
481            .await
482            .expect("supervision loop should terminate");
483        join.expect("supervision task should not panic");
484    }
485
486    #[test]
487    fn with_metrics_stores_collector() {
488        let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
489        let controller = SupervisingRouteController::new(registry, SupervisionConfig::default())
490            .with_metrics(StdArc::new(camel_api::NoOpMetrics));
491        assert!(controller.metrics.is_some());
492    }
493
494    #[tokio::test]
495    async fn ensure_supervision_loop_started_is_idempotent() {
496        let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
497        let mut controller =
498            SupervisingRouteController::new(registry, SupervisionConfig::default());
499
500        assert!(controller.crash_rx.is_some());
501        controller.ensure_supervision_loop_started();
502        assert!(controller.crash_rx.is_none());
503
504        // Second call must be a no-op and keep receiver consumed.
505        controller.ensure_supervision_loop_started();
506        assert!(controller.crash_rx.is_none());
507    }
508
509    /// A consumer that crashes on first call, then blocks indefinitely.
510    struct CrashThenBlockConsumer {
511        call_count: StdArc<AtomicU32>,
512    }
513
514    #[async_trait]
515    impl Consumer for CrashThenBlockConsumer {
516        async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
517            let count = self.call_count.fetch_add(1, Ordering::SeqCst);
518
519            if count == 0 {
520                // First call: crash immediately
521                return Err(CamelError::RouteError("simulated crash".into()));
522            }
523
524            // Second+ call: block until cancelled
525            ctx.cancelled().await;
526            Ok(())
527        }
528
529        async fn stop(&mut self) -> Result<(), CamelError> {
530            Ok(())
531        }
532
533        fn concurrency_model(&self) -> ConcurrencyModel {
534            ConcurrencyModel::Sequential
535        }
536    }
537
538    struct CrashThenBlockEndpoint {
539        call_count: StdArc<AtomicU32>,
540    }
541
542    impl Endpoint for CrashThenBlockEndpoint {
543        fn uri(&self) -> &str {
544            "crash-then-block:test"
545        }
546
547        fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
548            Ok(Box::new(CrashThenBlockConsumer {
549                call_count: StdArc::clone(&self.call_count),
550            }))
551        }
552
553        fn create_producer(
554            &self,
555            _ctx: &camel_api::ProducerContext,
556        ) -> Result<camel_api::BoxProcessor, CamelError> {
557            Err(CamelError::RouteError("no producer".into()))
558        }
559    }
560
561    struct CrashThenBlockComponent {
562        call_count: StdArc<AtomicU32>,
563    }
564
565    impl Component for CrashThenBlockComponent {
566        fn scheme(&self) -> &str {
567            "crash-then-block"
568        }
569
570        fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
571            Ok(Box::new(CrashThenBlockEndpoint {
572                call_count: StdArc::clone(&self.call_count),
573            }))
574        }
575    }
576
577    #[tokio::test]
578    async fn test_supervising_controller_restarts_crashed_route() {
579        // Set up registry with crash-then-block component
580        let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
581        let call_count = StdArc::new(AtomicU32::new(0));
582        registry.lock().unwrap().register(CrashThenBlockComponent {
583            call_count: StdArc::clone(&call_count),
584        });
585
586        // Configure supervision with fast delays for testing
587        let config = SupervisionConfig {
588            max_attempts: Some(5),
589            initial_delay: Duration::from_millis(50),
590            backoff_multiplier: 1.0, // no growth
591            max_delay: Duration::from_secs(60),
592        };
593
594        // Create supervising controller
595        let controller: StdArc<Mutex<dyn RouteControllerInternal>> = StdArc::new(Mutex::new(
596            SupervisingRouteController::new(StdArc::clone(&registry), config),
597        ));
598
599        // Set self-ref
600        controller
601            .try_lock()
602            .unwrap()
603            .set_self_ref(StdArc::clone(&controller) as StdArc<Mutex<dyn RouteController>>);
604        let runtime = attach_runtime_bus(&controller).await;
605
606        // Add a route
607        let runtime_def = crate::route::RouteDefinition::new("crash-then-block:test", vec![])
608            .with_route_id("crash-route");
609        runtime.register_route(runtime_def).await.unwrap();
610
611        // Start all routes
612        controller.lock().await.start_all_routes().await.unwrap();
613
614        // Wait for crash + restart
615        tokio::time::sleep(Duration::from_millis(500)).await;
616
617        // Verify consumer was called at least twice (crash + restart)
618        let count = call_count.load(Ordering::SeqCst);
619        assert!(
620            count >= 2,
621            "expected at least 2 consumer calls (crash + restart), got {}",
622            count
623        );
624
625        // Verify runtime projection reports Started.
626        let status = match runtime
627            .ask(RuntimeQuery::GetRouteStatus {
628                route_id: "crash-route".into(),
629            })
630            .await
631            .unwrap()
632        {
633            RuntimeQueryResult::RouteStatus { status, .. } => status,
634            other => panic!("unexpected query result: {other:?}"),
635        };
636        assert_eq!(status, "Started");
637    }
638
639    #[tokio::test]
640    async fn test_supervising_controller_respects_max_attempts() {
641        // Set up registry with always-crash component
642        struct AlwaysCrashConsumer;
643        #[async_trait]
644        impl Consumer for AlwaysCrashConsumer {
645            async fn start(&mut self, _ctx: ConsumerContext) -> Result<(), CamelError> {
646                Err(CamelError::RouteError("always crashes".into()))
647            }
648            async fn stop(&mut self) -> Result<(), CamelError> {
649                Ok(())
650            }
651            fn concurrency_model(&self) -> ConcurrencyModel {
652                ConcurrencyModel::Sequential
653            }
654        }
655        struct AlwaysCrashEndpoint;
656        impl Endpoint for AlwaysCrashEndpoint {
657            fn uri(&self) -> &str {
658                "always-crash:test"
659            }
660            fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
661                Ok(Box::new(AlwaysCrashConsumer))
662            }
663            fn create_producer(
664                &self,
665                _ctx: &camel_api::ProducerContext,
666            ) -> Result<camel_api::BoxProcessor, CamelError> {
667                Err(CamelError::RouteError("no producer".into()))
668            }
669        }
670        struct AlwaysCrashComponent;
671        impl Component for AlwaysCrashComponent {
672            fn scheme(&self) -> &str {
673                "always-crash"
674            }
675            fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
676                Ok(Box::new(AlwaysCrashEndpoint))
677            }
678        }
679
680        let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
681        registry.lock().unwrap().register(AlwaysCrashComponent);
682
683        // Configure with max 2 attempts
684        let config = SupervisionConfig {
685            max_attempts: Some(2),
686            initial_delay: Duration::from_millis(10),
687            backoff_multiplier: 1.0,
688            max_delay: Duration::from_secs(1),
689        };
690
691        let controller: StdArc<Mutex<dyn RouteControllerInternal>> = StdArc::new(Mutex::new(
692            SupervisingRouteController::new(StdArc::clone(&registry), config),
693        ));
694
695        controller
696            .try_lock()
697            .unwrap()
698            .set_self_ref(StdArc::clone(&controller) as StdArc<Mutex<dyn RouteController>>);
699        let runtime = attach_runtime_bus(&controller).await;
700
701        let runtime_def = crate::route::RouteDefinition::new("always-crash:test", vec![])
702            .with_route_id("always-crash-route");
703        runtime.register_route(runtime_def).await.unwrap();
704
705        controller.lock().await.start_all_routes().await.unwrap();
706
707        // Wait for all attempts
708        tokio::time::sleep(Duration::from_millis(200)).await;
709
710        // Runtime projection should be in Failed state (not restarted after max attempts).
711        let status = match runtime
712            .ask(RuntimeQuery::GetRouteStatus {
713                route_id: "always-crash-route".into(),
714            })
715            .await
716            .unwrap()
717        {
718            RuntimeQueryResult::RouteStatus { status, .. } => status,
719            other => panic!("unexpected query result: {other:?}"),
720        };
721        assert_eq!(status, "Failed");
722    }
723
724    #[tokio::test]
725    async fn test_supervising_controller_delegates_to_inner() {
726        let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
727        let config = SupervisionConfig::default();
728        let mut controller = SupervisingRouteController::new(StdArc::clone(&registry), config);
729
730        // Set self-ref
731        let self_ref: StdArc<Mutex<dyn RouteController>> = StdArc::new(Mutex::new(
732            SupervisingRouteController::new(registry, SupervisionConfig::default()),
733        ));
734        controller.set_self_ref(self_ref);
735
736        // Test route_count and route_ids (delegation)
737        assert_eq!(controller.route_count(), 0);
738        assert_eq!(controller.route_ids(), Vec::<String>::new());
739    }
740
741    /// Consumer that always crashes immediately, tracking call count.
742    struct AlwaysCrashWithCountConsumer {
743        call_count: StdArc<AtomicU32>,
744    }
745
746    #[async_trait]
747    impl Consumer for AlwaysCrashWithCountConsumer {
748        async fn start(&mut self, _ctx: ConsumerContext) -> Result<(), CamelError> {
749            self.call_count.fetch_add(1, Ordering::SeqCst);
750            Err(CamelError::RouteError("always crashes".into()))
751        }
752
753        async fn stop(&mut self) -> Result<(), CamelError> {
754            Ok(())
755        }
756
757        fn concurrency_model(&self) -> ConcurrencyModel {
758            ConcurrencyModel::Sequential
759        }
760    }
761
762    struct AlwaysCrashWithCountEndpoint {
763        call_count: StdArc<AtomicU32>,
764    }
765
766    impl Endpoint for AlwaysCrashWithCountEndpoint {
767        fn uri(&self) -> &str {
768            "always-crash-count:test"
769        }
770
771        fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
772            Ok(Box::new(AlwaysCrashWithCountConsumer {
773                call_count: StdArc::clone(&self.call_count),
774            }))
775        }
776
777        fn create_producer(
778            &self,
779            _ctx: &camel_api::ProducerContext,
780        ) -> Result<camel_api::BoxProcessor, CamelError> {
781            Err(CamelError::RouteError("no producer".into()))
782        }
783    }
784
785    struct AlwaysCrashWithCountComponent {
786        call_count: StdArc<AtomicU32>,
787    }
788
789    impl Component for AlwaysCrashWithCountComponent {
790        fn scheme(&self) -> &str {
791            "always-crash-count"
792        }
793
794        fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
795            Ok(Box::new(AlwaysCrashWithCountEndpoint {
796                call_count: StdArc::clone(&self.call_count),
797            }))
798        }
799    }
800
801    #[tokio::test]
802    async fn test_supervision_gives_up_after_max_attempts() {
803        // Set up registry with always-crash component that tracks call count
804        let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
805        let call_count = StdArc::new(AtomicU32::new(0));
806        registry
807            .lock()
808            .unwrap()
809            .register(AlwaysCrashWithCountComponent {
810                call_count: StdArc::clone(&call_count),
811            });
812
813        // Configure supervision with max_attempts=2 and fast delays
814        let config = SupervisionConfig {
815            max_attempts: Some(2),
816            initial_delay: Duration::from_millis(50),
817            backoff_multiplier: 1.0,
818            max_delay: Duration::from_secs(60),
819        };
820
821        let controller: StdArc<Mutex<dyn RouteControllerInternal>> = StdArc::new(Mutex::new(
822            SupervisingRouteController::new(StdArc::clone(&registry), config),
823        ));
824
825        controller
826            .try_lock()
827            .unwrap()
828            .set_self_ref(StdArc::clone(&controller) as StdArc<Mutex<dyn RouteController>>);
829        let runtime = attach_runtime_bus(&controller).await;
830
831        let runtime_def = crate::route::RouteDefinition::new("always-crash-count:test", vec![])
832            .with_route_id("give-up-route");
833        runtime.register_route(runtime_def).await.unwrap();
834
835        controller.lock().await.start_all_routes().await.unwrap();
836
837        // Wait enough time for: initial start + 2 restart attempts
838        // With 50ms initial delay and no backoff: 50ms + 50ms = 100ms minimum
839        // Wait 800ms to be safe
840        tokio::time::sleep(Duration::from_millis(800)).await;
841
842        // Verify consumer was called exactly max_attempts + 1 times
843        // (1 initial start + 2 restart attempts = 3 total)
844        let count = call_count.load(Ordering::SeqCst);
845        assert_eq!(
846            count, 3,
847            "expected exactly 3 consumer calls (initial + 2 restarts), got {}",
848            count
849        );
850
851        // Verify runtime projection is in Failed state (supervision gave up).
852        let status = match runtime
853            .ask(RuntimeQuery::GetRouteStatus {
854                route_id: "give-up-route".into(),
855            })
856            .await
857            .unwrap()
858        {
859            RuntimeQueryResult::RouteStatus { status, .. } => status,
860            other => panic!("unexpected query result: {other:?}"),
861        };
862        assert_eq!(status, "Failed");
863    }
864
865    /// Consumer that crashes on odd calls, blocks briefly then crashes on even calls.
866    /// This simulates a route that sometimes runs successfully before crashing.
867    struct CrashOnOddBlockOnEvenConsumer {
868        call_count: StdArc<AtomicU32>,
869    }
870
871    #[async_trait]
872    impl Consumer for CrashOnOddBlockOnEvenConsumer {
873        async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
874            let count = self.call_count.fetch_add(1, Ordering::SeqCst);
875            // count is 0-indexed: 0=call1, 1=call2, 2=call3, etc.
876            // Odd calls (count 0, 2, 4, ...) crash immediately
877            // Even calls (count 1, 3, 5, ...) block for 100ms then crash
878
879            if count.is_multiple_of(2) {
880                // Odd-numbered call (1st, 3rd, 5th, ...): crash immediately
881                return Err(CamelError::RouteError("odd call crash".into()));
882            }
883
884            // Even-numbered call (2nd, 4th, 6th, ...): block briefly then crash
885            // This simulates "successful" operation before crashing
886            tokio::select! {
887                _ = ctx.cancelled() => {
888                    // Cancelled externally
889                    return Ok(());
890                }
891                _ = tokio::time::sleep(Duration::from_millis(100)) => {
892                    // Simulated "uptime" before crash
893                    return Err(CamelError::RouteError("even call crash after uptime".into()));
894                }
895            }
896        }
897
898        async fn stop(&mut self) -> Result<(), CamelError> {
899            Ok(())
900        }
901
902        fn concurrency_model(&self) -> ConcurrencyModel {
903            ConcurrencyModel::Sequential
904        }
905    }
906
907    struct CrashOnOddBlockOnEvenEndpoint {
908        call_count: StdArc<AtomicU32>,
909    }
910
911    impl Endpoint for CrashOnOddBlockOnEvenEndpoint {
912        fn uri(&self) -> &str {
913            "crash-odd-block-even:test"
914        }
915
916        fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
917            Ok(Box::new(CrashOnOddBlockOnEvenConsumer {
918                call_count: StdArc::clone(&self.call_count),
919            }))
920        }
921
922        fn create_producer(
923            &self,
924            _ctx: &camel_api::ProducerContext,
925        ) -> Result<camel_api::BoxProcessor, CamelError> {
926            Err(CamelError::RouteError("no producer".into()))
927        }
928    }
929
930    struct CrashOnOddBlockOnEvenComponent {
931        call_count: StdArc<AtomicU32>,
932    }
933
934    impl Component for CrashOnOddBlockOnEvenComponent {
935        fn scheme(&self) -> &str {
936            "crash-odd-block-even"
937        }
938
939        fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
940            Ok(Box::new(CrashOnOddBlockOnEvenEndpoint {
941                call_count: StdArc::clone(&self.call_count),
942            }))
943        }
944    }
945
946    #[tokio::test]
947    async fn test_supervision_resets_attempt_count_on_success() {
948        // Set up registry with crash-on-odd, block-on-even component
949        let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
950        let call_count = StdArc::new(AtomicU32::new(0));
951        registry
952            .lock()
953            .unwrap()
954            .register(CrashOnOddBlockOnEvenComponent {
955                call_count: StdArc::clone(&call_count),
956            });
957
958        // Configure with max_attempts=2 - allows continued restarts when successful runs reset the counter
959        // Without reset: would give up after 2 restarts (3 total calls)
960        // With reset: successful runs (100ms >= 50ms) reset counter, allowing continued restarts
961        let config = SupervisionConfig {
962            max_attempts: Some(2),
963            initial_delay: Duration::from_millis(50),
964            backoff_multiplier: 1.0,
965            max_delay: Duration::from_secs(60),
966        };
967
968        let controller: StdArc<Mutex<dyn RouteControllerInternal>> = StdArc::new(Mutex::new(
969            SupervisingRouteController::new(StdArc::clone(&registry), config),
970        ));
971
972        controller
973            .try_lock()
974            .unwrap()
975            .set_self_ref(StdArc::clone(&controller) as StdArc<Mutex<dyn RouteController>>);
976        let runtime = attach_runtime_bus(&controller).await;
977
978        let runtime_def = crate::route::RouteDefinition::new("crash-odd-block-even:test", vec![])
979            .with_route_id("reset-attempt-route");
980        runtime.register_route(runtime_def).await.unwrap();
981
982        controller.lock().await.start_all_routes().await.unwrap();
983
984        // Wait long enough for multiple crash-restart cycles:
985        // - Call 1: crash immediately → restart (attempt reset to 0)
986        // - Call 2: block 100ms, crash → restart (attempt reset to 0)
987        // - Call 3: crash immediately → restart (attempt reset to 0)
988        // - Call 4: block 100ms, crash → restart (attempt reset to 0)
989        // With 50ms initial delay and 100ms block time, each full cycle takes ~150ms
990        // Wait 1s to ensure we get at least 4 calls
991        tokio::time::sleep(Duration::from_millis(1000)).await;
992
993        // Verify consumer was called at least 4 times
994        // (proving the attempt count reset allows continued restarts)
995        let count = call_count.load(Ordering::SeqCst);
996        assert!(
997            count >= 4,
998            "expected at least 4 consumer calls (proving attempt reset), got {}",
999            count
1000        );
1001
1002        // Verify runtime projection is NOT Failed (polling, since route crashes in a loop).
1003        let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
1004        loop {
1005            let status = match runtime
1006                .ask(RuntimeQuery::GetRouteStatus {
1007                    route_id: "reset-attempt-route".into(),
1008                })
1009                .await
1010                .unwrap()
1011            {
1012                RuntimeQueryResult::RouteStatus { status, .. } => status,
1013                other => panic!("unexpected query result: {other:?}"),
1014            };
1015            if status != "Failed" {
1016                break;
1017            }
1018            assert!(
1019                tokio::time::Instant::now() < deadline,
1020                "route remained in Failed state for 2s — supervision likely gave up"
1021            );
1022            tokio::time::sleep(Duration::from_millis(50)).await;
1023        }
1024    }
1025}