Skip to main content

coreon_core/
context.rs

1//! CamelContext — registry of components, endpoints, and running routes.
2
3use crate::{
4    component::Component,
5    endpoint::{Consumer, Endpoint, Producer},
6    error::{CamelError, Result},
7    exchange::Exchange,
8    route::Route,
9    uri::CamelUri,
10};
11use dashmap::DashMap;
12use std::sync::{
13    atomic::{AtomicBool, Ordering},
14    Arc,
15};
16use tokio::sync::Mutex;
17use tracing::{debug, info};
18
19/// Shared state handed to components/endpoints so they can resolve other
20/// endpoints (e.g. a Producer sending into `direct:next`).
21pub struct CamelContext {
22    components: DashMap<String, Arc<dyn Component>>,
23    endpoints: DashMap<String, Arc<dyn Endpoint>>,
24    routes: Mutex<Vec<Route>>,
25    running_consumers: Mutex<Vec<Arc<dyn Consumer>>>,
26    running: AtomicBool,
27}
28
29impl CamelContext {
30    pub fn new() -> Arc<Self> {
31        Arc::new(Self {
32            components: DashMap::new(),
33            endpoints: DashMap::new(),
34            routes: Mutex::new(Vec::new()),
35            running_consumers: Mutex::new(Vec::new()),
36            running: AtomicBool::new(false),
37        })
38    }
39
40    pub fn register_component(&self, component: Arc<dyn Component>) {
41        let scheme = component.scheme().to_owned();
42        debug!(scheme = %scheme, "registering component");
43        self.components.insert(scheme, component);
44    }
45
46    /// Resolve a URI to an Endpoint, reusing any previously-created endpoint
47    /// for the same URI. Creating an endpoint is cheap in MVP components, but
48    /// the cache is the natural place to deduplicate `direct:foo` producers
49    /// and consumers so they share the same channel.
50    pub async fn resolve_endpoint(&self, uri_str: &str) -> Result<Arc<dyn Endpoint>> {
51        if let Some(ep) = self.endpoints.get(uri_str) {
52            return Ok(ep.clone());
53        }
54        let uri = CamelUri::parse(uri_str)?;
55        let component = self
56            .components
57            .get(&uri.scheme)
58            .ok_or_else(|| CamelError::UnknownScheme(uri.scheme.clone()))?
59            .clone();
60        let ep = component.create_endpoint(&uri).await?;
61        self.endpoints.insert(uri_str.to_owned(), ep.clone());
62        Ok(ep)
63    }
64
65    /// Add a pre-built Route to the context. `start()` is what actually wires
66    /// consumers up.
67    pub async fn add_route(&self, route: Route) {
68        self.routes.lock().await.push(route);
69    }
70
71    pub async fn start(&self) -> Result<()> {
72        if self.running.swap(true, Ordering::SeqCst) {
73            return Err(CamelError::AlreadyRunning);
74        }
75        let routes = self.routes.lock().await.clone();
76        let mut started: Vec<Arc<dyn Consumer>> = Vec::with_capacity(routes.len());
77        for route in routes {
78            let consumer = route
79                .from
80                .create_consumer(route.pipeline.clone())
81                .await?;
82            consumer.start().await?;
83            info!(route = %route.id, uri = %route.from.uri(), "route started");
84            started.push(consumer);
85        }
86        *self.running_consumers.lock().await = started;
87        Ok(())
88    }
89
90    pub async fn stop(&self) -> Result<()> {
91        if !self.running.swap(false, Ordering::SeqCst) {
92            return Err(CamelError::NotRunning);
93        }
94        let consumers = std::mem::take(&mut *self.running_consumers.lock().await);
95        for c in consumers {
96            c.stop().await?;
97        }
98        Ok(())
99    }
100
101    pub fn is_running(&self) -> bool {
102        self.running.load(Ordering::Acquire)
103    }
104
105    /// ProducerTemplate equivalent — resolve an endpoint and send a one-shot
106    /// exchange. Useful for tests and for injecting messages into `direct:`
107    /// from outside the route graph.
108    pub async fn send(&self, uri: &str, exchange: &mut Exchange) -> Result<()> {
109        let ep = self.resolve_endpoint(uri).await?;
110        let producer: Arc<dyn Producer> = ep.create_producer().await?;
111        producer.send(exchange).await
112    }
113}