1use crate::{
4 component::Component,
5 endpoint::{Consumer, Endpoint, Producer},
6 error::{CamelError, Result},
7 exchange::Exchange,
8 route::Route,
9 uri::CamelUri,
10};
11use dashmap::DashMap;
12use std::sync::{
13 atomic::{AtomicBool, Ordering},
14 Arc,
15};
16use tokio::sync::Mutex;
17use tracing::{debug, info};
18
19pub struct CamelContext {
22 components: DashMap<String, Arc<dyn Component>>,
23 endpoints: DashMap<String, Arc<dyn Endpoint>>,
24 routes: Mutex<Vec<Route>>,
25 running_consumers: Mutex<Vec<Arc<dyn Consumer>>>,
26 running: AtomicBool,
27}
28
29impl CamelContext {
30 pub fn new() -> Arc<Self> {
31 Arc::new(Self {
32 components: DashMap::new(),
33 endpoints: DashMap::new(),
34 routes: Mutex::new(Vec::new()),
35 running_consumers: Mutex::new(Vec::new()),
36 running: AtomicBool::new(false),
37 })
38 }
39
40 pub fn register_component(&self, component: Arc<dyn Component>) {
41 let scheme = component.scheme().to_owned();
42 debug!(scheme = %scheme, "registering component");
43 self.components.insert(scheme, component);
44 }
45
46 pub async fn resolve_endpoint(&self, uri_str: &str) -> Result<Arc<dyn Endpoint>> {
51 if let Some(ep) = self.endpoints.get(uri_str) {
52 return Ok(ep.clone());
53 }
54 let uri = CamelUri::parse(uri_str)?;
55 let component = self
56 .components
57 .get(&uri.scheme)
58 .ok_or_else(|| CamelError::UnknownScheme(uri.scheme.clone()))?
59 .clone();
60 let ep = component.create_endpoint(&uri).await?;
61 self.endpoints.insert(uri_str.to_owned(), ep.clone());
62 Ok(ep)
63 }
64
65 pub async fn add_route(&self, route: Route) {
68 self.routes.lock().await.push(route);
69 }
70
71 pub async fn start(&self) -> Result<()> {
72 if self.running.swap(true, Ordering::SeqCst) {
73 return Err(CamelError::AlreadyRunning);
74 }
75 let routes = self.routes.lock().await.clone();
76 let mut started: Vec<Arc<dyn Consumer>> = Vec::with_capacity(routes.len());
77 for route in routes {
78 let consumer = route
79 .from
80 .create_consumer(route.pipeline.clone())
81 .await?;
82 consumer.start().await?;
83 info!(route = %route.id, uri = %route.from.uri(), "route started");
84 started.push(consumer);
85 }
86 *self.running_consumers.lock().await = started;
87 Ok(())
88 }
89
90 pub async fn stop(&self) -> Result<()> {
91 if !self.running.swap(false, Ordering::SeqCst) {
92 return Err(CamelError::NotRunning);
93 }
94 let consumers = std::mem::take(&mut *self.running_consumers.lock().await);
95 for c in consumers {
96 c.stop().await?;
97 }
98 Ok(())
99 }
100
101 pub fn is_running(&self) -> bool {
102 self.running.load(Ordering::Acquire)
103 }
104
105 pub async fn send(&self, uri: &str, exchange: &mut Exchange) -> Result<()> {
109 let ep = self.resolve_endpoint(uri).await?;
110 let producer: Arc<dyn Producer> = ep.create_producer().await?;
111 producer.send(exchange).await
112 }
113}