use crate::{
component::Component,
endpoint::{Consumer, Endpoint, Producer},
error::{CamelError, Result},
exchange::Exchange,
route::Route,
uri::CamelUri,
};
use dashmap::DashMap;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use tokio::sync::Mutex;
use tracing::{debug, info};
pub struct CamelContext {
components: DashMap<String, Arc<dyn Component>>,
endpoints: DashMap<String, Arc<dyn Endpoint>>,
routes: Mutex<Vec<Route>>,
running_consumers: Mutex<Vec<Arc<dyn Consumer>>>,
running: AtomicBool,
}
impl CamelContext {
pub fn new() -> Arc<Self> {
Arc::new(Self {
components: DashMap::new(),
endpoints: DashMap::new(),
routes: Mutex::new(Vec::new()),
running_consumers: Mutex::new(Vec::new()),
running: AtomicBool::new(false),
})
}
pub fn register_component(&self, component: Arc<dyn Component>) {
let scheme = component.scheme().to_owned();
debug!(scheme = %scheme, "registering component");
self.components.insert(scheme, component);
}
pub async fn resolve_endpoint(&self, uri_str: &str) -> Result<Arc<dyn Endpoint>> {
if let Some(ep) = self.endpoints.get(uri_str) {
return Ok(ep.clone());
}
let uri = CamelUri::parse(uri_str)?;
let component = self
.components
.get(&uri.scheme)
.ok_or_else(|| CamelError::UnknownScheme(uri.scheme.clone()))?
.clone();
let ep = component.create_endpoint(&uri).await?;
self.endpoints.insert(uri_str.to_owned(), ep.clone());
Ok(ep)
}
pub async fn add_route(&self, route: Route) {
self.routes.lock().await.push(route);
}
pub async fn start(&self) -> Result<()> {
if self.running.swap(true, Ordering::SeqCst) {
return Err(CamelError::AlreadyRunning);
}
let routes = self.routes.lock().await.clone();
let mut started: Vec<Arc<dyn Consumer>> = Vec::with_capacity(routes.len());
for route in routes {
let consumer = route
.from
.create_consumer(route.pipeline.clone())
.await?;
consumer.start().await?;
info!(route = %route.id, uri = %route.from.uri(), "route started");
started.push(consumer);
}
*self.running_consumers.lock().await = started;
Ok(())
}
pub async fn stop(&self) -> Result<()> {
if !self.running.swap(false, Ordering::SeqCst) {
return Err(CamelError::NotRunning);
}
let consumers = std::mem::take(&mut *self.running_consumers.lock().await);
for c in consumers {
c.stop().await?;
}
Ok(())
}
pub fn is_running(&self) -> bool {
self.running.load(Ordering::Acquire)
}
pub async fn send(&self, uri: &str, exchange: &mut Exchange) -> Result<()> {
let ep = self.resolve_endpoint(uri).await?;
let producer: Arc<dyn Producer> = ep.create_producer().await?;
producer.send(exchange).await
}
}