1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicU64, Ordering};
6use tokio::sync::Mutex;
7use tokio_util::sync::CancellationToken;
8use tracing::{info, warn};
9
10use camel_api::error_handler::ErrorHandlerConfig;
11use camel_api::{
12 CamelError, HealthReport, HealthStatus, Lifecycle, MetricsCollector, NoOpMetrics,
13 RouteController, RuntimeCommandBus, RuntimeQueryBus, ServiceHealth, ServiceStatus,
14 SupervisionConfig,
15};
16use camel_component::Component;
17use camel_language_api::Language;
18use camel_language_api::LanguageError;
19
20use crate::lifecycle::adapters::RuntimeExecutionAdapter;
21use crate::lifecycle::adapters::redb_journal::{RedbJournalOptions, RedbRuntimeEventJournal};
22use crate::lifecycle::adapters::route_controller::{
23 DefaultRouteController, RouteControllerInternal, SharedLanguageRegistry,
24};
25use crate::lifecycle::application::route_definition::RouteDefinition;
26use crate::lifecycle::application::runtime_bus::RuntimeBus;
27use crate::lifecycle::application::supervision_service::SupervisingRouteController;
28use crate::shared::components::domain::Registry;
29use crate::shared::observability::domain::TracerConfig;
30
31static CONTEXT_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
32
33pub struct CamelContext {
41 registry: Arc<std::sync::Mutex<Registry>>,
42 route_controller: Arc<Mutex<dyn RouteControllerInternal>>,
43 runtime: Arc<RuntimeBus>,
44 cancel_token: CancellationToken,
45 metrics: Arc<dyn MetricsCollector>,
46 languages: SharedLanguageRegistry,
47 shutdown_timeout: std::time::Duration,
48 services: Vec<Box<dyn Lifecycle>>,
49 component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
50}
51
52#[derive(Clone)]
56pub struct RuntimeExecutionHandle {
57 controller: Arc<Mutex<dyn RouteControllerInternal>>,
58 runtime: Arc<RuntimeBus>,
59}
60
61impl RuntimeExecutionHandle {
62 pub(crate) async fn add_route_definition(
63 &self,
64 definition: RouteDefinition,
65 ) -> Result<(), CamelError> {
66 use crate::lifecycle::ports::RouteRegistrationPort;
67 self.runtime.register_route(definition).await
68 }
69
70 pub(crate) async fn compile_route_definition(
71 &self,
72 definition: RouteDefinition,
73 ) -> Result<camel_api::BoxProcessor, CamelError> {
74 let controller = self.controller.lock().await;
75 controller.compile_route_definition(definition)
76 }
77
78 pub(crate) async fn swap_route_pipeline(
79 &self,
80 route_id: &str,
81 pipeline: camel_api::BoxProcessor,
82 ) -> Result<(), CamelError> {
83 let controller = self.controller.lock().await;
84 controller.swap_pipeline(route_id, pipeline)
85 }
86
87 pub(crate) async fn execute_runtime_command(
88 &self,
89 cmd: camel_api::RuntimeCommand,
90 ) -> Result<camel_api::RuntimeCommandResult, CamelError> {
91 self.runtime.execute(cmd).await
92 }
93
94 pub(crate) async fn runtime_route_status(
95 &self,
96 route_id: &str,
97 ) -> Result<Option<String>, CamelError> {
98 match self
99 .runtime
100 .ask(camel_api::RuntimeQuery::GetRouteStatus {
101 route_id: route_id.to_string(),
102 })
103 .await
104 {
105 Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
106 Ok(_) => Err(CamelError::RouteError(
107 "unexpected runtime query response for route status".to_string(),
108 )),
109 Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
110 Err(err) => Err(err),
111 }
112 }
113
114 pub(crate) async fn runtime_route_ids(&self) -> Result<Vec<String>, CamelError> {
115 match self.runtime.ask(camel_api::RuntimeQuery::ListRoutes).await {
116 Ok(camel_api::RuntimeQueryResult::Routes { route_ids }) => Ok(route_ids),
117 Ok(_) => Err(CamelError::RouteError(
118 "unexpected runtime query response for route listing".to_string(),
119 )),
120 Err(err) => Err(err),
121 }
122 }
123
124 pub(crate) async fn in_flight_count(&self, route_id: &str) -> Result<u64, CamelError> {
125 let controller = self.controller.lock().await;
126 if !controller.route_exists(route_id) {
127 return Err(CamelError::RouteError(format!(
128 "Route '{}' not found",
129 route_id
130 )));
131 }
132 Ok(controller.in_flight_count(route_id).unwrap_or(0))
136 }
137
138 #[cfg(test)]
139 pub(crate) async fn force_start_route_for_test(
140 &self,
141 route_id: &str,
142 ) -> Result<(), CamelError> {
143 let mut controller = self.controller.lock().await;
144 controller.start_route(route_id).await
145 }
146
147 #[cfg(test)]
148 pub(crate) async fn controller_route_count_for_test(&self) -> usize {
149 let controller = self.controller.lock().await;
150 controller.route_count()
151 }
152}
153
154impl CamelContext {
155 fn built_in_languages() -> SharedLanguageRegistry {
156 let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
157 languages.insert(
158 "simple".to_string(),
159 Arc::new(camel_language_simple::SimpleLanguage),
160 );
161 #[cfg(feature = "lang-js")]
162 {
163 let js_lang = camel_language_js::JsLanguage::new();
164 languages.insert("js".to_string(), Arc::new(js_lang.clone()));
165 languages.insert("javascript".to_string(), Arc::new(js_lang));
166 }
167 #[cfg(feature = "lang-rhai")]
168 {
169 let rhai_lang = camel_language_rhai::RhaiLanguage::new();
170 languages.insert("rhai".to_string(), Arc::new(rhai_lang));
171 }
172 #[cfg(feature = "lang-jsonpath")]
173 {
174 languages.insert(
175 "jsonpath".to_string(),
176 Arc::new(camel_language_jsonpath::JsonPathLanguage),
177 );
178 }
179 #[cfg(feature = "lang-xpath")]
180 {
181 languages.insert(
182 "xpath".to_string(),
183 Arc::new(camel_language_xpath::XPathLanguage),
184 );
185 }
186 Arc::new(std::sync::Mutex::new(languages))
187 }
188
189 fn build_runtime(
190 controller: Arc<Mutex<dyn RouteControllerInternal>>,
191 store: crate::lifecycle::adapters::InMemoryRuntimeStore,
192 ) -> Arc<RuntimeBus> {
193 let execution = Arc::new(RuntimeExecutionAdapter::new(Arc::clone(&controller)));
194 Arc::new(
195 RuntimeBus::new(
196 Arc::new(store.clone()),
197 Arc::new(store.clone()),
198 Arc::new(store.clone()),
199 Arc::new(store.clone()),
200 )
201 .with_uow(Arc::new(store))
202 .with_execution(execution),
203 )
204 }
205
206 pub fn new() -> Self {
210 Self::with_metrics(Arc::new(NoOpMetrics))
211 }
212
213 pub fn with_metrics(metrics: Arc<dyn MetricsCollector>) -> Self {
215 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
216 let languages = Self::built_in_languages();
217 let controller: Arc<Mutex<dyn RouteControllerInternal>> = Arc::new(Mutex::new(
218 DefaultRouteController::with_languages(Arc::clone(®istry), Arc::clone(&languages)),
219 ));
220 let store = crate::lifecycle::adapters::InMemoryRuntimeStore::default();
221 let runtime = Self::build_runtime(Arc::clone(&controller), store);
222
223 let mut controller_guard = controller
226 .try_lock()
227 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access");
228 controller_guard.set_self_ref(Arc::clone(&controller) as Arc<Mutex<dyn RouteController>>);
229 let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
230 controller_guard.set_runtime_handle(runtime_handle);
231 drop(controller_guard);
232
233 Self {
234 registry,
235 route_controller: controller,
236 runtime,
237 cancel_token: CancellationToken::new(),
238 metrics,
239 languages,
240 shutdown_timeout: std::time::Duration::from_secs(30),
241 services: Vec::new(),
242 component_configs: HashMap::new(),
243 }
244 }
245
246 pub fn with_supervision(config: SupervisionConfig) -> Self {
250 Self::with_supervision_and_metrics(config, Arc::new(NoOpMetrics))
251 }
252
253 pub fn with_supervision_and_metrics(
257 config: SupervisionConfig,
258 metrics: Arc<dyn MetricsCollector>,
259 ) -> Self {
260 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
261 let languages = Self::built_in_languages();
262 let controller: Arc<Mutex<dyn RouteControllerInternal>> = Arc::new(Mutex::new(
263 SupervisingRouteController::with_languages(
264 Arc::clone(®istry),
265 config,
266 Arc::clone(&languages),
267 )
268 .with_metrics(Arc::clone(&metrics)),
269 ));
270 let store = crate::lifecycle::adapters::InMemoryRuntimeStore::default();
271 let runtime = Self::build_runtime(Arc::clone(&controller), store);
272
273 let mut controller_guard = controller
276 .try_lock()
277 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access");
278 controller_guard.set_self_ref(Arc::clone(&controller) as Arc<Mutex<dyn RouteController>>);
279 let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
280 controller_guard.set_runtime_handle(runtime_handle);
281 drop(controller_guard);
282
283 Self {
284 registry,
285 route_controller: controller,
286 runtime,
287 cancel_token: CancellationToken::new(),
288 metrics,
289 languages,
290 shutdown_timeout: std::time::Duration::from_secs(30),
291 services: Vec::new(),
292 component_configs: HashMap::new(),
293 }
294 }
295
296 pub async fn new_with_redb_journal(
298 path: impl Into<PathBuf>,
299 options: RedbJournalOptions,
300 ) -> Result<Self, CamelError> {
301 Self::with_metrics_and_redb_journal(Arc::new(NoOpMetrics), path, options).await
302 }
303
304 pub async fn with_metrics_and_redb_journal(
306 metrics: Arc<dyn MetricsCollector>,
307 path: impl Into<PathBuf>,
308 options: RedbJournalOptions,
309 ) -> Result<Self, CamelError> {
310 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
311 let languages = Self::built_in_languages();
312 let controller: Arc<Mutex<dyn RouteControllerInternal>> = Arc::new(Mutex::new(
313 DefaultRouteController::with_languages(Arc::clone(®istry), Arc::clone(&languages)),
314 ));
315 let journal = RedbRuntimeEventJournal::new(path, options).await?;
316 let store = crate::lifecycle::adapters::InMemoryRuntimeStore::default()
317 .with_journal(Arc::new(journal));
318 let runtime = Self::build_runtime(Arc::clone(&controller), store);
319
320 let mut controller_guard = controller
323 .try_lock()
324 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access");
325 controller_guard.set_self_ref(Arc::clone(&controller) as Arc<Mutex<dyn RouteController>>);
326 let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
327 controller_guard.set_runtime_handle(runtime_handle);
328 drop(controller_guard);
329
330 Ok(Self {
331 registry,
332 route_controller: controller,
333 runtime,
334 cancel_token: CancellationToken::new(),
335 metrics,
336 languages,
337 shutdown_timeout: std::time::Duration::from_secs(30),
338 services: Vec::new(),
339 component_configs: HashMap::new(),
340 })
341 }
342
343 pub async fn with_supervision_and_metrics_and_redb_journal(
345 config: SupervisionConfig,
346 metrics: Arc<dyn MetricsCollector>,
347 path: impl Into<PathBuf>,
348 options: RedbJournalOptions,
349 ) -> Result<Self, CamelError> {
350 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
351 let languages = Self::built_in_languages();
352 let controller: Arc<Mutex<dyn RouteControllerInternal>> = Arc::new(Mutex::new(
353 SupervisingRouteController::with_languages(
354 Arc::clone(®istry),
355 config,
356 Arc::clone(&languages),
357 )
358 .with_metrics(Arc::clone(&metrics)),
359 ));
360 let journal = RedbRuntimeEventJournal::new(path, options).await?;
361 let store = crate::lifecycle::adapters::InMemoryRuntimeStore::default()
362 .with_journal(Arc::new(journal));
363 let runtime = Self::build_runtime(Arc::clone(&controller), store);
364
365 let mut controller_guard = controller
368 .try_lock()
369 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access");
370 controller_guard.set_self_ref(Arc::clone(&controller) as Arc<Mutex<dyn RouteController>>);
371 let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
372 controller_guard.set_runtime_handle(runtime_handle);
373 drop(controller_guard);
374
375 Ok(Self {
376 registry,
377 route_controller: controller,
378 runtime,
379 cancel_token: CancellationToken::new(),
380 metrics,
381 languages,
382 shutdown_timeout: std::time::Duration::from_secs(30),
383 services: Vec::new(),
384 component_configs: HashMap::new(),
385 })
386 }
387
388 pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
390 self.route_controller
391 .try_lock()
392 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
393 .set_error_handler(config);
394 }
395
396 pub fn set_tracing(&mut self, enabled: bool) {
398 self.route_controller
399 .try_lock()
400 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
401 .set_tracer_config(&TracerConfig {
402 enabled,
403 ..Default::default()
404 });
405 }
406
407 pub fn set_tracer_config(&mut self, config: TracerConfig) {
409 let config = if config.metrics_collector.is_none() {
411 TracerConfig {
412 metrics_collector: Some(Arc::clone(&self.metrics)),
413 ..config
414 }
415 } else {
416 config
417 };
418
419 self.route_controller
420 .try_lock()
421 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
422 .set_tracer_config(&config);
423 }
424
425 pub fn with_tracing(mut self) -> Self {
427 self.set_tracing(true);
428 self
429 }
430
431 pub fn with_tracer_config(mut self, config: TracerConfig) -> Self {
435 self.set_tracer_config(config);
436 self
437 }
438
439 pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
441 if let Some(collector) = service.as_metrics_collector() {
443 self.metrics = collector;
444 }
445
446 self.services.push(Box::new(service));
447 self
448 }
449
450 pub fn register_component<C: Component + 'static>(&mut self, component: C) {
452 info!(scheme = component.scheme(), "Registering component");
453 self.registry
454 .lock()
455 .expect("mutex poisoned: another thread panicked while holding this lock")
456 .register(component);
457 }
458
459 pub fn register_language(
465 &mut self,
466 name: impl Into<String>,
467 lang: Box<dyn Language>,
468 ) -> Result<(), LanguageError> {
469 let name = name.into();
470 let mut languages = self
471 .languages
472 .lock()
473 .expect("mutex poisoned: another thread panicked while holding this lock");
474 if languages.contains_key(&name) {
475 return Err(LanguageError::AlreadyRegistered(name));
476 }
477 languages.insert(name, Arc::from(lang));
478 Ok(())
479 }
480
481 pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
483 let languages = self
484 .languages
485 .lock()
486 .expect("mutex poisoned: another thread panicked while holding this lock");
487 languages.get(name).cloned()
488 }
489
490 pub async fn add_route_definition(
494 &self,
495 definition: RouteDefinition,
496 ) -> Result<(), CamelError> {
497 use crate::lifecycle::ports::RouteRegistrationPort;
498 info!(
499 from = definition.from_uri(),
500 route_id = %definition.route_id(),
501 "Adding route definition"
502 );
503 self.runtime.register_route(definition).await
504 }
505
506 fn next_context_command_id(op: &str, route_id: &str) -> String {
507 let seq = CONTEXT_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
508 format!("context:{op}:{route_id}:{seq}")
509 }
510
511 pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
513 self.registry
514 .lock()
515 .expect("mutex poisoned: another thread panicked while holding this lock")
516 }
517
518 pub fn runtime_execution_handle(&self) -> RuntimeExecutionHandle {
520 RuntimeExecutionHandle {
521 controller: Arc::clone(&self.route_controller),
522 runtime: Arc::clone(&self.runtime),
523 }
524 }
525
526 pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
528 Arc::clone(&self.metrics)
529 }
530
531 pub fn runtime(&self) -> Arc<dyn camel_api::RuntimeHandle> {
533 self.runtime.clone()
534 }
535
536 pub fn producer_context(&self) -> camel_api::ProducerContext {
538 camel_api::ProducerContext::new().with_runtime(self.runtime())
539 }
540
541 pub async fn runtime_route_status(&self, route_id: &str) -> Result<Option<String>, CamelError> {
543 match self
544 .runtime()
545 .ask(camel_api::RuntimeQuery::GetRouteStatus {
546 route_id: route_id.to_string(),
547 })
548 .await
549 {
550 Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
551 Ok(_) => Err(CamelError::RouteError(
552 "unexpected runtime query response for route status".to_string(),
553 )),
554 Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
555 Err(err) => Err(err),
556 }
557 }
558
559 pub async fn start(&mut self) -> Result<(), CamelError> {
564 info!("Starting CamelContext");
565
566 for (i, service) in self.services.iter_mut().enumerate() {
568 info!("Starting service: {}", service.name());
569 if let Err(e) = service.start().await {
570 warn!(
572 "Service {} failed to start, rolling back {} services",
573 service.name(),
574 i
575 );
576 for j in (0..i).rev() {
577 if let Err(rollback_err) = self.services[j].stop().await {
578 warn!(
579 "Failed to stop service {} during rollback: {}",
580 self.services[j].name(),
581 rollback_err
582 );
583 }
584 }
585 return Err(e);
586 }
587 }
588
589 let route_ids = {
592 let controller = self.route_controller.lock().await;
593 controller.auto_startup_route_ids()
594 };
595 for route_id in route_ids {
596 self.runtime
597 .execute(camel_api::RuntimeCommand::StartRoute {
598 route_id: route_id.clone(),
599 command_id: Self::next_context_command_id("start", &route_id),
600 causation_id: None,
601 })
602 .await?;
603 }
604
605 info!("CamelContext started");
606 Ok(())
607 }
608
609 pub async fn stop(&mut self) -> Result<(), CamelError> {
611 self.stop_timeout(self.shutdown_timeout).await
612 }
613
614 pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
619 info!("Stopping CamelContext");
620
621 self.cancel_token.cancel();
623
624 let route_ids = {
627 let controller = self.route_controller.lock().await;
628 controller.shutdown_route_ids()
629 };
630 for route_id in route_ids {
631 if let Err(err) = self
632 .runtime
633 .execute(camel_api::RuntimeCommand::StopRoute {
634 route_id: route_id.clone(),
635 command_id: Self::next_context_command_id("stop", &route_id),
636 causation_id: None,
637 })
638 .await
639 {
640 warn!(route_id = %route_id, error = %err, "Runtime stop command failed during context shutdown");
641 }
642 }
643
644 let mut first_error = None;
647 for service in self.services.iter_mut().rev() {
648 info!("Stopping service: {}", service.name());
649 if let Err(e) = service.stop().await {
650 warn!("Service {} failed to stop: {}", service.name(), e);
651 if first_error.is_none() {
652 first_error = Some(e);
653 }
654 }
655 }
656
657 info!("CamelContext stopped");
658
659 if let Some(e) = first_error {
660 Err(e)
661 } else {
662 Ok(())
663 }
664 }
665
666 pub fn shutdown_timeout(&self) -> std::time::Duration {
668 self.shutdown_timeout
669 }
670
671 pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
673 self.shutdown_timeout = timeout;
674 }
675
676 pub async fn abort(&mut self) {
678 self.cancel_token.cancel();
679 let route_ids = {
680 let controller = self.route_controller.lock().await;
681 controller.shutdown_route_ids()
682 };
683 for route_id in route_ids {
684 let _ = self
685 .runtime
686 .execute(camel_api::RuntimeCommand::StopRoute {
687 route_id: route_id.clone(),
688 command_id: Self::next_context_command_id("abort-stop", &route_id),
689 causation_id: None,
690 })
691 .await;
692 }
693 }
694
695 pub fn health_check(&self) -> HealthReport {
697 let services: Vec<ServiceHealth> = self
698 .services
699 .iter()
700 .map(|s| ServiceHealth {
701 name: s.name().to_string(),
702 status: s.status(),
703 })
704 .collect();
705
706 let status = if services.iter().all(|s| s.status == ServiceStatus::Started) {
707 HealthStatus::Healthy
708 } else {
709 HealthStatus::Unhealthy
710 };
711
712 HealthReport {
713 status,
714 services,
715 ..Default::default()
716 }
717 }
718
719 pub fn set_component_config<T: 'static + Send + Sync>(&mut self, config: T) {
721 self.component_configs
722 .insert(TypeId::of::<T>(), Box::new(config));
723 }
724
725 pub fn get_component_config<T: 'static + Send + Sync>(&self) -> Option<&T> {
727 self.component_configs
728 .get(&TypeId::of::<T>())
729 .and_then(|b| b.downcast_ref::<T>())
730 }
731}
732
733impl Default for CamelContext {
734 fn default() -> Self {
735 Self::new()
736 }
737}
738
739#[cfg(test)]
740mod tests {
741 use super::*;
742 use crate::lifecycle::application::route_definition::{
743 BuilderStep, LanguageExpressionDef, RouteDefinition,
744 };
745 use crate::lifecycle::domain::{RouteRuntimeAggregate, RouteRuntimeState};
746 use async_trait::async_trait;
747 use camel_api::CamelError;
748 use camel_api::{
749 CanonicalRouteSpec, RuntimeCommand, RuntimeCommandResult, RuntimeQuery, RuntimeQueryResult,
750 };
751 use camel_component::{Component, ConcurrencyModel, Consumer, ConsumerContext, Endpoint};
752
753 struct MockComponent;
755
756 impl Component for MockComponent {
757 fn scheme(&self) -> &str {
758 "mock"
759 }
760
761 fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
762 Err(CamelError::ComponentNotFound("mock".to_string()))
763 }
764 }
765
766 struct HoldConsumer;
767
768 #[async_trait]
769 impl Consumer for HoldConsumer {
770 async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
771 ctx.cancelled().await;
772 Ok(())
773 }
774
775 async fn stop(&mut self) -> Result<(), CamelError> {
776 Ok(())
777 }
778
779 fn concurrency_model(&self) -> ConcurrencyModel {
780 ConcurrencyModel::Sequential
781 }
782 }
783
784 struct HoldEndpoint;
785
786 impl Endpoint for HoldEndpoint {
787 fn uri(&self) -> &str {
788 "hold:test"
789 }
790
791 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
792 Ok(Box::new(HoldConsumer))
793 }
794
795 fn create_producer(
796 &self,
797 _ctx: &camel_api::ProducerContext,
798 ) -> Result<camel_api::BoxProcessor, CamelError> {
799 Err(CamelError::RouteError("no producer".to_string()))
800 }
801 }
802
803 struct HoldComponent;
804
805 impl Component for HoldComponent {
806 fn scheme(&self) -> &str {
807 "hold"
808 }
809
810 fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
811 Ok(Box::new(HoldEndpoint))
812 }
813 }
814
815 #[test]
816 fn test_context_handles_mutex_poisoning_gracefully() {
817 let mut ctx = CamelContext::new();
818
819 ctx.register_component(MockComponent);
821
822 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
824 let _guard = ctx.registry();
825 }));
826
827 assert!(
828 result.is_ok(),
829 "Registry access should handle mutex poisoning"
830 );
831 }
832
833 #[test]
834 fn test_context_resolves_simple_language() {
835 let ctx = CamelContext::new();
836 let lang = ctx
837 .resolve_language("simple")
838 .expect("simple language not found");
839 assert_eq!(lang.name(), "simple");
840 }
841
842 #[test]
843 fn test_simple_language_via_context() {
844 let ctx = CamelContext::new();
845 let lang = ctx.resolve_language("simple").unwrap();
846 let pred = lang.create_predicate("${header.x} == 'hello'").unwrap();
847 let mut msg = camel_api::message::Message::default();
848 msg.set_header("x", camel_api::Value::String("hello".into()));
849 let ex = camel_api::exchange::Exchange::new(msg);
850 assert!(pred.matches(&ex).unwrap());
851 }
852
853 #[test]
854 fn test_resolve_unknown_language_returns_none() {
855 let ctx = CamelContext::new();
856 assert!(ctx.resolve_language("nonexistent").is_none());
857 }
858
859 #[test]
860 fn test_register_language_duplicate_returns_error() {
861 use camel_language_api::LanguageError;
862 struct DummyLang;
863 impl camel_language_api::Language for DummyLang {
864 fn name(&self) -> &'static str {
865 "dummy"
866 }
867 fn create_expression(
868 &self,
869 _: &str,
870 ) -> Result<Box<dyn camel_language_api::Expression>, LanguageError> {
871 Err(LanguageError::EvalError("not implemented".into()))
872 }
873 fn create_predicate(
874 &self,
875 _: &str,
876 ) -> Result<Box<dyn camel_language_api::Predicate>, LanguageError> {
877 Err(LanguageError::EvalError("not implemented".into()))
878 }
879 }
880
881 let mut ctx = CamelContext::new();
882 ctx.register_language("dummy", Box::new(DummyLang)).unwrap();
883 let result = ctx.register_language("dummy", Box::new(DummyLang));
884 assert!(result.is_err(), "duplicate registration should fail");
885 let err_msg = result.unwrap_err().to_string();
886 assert!(
887 err_msg.contains("dummy"),
888 "error should mention the language name"
889 );
890 }
891
892 #[test]
893 fn test_register_language_new_key_succeeds() {
894 use camel_language_api::LanguageError;
895 struct DummyLang;
896 impl camel_language_api::Language for DummyLang {
897 fn name(&self) -> &'static str {
898 "dummy"
899 }
900 fn create_expression(
901 &self,
902 _: &str,
903 ) -> Result<Box<dyn camel_language_api::Expression>, LanguageError> {
904 Err(LanguageError::EvalError("not implemented".into()))
905 }
906 fn create_predicate(
907 &self,
908 _: &str,
909 ) -> Result<Box<dyn camel_language_api::Predicate>, LanguageError> {
910 Err(LanguageError::EvalError("not implemented".into()))
911 }
912 }
913
914 let mut ctx = CamelContext::new();
915 let result = ctx.register_language("dummy", Box::new(DummyLang));
916 assert!(result.is_ok(), "first registration should succeed");
917 }
918
919 #[tokio::test]
920 async fn test_add_route_definition_uses_runtime_registered_language() {
921 use camel_language_api::{Expression, LanguageError, Predicate};
922
923 struct DummyExpression;
924 impl Expression for DummyExpression {
925 fn evaluate(
926 &self,
927 _exchange: &camel_api::Exchange,
928 ) -> Result<camel_api::Value, LanguageError> {
929 Ok(camel_api::Value::String("ok".into()))
930 }
931 }
932
933 struct DummyPredicate;
934 impl Predicate for DummyPredicate {
935 fn matches(&self, _exchange: &camel_api::Exchange) -> Result<bool, LanguageError> {
936 Ok(true)
937 }
938 }
939
940 struct RuntimeLang;
941 impl camel_language_api::Language for RuntimeLang {
942 fn name(&self) -> &'static str {
943 "runtime"
944 }
945
946 fn create_expression(
947 &self,
948 _script: &str,
949 ) -> Result<Box<dyn Expression>, LanguageError> {
950 Ok(Box::new(DummyExpression))
951 }
952
953 fn create_predicate(&self, _script: &str) -> Result<Box<dyn Predicate>, LanguageError> {
954 Ok(Box::new(DummyPredicate))
955 }
956 }
957
958 let mut ctx = CamelContext::new();
959 ctx.register_language("runtime", Box::new(RuntimeLang))
960 .unwrap();
961
962 let definition = RouteDefinition::new(
963 "timer:tick",
964 vec![BuilderStep::DeclarativeScript {
965 expression: LanguageExpressionDef {
966 language: "runtime".into(),
967 source: "${body}".into(),
968 },
969 }],
970 )
971 .with_route_id("runtime-lang-route");
972
973 let result = ctx.add_route_definition(definition).await;
974 assert!(
975 result.is_ok(),
976 "route should resolve runtime language: {result:?}"
977 );
978 }
979
980 #[tokio::test]
981 async fn test_add_route_definition_fails_for_unregistered_runtime_language() {
982 let ctx = CamelContext::new();
983 let definition = RouteDefinition::new(
984 "timer:tick",
985 vec![BuilderStep::DeclarativeSetBody {
986 value: crate::route::ValueSourceDef::Expression(LanguageExpressionDef {
987 language: "missing-lang".into(),
988 source: "${body}".into(),
989 }),
990 }],
991 )
992 .with_route_id("missing-runtime-lang-route");
993
994 let result = ctx.add_route_definition(definition).await;
995 assert!(
996 result.is_err(),
997 "route should fail when language is missing"
998 );
999 let error_text = result.unwrap_err().to_string();
1000 assert!(
1001 error_text.contains("missing-lang"),
1002 "error should mention missing language, got: {error_text}"
1003 );
1004 }
1005
1006 #[tokio::test]
1007 async fn add_route_definition_does_not_require_mut() {
1008 let ctx = CamelContext::new();
1009 let definition = RouteDefinition::new("timer:tick", vec![]).with_route_id("immutable-ctx");
1010
1011 let result = ctx.add_route_definition(definition).await;
1012 assert!(
1013 result.is_ok(),
1014 "immutable context should add route: {result:?}"
1015 );
1016 }
1017
1018 #[test]
1019 fn test_health_check_empty_context() {
1020 let ctx = CamelContext::new();
1021 let report = ctx.health_check();
1022
1023 assert_eq!(report.status, HealthStatus::Healthy);
1024 assert!(report.services.is_empty());
1025 }
1026
1027 #[tokio::test]
1028 async fn context_exposes_runtime_command_and_query_buses() {
1029 let ctx = CamelContext::new();
1030 let runtime = ctx.runtime();
1031
1032 let register = runtime
1033 .execute(RuntimeCommand::RegisterRoute {
1034 spec: CanonicalRouteSpec::new("runtime-r1", "timer:tick"),
1035 command_id: "cmd-1".into(),
1036 causation_id: None,
1037 })
1038 .await
1039 .unwrap();
1040 assert!(matches!(
1041 register,
1042 RuntimeCommandResult::RouteRegistered { ref route_id } if route_id == "runtime-r1"
1043 ));
1044
1045 let query = runtime
1046 .ask(RuntimeQuery::GetRouteStatus {
1047 route_id: "runtime-r1".into(),
1048 })
1049 .await
1050 .unwrap();
1051 assert!(matches!(
1052 query,
1053 RuntimeQueryResult::RouteStatus { ref status, .. } if status == "Registered"
1054 ));
1055 }
1056
1057 #[tokio::test]
1058 async fn default_runtime_journal_isolated_per_context_without_env_override() {
1059 if let Ok(value) = std::env::var("CAMEL_RUNTIME_JOURNAL_PATH")
1060 && !value.trim().is_empty()
1061 {
1062 return;
1063 }
1064
1065 let first = CamelContext::new();
1066 first
1067 .runtime()
1068 .execute(RuntimeCommand::RegisterRoute {
1069 spec: CanonicalRouteSpec::new("default-isolation-r1", "timer:tick"),
1070 command_id: "iso-c1".into(),
1071 causation_id: None,
1072 })
1073 .await
1074 .unwrap();
1075
1076 let second = CamelContext::new();
1077 second
1078 .runtime()
1079 .execute(RuntimeCommand::RegisterRoute {
1080 spec: CanonicalRouteSpec::new("default-isolation-r1", "timer:tick"),
1081 command_id: "iso-c2".into(),
1082 causation_id: None,
1083 })
1084 .await
1085 .unwrap();
1086 }
1087
1088 #[tokio::test]
1089 async fn runtime_commands_drive_real_route_controller_lifecycle() {
1090 let mut ctx = CamelContext::new();
1091 ctx.register_component(HoldComponent);
1092 let runtime = ctx.runtime();
1093
1094 runtime
1095 .execute(RuntimeCommand::RegisterRoute {
1096 spec: CanonicalRouteSpec::new("runtime-hold", "hold:test"),
1097 command_id: "c1".into(),
1098 causation_id: None,
1099 })
1100 .await
1101 .unwrap();
1102
1103 assert_eq!(
1104 ctx.runtime_route_status("runtime-hold").await.unwrap(),
1105 Some("Registered".to_string())
1106 );
1107 assert!(matches!(
1108 ctx.runtime.repo().load("runtime-hold").await.unwrap(),
1109 Some(agg)
1110 if matches!(agg.state(), crate::lifecycle::domain::RouteRuntimeState::Registered)
1111 ));
1112
1113 runtime
1114 .execute(RuntimeCommand::StartRoute {
1115 route_id: "runtime-hold".into(),
1116 command_id: "c2".into(),
1117 causation_id: Some("c1".into()),
1118 })
1119 .await
1120 .unwrap();
1121 assert_eq!(
1122 ctx.runtime_route_status("runtime-hold").await.unwrap(),
1123 Some("Started".to_string())
1124 );
1125 assert!(matches!(
1126 ctx.runtime.repo().load("runtime-hold").await.unwrap(),
1127 Some(agg)
1128 if matches!(agg.state(), crate::lifecycle::domain::RouteRuntimeState::Started)
1129 ));
1130
1131 runtime
1132 .execute(RuntimeCommand::SuspendRoute {
1133 route_id: "runtime-hold".into(),
1134 command_id: "c3".into(),
1135 causation_id: Some("c2".into()),
1136 })
1137 .await
1138 .unwrap();
1139 assert_eq!(
1140 ctx.runtime_route_status("runtime-hold").await.unwrap(),
1141 Some("Suspended".to_string())
1142 );
1143
1144 runtime
1145 .execute(RuntimeCommand::ResumeRoute {
1146 route_id: "runtime-hold".into(),
1147 command_id: "c4".into(),
1148 causation_id: Some("c3".into()),
1149 })
1150 .await
1151 .unwrap();
1152 assert_eq!(
1153 ctx.runtime_route_status("runtime-hold").await.unwrap(),
1154 Some("Started".to_string())
1155 );
1156
1157 runtime
1158 .execute(RuntimeCommand::StopRoute {
1159 route_id: "runtime-hold".into(),
1160 command_id: "c5".into(),
1161 causation_id: Some("c4".into()),
1162 })
1163 .await
1164 .unwrap();
1165 assert_eq!(
1166 ctx.runtime_route_status("runtime-hold").await.unwrap(),
1167 Some("Stopped".to_string())
1168 );
1169
1170 runtime
1171 .execute(RuntimeCommand::ReloadRoute {
1172 route_id: "runtime-hold".into(),
1173 command_id: "c6".into(),
1174 causation_id: Some("c5".into()),
1175 })
1176 .await
1177 .unwrap();
1178 assert_eq!(
1179 ctx.runtime_route_status("runtime-hold").await.unwrap(),
1180 Some("Started".to_string())
1181 );
1182
1183 runtime
1184 .execute(RuntimeCommand::StopRoute {
1185 route_id: "runtime-hold".into(),
1186 command_id: "c7".into(),
1187 causation_id: Some("c6".into()),
1188 })
1189 .await
1190 .unwrap();
1191 assert_eq!(
1192 ctx.runtime_route_status("runtime-hold").await.unwrap(),
1193 Some("Stopped".to_string())
1194 );
1195
1196 runtime
1197 .execute(RuntimeCommand::RemoveRoute {
1198 route_id: "runtime-hold".into(),
1199 command_id: "c8".into(),
1200 causation_id: Some("c7".into()),
1201 })
1202 .await
1203 .unwrap();
1204 assert_eq!(
1205 ctx.runtime_route_status("runtime-hold").await.unwrap(),
1206 None
1207 );
1208 assert!(
1209 ctx.runtime
1210 .repo()
1211 .load("runtime-hold")
1212 .await
1213 .unwrap()
1214 .is_none()
1215 );
1216 }
1217
1218 #[tokio::test]
1219 async fn runtime_queries_read_projection_state_when_connected() {
1220 let mut ctx = CamelContext::new();
1221 ctx.register_component(HoldComponent);
1222 let runtime = ctx.runtime();
1223 runtime
1224 .execute(RuntimeCommand::RegisterRoute {
1225 spec: CanonicalRouteSpec::new("rq", "hold:test"),
1226 command_id: "c1".into(),
1227 causation_id: None,
1228 })
1229 .await
1230 .unwrap();
1231
1232 ctx.runtime_execution_handle()
1234 .force_start_route_for_test("rq")
1235 .await
1236 .unwrap();
1237
1238 let result = runtime
1239 .ask(RuntimeQuery::GetRouteStatus {
1240 route_id: "rq".into(),
1241 })
1242 .await
1243 .unwrap();
1244
1245 match result {
1246 RuntimeQueryResult::RouteStatus { status, .. } => assert_eq!(status, "Registered"),
1247 _ => panic!("unexpected query result"),
1248 }
1249 }
1250
1251 #[tokio::test]
1252 async fn add_route_definition_produces_registered_state() {
1253 let ctx = CamelContext::new();
1254 let definition =
1255 RouteDefinition::new("direct:test", vec![]).with_route_id("async-test-route");
1256
1257 ctx.add_route_definition(definition).await.unwrap();
1258
1259 let status = ctx
1260 .runtime()
1261 .ask(RuntimeQuery::GetRouteStatus {
1262 route_id: "async-test-route".to_string(),
1263 })
1264 .await
1265 .unwrap();
1266
1267 match status {
1268 RuntimeQueryResult::RouteStatus { status, .. } => {
1269 assert_eq!(
1270 status, "Registered",
1271 "expected Registered state after add_route_definition"
1272 );
1273 }
1274 _ => panic!("unexpected query result"),
1275 }
1276 }
1277
1278 #[tokio::test]
1279 async fn add_route_definition_injects_runtime_into_producer_context() {
1280 use std::sync::atomic::{AtomicBool, Ordering};
1281
1282 struct RuntimeAwareEndpoint {
1283 saw_runtime: Arc<AtomicBool>,
1284 }
1285
1286 impl Endpoint for RuntimeAwareEndpoint {
1287 fn uri(&self) -> &str {
1288 "runtime-aware:test"
1289 }
1290
1291 fn create_consumer(&self) -> Result<Box<dyn camel_component::Consumer>, CamelError> {
1292 Err(CamelError::RouteError("no consumer".to_string()))
1293 }
1294
1295 fn create_producer(
1296 &self,
1297 ctx: &camel_api::ProducerContext,
1298 ) -> Result<camel_api::BoxProcessor, CamelError> {
1299 self.saw_runtime
1300 .store(ctx.runtime().is_some(), Ordering::SeqCst);
1301 if ctx.runtime().is_none() {
1302 return Err(CamelError::RouteError(
1303 "runtime handle missing in ProducerContext".to_string(),
1304 ));
1305 }
1306 Ok(camel_api::BoxProcessor::new(camel_api::IdentityProcessor))
1307 }
1308 }
1309
1310 struct RuntimeAwareComponent {
1311 saw_runtime: Arc<AtomicBool>,
1312 }
1313
1314 impl Component for RuntimeAwareComponent {
1315 fn scheme(&self) -> &str {
1316 "runtime-aware"
1317 }
1318
1319 fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
1320 Ok(Box::new(RuntimeAwareEndpoint {
1321 saw_runtime: Arc::clone(&self.saw_runtime),
1322 }))
1323 }
1324 }
1325
1326 let saw_runtime = Arc::new(AtomicBool::new(false));
1327 let mut ctx = CamelContext::new();
1328 ctx.register_component(RuntimeAwareComponent {
1329 saw_runtime: Arc::clone(&saw_runtime),
1330 });
1331
1332 let definition = RouteDefinition::new(
1333 "timer:tick",
1334 vec![BuilderStep::To("runtime-aware:test".to_string())],
1335 )
1336 .with_route_id("runtime-aware-route");
1337
1338 let result = ctx.add_route_definition(definition).await;
1339 assert!(
1340 result.is_ok(),
1341 "route should resolve producer with runtime context: {result:?}"
1342 );
1343 assert!(
1344 saw_runtime.load(Ordering::SeqCst),
1345 "component producer should observe runtime handle in ProducerContext"
1346 );
1347 }
1348
1349 #[tokio::test]
1350 async fn add_route_definition_registers_runtime_projection_and_aggregate() {
1351 let mut ctx = CamelContext::new();
1352 ctx.register_component(HoldComponent);
1353
1354 let definition = RouteDefinition::new("hold:test", vec![]).with_route_id("ctx-runtime-r1");
1355 ctx.add_route_definition(definition).await.unwrap();
1356
1357 let aggregate = ctx.runtime.repo().load("ctx-runtime-r1").await.unwrap();
1358 assert!(
1359 matches!(aggregate, Some(agg) if matches!(agg.state(), RouteRuntimeState::Registered)),
1360 "route registration should seed aggregate as Registered"
1361 );
1362
1363 let status = ctx.runtime_route_status("ctx-runtime-r1").await.unwrap();
1364 assert_eq!(status.as_deref(), Some("Registered"));
1365 }
1366
1367 #[tokio::test]
1368 async fn add_route_definition_rolls_back_controller_when_runtime_registration_fails() {
1369 let mut ctx = CamelContext::new();
1370 ctx.register_component(HoldComponent);
1371
1372 ctx.runtime
1373 .repo()
1374 .save(RouteRuntimeAggregate::new("ctx-runtime-dup"))
1375 .await
1376 .unwrap();
1377
1378 let definition = RouteDefinition::new("hold:test", vec![]).with_route_id("ctx-runtime-dup");
1379 let result = ctx.add_route_definition(definition).await;
1380 assert!(result.is_err(), "duplicate runtime registration must fail");
1381
1382 assert_eq!(
1383 ctx.runtime_execution_handle()
1384 .controller_route_count_for_test()
1385 .await,
1386 0,
1387 "controller route should be rolled back on runtime bootstrap failure"
1388 );
1389 }
1390
1391 #[tokio::test]
1392 async fn context_start_stop_drives_runtime_lifecycle_via_command_bus() {
1393 let mut ctx = CamelContext::new();
1394 ctx.register_component(HoldComponent);
1395
1396 let autostart =
1397 RouteDefinition::new("hold:test", vec![]).with_route_id("ctx-lifecycle-auto");
1398 let lazy = RouteDefinition::new("hold:test", vec![])
1399 .with_route_id("ctx-lifecycle-lazy")
1400 .with_auto_startup(false);
1401
1402 ctx.add_route_definition(autostart).await.unwrap();
1403 ctx.add_route_definition(lazy).await.unwrap();
1404
1405 assert_eq!(
1406 ctx.runtime_route_status("ctx-lifecycle-auto")
1407 .await
1408 .unwrap(),
1409 Some("Registered".to_string())
1410 );
1411 assert_eq!(
1412 ctx.runtime_route_status("ctx-lifecycle-lazy")
1413 .await
1414 .unwrap(),
1415 Some("Registered".to_string())
1416 );
1417
1418 ctx.start().await.unwrap();
1419
1420 assert_eq!(
1421 ctx.runtime_route_status("ctx-lifecycle-auto")
1422 .await
1423 .unwrap(),
1424 Some("Started".to_string())
1425 );
1426 assert_eq!(
1427 ctx.runtime_route_status("ctx-lifecycle-lazy")
1428 .await
1429 .unwrap(),
1430 Some("Registered".to_string())
1431 );
1432
1433 ctx.stop().await.unwrap();
1434
1435 assert_eq!(
1436 ctx.runtime_route_status("ctx-lifecycle-auto")
1437 .await
1438 .unwrap(),
1439 Some("Stopped".to_string())
1440 );
1441 assert_eq!(
1442 ctx.runtime_route_status("ctx-lifecycle-lazy")
1443 .await
1444 .unwrap(),
1445 Some("Registered".to_string())
1446 );
1447 }
1448}
1449
1450#[cfg(test)]
1451mod lifecycle_tests {
1452 use super::*;
1453 use async_trait::async_trait;
1454 use camel_api::Lifecycle;
1455 use std::sync::Arc;
1456 use std::sync::atomic::{AtomicUsize, Ordering};
1457
1458 struct MockService {
1459 start_count: Arc<AtomicUsize>,
1460 stop_count: Arc<AtomicUsize>,
1461 }
1462
1463 impl MockService {
1464 fn new() -> (Self, Arc<AtomicUsize>, Arc<AtomicUsize>) {
1465 let start_count = Arc::new(AtomicUsize::new(0));
1466 let stop_count = Arc::new(AtomicUsize::new(0));
1467 (
1468 Self {
1469 start_count: start_count.clone(),
1470 stop_count: stop_count.clone(),
1471 },
1472 start_count,
1473 stop_count,
1474 )
1475 }
1476 }
1477
1478 #[async_trait]
1479 impl Lifecycle for MockService {
1480 fn name(&self) -> &str {
1481 "mock"
1482 }
1483
1484 async fn start(&mut self) -> Result<(), CamelError> {
1485 self.start_count.fetch_add(1, Ordering::SeqCst);
1486 Ok(())
1487 }
1488
1489 async fn stop(&mut self) -> Result<(), CamelError> {
1490 self.stop_count.fetch_add(1, Ordering::SeqCst);
1491 Ok(())
1492 }
1493 }
1494
1495 #[tokio::test]
1496 async fn test_context_starts_lifecycle_services() {
1497 let (service, start_count, stop_count) = MockService::new();
1498
1499 let mut ctx = CamelContext::new().with_lifecycle(service);
1500
1501 assert_eq!(start_count.load(Ordering::SeqCst), 0);
1502
1503 ctx.start().await.unwrap();
1504
1505 assert_eq!(start_count.load(Ordering::SeqCst), 1);
1506 assert_eq!(stop_count.load(Ordering::SeqCst), 0);
1507
1508 ctx.stop().await.unwrap();
1509
1510 assert_eq!(stop_count.load(Ordering::SeqCst), 1);
1511 }
1512
1513 #[tokio::test]
1514 async fn test_service_start_failure_rollback() {
1515 struct FailingService {
1516 start_count: Arc<AtomicUsize>,
1517 stop_count: Arc<AtomicUsize>,
1518 should_fail: bool,
1519 }
1520
1521 #[async_trait]
1522 impl Lifecycle for FailingService {
1523 fn name(&self) -> &str {
1524 "failing"
1525 }
1526
1527 async fn start(&mut self) -> Result<(), CamelError> {
1528 self.start_count.fetch_add(1, Ordering::SeqCst);
1529 if self.should_fail {
1530 Err(CamelError::ProcessorError("intentional failure".into()))
1531 } else {
1532 Ok(())
1533 }
1534 }
1535
1536 async fn stop(&mut self) -> Result<(), CamelError> {
1537 self.stop_count.fetch_add(1, Ordering::SeqCst);
1538 Ok(())
1539 }
1540 }
1541
1542 let start1 = Arc::new(AtomicUsize::new(0));
1543 let stop1 = Arc::new(AtomicUsize::new(0));
1544 let start2 = Arc::new(AtomicUsize::new(0));
1545 let stop2 = Arc::new(AtomicUsize::new(0));
1546 let start3 = Arc::new(AtomicUsize::new(0));
1547 let stop3 = Arc::new(AtomicUsize::new(0));
1548
1549 let service1 = FailingService {
1550 start_count: start1.clone(),
1551 stop_count: stop1.clone(),
1552 should_fail: false,
1553 };
1554 let service2 = FailingService {
1555 start_count: start2.clone(),
1556 stop_count: stop2.clone(),
1557 should_fail: true, };
1559 let service3 = FailingService {
1560 start_count: start3.clone(),
1561 stop_count: stop3.clone(),
1562 should_fail: false,
1563 };
1564
1565 let mut ctx = CamelContext::new()
1566 .with_lifecycle(service1)
1567 .with_lifecycle(service2)
1568 .with_lifecycle(service3);
1569
1570 let result = ctx.start().await;
1572 assert!(result.is_err());
1573
1574 assert_eq!(start1.load(Ordering::SeqCst), 1);
1576 assert_eq!(stop1.load(Ordering::SeqCst), 1);
1577
1578 assert_eq!(start2.load(Ordering::SeqCst), 1);
1580 assert_eq!(stop2.load(Ordering::SeqCst), 0);
1581
1582 assert_eq!(start3.load(Ordering::SeqCst), 0);
1584 assert_eq!(stop3.load(Ordering::SeqCst), 0);
1585 }
1586
1587 #[tokio::test]
1588 async fn test_services_stop_in_reverse_order() {
1589 use std::sync::Mutex as StdMutex;
1590
1591 struct OrderTracker {
1592 name: String,
1593 order: Arc<StdMutex<Vec<String>>>,
1594 }
1595
1596 #[async_trait]
1597 impl Lifecycle for OrderTracker {
1598 fn name(&self) -> &str {
1599 &self.name
1600 }
1601
1602 async fn start(&mut self) -> Result<(), CamelError> {
1603 Ok(())
1604 }
1605
1606 async fn stop(&mut self) -> Result<(), CamelError> {
1607 self.order.lock().unwrap().push(self.name.clone());
1608 Ok(())
1609 }
1610 }
1611
1612 let order = Arc::new(StdMutex::new(Vec::<String>::new()));
1613
1614 let s1 = OrderTracker {
1615 name: "first".into(),
1616 order: Arc::clone(&order),
1617 };
1618 let s2 = OrderTracker {
1619 name: "second".into(),
1620 order: Arc::clone(&order),
1621 };
1622 let s3 = OrderTracker {
1623 name: "third".into(),
1624 order: Arc::clone(&order),
1625 };
1626
1627 let mut ctx = CamelContext::new()
1628 .with_lifecycle(s1)
1629 .with_lifecycle(s2)
1630 .with_lifecycle(s3);
1631
1632 ctx.start().await.unwrap();
1633 ctx.stop().await.unwrap();
1634
1635 let stopped = order.lock().unwrap();
1636 assert_eq!(
1637 *stopped,
1638 vec!["third", "second", "first"],
1639 "services must stop in reverse insertion order"
1640 );
1641 }
1642}
1643
1644#[cfg(test)]
1645mod config_registry_tests {
1646 use super::*;
1647
1648 #[derive(Debug, Clone, PartialEq)]
1649 struct MyConfig {
1650 value: u32,
1651 }
1652
1653 #[test]
1654 fn test_set_and_get_component_config() {
1655 let mut ctx = CamelContext::new();
1656 ctx.set_component_config(MyConfig { value: 42 });
1657 let got = ctx.get_component_config::<MyConfig>();
1658 assert_eq!(got, Some(&MyConfig { value: 42 }));
1659 }
1660
1661 #[test]
1662 fn test_get_missing_config_returns_none() {
1663 let ctx = CamelContext::new();
1664 assert!(ctx.get_component_config::<MyConfig>().is_none());
1665 }
1666
1667 #[test]
1668 fn test_set_overwrites_previous_config() {
1669 let mut ctx = CamelContext::new();
1670 ctx.set_component_config(MyConfig { value: 1 });
1671 ctx.set_component_config(MyConfig { value: 2 });
1672 assert_eq!(ctx.get_component_config::<MyConfig>().unwrap().value, 2);
1673 }
1674}