coreon-core 0.1.0

Core abstractions for camel-rs: Exchange, Processor, Endpoint, Component, CamelContext.
Documentation
//! CamelContext — registry of components, endpoints, and running routes.

use crate::{
    component::Component,
    endpoint::{Consumer, Endpoint, Producer},
    error::{CamelError, Result},
    exchange::Exchange,
    route::Route,
    uri::CamelUri,
};
use dashmap::DashMap;
use std::sync::{
    atomic::{AtomicBool, Ordering},
    Arc,
};
use tokio::sync::Mutex;
use tracing::{debug, info};

/// Shared state handed to components/endpoints so they can resolve other
/// endpoints (e.g. a Producer sending into `direct:next`).
pub struct CamelContext {
    components: DashMap<String, Arc<dyn Component>>,
    endpoints: DashMap<String, Arc<dyn Endpoint>>,
    routes: Mutex<Vec<Route>>,
    running_consumers: Mutex<Vec<Arc<dyn Consumer>>>,
    running: AtomicBool,
}

impl CamelContext {
    pub fn new() -> Arc<Self> {
        Arc::new(Self {
            components: DashMap::new(),
            endpoints: DashMap::new(),
            routes: Mutex::new(Vec::new()),
            running_consumers: Mutex::new(Vec::new()),
            running: AtomicBool::new(false),
        })
    }

    pub fn register_component(&self, component: Arc<dyn Component>) {
        let scheme = component.scheme().to_owned();
        debug!(scheme = %scheme, "registering component");
        self.components.insert(scheme, component);
    }

    /// Resolve a URI to an Endpoint, reusing any previously-created endpoint
    /// for the same URI. Creating an endpoint is cheap in MVP components, but
    /// the cache is the natural place to deduplicate `direct:foo` producers
    /// and consumers so they share the same channel.
    pub async fn resolve_endpoint(&self, uri_str: &str) -> Result<Arc<dyn Endpoint>> {
        if let Some(ep) = self.endpoints.get(uri_str) {
            return Ok(ep.clone());
        }
        let uri = CamelUri::parse(uri_str)?;
        let component = self
            .components
            .get(&uri.scheme)
            .ok_or_else(|| CamelError::UnknownScheme(uri.scheme.clone()))?
            .clone();
        let ep = component.create_endpoint(&uri).await?;
        self.endpoints.insert(uri_str.to_owned(), ep.clone());
        Ok(ep)
    }

    /// Add a pre-built Route to the context. `start()` is what actually wires
    /// consumers up.
    pub async fn add_route(&self, route: Route) {
        self.routes.lock().await.push(route);
    }

    pub async fn start(&self) -> Result<()> {
        if self.running.swap(true, Ordering::SeqCst) {
            return Err(CamelError::AlreadyRunning);
        }
        let routes = self.routes.lock().await.clone();
        let mut started: Vec<Arc<dyn Consumer>> = Vec::with_capacity(routes.len());
        for route in routes {
            let consumer = route
                .from
                .create_consumer(route.pipeline.clone())
                .await?;
            consumer.start().await?;
            info!(route = %route.id, uri = %route.from.uri(), "route started");
            started.push(consumer);
        }
        *self.running_consumers.lock().await = started;
        Ok(())
    }

    pub async fn stop(&self) -> Result<()> {
        if !self.running.swap(false, Ordering::SeqCst) {
            return Err(CamelError::NotRunning);
        }
        let consumers = std::mem::take(&mut *self.running_consumers.lock().await);
        for c in consumers {
            c.stop().await?;
        }
        Ok(())
    }

    pub fn is_running(&self) -> bool {
        self.running.load(Ordering::Acquire)
    }

    /// ProducerTemplate equivalent — resolve an endpoint and send a one-shot
    /// exchange. Useful for tests and for injecting messages into `direct:`
    /// from outside the route graph.
    pub async fn send(&self, uri: &str, exchange: &mut Exchange) -> Result<()> {
        let ep = self.resolve_endpoint(uri).await?;
        let producer: Arc<dyn Producer> = ep.create_producer().await?;
        producer.send(exchange).await
    }
}