1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use tokio::time::timeout;
6use tokio_util::sync::CancellationToken;
7use tracing::{info, warn};
8
9#[cfg(test)]
10use camel_api::StepLifecycle;
11use camel_api::error_handler::ErrorHandlerConfig;
12use camel_api::{
13 CamelError, FunctionInvoker, HealthReport, Lifecycle, MetricsCollector, PlatformIdentity,
14 PlatformService, ReadinessGate, RouteTemplateSpec, RuntimeCommandBus, RuntimeQueryBus,
15 TemplateInstanceRecord,
16};
17use camel_component_api::{Component, ComponentContext, ComponentRegistrar};
18use camel_language_api::Language;
19
20use crate::health_registry::HealthCheckRegistry;
21use crate::lifecycle::adapters::controller_actor::RouteControllerHandle;
22use crate::lifecycle::adapters::route_controller::SharedLanguageRegistry;
23use crate::lifecycle::application::route_definition::RouteDefinition;
24use crate::lifecycle::application::runtime_bus::RuntimeBus;
25use crate::lifecycle::domain::LanguageRegistryError;
26use crate::registry::RegistryError;
27use crate::shared::components::domain::Registry;
28use crate::shared::observability::domain::TracerConfig;
29use crate::template::TemplateRegistry;
30
31static CONTEXT_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
32
33pub use crate::context_builder::CamelContextBuilder;
34
35pub struct CamelContext {
43 registry: Arc<std::sync::Mutex<Registry>>,
44 route_controller: RouteControllerHandle,
45 _actor_join: tokio::task::JoinHandle<()>,
46 supervision_join: Option<tokio::task::JoinHandle<()>>,
47 runtime: Arc<RuntimeBus>,
48 cancel_token: CancellationToken,
49 metrics: Arc<dyn MetricsCollector>,
50 platform_service: Arc<dyn PlatformService>,
52 languages: SharedLanguageRegistry,
53 shutdown_timeout: std::time::Duration,
54 services: Vec<Box<dyn Lifecycle>>,
55 health_registry: Arc<HealthCheckRegistry>,
56 component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
57 function_invoker: Option<Arc<dyn FunctionInvoker>>,
58 template_registry: Arc<TemplateRegistry>,
59 idempotent_repositories: crate::registry::SharedIdempotentRegistry,
60 claim_check_repositories: crate::registry::SharedClaimCheckRegistry,
61}
62
63pub(crate) struct FromParts {
66 pub(crate) registry: Arc<std::sync::Mutex<Registry>>,
67 pub(crate) route_controller: RouteControllerHandle,
68 pub(crate) _actor_join: tokio::task::JoinHandle<()>,
69 pub(crate) supervision_join: Option<tokio::task::JoinHandle<()>>,
70 pub(crate) runtime: Arc<RuntimeBus>,
71 pub(crate) cancel_token: CancellationToken,
72 pub(crate) metrics: Arc<dyn MetricsCollector>,
73 pub(crate) platform_service: Arc<dyn PlatformService>,
74 pub(crate) languages: SharedLanguageRegistry,
75 pub(crate) shutdown_timeout: std::time::Duration,
76 pub(crate) services: Vec<Box<dyn Lifecycle>>,
77 pub(crate) health_registry: Arc<HealthCheckRegistry>,
78 pub(crate) component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
79 pub(crate) function_invoker: Option<Arc<dyn FunctionInvoker>>,
80 pub(crate) template_registry: Arc<TemplateRegistry>,
81 pub(crate) idempotent_repositories: crate::registry::SharedIdempotentRegistry,
82 pub(crate) claim_check_repositories: crate::registry::SharedClaimCheckRegistry,
83}
84
85impl CamelContext {
86 pub(crate) fn from_parts(parts: FromParts) -> Self {
87 Self {
88 registry: parts.registry,
89 route_controller: parts.route_controller,
90 _actor_join: parts._actor_join,
91 supervision_join: parts.supervision_join,
92 runtime: parts.runtime,
93 cancel_token: parts.cancel_token,
94 metrics: parts.metrics,
95 platform_service: parts.platform_service,
96 languages: parts.languages,
97 shutdown_timeout: parts.shutdown_timeout,
98 services: parts.services,
99 health_registry: parts.health_registry,
100 component_configs: parts.component_configs,
101 function_invoker: parts.function_invoker,
102 template_registry: parts.template_registry,
103 idempotent_repositories: parts.idempotent_repositories,
104 claim_check_repositories: parts.claim_check_repositories,
105 }
106 }
107}
108
109#[derive(Clone)]
113pub struct RuntimeExecutionHandle {
114 pub(crate) controller: RouteControllerHandle,
115 pub(crate) runtime: Arc<RuntimeBus>,
116 pub(crate) function_invoker: Option<Arc<dyn FunctionInvoker>>,
117 #[cfg(test)]
121 #[allow(clippy::type_complexity)]
122 pub(crate) test_lifecycle_inject: Arc<std::sync::Mutex<Option<Vec<Arc<dyn StepLifecycle>>>>>,
123}
124
125impl RuntimeExecutionHandle {
126 pub(crate) async fn add_route_definition(
127 &self,
128 definition: RouteDefinition,
129 ) -> Result<(), CamelError> {
130 use crate::lifecycle::ports::RouteRegistrationPort;
131 self.runtime
132 .register_route(definition)
133 .await
134 .map_err(Into::into)
135 }
136
137 #[allow(dead_code)]
140 pub(crate) async fn compile_route_definition(
141 &self,
142 definition: RouteDefinition,
143 ) -> Result<camel_api::BoxProcessor, CamelError> {
144 self.controller.compile_route_definition(definition).await
145 }
146
147 #[allow(dead_code)] pub(crate) async fn compile_route_definition_with_generation(
149 &self,
150 definition: RouteDefinition,
151 generation: u64,
152 ) -> Result<camel_api::BoxProcessor, CamelError> {
153 self.controller
154 .compile_route_definition_with_generation(definition, generation)
155 .await
156 }
157
158 pub(crate) async fn compile_route_definition_pipeline(
159 &self,
160 definition: RouteDefinition,
161 generation: u64,
162 ) -> Result<crate::lifecycle::adapters::route_helpers::CompiledPipeline, CamelError> {
163 self.controller
164 .compile_route_definition_pipeline(definition, generation)
165 .await
166 }
167
168 pub(crate) async fn compile_route_definition_dry_pipeline(
171 &self,
172 definition: RouteDefinition,
173 ) -> Result<crate::lifecycle::adapters::route_helpers::CompiledPipeline, CamelError> {
174 self.controller
175 .compile_route_definition_dry_pipeline(definition)
176 .await
177 }
178
179 pub(crate) async fn prepare_route_definition_with_generation(
180 &self,
181 definition: RouteDefinition,
182 generation: u64,
183 ) -> Result<crate::lifecycle::adapters::route_controller::PreparedRoute, CamelError> {
184 self.controller
185 .prepare_route_definition_with_generation(definition, generation)
186 .await
187 }
188
189 pub(crate) async fn insert_prepared_route(
190 &self,
191 prepared: crate::lifecycle::adapters::route_controller::PreparedRoute,
192 ) -> Result<(), CamelError> {
193 self.controller.insert_prepared_route(prepared).await
194 }
195
196 pub(crate) async fn remove_route_preserving_functions(
197 &self,
198 route_id: String,
199 ) -> Result<(), CamelError> {
200 self.controller
201 .remove_route_preserving_functions(route_id)
202 .await
203 }
204
205 pub(crate) async fn register_route_aggregate(
206 &self,
207 route_id: String,
208 ) -> Result<(), CamelError> {
209 self.runtime.register_aggregate_only(route_id).await
210 }
211
212 pub(crate) async fn swap_route_pipeline(
213 &self,
214 route_id: &str,
215 pipeline: camel_api::BoxProcessor,
216 ) -> Result<(), CamelError> {
217 self.controller.swap_pipeline(route_id, pipeline).await
218 }
219
220 pub(crate) async fn stop_route_reload(&self, route_id: &str) -> Result<(), CamelError> {
222 self.controller.stop_route_reload(route_id).await
223 }
224
225 pub(crate) async fn start_route_reload(&self, route_id: &str) -> Result<(), CamelError> {
227 self.controller.start_route_reload(route_id).await
228 }
229
230 pub(crate) async fn swap_route_pipeline_raw(
233 &self,
234 route_id: &str,
235 pipeline: camel_api::BoxProcessor,
236 lifecycle: Vec<Arc<dyn camel_api::StepLifecycle>>,
237 ) -> Result<(), CamelError> {
238 self.controller
239 .swap_pipeline_raw(route_id, pipeline, lifecycle)
240 .await
241 }
242
243 pub(crate) async fn execute_runtime_command(
244 &self,
245 cmd: camel_api::RuntimeCommand,
246 ) -> Result<camel_api::RuntimeCommandResult, CamelError> {
247 self.runtime.execute(cmd).await
248 }
249
250 pub(crate) async fn runtime_route_status(
251 &self,
252 route_id: &str,
253 ) -> Result<Option<String>, CamelError> {
254 match self
255 .runtime
256 .ask(camel_api::RuntimeQuery::GetRouteStatus {
257 route_id: route_id.to_string(),
258 })
259 .await
260 {
261 Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
262 Ok(_) => Err(CamelError::RouteError(
263 "unexpected runtime query response for route status".to_string(),
264 )),
265 Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
266 Err(err) => Err(err),
267 }
268 }
269
270 pub(crate) async fn runtime_route_ids(&self) -> Result<Vec<String>, CamelError> {
271 match self.runtime.ask(camel_api::RuntimeQuery::ListRoutes).await {
272 Ok(camel_api::RuntimeQueryResult::Routes { route_ids }) => Ok(route_ids),
273 Ok(_) => Err(CamelError::RouteError(
274 "unexpected runtime query response for route listing".to_string(),
275 )),
276 Err(err) => Err(err),
277 }
278 }
279
280 pub(crate) async fn route_source_hash(&self, route_id: &str) -> Option<u64> {
281 self.controller.route_source_hash(route_id).await
282 }
283
284 pub(crate) async fn in_flight_count(&self, route_id: &str) -> Result<u64, CamelError> {
285 if !self.controller.route_exists(route_id).await? {
286 return Err(CamelError::RouteError(format!(
287 "Route '{}' not found",
288 route_id
289 )));
290 }
291 Ok(self
292 .controller
293 .in_flight_count(route_id)
294 .await?
295 .unwrap_or(0))
296 }
297
298 pub(crate) async fn route_has_lifecycle(&self, route_id: &str) -> bool {
300 self.controller
301 .route_has_lifecycle(route_id)
302 .await
303 .unwrap_or(false)
304 }
305
306 pub(crate) fn function_invoker(&self) -> Option<Arc<dyn FunctionInvoker>> {
307 self.function_invoker.clone()
308 }
309
310 #[cfg(test)]
311 pub(crate) async fn force_start_route_for_test(
312 &self,
313 route_id: &str,
314 ) -> Result<(), CamelError> {
315 self.controller.start_route(route_id).await
316 }
317
318 pub async fn controller_route_count_for_test(&self) -> usize {
319 self.controller.route_count().await.unwrap_or(0)
320 }
321}
322
323impl CamelContext {
324 pub fn builder() -> CamelContextBuilder {
325 CamelContextBuilder::new()
326 }
327
328 pub async fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
330 let _ = self.route_controller.set_error_handler(config).await;
331 }
332
333 pub async fn set_tracing(&mut self, enabled: bool) {
335 let _ = self
336 .route_controller
337 .set_tracer_config(TracerConfig {
338 enabled,
339 ..Default::default()
340 })
341 .await;
342 }
343
344 pub async fn set_tracer_config(&mut self, config: TracerConfig) {
346 let config = if config.metrics_collector.is_none() {
348 TracerConfig {
349 metrics_collector: Some(Arc::clone(&self.metrics)),
350 ..config
351 }
352 } else {
353 config
354 };
355
356 let _ = self.route_controller.set_tracer_config(config).await;
357 }
358
359 pub async fn with_tracing(mut self) -> Self {
361 self.set_tracing(true).await;
362 self
363 }
364
365 pub async fn with_tracer_config(mut self, config: TracerConfig) -> Self {
369 self.set_tracer_config(config).await;
370 self
371 }
372
373 pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
382 if let Some(collector) = service.as_metrics_collector() {
383 self.metrics = collector;
384 }
385 if let Some(invoker) = service.as_function_invoker() {
386 self.function_invoker = Some(invoker.clone());
387 if let Err(e) = self.route_controller.try_set_function_invoker(invoker) {
388 tracing::warn!("Failed to propagate function invoker to route controller: {e}");
389 }
390 }
391
392 self.services.push(Box::new(service));
393 self
394 }
395
396 pub fn register_component<C: Component + 'static>(&mut self, component: C) {
398 info!(scheme = component.scheme(), "Registering component");
399 self.registry
400 .lock()
401 .expect("mutex poisoned: another thread panicked while holding this lock") .register(Arc::new(component));
403 }
404
405 pub fn register_language(
412 &mut self,
413 name: impl Into<String>,
414 lang: Box<dyn Language>,
415 ) -> Result<(), LanguageRegistryError> {
416 let name = name.into();
417 let mut languages = self
418 .languages
419 .lock()
420 .expect("mutex poisoned: another thread panicked while holding this lock"); if languages.contains_key(&name) {
422 return Err(LanguageRegistryError::AlreadyRegistered { name });
423 }
424 languages.insert(name, Arc::from(lang));
425 Ok(())
426 }
427
428 pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
430 let languages = self
431 .languages
432 .lock()
433 .expect("mutex poisoned: another thread panicked while holding this lock"); languages.get(name).cloned()
435 }
436
437 pub async fn add_route_definition(
441 &self,
442 definition: RouteDefinition,
443 ) -> Result<(), CamelError> {
444 use crate::lifecycle::ports::RouteRegistrationPort;
445 info!(
446 from = definition.from_uri(),
447 route_id = %definition.route_id(),
448 "Adding route definition"
449 );
450 self.runtime
451 .register_route(definition)
452 .await
453 .map_err(Into::into)
454 }
455
456 fn next_context_command_id(op: &str, route_id: &str) -> String {
457 let seq = CONTEXT_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
458 format!("context:{op}:{route_id}:{seq}")
459 }
460
461 pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
463 self.registry
464 .lock()
465 .expect("mutex poisoned: another thread panicked while holding this lock") }
467
468 pub fn registry_arc(&self) -> Arc<std::sync::Mutex<Registry>> {
470 Arc::clone(&self.registry)
471 }
472
473 pub fn runtime_execution_handle(&self) -> RuntimeExecutionHandle {
475 RuntimeExecutionHandle {
476 controller: self.route_controller.clone(),
477 runtime: Arc::clone(&self.runtime),
478 function_invoker: self.function_invoker.clone(),
479 #[cfg(test)]
480 test_lifecycle_inject: Arc::new(std::sync::Mutex::new(None)),
481 }
482 }
483
484 pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
486 Arc::clone(&self.metrics)
487 }
488
489 pub fn platform_service(&self) -> Arc<dyn PlatformService> {
491 Arc::clone(&self.platform_service)
492 }
493
494 pub fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
496 self.platform_service.readiness_gate()
497 }
498
499 pub fn platform_identity(&self) -> PlatformIdentity {
501 self.platform_service.identity()
502 }
503
504 pub fn leadership(&self) -> Arc<dyn camel_api::LeadershipService> {
506 self.platform_service.leadership()
507 }
508
509 pub fn runtime(&self) -> Arc<dyn camel_api::RuntimeHandle> {
511 self.runtime.clone()
512 }
513
514 pub fn producer_context(&self) -> camel_api::ProducerContext {
516 camel_api::ProducerContext::new().with_runtime(self.runtime())
517 }
518
519 pub async fn runtime_route_status(&self, route_id: &str) -> Result<Option<String>, CamelError> {
521 match self
522 .runtime()
523 .ask(camel_api::RuntimeQuery::GetRouteStatus {
524 route_id: route_id.to_string(),
525 })
526 .await
527 {
528 Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
529 Ok(_) => Err(CamelError::RouteError(
530 "unexpected runtime query response for route status".to_string(),
531 )),
532 Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
533 Err(err) => Err(err),
534 }
535 }
536
537 pub async fn start(&mut self) -> Result<(), CamelError> {
542 info!("Starting CamelContext");
543
544 for (i, service) in self.services.iter_mut().enumerate() {
546 info!("Starting service: {}", service.name());
547 if let Err(e) = service.start().await {
548 warn!(
550 "Service {} failed to start, rolling back {} services",
551 service.name(),
552 i
553 );
554 for j in (0..i).rev() {
555 if let Err(rollback_err) = self.services[j].stop().await {
556 warn!(
557 "Failed to stop service {} during rollback: {}",
558 self.services[j].name(),
559 rollback_err
560 );
561 }
562 }
563 return Err(e);
564 }
565 }
566
567 let route_ids = self.route_controller.auto_startup_route_ids().await?;
570 for route_id in route_ids {
571 self.runtime
572 .execute(camel_api::RuntimeCommand::StartRoute {
573 route_id: route_id.clone(),
574 command_id: Self::next_context_command_id("start", &route_id),
575 causation_id: None,
576 })
577 .await?;
578 }
579
580 info!("CamelContext started");
581 Ok(())
582 }
583
584 pub async fn stop(&mut self) -> Result<(), CamelError> {
586 self.stop_timeout(self.shutdown_timeout).await
587 }
588
589 pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
596 info!("Stopping CamelContext");
597
598 self.cancel_token.cancel();
600 if let Some(join) = self.supervision_join.take() {
601 join.abort();
602 }
603
604 let route_ids = self.route_controller.shutdown_route_ids().await?;
607 for route_id in route_ids {
608 if let Err(err) = self
609 .runtime
610 .execute(camel_api::RuntimeCommand::StopRoute {
611 route_id: route_id.clone(),
612 command_id: Self::next_context_command_id("stop", &route_id),
613 causation_id: None,
614 })
615 .await
616 {
617 warn!(route_id = %route_id, error = %err, "Runtime stop command failed during context shutdown");
618 }
619 }
620
621 self.health_registry.cancel_token().cancel();
622
623 let mut first_error = None;
626 for service in self.services.iter_mut().rev() {
627 info!("Stopping service: {}", service.name());
628 if let Err(e) = service.stop().await {
629 warn!("Service {} failed to stop: {}", service.name(), e);
630 if first_error.is_none() {
631 first_error = Some(e);
632 }
633 }
634 }
635
636 info!("CamelContext stopped");
637
638 if let Some(e) = first_error {
639 Err(e)
640 } else {
641 Ok(())
642 }
643 }
644
645 pub fn shutdown_timeout(&self) -> std::time::Duration {
647 self.shutdown_timeout
648 }
649
650 pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
652 self.shutdown_timeout = timeout;
653 }
654
655 pub async fn abort(&mut self) {
657 self.cancel_token.cancel();
658 if let Some(join) = self.supervision_join.take() {
659 join.abort();
660 }
661 let route_ids = self
662 .route_controller
663 .shutdown_route_ids()
664 .await
665 .unwrap_or_default();
666 for route_id in route_ids {
667 let _ = self
668 .runtime
669 .execute(camel_api::RuntimeCommand::StopRoute {
670 route_id: route_id.clone(),
671 command_id: Self::next_context_command_id("abort-stop", &route_id),
672 causation_id: None,
673 })
674 .await;
675 }
676
677 for service in self.services.iter_mut().rev() {
678 let name = service.name().to_string();
679 match timeout(std::time::Duration::from_secs(5), service.stop()).await {
680 Ok(Ok(())) => info!("Aborted service: {}", name),
681 Ok(Err(e)) => warn!("Service {} failed to stop during abort: {}", name, e),
682 Err(_) => warn!("Service {} timed out during abort (5s)", name),
683 }
684 }
685 }
686
687 pub async fn health_check(&self) -> HealthReport {
689 use camel_api::HealthSource;
690 self.health_report().await
691 }
692
693 pub fn health_registry(&self) -> Arc<HealthCheckRegistry> {
694 Arc::clone(&self.health_registry)
695 }
696
697 pub fn set_component_config<T: 'static + Send + Sync>(&mut self, config: T) {
699 self.component_configs
700 .insert(TypeId::of::<T>(), Box::new(config));
701 }
702
703 pub fn get_component_config<T: 'static + Send + Sync>(&self) -> Option<&T> {
705 self.component_configs
706 .get(&TypeId::of::<T>())
707 .and_then(|b| b.downcast_ref::<T>())
708 }
709
710 pub fn add_route_template(&self, spec: RouteTemplateSpec) -> Result<(), CamelError> {
716 self.template_registry.register(spec)
717 }
718
719 pub fn get_route_template(&self, id: &str) -> Option<RouteTemplateSpec> {
721 self.template_registry.get(id)
722 }
723
724 pub fn template_ids(&self) -> Vec<String> {
726 self.template_registry.template_ids()
727 }
728
729 pub fn record_template_instance(&self, record: TemplateInstanceRecord) {
731 self.template_registry.record_instance(record)
732 }
733
734 pub fn template_instances(&self, template_id: &str) -> Vec<TemplateInstanceRecord> {
736 self.template_registry.instances(template_id)
737 }
738
739 pub fn register_idempotent_repository(
746 &mut self,
747 name: impl Into<String>,
748 repo: Arc<dyn camel_api::IdempotentRepository>,
749 ) -> Result<(), RegistryError> {
750 self.idempotent_repositories.register(name, repo)
751 }
752
753 pub fn idempotent_repository(
755 &self,
756 name: &str,
757 ) -> Option<Arc<dyn camel_api::IdempotentRepository>> {
758 self.idempotent_repositories.get(name)
759 }
760
761 pub fn register_claim_check_repository(
768 &mut self,
769 name: impl Into<String>,
770 repo: Arc<dyn camel_api::ClaimCheckRepository>,
771 ) -> Result<(), RegistryError> {
772 self.claim_check_repositories.register(name, repo)
773 }
774
775 pub fn claim_check_repository(
777 &self,
778 name: &str,
779 ) -> Option<Arc<dyn camel_api::ClaimCheckRepository>> {
780 self.claim_check_repositories.get(name)
781 }
782}
783
784impl ComponentRegistrar for CamelContext {
785 fn register_component_dyn(&mut self, component: Arc<dyn Component>) {
786 self.registry
787 .lock()
788 .expect("mutex poisoned: another thread panicked while holding this lock") .register(component);
790 }
791}
792
793impl ComponentContext for CamelContext {
794 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
795 self.registry.lock().ok()?.get(scheme)
796 }
797
798 fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
799 self.languages.lock().ok()?.get(name).cloned()
800 }
801
802 fn metrics(&self) -> Arc<dyn MetricsCollector> {
803 Arc::clone(&self.metrics)
804 }
805
806 fn health(&self) -> Arc<dyn camel_component_api::HealthCheckRegistry> {
807 Arc::clone(&self.health_registry) as Arc<dyn camel_component_api::HealthCheckRegistry>
810 }
811
812 fn platform_service(&self) -> Arc<dyn PlatformService> {
813 Arc::clone(&self.platform_service)
814 }
815
816 fn register_route_health_check(
817 &self,
818 route_id: &str,
819 check: Arc<dyn camel_api::AsyncHealthCheck>,
820 ) {
821 self.health_registry.register_for_route(route_id, check);
822 }
823
824 fn unregister_route_health_check(&self, route_id: &str) {
825 self.health_registry.unregister_for_route(route_id);
826 }
827}
828
829#[async_trait::async_trait]
830impl camel_api::HealthSource for CamelContext {
831 async fn liveness(&self) -> camel_api::HealthStatus {
832 let has_failed = self
833 .services
834 .iter()
835 .any(|s| s.status() == camel_api::ServiceStatus::Failed);
836 if has_failed {
837 camel_api::HealthStatus::Unhealthy
838 } else {
839 camel_api::HealthStatus::Healthy
840 }
841 }
842
843 async fn readiness(&self) -> camel_api::HealthStatus {
844 let has_failed = self
845 .services
846 .iter()
847 .any(|s| s.status() == camel_api::ServiceStatus::Failed);
848 if has_failed {
849 return camel_api::HealthStatus::Unhealthy;
850 }
851 let has_stopped = self
852 .services
853 .iter()
854 .any(|s| s.status() == camel_api::ServiceStatus::Stopped);
855 if has_stopped {
856 return camel_api::HealthStatus::Degraded;
857 }
858 self.health_registry.check_all().await.status
859 }
860
861 async fn health_report(&self) -> camel_api::HealthReport {
862 let mut report = self.health_registry.check_all().await;
863 let mut worst = report.status;
864 for service in &self.services {
865 let svc_status = service.status();
866 let health = match svc_status {
867 camel_api::ServiceStatus::Started => camel_api::HealthStatus::Healthy,
868 camel_api::ServiceStatus::Stopped => camel_api::HealthStatus::Degraded,
869 camel_api::ServiceStatus::Failed => camel_api::HealthStatus::Unhealthy,
870 };
871 if matches!(worst, camel_api::HealthStatus::Healthy)
872 && matches!(
873 health,
874 camel_api::HealthStatus::Degraded | camel_api::HealthStatus::Unhealthy
875 )
876 {
877 worst = health;
878 }
879 if matches!(worst, camel_api::HealthStatus::Degraded)
880 && matches!(health, camel_api::HealthStatus::Unhealthy)
881 {
882 worst = health;
883 }
884 report.services.push(camel_api::ServiceHealth {
885 name: service.name().to_string(),
886 status: svc_status,
887 message: None,
888 });
889 }
890 report.status = worst;
891 report
892 }
893
894 async fn startup(&self) -> camel_api::HealthStatus {
895 camel_api::HealthStatus::Healthy
896 }
897}
898
899#[cfg(test)]
900#[path = "context_tests.rs"]
901mod context_tests;