1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicU64, Ordering};
6use tokio::sync::Mutex;
7use tokio_util::sync::CancellationToken;
8use tracing::{info, warn};
9
10use camel_api::error_handler::ErrorHandlerConfig;
11use camel_api::{
12 CamelError, HealthReport, HealthStatus, Lifecycle, MetricsCollector, NoOpMetrics,
13 RouteController, RuntimeCommandBus, RuntimeQueryBus, ServiceHealth, ServiceStatus,
14 SupervisionConfig,
15};
16use camel_component::Component;
17use camel_language_api::Language;
18use camel_language_api::LanguageError;
19
20use crate::lifecycle::adapters::RuntimeExecutionAdapter;
21use crate::lifecycle::adapters::route_controller::{
22 DefaultRouteController, RouteControllerInternal, SharedLanguageRegistry,
23};
24use crate::lifecycle::application::route_definition::RouteDefinition;
25use crate::lifecycle::application::runtime_bus::RuntimeBus;
26use crate::lifecycle::application::supervision_service::SupervisingRouteController;
27use crate::shared::components::domain::Registry;
28use crate::shared::observability::domain::TracerConfig;
29
30static CONTEXT_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
31
32pub struct CamelContext {
40 registry: Arc<std::sync::Mutex<Registry>>,
41 route_controller: Arc<Mutex<dyn RouteControllerInternal>>,
42 runtime: Arc<RuntimeBus>,
43 cancel_token: CancellationToken,
44 metrics: Arc<dyn MetricsCollector>,
45 languages: SharedLanguageRegistry,
46 shutdown_timeout: std::time::Duration,
47 services: Vec<Box<dyn Lifecycle>>,
48 component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
49}
50
51#[derive(Clone)]
55pub struct RuntimeExecutionHandle {
56 controller: Arc<Mutex<dyn RouteControllerInternal>>,
57 runtime: Arc<RuntimeBus>,
58}
59
60impl RuntimeExecutionHandle {
61 pub(crate) async fn add_route_definition(
62 &self,
63 definition: RouteDefinition,
64 ) -> Result<(), CamelError> {
65 use crate::lifecycle::ports::RouteRegistrationPort;
66 self.runtime.register_route(definition).await
67 }
68
69 pub(crate) async fn compile_route_definition(
70 &self,
71 definition: RouteDefinition,
72 ) -> Result<camel_api::BoxProcessor, CamelError> {
73 let controller = self.controller.lock().await;
74 controller.compile_route_definition(definition)
75 }
76
77 pub(crate) async fn swap_route_pipeline(
78 &self,
79 route_id: &str,
80 pipeline: camel_api::BoxProcessor,
81 ) -> Result<(), CamelError> {
82 let controller = self.controller.lock().await;
83 controller.swap_pipeline(route_id, pipeline)
84 }
85
86 pub(crate) async fn execute_runtime_command(
87 &self,
88 cmd: camel_api::RuntimeCommand,
89 ) -> Result<camel_api::RuntimeCommandResult, CamelError> {
90 self.runtime.execute(cmd).await
91 }
92
93 pub(crate) async fn runtime_route_status(
94 &self,
95 route_id: &str,
96 ) -> Result<Option<String>, CamelError> {
97 match self
98 .runtime
99 .ask(camel_api::RuntimeQuery::GetRouteStatus {
100 route_id: route_id.to_string(),
101 })
102 .await
103 {
104 Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
105 Ok(_) => Err(CamelError::RouteError(
106 "unexpected runtime query response for route status".to_string(),
107 )),
108 Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
109 Err(err) => Err(err),
110 }
111 }
112
113 pub(crate) async fn runtime_route_ids(&self) -> Result<Vec<String>, CamelError> {
114 match self.runtime.ask(camel_api::RuntimeQuery::ListRoutes).await {
115 Ok(camel_api::RuntimeQueryResult::Routes { route_ids }) => Ok(route_ids),
116 Ok(_) => Err(CamelError::RouteError(
117 "unexpected runtime query response for route listing".to_string(),
118 )),
119 Err(err) => Err(err),
120 }
121 }
122
123 #[cfg(test)]
124 pub(crate) async fn force_start_route_for_test(
125 &self,
126 route_id: &str,
127 ) -> Result<(), CamelError> {
128 let mut controller = self.controller.lock().await;
129 controller.start_route(route_id).await
130 }
131
132 #[cfg(test)]
133 pub(crate) async fn controller_route_count_for_test(&self) -> usize {
134 let controller = self.controller.lock().await;
135 controller.route_count()
136 }
137}
138
139impl CamelContext {
140 fn built_in_languages() -> SharedLanguageRegistry {
141 let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
142 languages.insert(
143 "simple".to_string(),
144 Arc::new(camel_language_simple::SimpleLanguage),
145 );
146 Arc::new(std::sync::Mutex::new(languages))
147 }
148
149 fn build_runtime(
150 controller: Arc<Mutex<dyn RouteControllerInternal>>,
151 store: crate::lifecycle::adapters::InMemoryRuntimeStore,
152 ) -> Arc<RuntimeBus> {
153 let execution = Arc::new(RuntimeExecutionAdapter::new(Arc::clone(&controller)));
154 Arc::new(
155 RuntimeBus::new(
156 Arc::new(store.clone()),
157 Arc::new(store.clone()),
158 Arc::new(store.clone()),
159 Arc::new(store.clone()),
160 )
161 .with_uow(Arc::new(store))
162 .with_execution(execution),
163 )
164 }
165
166 fn runtime_store_with_journal_path(
167 path: PathBuf,
168 ) -> crate::lifecycle::adapters::InMemoryRuntimeStore {
169 let store = crate::lifecycle::adapters::InMemoryRuntimeStore::default();
170 match crate::lifecycle::adapters::FileRuntimeEventJournal::new(path.clone()) {
171 Ok(journal) => store.with_journal(Arc::new(journal)),
172 Err(err) => {
173 warn!(
174 journal_path = %path.display(),
175 error = %err,
176 "Failed to initialize runtime event journal; falling back to in-memory-only runtime store"
177 );
178 store
179 }
180 }
181 }
182
183 pub fn new() -> Self {
187 Self::with_metrics(Arc::new(NoOpMetrics))
188 }
189
190 pub fn new_with_runtime_journal_path(path: impl Into<PathBuf>) -> Self {
194 Self::with_metrics_and_runtime_journal_path(Arc::new(NoOpMetrics), path)
195 }
196
197 pub fn with_metrics(metrics: Arc<dyn MetricsCollector>) -> Self {
199 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
200 let languages = Self::built_in_languages();
201 let controller: Arc<Mutex<dyn RouteControllerInternal>> = Arc::new(Mutex::new(
202 DefaultRouteController::with_languages(Arc::clone(®istry), Arc::clone(&languages)),
203 ));
204 let store = crate::lifecycle::adapters::InMemoryRuntimeStore::default();
205 let runtime = Self::build_runtime(Arc::clone(&controller), store);
206
207 let mut controller_guard = controller
210 .try_lock()
211 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access");
212 controller_guard.set_self_ref(Arc::clone(&controller) as Arc<Mutex<dyn RouteController>>);
213 let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
214 controller_guard.set_runtime_handle(runtime_handle);
215 drop(controller_guard);
216
217 Self {
218 registry,
219 route_controller: controller,
220 runtime,
221 cancel_token: CancellationToken::new(),
222 metrics,
223 languages,
224 shutdown_timeout: std::time::Duration::from_secs(30),
225 services: Vec::new(),
226 component_configs: HashMap::new(),
227 }
228 }
229
230 pub fn with_metrics_and_runtime_journal_path(
232 metrics: Arc<dyn MetricsCollector>,
233 journal_path: impl Into<PathBuf>,
234 ) -> Self {
235 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
236 let languages = Self::built_in_languages();
237 let controller: Arc<Mutex<dyn RouteControllerInternal>> = Arc::new(Mutex::new(
238 DefaultRouteController::with_languages(Arc::clone(®istry), Arc::clone(&languages)),
239 ));
240 let store = Self::runtime_store_with_journal_path(journal_path.into());
241 let runtime = Self::build_runtime(Arc::clone(&controller), store);
242
243 let mut controller_guard = controller
246 .try_lock()
247 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access");
248 controller_guard.set_self_ref(Arc::clone(&controller) as Arc<Mutex<dyn RouteController>>);
249 let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
250 controller_guard.set_runtime_handle(runtime_handle);
251 drop(controller_guard);
252
253 Self {
254 registry,
255 route_controller: controller,
256 runtime,
257 cancel_token: CancellationToken::new(),
258 metrics,
259 languages,
260 shutdown_timeout: std::time::Duration::from_secs(30),
261 services: Vec::new(),
262 component_configs: HashMap::new(),
263 }
264 }
265
266 pub fn with_supervision(config: SupervisionConfig) -> Self {
270 Self::with_supervision_and_metrics(config, Arc::new(NoOpMetrics))
271 }
272
273 pub fn with_supervision_and_metrics(
277 config: SupervisionConfig,
278 metrics: Arc<dyn MetricsCollector>,
279 ) -> Self {
280 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
281 let languages = Self::built_in_languages();
282 let controller: Arc<Mutex<dyn RouteControllerInternal>> = Arc::new(Mutex::new(
283 SupervisingRouteController::with_languages(
284 Arc::clone(®istry),
285 config,
286 Arc::clone(&languages),
287 )
288 .with_metrics(Arc::clone(&metrics)),
289 ));
290 let store = crate::lifecycle::adapters::InMemoryRuntimeStore::default();
291 let runtime = Self::build_runtime(Arc::clone(&controller), store);
292
293 let mut controller_guard = controller
296 .try_lock()
297 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access");
298 controller_guard.set_self_ref(Arc::clone(&controller) as Arc<Mutex<dyn RouteController>>);
299 let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
300 controller_guard.set_runtime_handle(runtime_handle);
301 drop(controller_guard);
302
303 Self {
304 registry,
305 route_controller: controller,
306 runtime,
307 cancel_token: CancellationToken::new(),
308 metrics,
309 languages,
310 shutdown_timeout: std::time::Duration::from_secs(30),
311 services: Vec::new(),
312 component_configs: HashMap::new(),
313 }
314 }
315
316 pub fn with_supervision_and_metrics_and_runtime_journal_path(
318 config: SupervisionConfig,
319 metrics: Arc<dyn MetricsCollector>,
320 journal_path: impl Into<PathBuf>,
321 ) -> Self {
322 let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
323 let languages = Self::built_in_languages();
324 let controller: Arc<Mutex<dyn RouteControllerInternal>> = Arc::new(Mutex::new(
325 SupervisingRouteController::with_languages(
326 Arc::clone(®istry),
327 config,
328 Arc::clone(&languages),
329 )
330 .with_metrics(Arc::clone(&metrics)),
331 ));
332 let store = Self::runtime_store_with_journal_path(journal_path.into());
333 let runtime = Self::build_runtime(Arc::clone(&controller), store);
334
335 let mut controller_guard = controller
338 .try_lock()
339 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access");
340 controller_guard.set_self_ref(Arc::clone(&controller) as Arc<Mutex<dyn RouteController>>);
341 let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
342 controller_guard.set_runtime_handle(runtime_handle);
343 drop(controller_guard);
344
345 Self {
346 registry,
347 route_controller: controller,
348 runtime,
349 cancel_token: CancellationToken::new(),
350 metrics,
351 languages,
352 shutdown_timeout: std::time::Duration::from_secs(30),
353 services: Vec::new(),
354 component_configs: HashMap::new(),
355 }
356 }
357
358 pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
360 self.route_controller
361 .try_lock()
362 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
363 .set_error_handler(config);
364 }
365
366 pub fn set_tracing(&mut self, enabled: bool) {
368 self.route_controller
369 .try_lock()
370 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
371 .set_tracer_config(&TracerConfig {
372 enabled,
373 ..Default::default()
374 });
375 }
376
377 pub fn set_tracer_config(&mut self, config: TracerConfig) {
379 let config = if config.metrics_collector.is_none() {
381 TracerConfig {
382 metrics_collector: Some(Arc::clone(&self.metrics)),
383 ..config
384 }
385 } else {
386 config
387 };
388
389 self.route_controller
390 .try_lock()
391 .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
392 .set_tracer_config(&config);
393 }
394
395 pub fn with_tracing(mut self) -> Self {
397 self.set_tracing(true);
398 self
399 }
400
401 pub fn with_tracer_config(mut self, config: TracerConfig) -> Self {
405 self.set_tracer_config(config);
406 self
407 }
408
409 pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
411 if let Some(collector) = service.as_metrics_collector() {
413 self.metrics = collector;
414 }
415
416 self.services.push(Box::new(service));
417 self
418 }
419
420 pub fn register_component<C: Component + 'static>(&mut self, component: C) {
422 info!(scheme = component.scheme(), "Registering component");
423 self.registry
424 .lock()
425 .expect("mutex poisoned: another thread panicked while holding this lock")
426 .register(component);
427 }
428
429 pub fn register_language(
435 &mut self,
436 name: impl Into<String>,
437 lang: Box<dyn Language>,
438 ) -> Result<(), LanguageError> {
439 let name = name.into();
440 let mut languages = self
441 .languages
442 .lock()
443 .expect("mutex poisoned: another thread panicked while holding this lock");
444 if languages.contains_key(&name) {
445 return Err(LanguageError::AlreadyRegistered(name));
446 }
447 languages.insert(name, Arc::from(lang));
448 Ok(())
449 }
450
451 pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
453 let languages = self
454 .languages
455 .lock()
456 .expect("mutex poisoned: another thread panicked while holding this lock");
457 languages.get(name).cloned()
458 }
459
460 pub async fn add_route_definition(
464 &self,
465 definition: RouteDefinition,
466 ) -> Result<(), CamelError> {
467 use crate::lifecycle::ports::RouteRegistrationPort;
468 info!(
469 from = definition.from_uri(),
470 route_id = %definition.route_id(),
471 "Adding route definition"
472 );
473 self.runtime.register_route(definition).await
474 }
475
476 fn next_context_command_id(op: &str, route_id: &str) -> String {
477 let seq = CONTEXT_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
478 format!("context:{op}:{route_id}:{seq}")
479 }
480
481 pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
483 self.registry
484 .lock()
485 .expect("mutex poisoned: another thread panicked while holding this lock")
486 }
487
488 pub fn runtime_execution_handle(&self) -> RuntimeExecutionHandle {
490 RuntimeExecutionHandle {
491 controller: Arc::clone(&self.route_controller),
492 runtime: Arc::clone(&self.runtime),
493 }
494 }
495
496 pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
498 Arc::clone(&self.metrics)
499 }
500
501 pub fn runtime(&self) -> Arc<dyn camel_api::RuntimeHandle> {
503 self.runtime.clone()
504 }
505
506 pub fn producer_context(&self) -> camel_api::ProducerContext {
508 camel_api::ProducerContext::new().with_runtime(self.runtime())
509 }
510
511 pub async fn runtime_route_status(&self, route_id: &str) -> Result<Option<String>, CamelError> {
513 match self
514 .runtime()
515 .ask(camel_api::RuntimeQuery::GetRouteStatus {
516 route_id: route_id.to_string(),
517 })
518 .await
519 {
520 Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
521 Ok(_) => Err(CamelError::RouteError(
522 "unexpected runtime query response for route status".to_string(),
523 )),
524 Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
525 Err(err) => Err(err),
526 }
527 }
528
529 pub async fn start(&mut self) -> Result<(), CamelError> {
534 info!("Starting CamelContext");
535
536 for (i, service) in self.services.iter_mut().enumerate() {
538 info!("Starting service: {}", service.name());
539 if let Err(e) = service.start().await {
540 warn!(
542 "Service {} failed to start, rolling back {} services",
543 service.name(),
544 i
545 );
546 for j in (0..i).rev() {
547 if let Err(rollback_err) = self.services[j].stop().await {
548 warn!(
549 "Failed to stop service {} during rollback: {}",
550 self.services[j].name(),
551 rollback_err
552 );
553 }
554 }
555 return Err(e);
556 }
557 }
558
559 let route_ids = {
562 let controller = self.route_controller.lock().await;
563 controller.auto_startup_route_ids()
564 };
565 for route_id in route_ids {
566 self.runtime
567 .execute(camel_api::RuntimeCommand::StartRoute {
568 route_id: route_id.clone(),
569 command_id: Self::next_context_command_id("start", &route_id),
570 causation_id: None,
571 })
572 .await?;
573 }
574
575 info!("CamelContext started");
576 Ok(())
577 }
578
579 pub async fn stop(&mut self) -> Result<(), CamelError> {
581 self.stop_timeout(self.shutdown_timeout).await
582 }
583
584 pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
589 info!("Stopping CamelContext");
590
591 self.cancel_token.cancel();
593
594 let route_ids = {
597 let controller = self.route_controller.lock().await;
598 controller.shutdown_route_ids()
599 };
600 for route_id in route_ids {
601 if let Err(err) = self
602 .runtime
603 .execute(camel_api::RuntimeCommand::StopRoute {
604 route_id: route_id.clone(),
605 command_id: Self::next_context_command_id("stop", &route_id),
606 causation_id: None,
607 })
608 .await
609 {
610 warn!(route_id = %route_id, error = %err, "Runtime stop command failed during context shutdown");
611 }
612 }
613
614 let mut first_error = None;
617 for service in self.services.iter_mut().rev() {
618 info!("Stopping service: {}", service.name());
619 if let Err(e) = service.stop().await {
620 warn!("Service {} failed to stop: {}", service.name(), e);
621 if first_error.is_none() {
622 first_error = Some(e);
623 }
624 }
625 }
626
627 info!("CamelContext stopped");
628
629 if let Some(e) = first_error {
630 Err(e)
631 } else {
632 Ok(())
633 }
634 }
635
636 pub fn shutdown_timeout(&self) -> std::time::Duration {
638 self.shutdown_timeout
639 }
640
641 pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
643 self.shutdown_timeout = timeout;
644 }
645
646 pub async fn abort(&mut self) {
648 self.cancel_token.cancel();
649 let route_ids = {
650 let controller = self.route_controller.lock().await;
651 controller.shutdown_route_ids()
652 };
653 for route_id in route_ids {
654 let _ = self
655 .runtime
656 .execute(camel_api::RuntimeCommand::StopRoute {
657 route_id: route_id.clone(),
658 command_id: Self::next_context_command_id("abort-stop", &route_id),
659 causation_id: None,
660 })
661 .await;
662 }
663 }
664
665 pub fn health_check(&self) -> HealthReport {
667 let services: Vec<ServiceHealth> = self
668 .services
669 .iter()
670 .map(|s| ServiceHealth {
671 name: s.name().to_string(),
672 status: s.status(),
673 })
674 .collect();
675
676 let status = if services.iter().all(|s| s.status == ServiceStatus::Started) {
677 HealthStatus::Healthy
678 } else {
679 HealthStatus::Unhealthy
680 };
681
682 HealthReport {
683 status,
684 services,
685 ..Default::default()
686 }
687 }
688
689 pub fn set_component_config<T: 'static + Send + Sync>(&mut self, config: T) {
691 self.component_configs
692 .insert(TypeId::of::<T>(), Box::new(config));
693 }
694
695 pub fn get_component_config<T: 'static + Send + Sync>(&self) -> Option<&T> {
697 self.component_configs
698 .get(&TypeId::of::<T>())
699 .and_then(|b| b.downcast_ref::<T>())
700 }
701}
702
703impl Default for CamelContext {
704 fn default() -> Self {
705 Self::new()
706 }
707}
708
709#[cfg(test)]
710mod tests {
711 use super::*;
712 use crate::lifecycle::application::route_definition::{
713 BuilderStep, LanguageExpressionDef, RouteDefinition,
714 };
715 use crate::lifecycle::domain::{RouteRuntimeAggregate, RouteRuntimeState};
716 use async_trait::async_trait;
717 use camel_api::CamelError;
718 use camel_api::{
719 CanonicalRouteSpec, RuntimeCommand, RuntimeCommandResult, RuntimeQuery, RuntimeQueryResult,
720 };
721 use camel_component::{Component, ConcurrencyModel, Consumer, ConsumerContext, Endpoint};
722 use tempfile::tempdir;
723
724 struct MockComponent;
726
727 impl Component for MockComponent {
728 fn scheme(&self) -> &str {
729 "mock"
730 }
731
732 fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
733 Err(CamelError::ComponentNotFound("mock".to_string()))
734 }
735 }
736
737 struct HoldConsumer;
738
739 #[async_trait]
740 impl Consumer for HoldConsumer {
741 async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
742 ctx.cancelled().await;
743 Ok(())
744 }
745
746 async fn stop(&mut self) -> Result<(), CamelError> {
747 Ok(())
748 }
749
750 fn concurrency_model(&self) -> ConcurrencyModel {
751 ConcurrencyModel::Sequential
752 }
753 }
754
755 struct HoldEndpoint;
756
757 impl Endpoint for HoldEndpoint {
758 fn uri(&self) -> &str {
759 "hold:test"
760 }
761
762 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
763 Ok(Box::new(HoldConsumer))
764 }
765
766 fn create_producer(
767 &self,
768 _ctx: &camel_api::ProducerContext,
769 ) -> Result<camel_api::BoxProcessor, CamelError> {
770 Err(CamelError::RouteError("no producer".to_string()))
771 }
772 }
773
774 struct HoldComponent;
775
776 impl Component for HoldComponent {
777 fn scheme(&self) -> &str {
778 "hold"
779 }
780
781 fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
782 Ok(Box::new(HoldEndpoint))
783 }
784 }
785
786 #[test]
787 fn test_context_handles_mutex_poisoning_gracefully() {
788 let mut ctx = CamelContext::new();
789
790 ctx.register_component(MockComponent);
792
793 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
795 let _guard = ctx.registry();
796 }));
797
798 assert!(
799 result.is_ok(),
800 "Registry access should handle mutex poisoning"
801 );
802 }
803
804 #[test]
805 fn test_context_resolves_simple_language() {
806 let ctx = CamelContext::new();
807 let lang = ctx
808 .resolve_language("simple")
809 .expect("simple language not found");
810 assert_eq!(lang.name(), "simple");
811 }
812
813 #[test]
814 fn test_simple_language_via_context() {
815 let ctx = CamelContext::new();
816 let lang = ctx.resolve_language("simple").unwrap();
817 let pred = lang.create_predicate("${header.x} == 'hello'").unwrap();
818 let mut msg = camel_api::message::Message::default();
819 msg.set_header("x", camel_api::Value::String("hello".into()));
820 let ex = camel_api::exchange::Exchange::new(msg);
821 assert!(pred.matches(&ex).unwrap());
822 }
823
824 #[test]
825 fn test_resolve_unknown_language_returns_none() {
826 let ctx = CamelContext::new();
827 assert!(ctx.resolve_language("nonexistent").is_none());
828 }
829
830 #[test]
831 fn test_register_language_duplicate_returns_error() {
832 use camel_language_api::LanguageError;
833 struct DummyLang;
834 impl camel_language_api::Language for DummyLang {
835 fn name(&self) -> &'static str {
836 "dummy"
837 }
838 fn create_expression(
839 &self,
840 _: &str,
841 ) -> Result<Box<dyn camel_language_api::Expression>, LanguageError> {
842 Err(LanguageError::EvalError("not implemented".into()))
843 }
844 fn create_predicate(
845 &self,
846 _: &str,
847 ) -> Result<Box<dyn camel_language_api::Predicate>, LanguageError> {
848 Err(LanguageError::EvalError("not implemented".into()))
849 }
850 }
851
852 let mut ctx = CamelContext::new();
853 ctx.register_language("dummy", Box::new(DummyLang)).unwrap();
854 let result = ctx.register_language("dummy", Box::new(DummyLang));
855 assert!(result.is_err(), "duplicate registration should fail");
856 let err_msg = result.unwrap_err().to_string();
857 assert!(
858 err_msg.contains("dummy"),
859 "error should mention the language name"
860 );
861 }
862
863 #[test]
864 fn test_register_language_new_key_succeeds() {
865 use camel_language_api::LanguageError;
866 struct DummyLang;
867 impl camel_language_api::Language for DummyLang {
868 fn name(&self) -> &'static str {
869 "dummy"
870 }
871 fn create_expression(
872 &self,
873 _: &str,
874 ) -> Result<Box<dyn camel_language_api::Expression>, LanguageError> {
875 Err(LanguageError::EvalError("not implemented".into()))
876 }
877 fn create_predicate(
878 &self,
879 _: &str,
880 ) -> Result<Box<dyn camel_language_api::Predicate>, LanguageError> {
881 Err(LanguageError::EvalError("not implemented".into()))
882 }
883 }
884
885 let mut ctx = CamelContext::new();
886 let result = ctx.register_language("dummy", Box::new(DummyLang));
887 assert!(result.is_ok(), "first registration should succeed");
888 }
889
890 #[tokio::test]
891 async fn test_add_route_definition_uses_runtime_registered_language() {
892 use camel_language_api::{Expression, LanguageError, Predicate};
893
894 struct DummyExpression;
895 impl Expression for DummyExpression {
896 fn evaluate(
897 &self,
898 _exchange: &camel_api::Exchange,
899 ) -> Result<camel_api::Value, LanguageError> {
900 Ok(camel_api::Value::String("ok".into()))
901 }
902 }
903
904 struct DummyPredicate;
905 impl Predicate for DummyPredicate {
906 fn matches(&self, _exchange: &camel_api::Exchange) -> Result<bool, LanguageError> {
907 Ok(true)
908 }
909 }
910
911 struct RuntimeLang;
912 impl camel_language_api::Language for RuntimeLang {
913 fn name(&self) -> &'static str {
914 "runtime"
915 }
916
917 fn create_expression(
918 &self,
919 _script: &str,
920 ) -> Result<Box<dyn Expression>, LanguageError> {
921 Ok(Box::new(DummyExpression))
922 }
923
924 fn create_predicate(&self, _script: &str) -> Result<Box<dyn Predicate>, LanguageError> {
925 Ok(Box::new(DummyPredicate))
926 }
927 }
928
929 let mut ctx = CamelContext::new();
930 ctx.register_language("runtime", Box::new(RuntimeLang))
931 .unwrap();
932
933 let definition = RouteDefinition::new(
934 "timer:tick",
935 vec![BuilderStep::DeclarativeScript {
936 expression: LanguageExpressionDef {
937 language: "runtime".into(),
938 source: "${body}".into(),
939 },
940 }],
941 )
942 .with_route_id("runtime-lang-route");
943
944 let result = ctx.add_route_definition(definition).await;
945 assert!(
946 result.is_ok(),
947 "route should resolve runtime language: {result:?}"
948 );
949 }
950
951 #[tokio::test]
952 async fn test_add_route_definition_fails_for_unregistered_runtime_language() {
953 let mut ctx = CamelContext::new();
954 let definition = RouteDefinition::new(
955 "timer:tick",
956 vec![BuilderStep::DeclarativeSetBody {
957 value: crate::route::ValueSourceDef::Expression(LanguageExpressionDef {
958 language: "missing-lang".into(),
959 source: "${body}".into(),
960 }),
961 }],
962 )
963 .with_route_id("missing-runtime-lang-route");
964
965 let result = ctx.add_route_definition(definition).await;
966 assert!(
967 result.is_err(),
968 "route should fail when language is missing"
969 );
970 let error_text = result.unwrap_err().to_string();
971 assert!(
972 error_text.contains("missing-lang"),
973 "error should mention missing language, got: {error_text}"
974 );
975 }
976
977 #[tokio::test]
978 async fn add_route_definition_does_not_require_mut() {
979 let ctx = CamelContext::new();
980 let definition = RouteDefinition::new("timer:tick", vec![]).with_route_id("immutable-ctx");
981
982 let result = ctx.add_route_definition(definition).await;
983 assert!(
984 result.is_ok(),
985 "immutable context should add route: {result:?}"
986 );
987 }
988
989 #[test]
990 fn test_health_check_empty_context() {
991 let ctx = CamelContext::new();
992 let report = ctx.health_check();
993
994 assert_eq!(report.status, HealthStatus::Healthy);
995 assert!(report.services.is_empty());
996 }
997
998 #[tokio::test]
999 async fn context_exposes_runtime_command_and_query_buses() {
1000 let ctx = CamelContext::new();
1001 let runtime = ctx.runtime();
1002
1003 let register = runtime
1004 .execute(RuntimeCommand::RegisterRoute {
1005 spec: CanonicalRouteSpec::new("runtime-r1", "timer:tick"),
1006 command_id: "cmd-1".into(),
1007 causation_id: None,
1008 })
1009 .await
1010 .unwrap();
1011 assert!(matches!(
1012 register,
1013 RuntimeCommandResult::RouteRegistered { ref route_id } if route_id == "runtime-r1"
1014 ));
1015
1016 let query = runtime
1017 .ask(RuntimeQuery::GetRouteStatus {
1018 route_id: "runtime-r1".into(),
1019 })
1020 .await
1021 .unwrap();
1022 assert!(matches!(
1023 query,
1024 RuntimeQueryResult::RouteStatus { ref status, .. } if status == "Registered"
1025 ));
1026 }
1027
1028 #[tokio::test]
1029 async fn context_with_explicit_runtime_journal_path_persists_events_to_file() {
1030 let dir = tempdir().unwrap();
1031 let journal_path = dir.path().join("context-runtime-events.jsonl");
1032 let mut ctx = CamelContext::new_with_runtime_journal_path(journal_path.clone());
1033 ctx.register_component(HoldComponent);
1034
1035 ctx.runtime()
1036 .execute(RuntimeCommand::RegisterRoute {
1037 spec: CanonicalRouteSpec::new("journal-path-r1", "hold:test"),
1038 command_id: "journal-c1".into(),
1039 causation_id: None,
1040 })
1041 .await
1042 .unwrap();
1043 ctx.runtime()
1044 .execute(RuntimeCommand::StartRoute {
1045 route_id: "journal-path-r1".into(),
1046 command_id: "journal-c2".into(),
1047 causation_id: Some("journal-c1".into()),
1048 })
1049 .await
1050 .unwrap();
1051
1052 let data = std::fs::read_to_string(&journal_path).unwrap();
1053 assert!(
1054 data.contains("journal-path-r1"),
1055 "journal file must contain route id"
1056 );
1057 assert!(
1058 data.contains("RouteRegistered"),
1059 "journal file must contain route registered event"
1060 );
1061 assert!(
1062 data.contains("RouteStarted"),
1063 "journal file must contain route started event"
1064 );
1065 }
1066
1067 #[tokio::test]
1068 async fn default_runtime_journal_isolated_per_context_without_env_override() {
1069 if let Ok(value) = std::env::var("CAMEL_RUNTIME_JOURNAL_PATH")
1070 && !value.trim().is_empty()
1071 {
1072 return;
1073 }
1074
1075 let first = CamelContext::new();
1076 first
1077 .runtime()
1078 .execute(RuntimeCommand::RegisterRoute {
1079 spec: CanonicalRouteSpec::new("default-isolation-r1", "timer:tick"),
1080 command_id: "iso-c1".into(),
1081 causation_id: None,
1082 })
1083 .await
1084 .unwrap();
1085
1086 let second = CamelContext::new();
1087 second
1088 .runtime()
1089 .execute(RuntimeCommand::RegisterRoute {
1090 spec: CanonicalRouteSpec::new("default-isolation-r1", "timer:tick"),
1091 command_id: "iso-c2".into(),
1092 causation_id: None,
1093 })
1094 .await
1095 .unwrap();
1096 }
1097
1098 #[tokio::test]
1099 async fn runtime_commands_drive_real_route_controller_lifecycle() {
1100 let mut ctx = CamelContext::new();
1101 ctx.register_component(HoldComponent);
1102 let runtime = ctx.runtime();
1103
1104 runtime
1105 .execute(RuntimeCommand::RegisterRoute {
1106 spec: CanonicalRouteSpec::new("runtime-hold", "hold:test"),
1107 command_id: "c1".into(),
1108 causation_id: None,
1109 })
1110 .await
1111 .unwrap();
1112
1113 assert_eq!(
1114 ctx.runtime_route_status("runtime-hold").await.unwrap(),
1115 Some("Registered".to_string())
1116 );
1117 assert!(matches!(
1118 ctx.runtime.repo().load("runtime-hold").await.unwrap(),
1119 Some(agg)
1120 if matches!(agg.state(), crate::lifecycle::domain::RouteRuntimeState::Registered)
1121 ));
1122
1123 runtime
1124 .execute(RuntimeCommand::StartRoute {
1125 route_id: "runtime-hold".into(),
1126 command_id: "c2".into(),
1127 causation_id: Some("c1".into()),
1128 })
1129 .await
1130 .unwrap();
1131 assert_eq!(
1132 ctx.runtime_route_status("runtime-hold").await.unwrap(),
1133 Some("Started".to_string())
1134 );
1135 assert!(matches!(
1136 ctx.runtime.repo().load("runtime-hold").await.unwrap(),
1137 Some(agg)
1138 if matches!(agg.state(), crate::lifecycle::domain::RouteRuntimeState::Started)
1139 ));
1140
1141 runtime
1142 .execute(RuntimeCommand::SuspendRoute {
1143 route_id: "runtime-hold".into(),
1144 command_id: "c3".into(),
1145 causation_id: Some("c2".into()),
1146 })
1147 .await
1148 .unwrap();
1149 assert_eq!(
1150 ctx.runtime_route_status("runtime-hold").await.unwrap(),
1151 Some("Suspended".to_string())
1152 );
1153
1154 runtime
1155 .execute(RuntimeCommand::ResumeRoute {
1156 route_id: "runtime-hold".into(),
1157 command_id: "c4".into(),
1158 causation_id: Some("c3".into()),
1159 })
1160 .await
1161 .unwrap();
1162 assert_eq!(
1163 ctx.runtime_route_status("runtime-hold").await.unwrap(),
1164 Some("Started".to_string())
1165 );
1166
1167 runtime
1168 .execute(RuntimeCommand::StopRoute {
1169 route_id: "runtime-hold".into(),
1170 command_id: "c5".into(),
1171 causation_id: Some("c4".into()),
1172 })
1173 .await
1174 .unwrap();
1175 assert_eq!(
1176 ctx.runtime_route_status("runtime-hold").await.unwrap(),
1177 Some("Stopped".to_string())
1178 );
1179
1180 runtime
1181 .execute(RuntimeCommand::ReloadRoute {
1182 route_id: "runtime-hold".into(),
1183 command_id: "c6".into(),
1184 causation_id: Some("c5".into()),
1185 })
1186 .await
1187 .unwrap();
1188 assert_eq!(
1189 ctx.runtime_route_status("runtime-hold").await.unwrap(),
1190 Some("Started".to_string())
1191 );
1192
1193 runtime
1194 .execute(RuntimeCommand::StopRoute {
1195 route_id: "runtime-hold".into(),
1196 command_id: "c7".into(),
1197 causation_id: Some("c6".into()),
1198 })
1199 .await
1200 .unwrap();
1201 assert_eq!(
1202 ctx.runtime_route_status("runtime-hold").await.unwrap(),
1203 Some("Stopped".to_string())
1204 );
1205
1206 runtime
1207 .execute(RuntimeCommand::RemoveRoute {
1208 route_id: "runtime-hold".into(),
1209 command_id: "c8".into(),
1210 causation_id: Some("c7".into()),
1211 })
1212 .await
1213 .unwrap();
1214 assert_eq!(
1215 ctx.runtime_route_status("runtime-hold").await.unwrap(),
1216 None
1217 );
1218 assert!(
1219 ctx.runtime
1220 .repo()
1221 .load("runtime-hold")
1222 .await
1223 .unwrap()
1224 .is_none()
1225 );
1226 }
1227
1228 #[tokio::test]
1229 async fn runtime_queries_read_projection_state_when_connected() {
1230 let mut ctx = CamelContext::new();
1231 ctx.register_component(HoldComponent);
1232 let runtime = ctx.runtime();
1233 runtime
1234 .execute(RuntimeCommand::RegisterRoute {
1235 spec: CanonicalRouteSpec::new("rq", "hold:test"),
1236 command_id: "c1".into(),
1237 causation_id: None,
1238 })
1239 .await
1240 .unwrap();
1241
1242 ctx.runtime_execution_handle()
1244 .force_start_route_for_test("rq")
1245 .await
1246 .unwrap();
1247
1248 let result = runtime
1249 .ask(RuntimeQuery::GetRouteStatus {
1250 route_id: "rq".into(),
1251 })
1252 .await
1253 .unwrap();
1254
1255 match result {
1256 RuntimeQueryResult::RouteStatus { status, .. } => assert_eq!(status, "Registered"),
1257 _ => panic!("unexpected query result"),
1258 }
1259 }
1260
1261 #[tokio::test]
1262 async fn add_route_definition_produces_registered_state() {
1263 let mut ctx = CamelContext::new();
1264 let definition =
1265 RouteDefinition::new("direct:test", vec![]).with_route_id("async-test-route");
1266
1267 ctx.add_route_definition(definition).await.unwrap();
1268
1269 let status = ctx
1270 .runtime()
1271 .ask(RuntimeQuery::GetRouteStatus {
1272 route_id: "async-test-route".to_string(),
1273 })
1274 .await
1275 .unwrap();
1276
1277 match status {
1278 RuntimeQueryResult::RouteStatus { status, .. } => {
1279 assert_eq!(
1280 status, "Registered",
1281 "expected Registered state after add_route_definition"
1282 );
1283 }
1284 _ => panic!("unexpected query result"),
1285 }
1286 }
1287
1288 #[tokio::test]
1289 async fn add_route_definition_injects_runtime_into_producer_context() {
1290 use std::sync::atomic::{AtomicBool, Ordering};
1291
1292 struct RuntimeAwareEndpoint {
1293 saw_runtime: Arc<AtomicBool>,
1294 }
1295
1296 impl Endpoint for RuntimeAwareEndpoint {
1297 fn uri(&self) -> &str {
1298 "runtime-aware:test"
1299 }
1300
1301 fn create_consumer(&self) -> Result<Box<dyn camel_component::Consumer>, CamelError> {
1302 Err(CamelError::RouteError("no consumer".to_string()))
1303 }
1304
1305 fn create_producer(
1306 &self,
1307 ctx: &camel_api::ProducerContext,
1308 ) -> Result<camel_api::BoxProcessor, CamelError> {
1309 self.saw_runtime
1310 .store(ctx.runtime().is_some(), Ordering::SeqCst);
1311 if ctx.runtime().is_none() {
1312 return Err(CamelError::RouteError(
1313 "runtime handle missing in ProducerContext".to_string(),
1314 ));
1315 }
1316 Ok(camel_api::BoxProcessor::new(camel_api::IdentityProcessor))
1317 }
1318 }
1319
1320 struct RuntimeAwareComponent {
1321 saw_runtime: Arc<AtomicBool>,
1322 }
1323
1324 impl Component for RuntimeAwareComponent {
1325 fn scheme(&self) -> &str {
1326 "runtime-aware"
1327 }
1328
1329 fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
1330 Ok(Box::new(RuntimeAwareEndpoint {
1331 saw_runtime: Arc::clone(&self.saw_runtime),
1332 }))
1333 }
1334 }
1335
1336 let saw_runtime = Arc::new(AtomicBool::new(false));
1337 let mut ctx = CamelContext::new();
1338 ctx.register_component(RuntimeAwareComponent {
1339 saw_runtime: Arc::clone(&saw_runtime),
1340 });
1341
1342 let definition = RouteDefinition::new(
1343 "timer:tick",
1344 vec![BuilderStep::To("runtime-aware:test".to_string())],
1345 )
1346 .with_route_id("runtime-aware-route");
1347
1348 let result = ctx.add_route_definition(definition).await;
1349 assert!(
1350 result.is_ok(),
1351 "route should resolve producer with runtime context: {result:?}"
1352 );
1353 assert!(
1354 saw_runtime.load(Ordering::SeqCst),
1355 "component producer should observe runtime handle in ProducerContext"
1356 );
1357 }
1358
1359 #[tokio::test]
1360 async fn add_route_definition_registers_runtime_projection_and_aggregate() {
1361 let mut ctx = CamelContext::new();
1362 ctx.register_component(HoldComponent);
1363
1364 let definition = RouteDefinition::new("hold:test", vec![]).with_route_id("ctx-runtime-r1");
1365 ctx.add_route_definition(definition).await.unwrap();
1366
1367 let aggregate = ctx.runtime.repo().load("ctx-runtime-r1").await.unwrap();
1368 assert!(
1369 matches!(aggregate, Some(agg) if matches!(agg.state(), RouteRuntimeState::Registered)),
1370 "route registration should seed aggregate as Registered"
1371 );
1372
1373 let status = ctx.runtime_route_status("ctx-runtime-r1").await.unwrap();
1374 assert_eq!(status.as_deref(), Some("Registered"));
1375 }
1376
1377 #[tokio::test]
1378 async fn add_route_definition_rolls_back_controller_when_runtime_registration_fails() {
1379 let mut ctx = CamelContext::new();
1380 ctx.register_component(HoldComponent);
1381
1382 ctx.runtime
1383 .repo()
1384 .save(RouteRuntimeAggregate::new("ctx-runtime-dup"))
1385 .await
1386 .unwrap();
1387
1388 let definition = RouteDefinition::new("hold:test", vec![]).with_route_id("ctx-runtime-dup");
1389 let result = ctx.add_route_definition(definition).await;
1390 assert!(result.is_err(), "duplicate runtime registration must fail");
1391
1392 assert_eq!(
1393 ctx.runtime_execution_handle()
1394 .controller_route_count_for_test()
1395 .await,
1396 0,
1397 "controller route should be rolled back on runtime bootstrap failure"
1398 );
1399 }
1400
1401 #[tokio::test]
1402 async fn context_start_stop_drives_runtime_lifecycle_via_command_bus() {
1403 let mut ctx = CamelContext::new();
1404 ctx.register_component(HoldComponent);
1405
1406 let autostart =
1407 RouteDefinition::new("hold:test", vec![]).with_route_id("ctx-lifecycle-auto");
1408 let lazy = RouteDefinition::new("hold:test", vec![])
1409 .with_route_id("ctx-lifecycle-lazy")
1410 .with_auto_startup(false);
1411
1412 ctx.add_route_definition(autostart).await.unwrap();
1413 ctx.add_route_definition(lazy).await.unwrap();
1414
1415 assert_eq!(
1416 ctx.runtime_route_status("ctx-lifecycle-auto")
1417 .await
1418 .unwrap(),
1419 Some("Registered".to_string())
1420 );
1421 assert_eq!(
1422 ctx.runtime_route_status("ctx-lifecycle-lazy")
1423 .await
1424 .unwrap(),
1425 Some("Registered".to_string())
1426 );
1427
1428 ctx.start().await.unwrap();
1429
1430 assert_eq!(
1431 ctx.runtime_route_status("ctx-lifecycle-auto")
1432 .await
1433 .unwrap(),
1434 Some("Started".to_string())
1435 );
1436 assert_eq!(
1437 ctx.runtime_route_status("ctx-lifecycle-lazy")
1438 .await
1439 .unwrap(),
1440 Some("Registered".to_string())
1441 );
1442
1443 ctx.stop().await.unwrap();
1444
1445 assert_eq!(
1446 ctx.runtime_route_status("ctx-lifecycle-auto")
1447 .await
1448 .unwrap(),
1449 Some("Stopped".to_string())
1450 );
1451 assert_eq!(
1452 ctx.runtime_route_status("ctx-lifecycle-lazy")
1453 .await
1454 .unwrap(),
1455 Some("Registered".to_string())
1456 );
1457 }
1458}
1459
1460#[cfg(test)]
1461mod lifecycle_tests {
1462 use super::*;
1463 use async_trait::async_trait;
1464 use camel_api::Lifecycle;
1465 use std::sync::Arc;
1466 use std::sync::atomic::{AtomicUsize, Ordering};
1467
1468 struct MockService {
1469 start_count: Arc<AtomicUsize>,
1470 stop_count: Arc<AtomicUsize>,
1471 }
1472
1473 impl MockService {
1474 fn new() -> (Self, Arc<AtomicUsize>, Arc<AtomicUsize>) {
1475 let start_count = Arc::new(AtomicUsize::new(0));
1476 let stop_count = Arc::new(AtomicUsize::new(0));
1477 (
1478 Self {
1479 start_count: start_count.clone(),
1480 stop_count: stop_count.clone(),
1481 },
1482 start_count,
1483 stop_count,
1484 )
1485 }
1486 }
1487
1488 #[async_trait]
1489 impl Lifecycle for MockService {
1490 fn name(&self) -> &str {
1491 "mock"
1492 }
1493
1494 async fn start(&mut self) -> Result<(), CamelError> {
1495 self.start_count.fetch_add(1, Ordering::SeqCst);
1496 Ok(())
1497 }
1498
1499 async fn stop(&mut self) -> Result<(), CamelError> {
1500 self.stop_count.fetch_add(1, Ordering::SeqCst);
1501 Ok(())
1502 }
1503 }
1504
1505 #[tokio::test]
1506 async fn test_context_starts_lifecycle_services() {
1507 let (service, start_count, stop_count) = MockService::new();
1508
1509 let mut ctx = CamelContext::new().with_lifecycle(service);
1510
1511 assert_eq!(start_count.load(Ordering::SeqCst), 0);
1512
1513 ctx.start().await.unwrap();
1514
1515 assert_eq!(start_count.load(Ordering::SeqCst), 1);
1516 assert_eq!(stop_count.load(Ordering::SeqCst), 0);
1517
1518 ctx.stop().await.unwrap();
1519
1520 assert_eq!(stop_count.load(Ordering::SeqCst), 1);
1521 }
1522
1523 #[tokio::test]
1524 async fn test_service_start_failure_rollback() {
1525 struct FailingService {
1526 start_count: Arc<AtomicUsize>,
1527 stop_count: Arc<AtomicUsize>,
1528 should_fail: bool,
1529 }
1530
1531 #[async_trait]
1532 impl Lifecycle for FailingService {
1533 fn name(&self) -> &str {
1534 "failing"
1535 }
1536
1537 async fn start(&mut self) -> Result<(), CamelError> {
1538 self.start_count.fetch_add(1, Ordering::SeqCst);
1539 if self.should_fail {
1540 Err(CamelError::ProcessorError("intentional failure".into()))
1541 } else {
1542 Ok(())
1543 }
1544 }
1545
1546 async fn stop(&mut self) -> Result<(), CamelError> {
1547 self.stop_count.fetch_add(1, Ordering::SeqCst);
1548 Ok(())
1549 }
1550 }
1551
1552 let start1 = Arc::new(AtomicUsize::new(0));
1553 let stop1 = Arc::new(AtomicUsize::new(0));
1554 let start2 = Arc::new(AtomicUsize::new(0));
1555 let stop2 = Arc::new(AtomicUsize::new(0));
1556 let start3 = Arc::new(AtomicUsize::new(0));
1557 let stop3 = Arc::new(AtomicUsize::new(0));
1558
1559 let service1 = FailingService {
1560 start_count: start1.clone(),
1561 stop_count: stop1.clone(),
1562 should_fail: false,
1563 };
1564 let service2 = FailingService {
1565 start_count: start2.clone(),
1566 stop_count: stop2.clone(),
1567 should_fail: true, };
1569 let service3 = FailingService {
1570 start_count: start3.clone(),
1571 stop_count: stop3.clone(),
1572 should_fail: false,
1573 };
1574
1575 let mut ctx = CamelContext::new()
1576 .with_lifecycle(service1)
1577 .with_lifecycle(service2)
1578 .with_lifecycle(service3);
1579
1580 let result = ctx.start().await;
1582 assert!(result.is_err());
1583
1584 assert_eq!(start1.load(Ordering::SeqCst), 1);
1586 assert_eq!(stop1.load(Ordering::SeqCst), 1);
1587
1588 assert_eq!(start2.load(Ordering::SeqCst), 1);
1590 assert_eq!(stop2.load(Ordering::SeqCst), 0);
1591
1592 assert_eq!(start3.load(Ordering::SeqCst), 0);
1594 assert_eq!(stop3.load(Ordering::SeqCst), 0);
1595 }
1596
1597 #[tokio::test]
1598 async fn test_services_stop_in_reverse_order() {
1599 use std::sync::Mutex as StdMutex;
1600
1601 struct OrderTracker {
1602 name: String,
1603 order: Arc<StdMutex<Vec<String>>>,
1604 }
1605
1606 #[async_trait]
1607 impl Lifecycle for OrderTracker {
1608 fn name(&self) -> &str {
1609 &self.name
1610 }
1611
1612 async fn start(&mut self) -> Result<(), CamelError> {
1613 Ok(())
1614 }
1615
1616 async fn stop(&mut self) -> Result<(), CamelError> {
1617 self.order.lock().unwrap().push(self.name.clone());
1618 Ok(())
1619 }
1620 }
1621
1622 let order = Arc::new(StdMutex::new(Vec::<String>::new()));
1623
1624 let s1 = OrderTracker {
1625 name: "first".into(),
1626 order: Arc::clone(&order),
1627 };
1628 let s2 = OrderTracker {
1629 name: "second".into(),
1630 order: Arc::clone(&order),
1631 };
1632 let s3 = OrderTracker {
1633 name: "third".into(),
1634 order: Arc::clone(&order),
1635 };
1636
1637 let mut ctx = CamelContext::new()
1638 .with_lifecycle(s1)
1639 .with_lifecycle(s2)
1640 .with_lifecycle(s3);
1641
1642 ctx.start().await.unwrap();
1643 ctx.stop().await.unwrap();
1644
1645 let stopped = order.lock().unwrap();
1646 assert_eq!(
1647 *stopped,
1648 vec!["third", "second", "first"],
1649 "services must stop in reverse insertion order"
1650 );
1651 }
1652}
1653
1654#[cfg(test)]
1655mod config_registry_tests {
1656 use super::*;
1657
1658 #[derive(Debug, Clone, PartialEq)]
1659 struct MyConfig {
1660 value: u32,
1661 }
1662
1663 #[test]
1664 fn test_set_and_get_component_config() {
1665 let mut ctx = CamelContext::new();
1666 ctx.set_component_config(MyConfig { value: 42 });
1667 let got = ctx.get_component_config::<MyConfig>();
1668 assert_eq!(got, Some(&MyConfig { value: 42 }));
1669 }
1670
1671 #[test]
1672 fn test_get_missing_config_returns_none() {
1673 let ctx = CamelContext::new();
1674 assert!(ctx.get_component_config::<MyConfig>().is_none());
1675 }
1676
1677 #[test]
1678 fn test_set_overwrites_previous_config() {
1679 let mut ctx = CamelContext::new();
1680 ctx.set_component_config(MyConfig { value: 1 });
1681 ctx.set_component_config(MyConfig { value: 2 });
1682 assert_eq!(ctx.get_component_config::<MyConfig>().unwrap().value, 2);
1683 }
1684}