Skip to main content

camel_core/
context.rs

1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::path::PathBuf;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicU64, Ordering};
6use tokio::sync::Mutex;
7use tokio_util::sync::CancellationToken;
8use tracing::{info, warn};
9
10use camel_api::error_handler::ErrorHandlerConfig;
11use camel_api::{
12    CamelError, HealthReport, HealthStatus, Lifecycle, MetricsCollector, NoOpMetrics,
13    RouteController, RuntimeCommandBus, RuntimeQueryBus, ServiceHealth, ServiceStatus,
14    SupervisionConfig,
15};
16use camel_component::Component;
17use camel_language_api::Language;
18use camel_language_api::LanguageError;
19
20use crate::lifecycle::adapters::RuntimeExecutionAdapter;
21use crate::lifecycle::adapters::route_controller::{
22    DefaultRouteController, RouteControllerInternal, SharedLanguageRegistry,
23};
24use crate::lifecycle::application::route_definition::RouteDefinition;
25use crate::lifecycle::application::runtime_bus::RuntimeBus;
26use crate::lifecycle::application::supervision_service::SupervisingRouteController;
27use crate::shared::components::domain::Registry;
28use crate::shared::observability::domain::TracerConfig;
29
30static CONTEXT_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
31
32/// The CamelContext is the runtime engine that manages components, routes, and their lifecycle.
33///
34/// # Lifecycle
35///
36/// A `CamelContext` is single-use: call [`start()`](Self::start) once to launch routes,
37/// then [`stop()`](Self::stop) or [`abort()`](Self::abort) to shut down. Restarting a
38/// stopped context is not supported — create a new instance instead.
39pub struct CamelContext {
40    registry: Arc<std::sync::Mutex<Registry>>,
41    route_controller: Arc<Mutex<dyn RouteControllerInternal>>,
42    runtime: Arc<RuntimeBus>,
43    cancel_token: CancellationToken,
44    metrics: Arc<dyn MetricsCollector>,
45    languages: SharedLanguageRegistry,
46    shutdown_timeout: std::time::Duration,
47    services: Vec<Box<dyn Lifecycle>>,
48    component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
49}
50
51/// Opaque handle for runtime side-effect execution operations.
52///
53/// This intentionally does not expose direct lifecycle mutation APIs to callers.
54#[derive(Clone)]
55pub struct RuntimeExecutionHandle {
56    controller: Arc<Mutex<dyn RouteControllerInternal>>,
57    runtime: Arc<RuntimeBus>,
58}
59
60impl RuntimeExecutionHandle {
61    pub(crate) async fn add_route_definition(
62        &self,
63        definition: RouteDefinition,
64    ) -> Result<(), CamelError> {
65        use crate::lifecycle::ports::RouteRegistrationPort;
66        self.runtime.register_route(definition).await
67    }
68
69    pub(crate) async fn compile_route_definition(
70        &self,
71        definition: RouteDefinition,
72    ) -> Result<camel_api::BoxProcessor, CamelError> {
73        let controller = self.controller.lock().await;
74        controller.compile_route_definition(definition)
75    }
76
77    pub(crate) async fn swap_route_pipeline(
78        &self,
79        route_id: &str,
80        pipeline: camel_api::BoxProcessor,
81    ) -> Result<(), CamelError> {
82        let controller = self.controller.lock().await;
83        controller.swap_pipeline(route_id, pipeline)
84    }
85
86    pub(crate) async fn execute_runtime_command(
87        &self,
88        cmd: camel_api::RuntimeCommand,
89    ) -> Result<camel_api::RuntimeCommandResult, CamelError> {
90        self.runtime.execute(cmd).await
91    }
92
93    pub(crate) async fn runtime_route_status(
94        &self,
95        route_id: &str,
96    ) -> Result<Option<String>, CamelError> {
97        match self
98            .runtime
99            .ask(camel_api::RuntimeQuery::GetRouteStatus {
100                route_id: route_id.to_string(),
101            })
102            .await
103        {
104            Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
105            Ok(_) => Err(CamelError::RouteError(
106                "unexpected runtime query response for route status".to_string(),
107            )),
108            Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
109            Err(err) => Err(err),
110        }
111    }
112
113    pub(crate) async fn runtime_route_ids(&self) -> Result<Vec<String>, CamelError> {
114        match self.runtime.ask(camel_api::RuntimeQuery::ListRoutes).await {
115            Ok(camel_api::RuntimeQueryResult::Routes { route_ids }) => Ok(route_ids),
116            Ok(_) => Err(CamelError::RouteError(
117                "unexpected runtime query response for route listing".to_string(),
118            )),
119            Err(err) => Err(err),
120        }
121    }
122
123    #[cfg(test)]
124    pub(crate) async fn force_start_route_for_test(
125        &self,
126        route_id: &str,
127    ) -> Result<(), CamelError> {
128        let mut controller = self.controller.lock().await;
129        controller.start_route(route_id).await
130    }
131
132    #[cfg(test)]
133    pub(crate) async fn controller_route_count_for_test(&self) -> usize {
134        let controller = self.controller.lock().await;
135        controller.route_count()
136    }
137}
138
139impl CamelContext {
140    fn built_in_languages() -> SharedLanguageRegistry {
141        let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
142        languages.insert(
143            "simple".to_string(),
144            Arc::new(camel_language_simple::SimpleLanguage),
145        );
146        Arc::new(std::sync::Mutex::new(languages))
147    }
148
149    fn build_runtime(
150        controller: Arc<Mutex<dyn RouteControllerInternal>>,
151        store: crate::lifecycle::adapters::InMemoryRuntimeStore,
152    ) -> Arc<RuntimeBus> {
153        let execution = Arc::new(RuntimeExecutionAdapter::new(Arc::clone(&controller)));
154        Arc::new(
155            RuntimeBus::new(
156                Arc::new(store.clone()),
157                Arc::new(store.clone()),
158                Arc::new(store.clone()),
159                Arc::new(store.clone()),
160            )
161            .with_uow(Arc::new(store))
162            .with_execution(execution),
163        )
164    }
165
166    fn runtime_store_with_journal_path(
167        path: PathBuf,
168    ) -> crate::lifecycle::adapters::InMemoryRuntimeStore {
169        let store = crate::lifecycle::adapters::InMemoryRuntimeStore::default();
170        match crate::lifecycle::adapters::FileRuntimeEventJournal::new(path.clone()) {
171            Ok(journal) => store.with_journal(Arc::new(journal)),
172            Err(err) => {
173                warn!(
174                    journal_path = %path.display(),
175                    error = %err,
176                    "Failed to initialize runtime event journal; falling back to in-memory-only runtime store"
177                );
178                store
179            }
180        }
181    }
182
183    /// Create a new, empty CamelContext.
184    ///
185    /// Runtime state is in-memory/ephemeral by default.
186    pub fn new() -> Self {
187        Self::with_metrics(Arc::new(NoOpMetrics))
188    }
189
190    /// Create a new CamelContext with an explicit runtime journal file path.
191    ///
192    /// This opt-in path enables local runtime replay/durability semantics.
193    pub fn new_with_runtime_journal_path(path: impl Into<PathBuf>) -> Self {
194        Self::with_metrics_and_runtime_journal_path(Arc::new(NoOpMetrics), path)
195    }
196
197    /// Create a new CamelContext with a custom metrics collector.
198    pub fn with_metrics(metrics: Arc<dyn MetricsCollector>) -> Self {
199        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
200        let languages = Self::built_in_languages();
201        let controller: Arc<Mutex<dyn RouteControllerInternal>> = Arc::new(Mutex::new(
202            DefaultRouteController::with_languages(Arc::clone(&registry), Arc::clone(&languages)),
203        ));
204        let store = crate::lifecycle::adapters::InMemoryRuntimeStore::default();
205        let runtime = Self::build_runtime(Arc::clone(&controller), store);
206
207        // Set self-ref so DefaultRouteController can create ProducerContext
208        // Use try_lock since we just created it and nobody else has access yet
209        let mut controller_guard = controller
210            .try_lock()
211            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access");
212        controller_guard.set_self_ref(Arc::clone(&controller) as Arc<Mutex<dyn RouteController>>);
213        let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
214        controller_guard.set_runtime_handle(runtime_handle);
215        drop(controller_guard);
216
217        Self {
218            registry,
219            route_controller: controller,
220            runtime,
221            cancel_token: CancellationToken::new(),
222            metrics,
223            languages,
224            shutdown_timeout: std::time::Duration::from_secs(30),
225            services: Vec::new(),
226            component_configs: HashMap::new(),
227        }
228    }
229
230    /// Create a new CamelContext with custom metrics and explicit runtime journal file path.
231    pub fn with_metrics_and_runtime_journal_path(
232        metrics: Arc<dyn MetricsCollector>,
233        journal_path: impl Into<PathBuf>,
234    ) -> Self {
235        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
236        let languages = Self::built_in_languages();
237        let controller: Arc<Mutex<dyn RouteControllerInternal>> = Arc::new(Mutex::new(
238            DefaultRouteController::with_languages(Arc::clone(&registry), Arc::clone(&languages)),
239        ));
240        let store = Self::runtime_store_with_journal_path(journal_path.into());
241        let runtime = Self::build_runtime(Arc::clone(&controller), store);
242
243        // Set self-ref so DefaultRouteController can create ProducerContext
244        // Use try_lock since we just created it and nobody else has access yet
245        let mut controller_guard = controller
246            .try_lock()
247            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access");
248        controller_guard.set_self_ref(Arc::clone(&controller) as Arc<Mutex<dyn RouteController>>);
249        let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
250        controller_guard.set_runtime_handle(runtime_handle);
251        drop(controller_guard);
252
253        Self {
254            registry,
255            route_controller: controller,
256            runtime,
257            cancel_token: CancellationToken::new(),
258            metrics,
259            languages,
260            shutdown_timeout: std::time::Duration::from_secs(30),
261            services: Vec::new(),
262            component_configs: HashMap::new(),
263        }
264    }
265
266    /// Create a new CamelContext with route supervision enabled.
267    ///
268    /// The supervision config controls automatic restart behavior for crashed routes.
269    pub fn with_supervision(config: SupervisionConfig) -> Self {
270        Self::with_supervision_and_metrics(config, Arc::new(NoOpMetrics))
271    }
272
273    /// Create a new CamelContext with route supervision and custom metrics.
274    ///
275    /// The supervision config controls automatic restart behavior for crashed routes.
276    pub fn with_supervision_and_metrics(
277        config: SupervisionConfig,
278        metrics: Arc<dyn MetricsCollector>,
279    ) -> Self {
280        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
281        let languages = Self::built_in_languages();
282        let controller: Arc<Mutex<dyn RouteControllerInternal>> = Arc::new(Mutex::new(
283            SupervisingRouteController::with_languages(
284                Arc::clone(&registry),
285                config,
286                Arc::clone(&languages),
287            )
288            .with_metrics(Arc::clone(&metrics)),
289        ));
290        let store = crate::lifecycle::adapters::InMemoryRuntimeStore::default();
291        let runtime = Self::build_runtime(Arc::clone(&controller), store);
292
293        // Set self-ref so SupervisingRouteController can create ProducerContext
294        // Use try_lock since we just created it and nobody else has access yet
295        let mut controller_guard = controller
296            .try_lock()
297            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access");
298        controller_guard.set_self_ref(Arc::clone(&controller) as Arc<Mutex<dyn RouteController>>);
299        let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
300        controller_guard.set_runtime_handle(runtime_handle);
301        drop(controller_guard);
302
303        Self {
304            registry,
305            route_controller: controller,
306            runtime,
307            cancel_token: CancellationToken::new(),
308            metrics,
309            languages,
310            shutdown_timeout: std::time::Duration::from_secs(30),
311            services: Vec::new(),
312            component_configs: HashMap::new(),
313        }
314    }
315
316    /// Create a new CamelContext with supervision, custom metrics, and explicit journal path.
317    pub fn with_supervision_and_metrics_and_runtime_journal_path(
318        config: SupervisionConfig,
319        metrics: Arc<dyn MetricsCollector>,
320        journal_path: impl Into<PathBuf>,
321    ) -> Self {
322        let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
323        let languages = Self::built_in_languages();
324        let controller: Arc<Mutex<dyn RouteControllerInternal>> = Arc::new(Mutex::new(
325            SupervisingRouteController::with_languages(
326                Arc::clone(&registry),
327                config,
328                Arc::clone(&languages),
329            )
330            .with_metrics(Arc::clone(&metrics)),
331        ));
332        let store = Self::runtime_store_with_journal_path(journal_path.into());
333        let runtime = Self::build_runtime(Arc::clone(&controller), store);
334
335        // Set self-ref so SupervisingRouteController can create ProducerContext
336        // Use try_lock since we just created it and nobody else has access yet
337        let mut controller_guard = controller
338            .try_lock()
339            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access");
340        controller_guard.set_self_ref(Arc::clone(&controller) as Arc<Mutex<dyn RouteController>>);
341        let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
342        controller_guard.set_runtime_handle(runtime_handle);
343        drop(controller_guard);
344
345        Self {
346            registry,
347            route_controller: controller,
348            runtime,
349            cancel_token: CancellationToken::new(),
350            metrics,
351            languages,
352            shutdown_timeout: std::time::Duration::from_secs(30),
353            services: Vec::new(),
354            component_configs: HashMap::new(),
355        }
356    }
357
358    /// Set a global error handler applied to all routes without a per-route handler.
359    pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
360        self.route_controller
361            .try_lock()
362            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
363            .set_error_handler(config);
364    }
365
366    /// Enable or disable tracing globally.
367    pub fn set_tracing(&mut self, enabled: bool) {
368        self.route_controller
369            .try_lock()
370            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
371            .set_tracer_config(&TracerConfig {
372                enabled,
373                ..Default::default()
374            });
375    }
376
377    /// Configure tracing with full config.
378    pub fn set_tracer_config(&mut self, config: TracerConfig) {
379        // Inject metrics collector if not already set
380        let config = if config.metrics_collector.is_none() {
381            TracerConfig {
382                metrics_collector: Some(Arc::clone(&self.metrics)),
383                ..config
384            }
385        } else {
386            config
387        };
388
389        self.route_controller
390            .try_lock()
391            .expect("BUG: CamelContext lock contention — try_lock should always succeed here since &mut self prevents concurrent access")
392            .set_tracer_config(&config);
393    }
394
395    /// Builder-style: enable tracing with default config.
396    pub fn with_tracing(mut self) -> Self {
397        self.set_tracing(true);
398        self
399    }
400
401    /// Builder-style: configure tracing with custom config.
402    /// Note: tracing subscriber initialization (stdout/file output) is handled
403    /// separately via init_tracing_subscriber (called in camel-config bridge).
404    pub fn with_tracer_config(mut self, config: TracerConfig) -> Self {
405        self.set_tracer_config(config);
406        self
407    }
408
409    /// Register a lifecycle service (Apache Camel: addService pattern)
410    pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
411        // Auto-register MetricsCollector if available
412        if let Some(collector) = service.as_metrics_collector() {
413            self.metrics = collector;
414        }
415
416        self.services.push(Box::new(service));
417        self
418    }
419
420    /// Register a component with this context.
421    pub fn register_component<C: Component + 'static>(&mut self, component: C) {
422        info!(scheme = component.scheme(), "Registering component");
423        self.registry
424            .lock()
425            .expect("mutex poisoned: another thread panicked while holding this lock")
426            .register(component);
427    }
428
429    /// Register a language with this context, keyed by name.
430    ///
431    /// Returns `Err(LanguageError::AlreadyRegistered)` if a language with the
432    /// same name is already registered. Use [`resolve_language`](Self::resolve_language)
433    /// to check before registering, or choose a distinct name.
434    pub fn register_language(
435        &mut self,
436        name: impl Into<String>,
437        lang: Box<dyn Language>,
438    ) -> Result<(), LanguageError> {
439        let name = name.into();
440        let mut languages = self
441            .languages
442            .lock()
443            .expect("mutex poisoned: another thread panicked while holding this lock");
444        if languages.contains_key(&name) {
445            return Err(LanguageError::AlreadyRegistered(name));
446        }
447        languages.insert(name, Arc::from(lang));
448        Ok(())
449    }
450
451    /// Resolve a language by name. Returns `None` if not registered.
452    pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
453        let languages = self
454            .languages
455            .lock()
456            .expect("mutex poisoned: another thread panicked while holding this lock");
457        languages.get(name).cloned()
458    }
459
460    /// Add a route definition to this context.
461    ///
462    /// The route must have an ID. Steps are resolved immediately using registered components.
463    pub async fn add_route_definition(
464        &self,
465        definition: RouteDefinition,
466    ) -> Result<(), CamelError> {
467        use crate::lifecycle::ports::RouteRegistrationPort;
468        info!(
469            from = definition.from_uri(),
470            route_id = %definition.route_id(),
471            "Adding route definition"
472        );
473        self.runtime.register_route(definition).await
474    }
475
476    fn next_context_command_id(op: &str, route_id: &str) -> String {
477        let seq = CONTEXT_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
478        format!("context:{op}:{route_id}:{seq}")
479    }
480
481    /// Access the component registry.
482    pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
483        self.registry
484            .lock()
485            .expect("mutex poisoned: another thread panicked while holding this lock")
486    }
487
488    /// Get runtime execution handle for file-watcher integrations.
489    pub fn runtime_execution_handle(&self) -> RuntimeExecutionHandle {
490        RuntimeExecutionHandle {
491            controller: Arc::clone(&self.route_controller),
492            runtime: Arc::clone(&self.runtime),
493        }
494    }
495
496    /// Get the metrics collector.
497    pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
498        Arc::clone(&self.metrics)
499    }
500
501    /// Get runtime command/query bus handle.
502    pub fn runtime(&self) -> Arc<dyn camel_api::RuntimeHandle> {
503        self.runtime.clone()
504    }
505
506    /// Build a producer context wired to this runtime.
507    pub fn producer_context(&self) -> camel_api::ProducerContext {
508        camel_api::ProducerContext::new().with_runtime(self.runtime())
509    }
510
511    /// Query route status via runtime read-model.
512    pub async fn runtime_route_status(&self, route_id: &str) -> Result<Option<String>, CamelError> {
513        match self
514            .runtime()
515            .ask(camel_api::RuntimeQuery::GetRouteStatus {
516                route_id: route_id.to_string(),
517            })
518            .await
519        {
520            Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
521            Ok(_) => Err(CamelError::RouteError(
522                "unexpected runtime query response for route status".to_string(),
523            )),
524            Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
525            Err(err) => Err(err),
526        }
527    }
528
529    /// Start all routes. Each route's consumer will begin producing exchanges.
530    ///
531    /// Only routes with `auto_startup == true` will be started, in order of their
532    /// `startup_order` (lower values start first).
533    pub async fn start(&mut self) -> Result<(), CamelError> {
534        info!("Starting CamelContext");
535
536        // Start lifecycle services first
537        for (i, service) in self.services.iter_mut().enumerate() {
538            info!("Starting service: {}", service.name());
539            if let Err(e) = service.start().await {
540                // Rollback: stop already started services in reverse order
541                warn!(
542                    "Service {} failed to start, rolling back {} services",
543                    service.name(),
544                    i
545                );
546                for j in (0..i).rev() {
547                    if let Err(rollback_err) = self.services[j].stop().await {
548                        warn!(
549                            "Failed to stop service {} during rollback: {}",
550                            self.services[j].name(),
551                            rollback_err
552                        );
553                    }
554                }
555                return Err(e);
556            }
557        }
558
559        // Then start routes via runtime command bus (aggregate-first),
560        // preserving route controller startup ordering metadata.
561        let route_ids = {
562            let controller = self.route_controller.lock().await;
563            controller.auto_startup_route_ids()
564        };
565        for route_id in route_ids {
566            self.runtime
567                .execute(camel_api::RuntimeCommand::StartRoute {
568                    route_id: route_id.clone(),
569                    command_id: Self::next_context_command_id("start", &route_id),
570                    causation_id: None,
571                })
572                .await?;
573        }
574
575        info!("CamelContext started");
576        Ok(())
577    }
578
579    /// Graceful shutdown with default 30-second timeout.
580    pub async fn stop(&mut self) -> Result<(), CamelError> {
581        self.stop_timeout(self.shutdown_timeout).await
582    }
583
584    /// Graceful shutdown with custom timeout.
585    ///
586    /// Note: The timeout parameter is currently not used directly; the RouteController
587    /// manages its own shutdown timeout. This may change in a future version.
588    pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
589        info!("Stopping CamelContext");
590
591        // Signal cancellation (for any legacy code that might use it)
592        self.cancel_token.cancel();
593
594        // Stop all routes via runtime command bus (aggregate-first),
595        // preserving route controller shutdown ordering metadata.
596        let route_ids = {
597            let controller = self.route_controller.lock().await;
598            controller.shutdown_route_ids()
599        };
600        for route_id in route_ids {
601            if let Err(err) = self
602                .runtime
603                .execute(camel_api::RuntimeCommand::StopRoute {
604                    route_id: route_id.clone(),
605                    command_id: Self::next_context_command_id("stop", &route_id),
606                    causation_id: None,
607                })
608                .await
609            {
610                warn!(route_id = %route_id, error = %err, "Runtime stop command failed during context shutdown");
611            }
612        }
613
614        // Then stop lifecycle services in reverse insertion order (LIFO)
615        // Continue stopping all services even if some fail
616        let mut first_error = None;
617        for service in self.services.iter_mut().rev() {
618            info!("Stopping service: {}", service.name());
619            if let Err(e) = service.stop().await {
620                warn!("Service {} failed to stop: {}", service.name(), e);
621                if first_error.is_none() {
622                    first_error = Some(e);
623                }
624            }
625        }
626
627        info!("CamelContext stopped");
628
629        if let Some(e) = first_error {
630            Err(e)
631        } else {
632            Ok(())
633        }
634    }
635
636    /// Get the graceful shutdown timeout used by [`stop()`](Self::stop).
637    pub fn shutdown_timeout(&self) -> std::time::Duration {
638        self.shutdown_timeout
639    }
640
641    /// Set the graceful shutdown timeout used by [`stop()`](Self::stop).
642    pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
643        self.shutdown_timeout = timeout;
644    }
645
646    /// Immediate abort — kills all tasks without draining.
647    pub async fn abort(&mut self) {
648        self.cancel_token.cancel();
649        let route_ids = {
650            let controller = self.route_controller.lock().await;
651            controller.shutdown_route_ids()
652        };
653        for route_id in route_ids {
654            let _ = self
655                .runtime
656                .execute(camel_api::RuntimeCommand::StopRoute {
657                    route_id: route_id.clone(),
658                    command_id: Self::next_context_command_id("abort-stop", &route_id),
659                    causation_id: None,
660                })
661                .await;
662        }
663    }
664
665    /// Check health status of all registered services.
666    pub fn health_check(&self) -> HealthReport {
667        let services: Vec<ServiceHealth> = self
668            .services
669            .iter()
670            .map(|s| ServiceHealth {
671                name: s.name().to_string(),
672                status: s.status(),
673            })
674            .collect();
675
676        let status = if services.iter().all(|s| s.status == ServiceStatus::Started) {
677            HealthStatus::Healthy
678        } else {
679            HealthStatus::Unhealthy
680        };
681
682        HealthReport {
683            status,
684            services,
685            ..Default::default()
686        }
687    }
688
689    /// Store a component config. Overwrites any previously stored config of the same type.
690    pub fn set_component_config<T: 'static + Send + Sync>(&mut self, config: T) {
691        self.component_configs
692            .insert(TypeId::of::<T>(), Box::new(config));
693    }
694
695    /// Retrieve a stored component config by type. Returns None if not stored.
696    pub fn get_component_config<T: 'static + Send + Sync>(&self) -> Option<&T> {
697        self.component_configs
698            .get(&TypeId::of::<T>())
699            .and_then(|b| b.downcast_ref::<T>())
700    }
701}
702
703impl Default for CamelContext {
704    fn default() -> Self {
705        Self::new()
706    }
707}
708
709#[cfg(test)]
710mod tests {
711    use super::*;
712    use crate::lifecycle::application::route_definition::{
713        BuilderStep, LanguageExpressionDef, RouteDefinition,
714    };
715    use crate::lifecycle::domain::{RouteRuntimeAggregate, RouteRuntimeState};
716    use async_trait::async_trait;
717    use camel_api::CamelError;
718    use camel_api::{
719        CanonicalRouteSpec, RuntimeCommand, RuntimeCommandResult, RuntimeQuery, RuntimeQueryResult,
720    };
721    use camel_component::{Component, ConcurrencyModel, Consumer, ConsumerContext, Endpoint};
722    use tempfile::tempdir;
723
724    /// Mock component for testing
725    struct MockComponent;
726
727    impl Component for MockComponent {
728        fn scheme(&self) -> &str {
729            "mock"
730        }
731
732        fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
733            Err(CamelError::ComponentNotFound("mock".to_string()))
734        }
735    }
736
737    struct HoldConsumer;
738
739    #[async_trait]
740    impl Consumer for HoldConsumer {
741        async fn start(&mut self, ctx: ConsumerContext) -> Result<(), CamelError> {
742            ctx.cancelled().await;
743            Ok(())
744        }
745
746        async fn stop(&mut self) -> Result<(), CamelError> {
747            Ok(())
748        }
749
750        fn concurrency_model(&self) -> ConcurrencyModel {
751            ConcurrencyModel::Sequential
752        }
753    }
754
755    struct HoldEndpoint;
756
757    impl Endpoint for HoldEndpoint {
758        fn uri(&self) -> &str {
759            "hold:test"
760        }
761
762        fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
763            Ok(Box::new(HoldConsumer))
764        }
765
766        fn create_producer(
767            &self,
768            _ctx: &camel_api::ProducerContext,
769        ) -> Result<camel_api::BoxProcessor, CamelError> {
770            Err(CamelError::RouteError("no producer".to_string()))
771        }
772    }
773
774    struct HoldComponent;
775
776    impl Component for HoldComponent {
777        fn scheme(&self) -> &str {
778            "hold"
779        }
780
781        fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
782            Ok(Box::new(HoldEndpoint))
783        }
784    }
785
786    #[test]
787    fn test_context_handles_mutex_poisoning_gracefully() {
788        let mut ctx = CamelContext::new();
789
790        // Register a component successfully
791        ctx.register_component(MockComponent);
792
793        // Access registry should work even after potential panic in another thread
794        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
795            let _guard = ctx.registry();
796        }));
797
798        assert!(
799            result.is_ok(),
800            "Registry access should handle mutex poisoning"
801        );
802    }
803
804    #[test]
805    fn test_context_resolves_simple_language() {
806        let ctx = CamelContext::new();
807        let lang = ctx
808            .resolve_language("simple")
809            .expect("simple language not found");
810        assert_eq!(lang.name(), "simple");
811    }
812
813    #[test]
814    fn test_simple_language_via_context() {
815        let ctx = CamelContext::new();
816        let lang = ctx.resolve_language("simple").unwrap();
817        let pred = lang.create_predicate("${header.x} == 'hello'").unwrap();
818        let mut msg = camel_api::message::Message::default();
819        msg.set_header("x", camel_api::Value::String("hello".into()));
820        let ex = camel_api::exchange::Exchange::new(msg);
821        assert!(pred.matches(&ex).unwrap());
822    }
823
824    #[test]
825    fn test_resolve_unknown_language_returns_none() {
826        let ctx = CamelContext::new();
827        assert!(ctx.resolve_language("nonexistent").is_none());
828    }
829
830    #[test]
831    fn test_register_language_duplicate_returns_error() {
832        use camel_language_api::LanguageError;
833        struct DummyLang;
834        impl camel_language_api::Language for DummyLang {
835            fn name(&self) -> &'static str {
836                "dummy"
837            }
838            fn create_expression(
839                &self,
840                _: &str,
841            ) -> Result<Box<dyn camel_language_api::Expression>, LanguageError> {
842                Err(LanguageError::EvalError("not implemented".into()))
843            }
844            fn create_predicate(
845                &self,
846                _: &str,
847            ) -> Result<Box<dyn camel_language_api::Predicate>, LanguageError> {
848                Err(LanguageError::EvalError("not implemented".into()))
849            }
850        }
851
852        let mut ctx = CamelContext::new();
853        ctx.register_language("dummy", Box::new(DummyLang)).unwrap();
854        let result = ctx.register_language("dummy", Box::new(DummyLang));
855        assert!(result.is_err(), "duplicate registration should fail");
856        let err_msg = result.unwrap_err().to_string();
857        assert!(
858            err_msg.contains("dummy"),
859            "error should mention the language name"
860        );
861    }
862
863    #[test]
864    fn test_register_language_new_key_succeeds() {
865        use camel_language_api::LanguageError;
866        struct DummyLang;
867        impl camel_language_api::Language for DummyLang {
868            fn name(&self) -> &'static str {
869                "dummy"
870            }
871            fn create_expression(
872                &self,
873                _: &str,
874            ) -> Result<Box<dyn camel_language_api::Expression>, LanguageError> {
875                Err(LanguageError::EvalError("not implemented".into()))
876            }
877            fn create_predicate(
878                &self,
879                _: &str,
880            ) -> Result<Box<dyn camel_language_api::Predicate>, LanguageError> {
881                Err(LanguageError::EvalError("not implemented".into()))
882            }
883        }
884
885        let mut ctx = CamelContext::new();
886        let result = ctx.register_language("dummy", Box::new(DummyLang));
887        assert!(result.is_ok(), "first registration should succeed");
888    }
889
890    #[tokio::test]
891    async fn test_add_route_definition_uses_runtime_registered_language() {
892        use camel_language_api::{Expression, LanguageError, Predicate};
893
894        struct DummyExpression;
895        impl Expression for DummyExpression {
896            fn evaluate(
897                &self,
898                _exchange: &camel_api::Exchange,
899            ) -> Result<camel_api::Value, LanguageError> {
900                Ok(camel_api::Value::String("ok".into()))
901            }
902        }
903
904        struct DummyPredicate;
905        impl Predicate for DummyPredicate {
906            fn matches(&self, _exchange: &camel_api::Exchange) -> Result<bool, LanguageError> {
907                Ok(true)
908            }
909        }
910
911        struct RuntimeLang;
912        impl camel_language_api::Language for RuntimeLang {
913            fn name(&self) -> &'static str {
914                "runtime"
915            }
916
917            fn create_expression(
918                &self,
919                _script: &str,
920            ) -> Result<Box<dyn Expression>, LanguageError> {
921                Ok(Box::new(DummyExpression))
922            }
923
924            fn create_predicate(&self, _script: &str) -> Result<Box<dyn Predicate>, LanguageError> {
925                Ok(Box::new(DummyPredicate))
926            }
927        }
928
929        let mut ctx = CamelContext::new();
930        ctx.register_language("runtime", Box::new(RuntimeLang))
931            .unwrap();
932
933        let definition = RouteDefinition::new(
934            "timer:tick",
935            vec![BuilderStep::DeclarativeScript {
936                expression: LanguageExpressionDef {
937                    language: "runtime".into(),
938                    source: "${body}".into(),
939                },
940            }],
941        )
942        .with_route_id("runtime-lang-route");
943
944        let result = ctx.add_route_definition(definition).await;
945        assert!(
946            result.is_ok(),
947            "route should resolve runtime language: {result:?}"
948        );
949    }
950
951    #[tokio::test]
952    async fn test_add_route_definition_fails_for_unregistered_runtime_language() {
953        let mut ctx = CamelContext::new();
954        let definition = RouteDefinition::new(
955            "timer:tick",
956            vec![BuilderStep::DeclarativeSetBody {
957                value: crate::route::ValueSourceDef::Expression(LanguageExpressionDef {
958                    language: "missing-lang".into(),
959                    source: "${body}".into(),
960                }),
961            }],
962        )
963        .with_route_id("missing-runtime-lang-route");
964
965        let result = ctx.add_route_definition(definition).await;
966        assert!(
967            result.is_err(),
968            "route should fail when language is missing"
969        );
970        let error_text = result.unwrap_err().to_string();
971        assert!(
972            error_text.contains("missing-lang"),
973            "error should mention missing language, got: {error_text}"
974        );
975    }
976
977    #[tokio::test]
978    async fn add_route_definition_does_not_require_mut() {
979        let ctx = CamelContext::new();
980        let definition = RouteDefinition::new("timer:tick", vec![]).with_route_id("immutable-ctx");
981
982        let result = ctx.add_route_definition(definition).await;
983        assert!(
984            result.is_ok(),
985            "immutable context should add route: {result:?}"
986        );
987    }
988
989    #[test]
990    fn test_health_check_empty_context() {
991        let ctx = CamelContext::new();
992        let report = ctx.health_check();
993
994        assert_eq!(report.status, HealthStatus::Healthy);
995        assert!(report.services.is_empty());
996    }
997
998    #[tokio::test]
999    async fn context_exposes_runtime_command_and_query_buses() {
1000        let ctx = CamelContext::new();
1001        let runtime = ctx.runtime();
1002
1003        let register = runtime
1004            .execute(RuntimeCommand::RegisterRoute {
1005                spec: CanonicalRouteSpec::new("runtime-r1", "timer:tick"),
1006                command_id: "cmd-1".into(),
1007                causation_id: None,
1008            })
1009            .await
1010            .unwrap();
1011        assert!(matches!(
1012            register,
1013            RuntimeCommandResult::RouteRegistered { ref route_id } if route_id == "runtime-r1"
1014        ));
1015
1016        let query = runtime
1017            .ask(RuntimeQuery::GetRouteStatus {
1018                route_id: "runtime-r1".into(),
1019            })
1020            .await
1021            .unwrap();
1022        assert!(matches!(
1023            query,
1024            RuntimeQueryResult::RouteStatus { ref status, .. } if status == "Registered"
1025        ));
1026    }
1027
1028    #[tokio::test]
1029    async fn context_with_explicit_runtime_journal_path_persists_events_to_file() {
1030        let dir = tempdir().unwrap();
1031        let journal_path = dir.path().join("context-runtime-events.jsonl");
1032        let mut ctx = CamelContext::new_with_runtime_journal_path(journal_path.clone());
1033        ctx.register_component(HoldComponent);
1034
1035        ctx.runtime()
1036            .execute(RuntimeCommand::RegisterRoute {
1037                spec: CanonicalRouteSpec::new("journal-path-r1", "hold:test"),
1038                command_id: "journal-c1".into(),
1039                causation_id: None,
1040            })
1041            .await
1042            .unwrap();
1043        ctx.runtime()
1044            .execute(RuntimeCommand::StartRoute {
1045                route_id: "journal-path-r1".into(),
1046                command_id: "journal-c2".into(),
1047                causation_id: Some("journal-c1".into()),
1048            })
1049            .await
1050            .unwrap();
1051
1052        let data = std::fs::read_to_string(&journal_path).unwrap();
1053        assert!(
1054            data.contains("journal-path-r1"),
1055            "journal file must contain route id"
1056        );
1057        assert!(
1058            data.contains("RouteRegistered"),
1059            "journal file must contain route registered event"
1060        );
1061        assert!(
1062            data.contains("RouteStarted"),
1063            "journal file must contain route started event"
1064        );
1065    }
1066
1067    #[tokio::test]
1068    async fn default_runtime_journal_isolated_per_context_without_env_override() {
1069        if let Ok(value) = std::env::var("CAMEL_RUNTIME_JOURNAL_PATH")
1070            && !value.trim().is_empty()
1071        {
1072            return;
1073        }
1074
1075        let first = CamelContext::new();
1076        first
1077            .runtime()
1078            .execute(RuntimeCommand::RegisterRoute {
1079                spec: CanonicalRouteSpec::new("default-isolation-r1", "timer:tick"),
1080                command_id: "iso-c1".into(),
1081                causation_id: None,
1082            })
1083            .await
1084            .unwrap();
1085
1086        let second = CamelContext::new();
1087        second
1088            .runtime()
1089            .execute(RuntimeCommand::RegisterRoute {
1090                spec: CanonicalRouteSpec::new("default-isolation-r1", "timer:tick"),
1091                command_id: "iso-c2".into(),
1092                causation_id: None,
1093            })
1094            .await
1095            .unwrap();
1096    }
1097
1098    #[tokio::test]
1099    async fn runtime_commands_drive_real_route_controller_lifecycle() {
1100        let mut ctx = CamelContext::new();
1101        ctx.register_component(HoldComponent);
1102        let runtime = ctx.runtime();
1103
1104        runtime
1105            .execute(RuntimeCommand::RegisterRoute {
1106                spec: CanonicalRouteSpec::new("runtime-hold", "hold:test"),
1107                command_id: "c1".into(),
1108                causation_id: None,
1109            })
1110            .await
1111            .unwrap();
1112
1113        assert_eq!(
1114            ctx.runtime_route_status("runtime-hold").await.unwrap(),
1115            Some("Registered".to_string())
1116        );
1117        assert!(matches!(
1118            ctx.runtime.repo().load("runtime-hold").await.unwrap(),
1119            Some(agg)
1120                if matches!(agg.state(), crate::lifecycle::domain::RouteRuntimeState::Registered)
1121        ));
1122
1123        runtime
1124            .execute(RuntimeCommand::StartRoute {
1125                route_id: "runtime-hold".into(),
1126                command_id: "c2".into(),
1127                causation_id: Some("c1".into()),
1128            })
1129            .await
1130            .unwrap();
1131        assert_eq!(
1132            ctx.runtime_route_status("runtime-hold").await.unwrap(),
1133            Some("Started".to_string())
1134        );
1135        assert!(matches!(
1136            ctx.runtime.repo().load("runtime-hold").await.unwrap(),
1137            Some(agg)
1138                if matches!(agg.state(), crate::lifecycle::domain::RouteRuntimeState::Started)
1139        ));
1140
1141        runtime
1142            .execute(RuntimeCommand::SuspendRoute {
1143                route_id: "runtime-hold".into(),
1144                command_id: "c3".into(),
1145                causation_id: Some("c2".into()),
1146            })
1147            .await
1148            .unwrap();
1149        assert_eq!(
1150            ctx.runtime_route_status("runtime-hold").await.unwrap(),
1151            Some("Suspended".to_string())
1152        );
1153
1154        runtime
1155            .execute(RuntimeCommand::ResumeRoute {
1156                route_id: "runtime-hold".into(),
1157                command_id: "c4".into(),
1158                causation_id: Some("c3".into()),
1159            })
1160            .await
1161            .unwrap();
1162        assert_eq!(
1163            ctx.runtime_route_status("runtime-hold").await.unwrap(),
1164            Some("Started".to_string())
1165        );
1166
1167        runtime
1168            .execute(RuntimeCommand::StopRoute {
1169                route_id: "runtime-hold".into(),
1170                command_id: "c5".into(),
1171                causation_id: Some("c4".into()),
1172            })
1173            .await
1174            .unwrap();
1175        assert_eq!(
1176            ctx.runtime_route_status("runtime-hold").await.unwrap(),
1177            Some("Stopped".to_string())
1178        );
1179
1180        runtime
1181            .execute(RuntimeCommand::ReloadRoute {
1182                route_id: "runtime-hold".into(),
1183                command_id: "c6".into(),
1184                causation_id: Some("c5".into()),
1185            })
1186            .await
1187            .unwrap();
1188        assert_eq!(
1189            ctx.runtime_route_status("runtime-hold").await.unwrap(),
1190            Some("Started".to_string())
1191        );
1192
1193        runtime
1194            .execute(RuntimeCommand::StopRoute {
1195                route_id: "runtime-hold".into(),
1196                command_id: "c7".into(),
1197                causation_id: Some("c6".into()),
1198            })
1199            .await
1200            .unwrap();
1201        assert_eq!(
1202            ctx.runtime_route_status("runtime-hold").await.unwrap(),
1203            Some("Stopped".to_string())
1204        );
1205
1206        runtime
1207            .execute(RuntimeCommand::RemoveRoute {
1208                route_id: "runtime-hold".into(),
1209                command_id: "c8".into(),
1210                causation_id: Some("c7".into()),
1211            })
1212            .await
1213            .unwrap();
1214        assert_eq!(
1215            ctx.runtime_route_status("runtime-hold").await.unwrap(),
1216            None
1217        );
1218        assert!(
1219            ctx.runtime
1220                .repo()
1221                .load("runtime-hold")
1222                .await
1223                .unwrap()
1224                .is_none()
1225        );
1226    }
1227
1228    #[tokio::test]
1229    async fn runtime_queries_read_projection_state_when_connected() {
1230        let mut ctx = CamelContext::new();
1231        ctx.register_component(HoldComponent);
1232        let runtime = ctx.runtime();
1233        runtime
1234            .execute(RuntimeCommand::RegisterRoute {
1235                spec: CanonicalRouteSpec::new("rq", "hold:test"),
1236                command_id: "c1".into(),
1237                causation_id: None,
1238            })
1239            .await
1240            .unwrap();
1241
1242        // Diverge live controller state from the projection on purpose.
1243        ctx.runtime_execution_handle()
1244            .force_start_route_for_test("rq")
1245            .await
1246            .unwrap();
1247
1248        let result = runtime
1249            .ask(RuntimeQuery::GetRouteStatus {
1250                route_id: "rq".into(),
1251            })
1252            .await
1253            .unwrap();
1254
1255        match result {
1256            RuntimeQueryResult::RouteStatus { status, .. } => assert_eq!(status, "Registered"),
1257            _ => panic!("unexpected query result"),
1258        }
1259    }
1260
1261    #[tokio::test]
1262    async fn add_route_definition_produces_registered_state() {
1263        let mut ctx = CamelContext::new();
1264        let definition =
1265            RouteDefinition::new("direct:test", vec![]).with_route_id("async-test-route");
1266
1267        ctx.add_route_definition(definition).await.unwrap();
1268
1269        let status = ctx
1270            .runtime()
1271            .ask(RuntimeQuery::GetRouteStatus {
1272                route_id: "async-test-route".to_string(),
1273            })
1274            .await
1275            .unwrap();
1276
1277        match status {
1278            RuntimeQueryResult::RouteStatus { status, .. } => {
1279                assert_eq!(
1280                    status, "Registered",
1281                    "expected Registered state after add_route_definition"
1282                );
1283            }
1284            _ => panic!("unexpected query result"),
1285        }
1286    }
1287
1288    #[tokio::test]
1289    async fn add_route_definition_injects_runtime_into_producer_context() {
1290        use std::sync::atomic::{AtomicBool, Ordering};
1291
1292        struct RuntimeAwareEndpoint {
1293            saw_runtime: Arc<AtomicBool>,
1294        }
1295
1296        impl Endpoint for RuntimeAwareEndpoint {
1297            fn uri(&self) -> &str {
1298                "runtime-aware:test"
1299            }
1300
1301            fn create_consumer(&self) -> Result<Box<dyn camel_component::Consumer>, CamelError> {
1302                Err(CamelError::RouteError("no consumer".to_string()))
1303            }
1304
1305            fn create_producer(
1306                &self,
1307                ctx: &camel_api::ProducerContext,
1308            ) -> Result<camel_api::BoxProcessor, CamelError> {
1309                self.saw_runtime
1310                    .store(ctx.runtime().is_some(), Ordering::SeqCst);
1311                if ctx.runtime().is_none() {
1312                    return Err(CamelError::RouteError(
1313                        "runtime handle missing in ProducerContext".to_string(),
1314                    ));
1315                }
1316                Ok(camel_api::BoxProcessor::new(camel_api::IdentityProcessor))
1317            }
1318        }
1319
1320        struct RuntimeAwareComponent {
1321            saw_runtime: Arc<AtomicBool>,
1322        }
1323
1324        impl Component for RuntimeAwareComponent {
1325            fn scheme(&self) -> &str {
1326                "runtime-aware"
1327            }
1328
1329            fn create_endpoint(&self, _uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
1330                Ok(Box::new(RuntimeAwareEndpoint {
1331                    saw_runtime: Arc::clone(&self.saw_runtime),
1332                }))
1333            }
1334        }
1335
1336        let saw_runtime = Arc::new(AtomicBool::new(false));
1337        let mut ctx = CamelContext::new();
1338        ctx.register_component(RuntimeAwareComponent {
1339            saw_runtime: Arc::clone(&saw_runtime),
1340        });
1341
1342        let definition = RouteDefinition::new(
1343            "timer:tick",
1344            vec![BuilderStep::To("runtime-aware:test".to_string())],
1345        )
1346        .with_route_id("runtime-aware-route");
1347
1348        let result = ctx.add_route_definition(definition).await;
1349        assert!(
1350            result.is_ok(),
1351            "route should resolve producer with runtime context: {result:?}"
1352        );
1353        assert!(
1354            saw_runtime.load(Ordering::SeqCst),
1355            "component producer should observe runtime handle in ProducerContext"
1356        );
1357    }
1358
1359    #[tokio::test]
1360    async fn add_route_definition_registers_runtime_projection_and_aggregate() {
1361        let mut ctx = CamelContext::new();
1362        ctx.register_component(HoldComponent);
1363
1364        let definition = RouteDefinition::new("hold:test", vec![]).with_route_id("ctx-runtime-r1");
1365        ctx.add_route_definition(definition).await.unwrap();
1366
1367        let aggregate = ctx.runtime.repo().load("ctx-runtime-r1").await.unwrap();
1368        assert!(
1369            matches!(aggregate, Some(agg) if matches!(agg.state(), RouteRuntimeState::Registered)),
1370            "route registration should seed aggregate as Registered"
1371        );
1372
1373        let status = ctx.runtime_route_status("ctx-runtime-r1").await.unwrap();
1374        assert_eq!(status.as_deref(), Some("Registered"));
1375    }
1376
1377    #[tokio::test]
1378    async fn add_route_definition_rolls_back_controller_when_runtime_registration_fails() {
1379        let mut ctx = CamelContext::new();
1380        ctx.register_component(HoldComponent);
1381
1382        ctx.runtime
1383            .repo()
1384            .save(RouteRuntimeAggregate::new("ctx-runtime-dup"))
1385            .await
1386            .unwrap();
1387
1388        let definition = RouteDefinition::new("hold:test", vec![]).with_route_id("ctx-runtime-dup");
1389        let result = ctx.add_route_definition(definition).await;
1390        assert!(result.is_err(), "duplicate runtime registration must fail");
1391
1392        assert_eq!(
1393            ctx.runtime_execution_handle()
1394                .controller_route_count_for_test()
1395                .await,
1396            0,
1397            "controller route should be rolled back on runtime bootstrap failure"
1398        );
1399    }
1400
1401    #[tokio::test]
1402    async fn context_start_stop_drives_runtime_lifecycle_via_command_bus() {
1403        let mut ctx = CamelContext::new();
1404        ctx.register_component(HoldComponent);
1405
1406        let autostart =
1407            RouteDefinition::new("hold:test", vec![]).with_route_id("ctx-lifecycle-auto");
1408        let lazy = RouteDefinition::new("hold:test", vec![])
1409            .with_route_id("ctx-lifecycle-lazy")
1410            .with_auto_startup(false);
1411
1412        ctx.add_route_definition(autostart).await.unwrap();
1413        ctx.add_route_definition(lazy).await.unwrap();
1414
1415        assert_eq!(
1416            ctx.runtime_route_status("ctx-lifecycle-auto")
1417                .await
1418                .unwrap(),
1419            Some("Registered".to_string())
1420        );
1421        assert_eq!(
1422            ctx.runtime_route_status("ctx-lifecycle-lazy")
1423                .await
1424                .unwrap(),
1425            Some("Registered".to_string())
1426        );
1427
1428        ctx.start().await.unwrap();
1429
1430        assert_eq!(
1431            ctx.runtime_route_status("ctx-lifecycle-auto")
1432                .await
1433                .unwrap(),
1434            Some("Started".to_string())
1435        );
1436        assert_eq!(
1437            ctx.runtime_route_status("ctx-lifecycle-lazy")
1438                .await
1439                .unwrap(),
1440            Some("Registered".to_string())
1441        );
1442
1443        ctx.stop().await.unwrap();
1444
1445        assert_eq!(
1446            ctx.runtime_route_status("ctx-lifecycle-auto")
1447                .await
1448                .unwrap(),
1449            Some("Stopped".to_string())
1450        );
1451        assert_eq!(
1452            ctx.runtime_route_status("ctx-lifecycle-lazy")
1453                .await
1454                .unwrap(),
1455            Some("Registered".to_string())
1456        );
1457    }
1458}
1459
1460#[cfg(test)]
1461mod lifecycle_tests {
1462    use super::*;
1463    use async_trait::async_trait;
1464    use camel_api::Lifecycle;
1465    use std::sync::Arc;
1466    use std::sync::atomic::{AtomicUsize, Ordering};
1467
1468    struct MockService {
1469        start_count: Arc<AtomicUsize>,
1470        stop_count: Arc<AtomicUsize>,
1471    }
1472
1473    impl MockService {
1474        fn new() -> (Self, Arc<AtomicUsize>, Arc<AtomicUsize>) {
1475            let start_count = Arc::new(AtomicUsize::new(0));
1476            let stop_count = Arc::new(AtomicUsize::new(0));
1477            (
1478                Self {
1479                    start_count: start_count.clone(),
1480                    stop_count: stop_count.clone(),
1481                },
1482                start_count,
1483                stop_count,
1484            )
1485        }
1486    }
1487
1488    #[async_trait]
1489    impl Lifecycle for MockService {
1490        fn name(&self) -> &str {
1491            "mock"
1492        }
1493
1494        async fn start(&mut self) -> Result<(), CamelError> {
1495            self.start_count.fetch_add(1, Ordering::SeqCst);
1496            Ok(())
1497        }
1498
1499        async fn stop(&mut self) -> Result<(), CamelError> {
1500            self.stop_count.fetch_add(1, Ordering::SeqCst);
1501            Ok(())
1502        }
1503    }
1504
1505    #[tokio::test]
1506    async fn test_context_starts_lifecycle_services() {
1507        let (service, start_count, stop_count) = MockService::new();
1508
1509        let mut ctx = CamelContext::new().with_lifecycle(service);
1510
1511        assert_eq!(start_count.load(Ordering::SeqCst), 0);
1512
1513        ctx.start().await.unwrap();
1514
1515        assert_eq!(start_count.load(Ordering::SeqCst), 1);
1516        assert_eq!(stop_count.load(Ordering::SeqCst), 0);
1517
1518        ctx.stop().await.unwrap();
1519
1520        assert_eq!(stop_count.load(Ordering::SeqCst), 1);
1521    }
1522
1523    #[tokio::test]
1524    async fn test_service_start_failure_rollback() {
1525        struct FailingService {
1526            start_count: Arc<AtomicUsize>,
1527            stop_count: Arc<AtomicUsize>,
1528            should_fail: bool,
1529        }
1530
1531        #[async_trait]
1532        impl Lifecycle for FailingService {
1533            fn name(&self) -> &str {
1534                "failing"
1535            }
1536
1537            async fn start(&mut self) -> Result<(), CamelError> {
1538                self.start_count.fetch_add(1, Ordering::SeqCst);
1539                if self.should_fail {
1540                    Err(CamelError::ProcessorError("intentional failure".into()))
1541                } else {
1542                    Ok(())
1543                }
1544            }
1545
1546            async fn stop(&mut self) -> Result<(), CamelError> {
1547                self.stop_count.fetch_add(1, Ordering::SeqCst);
1548                Ok(())
1549            }
1550        }
1551
1552        let start1 = Arc::new(AtomicUsize::new(0));
1553        let stop1 = Arc::new(AtomicUsize::new(0));
1554        let start2 = Arc::new(AtomicUsize::new(0));
1555        let stop2 = Arc::new(AtomicUsize::new(0));
1556        let start3 = Arc::new(AtomicUsize::new(0));
1557        let stop3 = Arc::new(AtomicUsize::new(0));
1558
1559        let service1 = FailingService {
1560            start_count: start1.clone(),
1561            stop_count: stop1.clone(),
1562            should_fail: false,
1563        };
1564        let service2 = FailingService {
1565            start_count: start2.clone(),
1566            stop_count: stop2.clone(),
1567            should_fail: true, // This one will fail
1568        };
1569        let service3 = FailingService {
1570            start_count: start3.clone(),
1571            stop_count: stop3.clone(),
1572            should_fail: false,
1573        };
1574
1575        let mut ctx = CamelContext::new()
1576            .with_lifecycle(service1)
1577            .with_lifecycle(service2)
1578            .with_lifecycle(service3);
1579
1580        // Attempt to start - should fail
1581        let result = ctx.start().await;
1582        assert!(result.is_err());
1583
1584        // Verify service1 was started and then stopped (rollback)
1585        assert_eq!(start1.load(Ordering::SeqCst), 1);
1586        assert_eq!(stop1.load(Ordering::SeqCst), 1);
1587
1588        // Verify service2 was attempted to start but failed
1589        assert_eq!(start2.load(Ordering::SeqCst), 1);
1590        assert_eq!(stop2.load(Ordering::SeqCst), 0);
1591
1592        // Verify service3 was never started
1593        assert_eq!(start3.load(Ordering::SeqCst), 0);
1594        assert_eq!(stop3.load(Ordering::SeqCst), 0);
1595    }
1596
1597    #[tokio::test]
1598    async fn test_services_stop_in_reverse_order() {
1599        use std::sync::Mutex as StdMutex;
1600
1601        struct OrderTracker {
1602            name: String,
1603            order: Arc<StdMutex<Vec<String>>>,
1604        }
1605
1606        #[async_trait]
1607        impl Lifecycle for OrderTracker {
1608            fn name(&self) -> &str {
1609                &self.name
1610            }
1611
1612            async fn start(&mut self) -> Result<(), CamelError> {
1613                Ok(())
1614            }
1615
1616            async fn stop(&mut self) -> Result<(), CamelError> {
1617                self.order.lock().unwrap().push(self.name.clone());
1618                Ok(())
1619            }
1620        }
1621
1622        let order = Arc::new(StdMutex::new(Vec::<String>::new()));
1623
1624        let s1 = OrderTracker {
1625            name: "first".into(),
1626            order: Arc::clone(&order),
1627        };
1628        let s2 = OrderTracker {
1629            name: "second".into(),
1630            order: Arc::clone(&order),
1631        };
1632        let s3 = OrderTracker {
1633            name: "third".into(),
1634            order: Arc::clone(&order),
1635        };
1636
1637        let mut ctx = CamelContext::new()
1638            .with_lifecycle(s1)
1639            .with_lifecycle(s2)
1640            .with_lifecycle(s3);
1641
1642        ctx.start().await.unwrap();
1643        ctx.stop().await.unwrap();
1644
1645        let stopped = order.lock().unwrap();
1646        assert_eq!(
1647            *stopped,
1648            vec!["third", "second", "first"],
1649            "services must stop in reverse insertion order"
1650        );
1651    }
1652}
1653
1654#[cfg(test)]
1655mod config_registry_tests {
1656    use super::*;
1657
1658    #[derive(Debug, Clone, PartialEq)]
1659    struct MyConfig {
1660        value: u32,
1661    }
1662
1663    #[test]
1664    fn test_set_and_get_component_config() {
1665        let mut ctx = CamelContext::new();
1666        ctx.set_component_config(MyConfig { value: 42 });
1667        let got = ctx.get_component_config::<MyConfig>();
1668        assert_eq!(got, Some(&MyConfig { value: 42 }));
1669    }
1670
1671    #[test]
1672    fn test_get_missing_config_returns_none() {
1673        let ctx = CamelContext::new();
1674        assert!(ctx.get_component_config::<MyConfig>().is_none());
1675    }
1676
1677    #[test]
1678    fn test_set_overwrites_previous_config() {
1679        let mut ctx = CamelContext::new();
1680        ctx.set_component_config(MyConfig { value: 1 });
1681        ctx.set_component_config(MyConfig { value: 2 });
1682        assert_eq!(ctx.get_component_config::<MyConfig>().unwrap().value, 2);
1683    }
1684}