1use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use std::time::Instant;
10
11use tokio::sync::Mutex;
12use tracing::{error, info, warn};
13
14use camel_api::error_handler::ErrorHandlerConfig;
15use camel_api::{CamelError, MetricsCollector, RouteController, RouteStatus, SupervisionConfig};
16
17use crate::registry::Registry;
18use crate::route::RouteDefinition;
19use crate::route_controller::{
20 CrashNotification, DefaultRouteController, RouteControllerInternal, SharedLanguageRegistry,
21};
22
23pub struct SupervisingRouteController {
28 inner: DefaultRouteController,
30 config: SupervisionConfig,
32 crash_tx: tokio::sync::mpsc::Sender<CrashNotification>,
34 crash_rx: Option<tokio::sync::mpsc::Receiver<CrashNotification>>,
36 metrics: Option<Arc<dyn MetricsCollector>>,
38}
39
40impl SupervisingRouteController {
41 pub fn new(registry: Arc<std::sync::Mutex<Registry>>, config: SupervisionConfig) -> Self {
43 Self::with_languages(
44 registry,
45 config,
46 Arc::new(std::sync::Mutex::new(HashMap::new())),
47 )
48 }
49
50 pub fn with_languages(
52 registry: Arc<std::sync::Mutex<Registry>>,
53 config: SupervisionConfig,
54 languages: SharedLanguageRegistry,
55 ) -> Self {
56 let (crash_tx, crash_rx) = tokio::sync::mpsc::channel(64);
57 Self {
58 inner: DefaultRouteController::with_languages(registry, languages),
59 config,
60 crash_tx,
61 crash_rx: Some(crash_rx),
62 metrics: None,
63 }
64 }
65
66 pub fn with_metrics(mut self, metrics: Arc<dyn MetricsCollector>) -> Self {
68 self.metrics = Some(metrics);
69 self
70 }
71}
72
73#[async_trait::async_trait]
74impl RouteController for SupervisingRouteController {
75 async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
76 self.inner.start_route(route_id).await
77 }
78
79 async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
80 self.inner.stop_route(route_id).await
81 }
82
83 async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
84 self.inner.restart_route(route_id).await
85 }
86
87 async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
88 self.inner.suspend_route(route_id).await
89 }
90
91 async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
92 self.inner.resume_route(route_id).await
93 }
94
95 fn route_status(&self, route_id: &str) -> Option<RouteStatus> {
96 self.inner.route_status(route_id)
97 }
98
99 async fn start_all_routes(&mut self) -> Result<(), CamelError> {
100 self.inner.set_crash_notifier(self.crash_tx.clone());
102
103 self.inner.start_all_routes().await?;
105
106 if let Some(rx) = self.crash_rx.take() {
108 if let Some(controller_ref) = self.inner.self_ref_for_supervision() {
109 let config = self.config.clone();
110 let metrics = self.metrics.clone();
111 tokio::spawn(async move {
112 supervision_loop(rx, controller_ref, config, metrics).await;
113 });
114 } else {
115 warn!("SupervisingRouteController: self_ref not set, supervision loop not started");
116 }
117 }
118
119 Ok(())
120 }
121
122 async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
123 self.inner.stop_all_routes().await
124 }
125}
126
127#[async_trait::async_trait]
128impl RouteControllerInternal for SupervisingRouteController {
129 fn add_route(&mut self, def: RouteDefinition) -> Result<(), CamelError> {
130 self.inner.add_route(def)
131 }
132
133 fn swap_pipeline(
134 &self,
135 route_id: &str,
136 pipeline: camel_api::BoxProcessor,
137 ) -> Result<(), CamelError> {
138 self.inner.swap_pipeline(route_id, pipeline)
139 }
140
141 fn route_from_uri(&self, route_id: &str) -> Option<String> {
142 self.inner.route_from_uri(route_id)
143 }
144
145 fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
146 self.inner.set_error_handler(config)
147 }
148
149 fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
150 self.inner.set_self_ref(self_ref)
151 }
152
153 fn route_count(&self) -> usize {
154 self.inner.route_count()
155 }
156
157 fn route_ids(&self) -> Vec<String> {
158 self.inner.route_ids()
159 }
160
161 fn set_tracer_config(&mut self, config: &crate::config::TracerConfig) {
162 self.inner.set_tracer_config(config)
163 }
164
165 fn compile_route_definition(
166 &self,
167 def: crate::route::RouteDefinition,
168 ) -> Result<camel_api::BoxProcessor, camel_api::CamelError> {
169 self.inner.compile_route_definition(def)
170 }
171
172 fn remove_route(&mut self, route_id: &str) -> Result<(), camel_api::CamelError> {
173 self.inner.remove_route(route_id)
174 }
175
176 async fn start_route_reload(&mut self, route_id: &str) -> Result<(), camel_api::CamelError> {
177 self.inner.start_route(route_id).await
178 }
179
180 async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), camel_api::CamelError> {
181 self.inner.stop_route(route_id).await
182 }
183}
184
185async fn supervision_loop(
190 mut rx: tokio::sync::mpsc::Receiver<CrashNotification>,
191 controller: Arc<Mutex<dyn RouteController>>,
192 config: SupervisionConfig,
193 _metrics: Option<Arc<dyn MetricsCollector>>,
194) {
195 let mut attempts: HashMap<String, u32> = HashMap::new();
196 let mut last_restart_time: HashMap<String, Instant> = HashMap::new();
197 let mut currently_restarting: HashSet<String> = HashSet::new();
198
199 info!("Supervision loop started");
200
201 while let Some(notification) = rx.recv().await {
202 let route_id = notification.route_id.clone();
203 let error = ¬ification.error;
204
205 if currently_restarting.contains(&route_id) {
207 continue;
208 }
209
210 info!(
211 route_id = %route_id,
212 error = %error,
213 "Route crashed, checking restart policy"
214 );
215
216 if let Some(last_time) = last_restart_time.get(&route_id)
219 && last_time.elapsed() >= config.initial_delay
220 {
221 attempts.insert(route_id.clone(), 0);
222 }
223
224 let current_attempt = attempts.entry(route_id.clone()).or_insert(0);
226 *current_attempt += 1;
227
228 if config
230 .max_attempts
231 .is_some_and(|max| *current_attempt > max)
232 {
233 error!(
234 route_id = %route_id,
235 attempts = current_attempt,
236 max = config.max_attempts.unwrap(),
237 "Route exceeded max restart attempts, giving up"
238 );
239 continue;
240 }
241
242 let delay = config.next_delay(*current_attempt);
244 info!(
245 route_id = %route_id,
246 attempt = current_attempt,
247 delay_ms = delay.as_millis(),
248 "Scheduling route restart"
249 );
250
251 currently_restarting.insert(route_id.clone());
253
254 tokio::time::sleep(delay).await;
256
257 let mut ctrl = controller.lock().await;
259 match ctrl.route_status(&route_id) {
260 Some(RouteStatus::Failed(_)) => {
261 match ctrl.restart_route(&route_id).await {
263 Ok(()) => {
264 info!(route_id = %route_id, "Route restarted successfully");
265 last_restart_time.insert(route_id.clone(), Instant::now());
268 }
269 Err(e) => {
270 error!(route_id = %route_id, error = %e, "Failed to restart route");
271 }
272 }
273 }
274 Some(status) => {
275 warn!(route_id = %route_id, ?status, "Route no longer failed, skipping supervision restart");
277 attempts.remove(&route_id);
278 }
279 None => {
280 warn!(route_id = %route_id, "Route not found during supervision restart");
281 }
282 }
283
284 currently_restarting.remove(&route_id);
286 }
287
288 info!("Supervision loop ended");
289}
290
291#[cfg(test)]
292mod tests {
293 use super::*;
294 use async_trait::async_trait;
295 use camel_component::{Component, ConcurrencyModel, Consumer, ConsumerContext, Endpoint};
296 use std::sync::Arc as StdArc;
297 use std::sync::atomic::{AtomicU32, Ordering};
298 use std::time::Duration;
299
300 struct CrashThenBlockConsumer {
302 call_count: StdArc<AtomicU32>,
303 }
304
305 #[async_trait]
306 impl Consumer for CrashThenBlockConsumer {
307 async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
308 let count = self.call_count.fetch_add(1, Ordering::SeqCst);
309
310 if count == 0 {
311 return Err(CamelError::RouteError("simulated crash".into()));
313 }
314
315 ctx.cancelled().await;
317 Ok(())
318 }
319
320 async fn stop(&mut self) -> Result<(), CamelError> {
321 Ok(())
322 }
323
324 fn concurrency_model(&self) -> ConcurrencyModel {
325 ConcurrencyModel::Sequential
326 }
327 }
328
329 struct CrashThenBlockEndpoint {
330 call_count: StdArc<AtomicU32>,
331 }
332
333 impl Endpoint for CrashThenBlockEndpoint {
334 fn uri(&self) -> &str {
335 "crash-then-block:test"
336 }
337
338 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
339 Ok(Box::new(CrashThenBlockConsumer {
340 call_count: StdArc::clone(&self.call_count),
341 }))
342 }
343
344 fn create_producer(
345 &self,
346 _ctx: &camel_api::ProducerContext,
347 ) -> Result<camel_api::BoxProcessor, CamelError> {
348 Err(CamelError::RouteError("no producer".into()))
349 }
350 }
351
352 struct CrashThenBlockComponent {
353 call_count: StdArc<AtomicU32>,
354 }
355
356 impl Component for CrashThenBlockComponent {
357 fn scheme(&self) -> &str {
358 "crash-then-block"
359 }
360
361 fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
362 Ok(Box::new(CrashThenBlockEndpoint {
363 call_count: StdArc::clone(&self.call_count),
364 }))
365 }
366 }
367
368 #[tokio::test]
369 async fn test_supervising_controller_restarts_crashed_route() {
370 let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
372 let call_count = StdArc::new(AtomicU32::new(0));
373 registry.lock().unwrap().register(CrashThenBlockComponent {
374 call_count: StdArc::clone(&call_count),
375 });
376
377 let config = SupervisionConfig {
379 max_attempts: Some(5),
380 initial_delay: Duration::from_millis(50),
381 backoff_multiplier: 1.0, max_delay: Duration::from_secs(60),
383 };
384
385 let controller: StdArc<Mutex<dyn RouteControllerInternal>> = StdArc::new(Mutex::new(
387 SupervisingRouteController::new(StdArc::clone(®istry), config),
388 ));
389
390 controller
392 .try_lock()
393 .unwrap()
394 .set_self_ref(StdArc::clone(&controller) as StdArc<Mutex<dyn RouteController>>);
395
396 let def = crate::route::RouteDefinition::new("crash-then-block:test", vec![])
398 .with_route_id("crash-route");
399 controller.try_lock().unwrap().add_route(def).unwrap();
400
401 controller.lock().await.start_all_routes().await.unwrap();
403
404 tokio::time::sleep(Duration::from_millis(500)).await;
406
407 let count = call_count.load(Ordering::SeqCst);
409 assert!(
410 count >= 2,
411 "expected at least 2 consumer calls (crash + restart), got {}",
412 count
413 );
414
415 let status = controller.lock().await.route_status("crash-route").unwrap();
417 assert!(
418 matches!(status, RouteStatus::Started),
419 "expected Started, got {:?}",
420 status
421 );
422 }
423
424 #[tokio::test]
425 async fn test_supervising_controller_respects_max_attempts() {
426 struct AlwaysCrashConsumer;
428 #[async_trait]
429 impl Consumer for AlwaysCrashConsumer {
430 async fn start(&mut self, _ctx: ConsumerContext) -> Result<(), CamelError> {
431 Err(CamelError::RouteError("always crashes".into()))
432 }
433 async fn stop(&mut self) -> Result<(), CamelError> {
434 Ok(())
435 }
436 fn concurrency_model(&self) -> ConcurrencyModel {
437 ConcurrencyModel::Sequential
438 }
439 }
440 struct AlwaysCrashEndpoint;
441 impl Endpoint for AlwaysCrashEndpoint {
442 fn uri(&self) -> &str {
443 "always-crash:test"
444 }
445 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
446 Ok(Box::new(AlwaysCrashConsumer))
447 }
448 fn create_producer(
449 &self,
450 _ctx: &camel_api::ProducerContext,
451 ) -> Result<camel_api::BoxProcessor, CamelError> {
452 Err(CamelError::RouteError("no producer".into()))
453 }
454 }
455 struct AlwaysCrashComponent;
456 impl Component for AlwaysCrashComponent {
457 fn scheme(&self) -> &str {
458 "always-crash"
459 }
460 fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
461 Ok(Box::new(AlwaysCrashEndpoint))
462 }
463 }
464
465 let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
466 registry.lock().unwrap().register(AlwaysCrashComponent);
467
468 let config = SupervisionConfig {
470 max_attempts: Some(2),
471 initial_delay: Duration::from_millis(10),
472 backoff_multiplier: 1.0,
473 max_delay: Duration::from_secs(1),
474 };
475
476 let controller: StdArc<Mutex<dyn RouteControllerInternal>> = StdArc::new(Mutex::new(
477 SupervisingRouteController::new(StdArc::clone(®istry), config),
478 ));
479
480 controller
481 .try_lock()
482 .unwrap()
483 .set_self_ref(StdArc::clone(&controller) as StdArc<Mutex<dyn RouteController>>);
484
485 let def = crate::route::RouteDefinition::new("always-crash:test", vec![])
486 .with_route_id("always-crash-route");
487 controller.try_lock().unwrap().add_route(def).unwrap();
488
489 controller.lock().await.start_all_routes().await.unwrap();
490
491 tokio::time::sleep(Duration::from_millis(200)).await;
493
494 let status = controller
496 .lock()
497 .await
498 .route_status("always-crash-route")
499 .unwrap();
500 assert!(
501 matches!(status, RouteStatus::Failed(_)),
502 "expected Failed, got {:?}",
503 status
504 );
505 }
506
507 #[tokio::test]
508 async fn test_supervising_controller_delegates_to_inner() {
509 let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
510 let config = SupervisionConfig::default();
511 let mut controller = SupervisingRouteController::new(StdArc::clone(®istry), config);
512
513 let self_ref: StdArc<Mutex<dyn RouteController>> = StdArc::new(Mutex::new(
515 SupervisingRouteController::new(registry, SupervisionConfig::default()),
516 ));
517 controller.set_self_ref(self_ref);
518
519 assert_eq!(controller.route_count(), 0);
521 assert_eq!(controller.route_ids(), Vec::<String>::new());
522 }
523
524 struct AlwaysCrashWithCountConsumer {
526 call_count: StdArc<AtomicU32>,
527 }
528
529 #[async_trait]
530 impl Consumer for AlwaysCrashWithCountConsumer {
531 async fn start(&mut self, _ctx: ConsumerContext) -> Result<(), CamelError> {
532 self.call_count.fetch_add(1, Ordering::SeqCst);
533 Err(CamelError::RouteError("always crashes".into()))
534 }
535
536 async fn stop(&mut self) -> Result<(), CamelError> {
537 Ok(())
538 }
539
540 fn concurrency_model(&self) -> ConcurrencyModel {
541 ConcurrencyModel::Sequential
542 }
543 }
544
545 struct AlwaysCrashWithCountEndpoint {
546 call_count: StdArc<AtomicU32>,
547 }
548
549 impl Endpoint for AlwaysCrashWithCountEndpoint {
550 fn uri(&self) -> &str {
551 "always-crash-count:test"
552 }
553
554 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
555 Ok(Box::new(AlwaysCrashWithCountConsumer {
556 call_count: StdArc::clone(&self.call_count),
557 }))
558 }
559
560 fn create_producer(
561 &self,
562 _ctx: &camel_api::ProducerContext,
563 ) -> Result<camel_api::BoxProcessor, CamelError> {
564 Err(CamelError::RouteError("no producer".into()))
565 }
566 }
567
568 struct AlwaysCrashWithCountComponent {
569 call_count: StdArc<AtomicU32>,
570 }
571
572 impl Component for AlwaysCrashWithCountComponent {
573 fn scheme(&self) -> &str {
574 "always-crash-count"
575 }
576
577 fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
578 Ok(Box::new(AlwaysCrashWithCountEndpoint {
579 call_count: StdArc::clone(&self.call_count),
580 }))
581 }
582 }
583
584 #[tokio::test]
585 async fn test_supervision_gives_up_after_max_attempts() {
586 let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
588 let call_count = StdArc::new(AtomicU32::new(0));
589 registry
590 .lock()
591 .unwrap()
592 .register(AlwaysCrashWithCountComponent {
593 call_count: StdArc::clone(&call_count),
594 });
595
596 let config = SupervisionConfig {
598 max_attempts: Some(2),
599 initial_delay: Duration::from_millis(50),
600 backoff_multiplier: 1.0,
601 max_delay: Duration::from_secs(60),
602 };
603
604 let controller: StdArc<Mutex<dyn RouteControllerInternal>> = StdArc::new(Mutex::new(
605 SupervisingRouteController::new(StdArc::clone(®istry), config),
606 ));
607
608 controller
609 .try_lock()
610 .unwrap()
611 .set_self_ref(StdArc::clone(&controller) as StdArc<Mutex<dyn RouteController>>);
612
613 let def = crate::route::RouteDefinition::new("always-crash-count:test", vec![])
614 .with_route_id("give-up-route");
615 controller.try_lock().unwrap().add_route(def).unwrap();
616
617 controller.lock().await.start_all_routes().await.unwrap();
618
619 tokio::time::sleep(Duration::from_millis(800)).await;
623
624 let count = call_count.load(Ordering::SeqCst);
627 assert_eq!(
628 count, 3,
629 "expected exactly 3 consumer calls (initial + 2 restarts), got {}",
630 count
631 );
632
633 let status = controller
635 .lock()
636 .await
637 .route_status("give-up-route")
638 .unwrap();
639 assert!(
640 matches!(status, RouteStatus::Failed(_)),
641 "expected Failed, got {:?}",
642 status
643 );
644 }
645
646 struct CrashOnOddBlockOnEvenConsumer {
649 call_count: StdArc<AtomicU32>,
650 }
651
652 #[async_trait]
653 impl Consumer for CrashOnOddBlockOnEvenConsumer {
654 async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
655 let count = self.call_count.fetch_add(1, Ordering::SeqCst);
656 if count.is_multiple_of(2) {
661 return Err(CamelError::RouteError("odd call crash".into()));
663 }
664
665 tokio::select! {
668 _ = ctx.cancelled() => {
669 return Ok(());
671 }
672 _ = tokio::time::sleep(Duration::from_millis(100)) => {
673 return Err(CamelError::RouteError("even call crash after uptime".into()));
675 }
676 }
677 }
678
679 async fn stop(&mut self) -> Result<(), CamelError> {
680 Ok(())
681 }
682
683 fn concurrency_model(&self) -> ConcurrencyModel {
684 ConcurrencyModel::Sequential
685 }
686 }
687
688 struct CrashOnOddBlockOnEvenEndpoint {
689 call_count: StdArc<AtomicU32>,
690 }
691
692 impl Endpoint for CrashOnOddBlockOnEvenEndpoint {
693 fn uri(&self) -> &str {
694 "crash-odd-block-even:test"
695 }
696
697 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
698 Ok(Box::new(CrashOnOddBlockOnEvenConsumer {
699 call_count: StdArc::clone(&self.call_count),
700 }))
701 }
702
703 fn create_producer(
704 &self,
705 _ctx: &camel_api::ProducerContext,
706 ) -> Result<camel_api::BoxProcessor, CamelError> {
707 Err(CamelError::RouteError("no producer".into()))
708 }
709 }
710
711 struct CrashOnOddBlockOnEvenComponent {
712 call_count: StdArc<AtomicU32>,
713 }
714
715 impl Component for CrashOnOddBlockOnEvenComponent {
716 fn scheme(&self) -> &str {
717 "crash-odd-block-even"
718 }
719
720 fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
721 Ok(Box::new(CrashOnOddBlockOnEvenEndpoint {
722 call_count: StdArc::clone(&self.call_count),
723 }))
724 }
725 }
726
727 #[tokio::test]
728 async fn test_supervision_resets_attempt_count_on_success() {
729 let registry = StdArc::new(std::sync::Mutex::new(Registry::new()));
731 let call_count = StdArc::new(AtomicU32::new(0));
732 registry
733 .lock()
734 .unwrap()
735 .register(CrashOnOddBlockOnEvenComponent {
736 call_count: StdArc::clone(&call_count),
737 });
738
739 let config = SupervisionConfig {
743 max_attempts: Some(2),
744 initial_delay: Duration::from_millis(50),
745 backoff_multiplier: 1.0,
746 max_delay: Duration::from_secs(60),
747 };
748
749 let controller: StdArc<Mutex<dyn RouteControllerInternal>> = StdArc::new(Mutex::new(
750 SupervisingRouteController::new(StdArc::clone(®istry), config),
751 ));
752
753 controller
754 .try_lock()
755 .unwrap()
756 .set_self_ref(StdArc::clone(&controller) as StdArc<Mutex<dyn RouteController>>);
757
758 let def = crate::route::RouteDefinition::new("crash-odd-block-even:test", vec![])
759 .with_route_id("reset-attempt-route");
760 controller.try_lock().unwrap().add_route(def).unwrap();
761
762 controller.lock().await.start_all_routes().await.unwrap();
763
764 tokio::time::sleep(Duration::from_millis(1000)).await;
772
773 let count = call_count.load(Ordering::SeqCst);
776 assert!(
777 count >= 4,
778 "expected at least 4 consumer calls (proving attempt reset), got {}",
779 count
780 );
781
782 let status = controller
785 .lock()
786 .await
787 .route_status("reset-attempt-route")
788 .unwrap();
789 assert!(
790 !matches!(status, RouteStatus::Failed(_)),
791 "expected route NOT to be Failed (supervision should continue due to attempt reset), got {:?}",
792 status
793 );
794 }
795}