camel_core/lifecycle/application/
runtime_bus.rs1use std::sync::Arc;
2
3use async_trait::async_trait;
4use tokio::sync::OnceCell;
5
6use camel_api::{
7 CamelError, RuntimeCommand, RuntimeCommandBus, RuntimeCommandResult, RuntimeQuery,
8 RuntimeQueryBus, RuntimeQueryResult,
9};
10
11use crate::lifecycle::application::commands::{
12 CommandDeps, execute_command, handle_register_internal,
13};
14use crate::lifecycle::application::queries::{QueryDeps, execute_query};
15use crate::lifecycle::application::route_definition::RouteDefinition;
16use crate::lifecycle::ports::RouteRegistrationPort;
17use crate::lifecycle::ports::{
18 CommandDedupPort, EventPublisherPort, ProjectionStorePort, RouteRepositoryPort,
19 RuntimeExecutionPort, RuntimeUnitOfWorkPort,
20};
21
22pub struct RuntimeBus {
23 repo: Arc<dyn RouteRepositoryPort>,
24 projections: Arc<dyn ProjectionStorePort>,
25 events: Arc<dyn EventPublisherPort>,
26 dedup: Arc<dyn CommandDedupPort>,
27 uow: Option<Arc<dyn RuntimeUnitOfWorkPort>>,
28 execution: Option<Arc<dyn RuntimeExecutionPort>>,
29 journal_recovered_once: OnceCell<()>,
30}
31
32impl RuntimeBus {
33 pub fn new(
34 repo: Arc<dyn RouteRepositoryPort>,
35 projections: Arc<dyn ProjectionStorePort>,
36 events: Arc<dyn EventPublisherPort>,
37 dedup: Arc<dyn CommandDedupPort>,
38 ) -> Self {
39 Self {
40 repo,
41 projections,
42 events,
43 dedup,
44 uow: None,
45 execution: None,
46 journal_recovered_once: OnceCell::new(),
47 }
48 }
49
50 pub fn with_uow(mut self, uow: Arc<dyn RuntimeUnitOfWorkPort>) -> Self {
51 self.uow = Some(uow);
52 self
53 }
54
55 pub fn with_execution(mut self, execution: Arc<dyn RuntimeExecutionPort>) -> Self {
56 self.execution = Some(execution);
57 self
58 }
59
60 pub fn repo(&self) -> &Arc<dyn RouteRepositoryPort> {
61 &self.repo
62 }
63
64 fn deps(&self) -> CommandDeps {
65 CommandDeps {
66 repo: Arc::clone(&self.repo),
67 projections: Arc::clone(&self.projections),
68 events: Arc::clone(&self.events),
69 uow: self.uow.clone(),
70 execution: self.execution.clone(),
71 }
72 }
73
74 fn query_deps(&self) -> QueryDeps {
75 QueryDeps {
76 projections: Arc::clone(&self.projections),
77 }
78 }
79
80 async fn ensure_journal_recovered(&self) -> Result<(), CamelError> {
81 let Some(uow) = &self.uow else {
82 return Ok(());
83 };
84
85 self.journal_recovered_once
86 .get_or_try_init(|| async {
87 uow.recover_from_journal().await?;
88 Ok::<(), CamelError>(())
89 })
90 .await?;
91 Ok(())
92 }
93}
94
95#[async_trait]
96impl RuntimeCommandBus for RuntimeBus {
97 async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
98 self.ensure_journal_recovered().await?;
99 let command_id = cmd.command_id().to_string();
100 if !self.dedup.first_seen(&command_id).await? {
101 return Ok(RuntimeCommandResult::Duplicate { command_id });
102 }
103 let deps = self.deps();
104 match execute_command(&deps, cmd).await {
105 Ok(result) => Ok(result),
106 Err(err) => {
107 let _ = self.dedup.forget_seen(&command_id).await;
108 Err(err)
109 }
110 }
111 }
112}
113
114#[async_trait]
115impl RuntimeQueryBus for RuntimeBus {
116 async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
117 self.ensure_journal_recovered().await?;
118 let deps = self.query_deps();
119 execute_query(&deps, query).await
120 }
121}
122
123#[async_trait]
124impl RouteRegistrationPort for RuntimeBus {
125 async fn register_route(&self, def: RouteDefinition) -> Result<(), CamelError> {
126 self.ensure_journal_recovered().await?;
127 let deps = self.deps();
128 handle_register_internal(&deps, def).await.map(|_| ())
129 }
130}
131
132#[cfg(test)]
133mod tests {
134 use super::*;
135 use std::collections::{HashMap, HashSet};
136 use std::sync::Mutex;
137
138 use crate::lifecycle::application::route_definition::RouteDefinition;
139 use crate::lifecycle::domain::{RouteRuntimeAggregate, RuntimeEvent};
140 use crate::lifecycle::ports::RouteRegistrationPort as InternalRuntimeCommandBus;
141 use crate::lifecycle::ports::RouteStatusProjection;
142
143 #[derive(Clone, Default)]
144 struct InMemoryTestRepo {
145 routes: Arc<Mutex<HashMap<String, RouteRuntimeAggregate>>>,
146 }
147
148 #[async_trait]
149 impl RouteRepositoryPort for InMemoryTestRepo {
150 async fn load(&self, route_id: &str) -> Result<Option<RouteRuntimeAggregate>, CamelError> {
151 Ok(self
152 .routes
153 .lock()
154 .expect("lock test routes")
155 .get(route_id)
156 .cloned())
157 }
158
159 async fn save(&self, aggregate: RouteRuntimeAggregate) -> Result<(), CamelError> {
160 self.routes
161 .lock()
162 .expect("lock test routes")
163 .insert(aggregate.route_id().to_string(), aggregate);
164 Ok(())
165 }
166
167 async fn save_if_version(
168 &self,
169 aggregate: RouteRuntimeAggregate,
170 expected_version: u64,
171 ) -> Result<(), CamelError> {
172 let route_id = aggregate.route_id().to_string();
173 let mut routes = self.routes.lock().expect("lock test routes");
174 let current = routes.get(&route_id).ok_or_else(|| {
175 CamelError::RouteError(format!(
176 "optimistic lock conflict for route '{route_id}': route not found"
177 ))
178 })?;
179
180 if current.version() != expected_version {
181 return Err(CamelError::RouteError(format!(
182 "optimistic lock conflict for route '{route_id}': expected version {expected_version}, actual {}",
183 current.version()
184 )));
185 }
186
187 routes.insert(route_id, aggregate);
188 Ok(())
189 }
190
191 async fn delete(&self, route_id: &str) -> Result<(), CamelError> {
192 self.routes
193 .lock()
194 .expect("lock test routes")
195 .remove(route_id);
196 Ok(())
197 }
198 }
199
200 #[derive(Clone, Default)]
201 struct InMemoryTestProjectionStore {
202 statuses: Arc<Mutex<HashMap<String, RouteStatusProjection>>>,
203 }
204
205 #[async_trait]
206 impl ProjectionStorePort for InMemoryTestProjectionStore {
207 async fn upsert_status(&self, status: RouteStatusProjection) -> Result<(), CamelError> {
208 self.statuses
209 .lock()
210 .expect("lock test statuses")
211 .insert(status.route_id.clone(), status);
212 Ok(())
213 }
214
215 async fn get_status(
216 &self,
217 route_id: &str,
218 ) -> Result<Option<RouteStatusProjection>, CamelError> {
219 Ok(self
220 .statuses
221 .lock()
222 .expect("lock test statuses")
223 .get(route_id)
224 .cloned())
225 }
226
227 async fn list_statuses(&self) -> Result<Vec<RouteStatusProjection>, CamelError> {
228 Ok(self
229 .statuses
230 .lock()
231 .expect("lock test statuses")
232 .values()
233 .cloned()
234 .collect())
235 }
236
237 async fn remove_status(&self, route_id: &str) -> Result<(), CamelError> {
238 self.statuses
239 .lock()
240 .expect("lock test statuses")
241 .remove(route_id);
242 Ok(())
243 }
244 }
245
246 #[derive(Clone, Default)]
247 struct InMemoryTestEventPublisher;
248
249 #[async_trait]
250 impl EventPublisherPort for InMemoryTestEventPublisher {
251 async fn publish(&self, _events: &[RuntimeEvent]) -> Result<(), CamelError> {
252 Ok(())
253 }
254 }
255
256 #[derive(Clone, Default)]
257 struct InMemoryTestDedup {
258 seen: Arc<Mutex<HashSet<String>>>,
259 }
260
261 #[async_trait]
262 impl CommandDedupPort for InMemoryTestDedup {
263 async fn first_seen(&self, command_id: &str) -> Result<bool, CamelError> {
264 let mut seen = self.seen.lock().expect("lock dedup set");
265 Ok(seen.insert(command_id.to_string()))
266 }
267
268 async fn forget_seen(&self, command_id: &str) -> Result<(), CamelError> {
269 self.seen.lock().expect("lock dedup set").remove(command_id);
270 Ok(())
271 }
272 }
273
274 fn build_test_runtime_bus() -> RuntimeBus {
275 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
276 let projections: Arc<dyn ProjectionStorePort> =
277 Arc::new(InMemoryTestProjectionStore::default());
278 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
279 let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
280 RuntimeBus::new(repo, projections, events, dedup)
281 }
282
283 #[tokio::test]
284 async fn runtime_bus_implements_internal_command_bus() {
285 let bus = build_test_runtime_bus();
286 let def = RouteDefinition::new("timer:test", vec![]).with_route_id("internal-route");
287 let result = InternalRuntimeCommandBus::register_route(&bus, def).await;
288 assert!(
289 result.is_ok(),
290 "internal bus registration failed: {:?}",
291 result
292 );
293
294 let status = bus
295 .ask(RuntimeQuery::GetRouteStatus {
296 route_id: "internal-route".to_string(),
297 })
298 .await
299 .unwrap();
300 match status {
301 RuntimeQueryResult::RouteStatus { status, .. } => {
302 assert_eq!(status, "Registered");
303 }
304 _ => panic!("unexpected query result"),
305 }
306 }
307}