Skip to main content

camel_core/
supervising_route_controller.rs

1//! Supervising route controller with automatic crash recovery.
2//!
3//! This module provides [`SupervisingRouteController`], which wraps a
4//! [`DefaultRouteController`] and monitors crashed routes, restarting them
5//! with exponential backoff based on [`SupervisionConfig`].
6
7use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use std::time::Instant;
10
11use tokio::sync::Mutex;
12use tracing::{error, info, warn};
13
14use camel_api::error_handler::ErrorHandlerConfig;
15use camel_api::{CamelError, MetricsCollector, RouteController, RouteStatus, SupervisionConfig};
16
17use crate::registry::Registry;
18use crate::route::RouteDefinition;
19use crate::route_controller::{
20    CrashNotification, DefaultRouteController, RouteControllerInternal, SharedLanguageRegistry,
21};
22
23/// A route controller that automatically restarts crashed routes.
24///
25/// Wraps a [`DefaultRouteController`] and spawns a supervision loop that
26/// receives crash notifications and restarts routes with exponential backoff.
27pub struct SupervisingRouteController {
28    /// The inner controller that manages actual routes.
29    inner: DefaultRouteController,
30    /// Supervision configuration.
31    config: SupervisionConfig,
32    /// Sender for crash notifications (cloned to inner controller).
33    crash_tx: tokio::sync::mpsc::Sender<CrashNotification>,
34    /// Receiver for crash notifications (taken when supervision loop starts).
35    crash_rx: Option<tokio::sync::mpsc::Receiver<CrashNotification>>,
36    /// Optional metrics collector.
37    metrics: Option<Arc<dyn MetricsCollector>>,
38}
39
40impl SupervisingRouteController {
41    /// Create a new supervising controller.
42    pub fn new(registry: Arc<std::sync::Mutex<Registry>>, config: SupervisionConfig) -> Self {
43        Self::with_languages(
44            registry,
45            config,
46            Arc::new(std::sync::Mutex::new(HashMap::new())),
47        )
48    }
49
50    /// Create a new supervising controller with shared language registry.
51    pub fn with_languages(
52        registry: Arc<std::sync::Mutex<Registry>>,
53        config: SupervisionConfig,
54        languages: SharedLanguageRegistry,
55    ) -> Self {
56        let (crash_tx, crash_rx) = tokio::sync::mpsc::channel(64);
57        Self {
58            inner: DefaultRouteController::with_languages(registry, languages),
59            config,
60            crash_tx,
61            crash_rx: Some(crash_rx),
62            metrics: None,
63        }
64    }
65
66    /// Set a metrics collector for the supervision loop.
67    pub fn with_metrics(mut self, metrics: Arc<dyn MetricsCollector>) -> Self {
68        self.metrics = Some(metrics);
69        self
70    }
71}
72
73#[async_trait::async_trait]
74impl RouteController for SupervisingRouteController {
75    async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
76        self.inner.start_route(route_id).await
77    }
78
79    async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
80        self.inner.stop_route(route_id).await
81    }
82
83    async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
84        self.inner.restart_route(route_id).await
85    }
86
87    async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
88        self.inner.suspend_route(route_id).await
89    }
90
91    async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
92        self.inner.resume_route(route_id).await
93    }
94
95    fn route_status(&self, route_id: &str) -> Option<RouteStatus> {
96        self.inner.route_status(route_id)
97    }
98
99    async fn start_all_routes(&mut self) -> Result<(), CamelError> {
100        // Set up crash notification before starting routes
101        self.inner.set_crash_notifier(self.crash_tx.clone());
102
103        // Start all routes via inner controller
104        self.inner.start_all_routes().await?;
105
106        // Take the receiver and spawn supervision loop
107        if let Some(rx) = self.crash_rx.take() {
108            if let Some(controller_ref) = self.inner.self_ref_for_supervision() {
109                let config = self.config.clone();
110                let metrics = self.metrics.clone();
111                tokio::spawn(async move {
112                    supervision_loop(rx, controller_ref, config, metrics).await;
113                });
114            } else {
115                warn!("SupervisingRouteController: self_ref not set, supervision loop not started");
116            }
117        }
118
119        Ok(())
120    }
121
122    async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
123        self.inner.stop_all_routes().await
124    }
125}
126
127#[async_trait::async_trait]
128impl RouteControllerInternal for SupervisingRouteController {
129    fn add_route(&mut self, def: RouteDefinition) -> Result<(), CamelError> {
130        self.inner.add_route(def)
131    }
132
133    fn swap_pipeline(
134        &self,
135        route_id: &str,
136        pipeline: camel_api::BoxProcessor,
137    ) -> Result<(), CamelError> {
138        self.inner.swap_pipeline(route_id, pipeline)
139    }
140
141    fn route_from_uri(&self, route_id: &str) -> Option<String> {
142        self.inner.route_from_uri(route_id)
143    }
144
145    fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
146        self.inner.set_error_handler(config)
147    }
148
149    fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
150        self.inner.set_self_ref(self_ref)
151    }
152
153    fn route_count(&self) -> usize {
154        self.inner.route_count()
155    }
156
157    fn route_ids(&self) -> Vec<String> {
158        self.inner.route_ids()
159    }
160
161    fn set_tracer_config(&mut self, config: &crate::config::TracerConfig) {
162        self.inner.set_tracer_config(config)
163    }
164
165    fn compile_route_definition(
166        &self,
167        def: crate::route::RouteDefinition,
168    ) -> Result<camel_api::BoxProcessor, camel_api::CamelError> {
169        self.inner.compile_route_definition(def)
170    }
171
172    fn remove_route(&mut self, route_id: &str) -> Result<(), camel_api::CamelError> {
173        self.inner.remove_route(route_id)
174    }
175
176    async fn start_route_reload(&mut self, route_id: &str) -> Result<(), camel_api::CamelError> {
177        self.inner.start_route(route_id).await
178    }
179
180    async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), camel_api::CamelError> {
181        self.inner.stop_route(route_id).await
182    }
183}
184
185/// Supervision loop that restarts crashed routes.
186///
187/// Receives crash notifications and restarts routes with exponential backoff.
188/// Tracks attempt counts per route and respects `max_attempts` from config.
189async fn supervision_loop(
190    mut rx: tokio::sync::mpsc::Receiver<CrashNotification>,
191    controller: Arc<Mutex<dyn RouteController>>,
192    config: SupervisionConfig,
193    _metrics: Option<Arc<dyn MetricsCollector>>,
194) {
195    let mut attempts: HashMap<String, u32> = HashMap::new();
196    let mut last_restart_time: HashMap<String, Instant> = HashMap::new();
197    let mut currently_restarting: HashSet<String> = HashSet::new();
198
199    info!("Supervision loop started");
200
201    while let Some(notification) = rx.recv().await {
202        let route_id = notification.route_id.clone();
203        let error = &notification.error;
204
205        // Skip if already processing a restart for this route
206        if currently_restarting.contains(&route_id) {
207            continue;
208        }
209
210        info!(
211            route_id = %route_id,
212            error = %error,
213            "Route crashed, checking restart policy"
214        );
215
216        // Reset attempt counter if route ran long enough before crashing
217        // A route that runs for >= initial_delay is considered a "successful run"
218        if let Some(last_time) = last_restart_time.get(&route_id)
219            && last_time.elapsed() >= config.initial_delay
220        {
221            attempts.insert(route_id.clone(), 0);
222        }
223
224        // Increment attempt counter
225        let current_attempt = attempts.entry(route_id.clone()).or_insert(0);
226        *current_attempt += 1;
227
228        // Check max attempts (collapse nested if-let)
229        if config
230            .max_attempts
231            .is_some_and(|max| *current_attempt > max)
232        {
233            error!(
234                route_id = %route_id,
235                attempts = current_attempt,
236                max = config.max_attempts.unwrap(),
237                "Route exceeded max restart attempts, giving up"
238            );
239            continue;
240        }
241
242        // Compute delay with exponential backoff
243        let delay = config.next_delay(*current_attempt);
244        info!(
245            route_id = %route_id,
246            attempt = current_attempt,
247            delay_ms = delay.as_millis(),
248            "Scheduling route restart"
249        );
250
251        // Mark as currently being processed
252        currently_restarting.insert(route_id.clone());
253
254        // Sleep before restart
255        tokio::time::sleep(delay).await;
256
257        // Check current status before restarting
258        let mut ctrl = controller.lock().await;
259        match ctrl.route_status(&route_id) {
260            Some(RouteStatus::Failed(_)) => {
261                // Route is still failed — proceed with restart
262                match ctrl.restart_route(&route_id).await {
263                    Ok(()) => {
264                        info!(route_id = %route_id, "Route restarted successfully");
265                        // Record restart time instead of resetting attempts
266                        // The counter will be reset on next crash if route ran long enough
267                        last_restart_time.insert(route_id.clone(), Instant::now());
268                    }
269                    Err(e) => {
270                        error!(route_id = %route_id, error = %e, "Failed to restart route");
271                    }
272                }
273            }
274            Some(status) => {
275                // Route was manually stopped/suspended — skip restart
276                warn!(route_id = %route_id, ?status, "Route no longer failed, skipping supervision restart");
277                attempts.remove(&route_id);
278            }
279            None => {
280                warn!(route_id = %route_id, "Route not found during supervision restart");
281            }
282        }
283
284        // No longer processing this route
285        currently_restarting.remove(&route_id);
286    }
287
288    info!("Supervision loop ended");
289}
290
291#[cfg(test)]
292mod tests {
293    use super::*;
294    use async_trait::async_trait;
295    use camel_component::{Component, ConcurrencyModel, Consumer, ConsumerContext, Endpoint};
296    use std::sync::Arc as StdArc;
297    use std::sync::atomic::{AtomicU32, Ordering};
298    use std::time::Duration;
299
300    /// A consumer that crashes on first call, then blocks indefinitely.
301    struct CrashThenBlockConsumer {
302        call_count: StdArc<AtomicU32>,
303    }
304
305    #[async_trait]
306    impl Consumer for CrashThenBlockConsumer {
307        async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
308            let count = self.call_count.fetch_add(1, Ordering::SeqCst);
309
310            if count == 0 {
311                // First call: crash immediately
312                return Err(CamelError::RouteError("simulated crash".into()));
313            }
314
315            // Second+ call: block until cancelled
316            ctx.cancelled().await;
317            Ok(())
318        }
319
320        async fn stop(&mut self) -> Result<(), CamelError> {
321            Ok(())
322        }
323
324        fn concurrency_model(&self) -> ConcurrencyModel {
325            ConcurrencyModel::Sequential
326        }
327    }
328
329    struct CrashThenBlockEndpoint {
330        call_count: StdArc<AtomicU32>,
331    }
332
333    impl Endpoint for CrashThenBlockEndpoint {
334        fn uri(&self) -> &str {
335            "crash-then-block:test"
336        }
337
338        fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
339            Ok(Box::new(CrashThenBlockConsumer {
340                call_count: StdArc::clone(&self.call_count),
341            }))
342        }
343
344        fn create_producer(
345            &self,
346            _ctx: &camel_api::ProducerContext,
347        ) -> Result<camel_api::BoxProcessor, CamelError> {
348            Err(CamelError::RouteError("no producer".into()))
349        }
350    }
351
352    struct CrashThenBlockComponent {
353        call_count: StdArc<AtomicU32>,
354    }
355
356    impl Component for CrashThenBlockComponent {
357        fn scheme(&self) -> &str {
358            "crash-then-block"
359        }
360
361        fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
362            Ok(Box::new(CrashThenBlockEndpoint {
363                call_count: StdArc::clone(&self.call_count),
364            }))
365        }
366    }
367
368    #[tokio::test]
369    async fn test_supervising_controller_restarts_crashed_route() {
370        // Set up registry with crash-then-block component
371        let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
372        let call_count = StdArc::new(AtomicU32::new(0));
373        registry.lock().unwrap().register(CrashThenBlockComponent {
374            call_count: StdArc::clone(&call_count),
375        });
376
377        // Configure supervision with fast delays for testing
378        let config = SupervisionConfig {
379            max_attempts: Some(5),
380            initial_delay: Duration::from_millis(50),
381            backoff_multiplier: 1.0, // no growth
382            max_delay: Duration::from_secs(60),
383        };
384
385        // Create supervising controller
386        let controller: StdArc<Mutex<dyn RouteControllerInternal>> = StdArc::new(Mutex::new(
387            SupervisingRouteController::new(StdArc::clone(&registry), config),
388        ));
389
390        // Set self-ref
391        controller
392            .try_lock()
393            .unwrap()
394            .set_self_ref(StdArc::clone(&controller) as StdArc<Mutex<dyn RouteController>>);
395
396        // Add a route
397        let def = crate::route::RouteDefinition::new("crash-then-block:test", vec![])
398            .with_route_id("crash-route");
399        controller.try_lock().unwrap().add_route(def).unwrap();
400
401        // Start all routes
402        controller.lock().await.start_all_routes().await.unwrap();
403
404        // Wait for crash + restart
405        tokio::time::sleep(Duration::from_millis(500)).await;
406
407        // Verify consumer was called at least twice (crash + restart)
408        let count = call_count.load(Ordering::SeqCst);
409        assert!(
410            count >= 2,
411            "expected at least 2 consumer calls (crash + restart), got {}",
412            count
413        );
414
415        // Verify route is now started
416        let status = controller.lock().await.route_status("crash-route").unwrap();
417        assert!(
418            matches!(status, RouteStatus::Started),
419            "expected Started, got {:?}",
420            status
421        );
422    }
423
424    #[tokio::test]
425    async fn test_supervising_controller_respects_max_attempts() {
426        // Set up registry with always-crash component
427        struct AlwaysCrashConsumer;
428        #[async_trait]
429        impl Consumer for AlwaysCrashConsumer {
430            async fn start(&mut self, _ctx: ConsumerContext) -> Result<(), CamelError> {
431                Err(CamelError::RouteError("always crashes".into()))
432            }
433            async fn stop(&mut self) -> Result<(), CamelError> {
434                Ok(())
435            }
436            fn concurrency_model(&self) -> ConcurrencyModel {
437                ConcurrencyModel::Sequential
438            }
439        }
440        struct AlwaysCrashEndpoint;
441        impl Endpoint for AlwaysCrashEndpoint {
442            fn uri(&self) -> &str {
443                "always-crash:test"
444            }
445            fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
446                Ok(Box::new(AlwaysCrashConsumer))
447            }
448            fn create_producer(
449                &self,
450                _ctx: &camel_api::ProducerContext,
451            ) -> Result<camel_api::BoxProcessor, CamelError> {
452                Err(CamelError::RouteError("no producer".into()))
453            }
454        }
455        struct AlwaysCrashComponent;
456        impl Component for AlwaysCrashComponent {
457            fn scheme(&self) -> &str {
458                "always-crash"
459            }
460            fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
461                Ok(Box::new(AlwaysCrashEndpoint))
462            }
463        }
464
465        let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
466        registry.lock().unwrap().register(AlwaysCrashComponent);
467
468        // Configure with max 2 attempts
469        let config = SupervisionConfig {
470            max_attempts: Some(2),
471            initial_delay: Duration::from_millis(10),
472            backoff_multiplier: 1.0,
473            max_delay: Duration::from_secs(1),
474        };
475
476        let controller: StdArc<Mutex<dyn RouteControllerInternal>> = StdArc::new(Mutex::new(
477            SupervisingRouteController::new(StdArc::clone(&registry), config),
478        ));
479
480        controller
481            .try_lock()
482            .unwrap()
483            .set_self_ref(StdArc::clone(&controller) as StdArc<Mutex<dyn RouteController>>);
484
485        let def = crate::route::RouteDefinition::new("always-crash:test", vec![])
486            .with_route_id("always-crash-route");
487        controller.try_lock().unwrap().add_route(def).unwrap();
488
489        controller.lock().await.start_all_routes().await.unwrap();
490
491        // Wait for all attempts
492        tokio::time::sleep(Duration::from_millis(200)).await;
493
494        // Route should be in Failed state (not restarted after max attempts)
495        let status = controller
496            .lock()
497            .await
498            .route_status("always-crash-route")
499            .unwrap();
500        assert!(
501            matches!(status, RouteStatus::Failed(_)),
502            "expected Failed, got {:?}",
503            status
504        );
505    }
506
507    #[tokio::test]
508    async fn test_supervising_controller_delegates_to_inner() {
509        let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
510        let config = SupervisionConfig::default();
511        let mut controller = SupervisingRouteController::new(StdArc::clone(&registry), config);
512
513        // Set self-ref
514        let self_ref: StdArc<Mutex<dyn RouteController>> = StdArc::new(Mutex::new(
515            SupervisingRouteController::new(registry, SupervisionConfig::default()),
516        ));
517        controller.set_self_ref(self_ref);
518
519        // Test route_count and route_ids (delegation)
520        assert_eq!(controller.route_count(), 0);
521        assert_eq!(controller.route_ids(), Vec::<String>::new());
522    }
523
524    /// Consumer that always crashes immediately, tracking call count.
525    struct AlwaysCrashWithCountConsumer {
526        call_count: StdArc<AtomicU32>,
527    }
528
529    #[async_trait]
530    impl Consumer for AlwaysCrashWithCountConsumer {
531        async fn start(&mut self, _ctx: ConsumerContext) -> Result<(), CamelError> {
532            self.call_count.fetch_add(1, Ordering::SeqCst);
533            Err(CamelError::RouteError("always crashes".into()))
534        }
535
536        async fn stop(&mut self) -> Result<(), CamelError> {
537            Ok(())
538        }
539
540        fn concurrency_model(&self) -> ConcurrencyModel {
541            ConcurrencyModel::Sequential
542        }
543    }
544
545    struct AlwaysCrashWithCountEndpoint {
546        call_count: StdArc<AtomicU32>,
547    }
548
549    impl Endpoint for AlwaysCrashWithCountEndpoint {
550        fn uri(&self) -> &str {
551            "always-crash-count:test"
552        }
553
554        fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
555            Ok(Box::new(AlwaysCrashWithCountConsumer {
556                call_count: StdArc::clone(&self.call_count),
557            }))
558        }
559
560        fn create_producer(
561            &self,
562            _ctx: &camel_api::ProducerContext,
563        ) -> Result<camel_api::BoxProcessor, CamelError> {
564            Err(CamelError::RouteError("no producer".into()))
565        }
566    }
567
568    struct AlwaysCrashWithCountComponent {
569        call_count: StdArc<AtomicU32>,
570    }
571
572    impl Component for AlwaysCrashWithCountComponent {
573        fn scheme(&self) -> &str {
574            "always-crash-count"
575        }
576
577        fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
578            Ok(Box::new(AlwaysCrashWithCountEndpoint {
579                call_count: StdArc::clone(&self.call_count),
580            }))
581        }
582    }
583
584    #[tokio::test]
585    async fn test_supervision_gives_up_after_max_attempts() {
586        // Set up registry with always-crash component that tracks call count
587        let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
588        let call_count = StdArc::new(AtomicU32::new(0));
589        registry
590            .lock()
591            .unwrap()
592            .register(AlwaysCrashWithCountComponent {
593                call_count: StdArc::clone(&call_count),
594            });
595
596        // Configure supervision with max_attempts=2 and fast delays
597        let config = SupervisionConfig {
598            max_attempts: Some(2),
599            initial_delay: Duration::from_millis(50),
600            backoff_multiplier: 1.0,
601            max_delay: Duration::from_secs(60),
602        };
603
604        let controller: StdArc<Mutex<dyn RouteControllerInternal>> = StdArc::new(Mutex::new(
605            SupervisingRouteController::new(StdArc::clone(&registry), config),
606        ));
607
608        controller
609            .try_lock()
610            .unwrap()
611            .set_self_ref(StdArc::clone(&controller) as StdArc<Mutex<dyn RouteController>>);
612
613        let def = crate::route::RouteDefinition::new("always-crash-count:test", vec![])
614            .with_route_id("give-up-route");
615        controller.try_lock().unwrap().add_route(def).unwrap();
616
617        controller.lock().await.start_all_routes().await.unwrap();
618
619        // Wait enough time for: initial start + 2 restart attempts
620        // With 50ms initial delay and no backoff: 50ms + 50ms = 100ms minimum
621        // Wait 800ms to be safe
622        tokio::time::sleep(Duration::from_millis(800)).await;
623
624        // Verify consumer was called exactly max_attempts + 1 times
625        // (1 initial start + 2 restart attempts = 3 total)
626        let count = call_count.load(Ordering::SeqCst);
627        assert_eq!(
628            count, 3,
629            "expected exactly 3 consumer calls (initial + 2 restarts), got {}",
630            count
631        );
632
633        // Verify route is in Failed state (supervision gave up)
634        let status = controller
635            .lock()
636            .await
637            .route_status("give-up-route")
638            .unwrap();
639        assert!(
640            matches!(status, RouteStatus::Failed(_)),
641            "expected Failed, got {:?}",
642            status
643        );
644    }
645
646    /// Consumer that crashes on odd calls, blocks briefly then crashes on even calls.
647    /// This simulates a route that sometimes runs successfully before crashing.
648    struct CrashOnOddBlockOnEvenConsumer {
649        call_count: StdArc<AtomicU32>,
650    }
651
652    #[async_trait]
653    impl Consumer for CrashOnOddBlockOnEvenConsumer {
654        async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
655            let count = self.call_count.fetch_add(1, Ordering::SeqCst);
656            // count is 0-indexed: 0=call1, 1=call2, 2=call3, etc.
657            // Odd calls (count 0, 2, 4, ...) crash immediately
658            // Even calls (count 1, 3, 5, ...) block for 100ms then crash
659
660            if count.is_multiple_of(2) {
661                // Odd-numbered call (1st, 3rd, 5th, ...): crash immediately
662                return Err(CamelError::RouteError("odd call crash".into()));
663            }
664
665            // Even-numbered call (2nd, 4th, 6th, ...): block briefly then crash
666            // This simulates "successful" operation before crashing
667            tokio::select! {
668                _ = ctx.cancelled() => {
669                    // Cancelled externally
670                    return Ok(());
671                }
672                _ = tokio::time::sleep(Duration::from_millis(100)) => {
673                    // Simulated "uptime" before crash
674                    return Err(CamelError::RouteError("even call crash after uptime".into()));
675                }
676            }
677        }
678
679        async fn stop(&mut self) -> Result<(), CamelError> {
680            Ok(())
681        }
682
683        fn concurrency_model(&self) -> ConcurrencyModel {
684            ConcurrencyModel::Sequential
685        }
686    }
687
688    struct CrashOnOddBlockOnEvenEndpoint {
689        call_count: StdArc<AtomicU32>,
690    }
691
692    impl Endpoint for CrashOnOddBlockOnEvenEndpoint {
693        fn uri(&self) -> &str {
694            "crash-odd-block-even:test"
695        }
696
697        fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
698            Ok(Box::new(CrashOnOddBlockOnEvenConsumer {
699                call_count: StdArc::clone(&self.call_count),
700            }))
701        }
702
703        fn create_producer(
704            &self,
705            _ctx: &camel_api::ProducerContext,
706        ) -> Result<camel_api::BoxProcessor, CamelError> {
707            Err(CamelError::RouteError("no producer".into()))
708        }
709    }
710
711    struct CrashOnOddBlockOnEvenComponent {
712        call_count: StdArc<AtomicU32>,
713    }
714
715    impl Component for CrashOnOddBlockOnEvenComponent {
716        fn scheme(&self) -> &str {
717            "crash-odd-block-even"
718        }
719
720        fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
721            Ok(Box::new(CrashOnOddBlockOnEvenEndpoint {
722                call_count: StdArc::clone(&self.call_count),
723            }))
724        }
725    }
726
727    #[tokio::test]
728    async fn test_supervision_resets_attempt_count_on_success() {
729        // Set up registry with crash-on-odd, block-on-even component
730        let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
731        let call_count = StdArc::new(AtomicU32::new(0));
732        registry
733            .lock()
734            .unwrap()
735            .register(CrashOnOddBlockOnEvenComponent {
736                call_count: StdArc::clone(&call_count),
737            });
738
739        // Configure with max_attempts=2 - allows continued restarts when successful runs reset the counter
740        // Without reset: would give up after 2 restarts (3 total calls)
741        // With reset: successful runs (100ms >= 50ms) reset counter, allowing continued restarts
742        let config = SupervisionConfig {
743            max_attempts: Some(2),
744            initial_delay: Duration::from_millis(50),
745            backoff_multiplier: 1.0,
746            max_delay: Duration::from_secs(60),
747        };
748
749        let controller: StdArc<Mutex<dyn RouteControllerInternal>> = StdArc::new(Mutex::new(
750            SupervisingRouteController::new(StdArc::clone(&registry), config),
751        ));
752
753        controller
754            .try_lock()
755            .unwrap()
756            .set_self_ref(StdArc::clone(&controller) as StdArc<Mutex<dyn RouteController>>);
757
758        let def = crate::route::RouteDefinition::new("crash-odd-block-even:test", vec![])
759            .with_route_id("reset-attempt-route");
760        controller.try_lock().unwrap().add_route(def).unwrap();
761
762        controller.lock().await.start_all_routes().await.unwrap();
763
764        // Wait long enough for multiple crash-restart cycles:
765        // - Call 1: crash immediately → restart (attempt reset to 0)
766        // - Call 2: block 100ms, crash → restart (attempt reset to 0)
767        // - Call 3: crash immediately → restart (attempt reset to 0)
768        // - Call 4: block 100ms, crash → restart (attempt reset to 0)
769        // With 50ms initial delay and 100ms block time, each full cycle takes ~150ms
770        // Wait 1s to ensure we get at least 4 calls
771        tokio::time::sleep(Duration::from_millis(1000)).await;
772
773        // Verify consumer was called at least 4 times
774        // (proving the attempt count reset allows continued restarts)
775        let count = call_count.load(Ordering::SeqCst);
776        assert!(
777            count >= 4,
778            "expected at least 4 consumer calls (proving attempt reset), got {}",
779            count
780        );
781
782        // Verify route is NOT in Failed state
783        // It should be either Started or in the process of restarting
784        let status = controller
785            .lock()
786            .await
787            .route_status("reset-attempt-route")
788            .unwrap();
789        assert!(
790            !matches!(status, RouteStatus::Failed(_)),
791            "expected route NOT to be Failed (supervision should continue due to attempt reset), got {:?}",
792            status
793        );
794    }
795}