1use std::sync::Arc;
2
3use async_trait::async_trait;
4use tokio::sync::OnceCell;
5
6use camel_api::{
7 CamelError, RuntimeCommand, RuntimeCommandBus, RuntimeCommandResult, RuntimeQuery,
8 RuntimeQueryBus, RuntimeQueryResult,
9};
10
11use crate::lifecycle::application::commands::{
12 CommandDeps, execute_command, handle_register_internal,
13};
14use crate::lifecycle::application::queries::{QueryDeps, execute_query};
15use crate::lifecycle::application::route_definition::RouteDefinition;
16use crate::lifecycle::ports::RouteRegistrationPort;
17use crate::lifecycle::ports::{
18 CommandDedupPort, EventPublisherPort, ProjectionStorePort, RouteRepositoryPort,
19 RuntimeExecutionPort, RuntimeUnitOfWorkPort,
20};
21
22pub struct RuntimeBus {
23 repo: Arc<dyn RouteRepositoryPort>,
24 projections: Arc<dyn ProjectionStorePort>,
25 events: Arc<dyn EventPublisherPort>,
26 dedup: Arc<dyn CommandDedupPort>,
27 uow: Option<Arc<dyn RuntimeUnitOfWorkPort>>,
28 execution: Option<Arc<dyn RuntimeExecutionPort>>,
29 journal_recovered_once: OnceCell<()>,
30}
31
32impl RuntimeBus {
33 pub fn new(
34 repo: Arc<dyn RouteRepositoryPort>,
35 projections: Arc<dyn ProjectionStorePort>,
36 events: Arc<dyn EventPublisherPort>,
37 dedup: Arc<dyn CommandDedupPort>,
38 ) -> Self {
39 Self {
40 repo,
41 projections,
42 events,
43 dedup,
44 uow: None,
45 execution: None,
46 journal_recovered_once: OnceCell::new(),
47 }
48 }
49
50 pub fn with_uow(mut self, uow: Arc<dyn RuntimeUnitOfWorkPort>) -> Self {
51 self.uow = Some(uow);
52 self
53 }
54
55 pub fn with_execution(mut self, execution: Arc<dyn RuntimeExecutionPort>) -> Self {
56 self.execution = Some(execution);
57 self
58 }
59
60 pub fn repo(&self) -> &Arc<dyn RouteRepositoryPort> {
61 &self.repo
62 }
63
64 fn deps(&self) -> CommandDeps {
65 CommandDeps {
66 repo: Arc::clone(&self.repo),
67 projections: Arc::clone(&self.projections),
68 events: Arc::clone(&self.events),
69 uow: self.uow.clone(),
70 execution: self.execution.clone(),
71 }
72 }
73
74 fn query_deps(&self) -> QueryDeps {
75 QueryDeps {
76 projections: Arc::clone(&self.projections),
77 }
78 }
79
80 async fn ensure_journal_recovered(&self) -> Result<(), CamelError> {
81 let Some(uow) = &self.uow else {
82 return Ok(());
83 };
84
85 self.journal_recovered_once
86 .get_or_try_init(|| async {
87 uow.recover_from_journal().await?;
88 Ok::<(), CamelError>(())
89 })
90 .await?;
91 Ok(())
92 }
93}
94
95#[async_trait]
96impl RuntimeCommandBus for RuntimeBus {
97 async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
98 self.ensure_journal_recovered().await?;
99 let command_id = cmd.command_id().to_string();
100 if !self.dedup.first_seen(&command_id).await? {
101 return Ok(RuntimeCommandResult::Duplicate { command_id });
102 }
103 let deps = self.deps();
104 match execute_command(&deps, cmd).await {
105 Ok(result) => Ok(result),
106 Err(err) => {
107 let _ = self.dedup.forget_seen(&command_id).await;
108 Err(err)
109 }
110 }
111 }
112}
113
114#[async_trait]
115impl RuntimeQueryBus for RuntimeBus {
116 async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
117 self.ensure_journal_recovered().await?;
118
119 match query {
120 RuntimeQuery::InFlightCount { route_id } => {
121 if let Some(execution) = &self.execution {
122 execution.in_flight_count(&route_id).await
123 } else {
124 Ok(RuntimeQueryResult::RouteNotFound { route_id })
125 }
126 }
127 other => {
128 let deps = self.query_deps();
129 execute_query(&deps, other).await
130 }
131 }
132 }
133}
134
135#[async_trait]
136impl RouteRegistrationPort for RuntimeBus {
137 async fn register_route(&self, def: RouteDefinition) -> Result<(), CamelError> {
138 self.ensure_journal_recovered().await?;
139 let deps = self.deps();
140 handle_register_internal(&deps, def).await.map(|_| ())
141 }
142}
143
144#[cfg(test)]
145mod tests {
146 use super::*;
147 use std::collections::{HashMap, HashSet};
148 use std::sync::Mutex;
149
150 use crate::lifecycle::application::route_definition::RouteDefinition;
151 use crate::lifecycle::domain::{RouteRuntimeAggregate, RuntimeEvent};
152 use crate::lifecycle::ports::RouteRegistrationPort as InternalRuntimeCommandBus;
153 use crate::lifecycle::ports::RouteStatusProjection;
154
155 #[derive(Clone, Default)]
156 struct InMemoryTestRepo {
157 routes: Arc<Mutex<HashMap<String, RouteRuntimeAggregate>>>,
158 }
159
160 #[async_trait]
161 impl RouteRepositoryPort for InMemoryTestRepo {
162 async fn load(&self, route_id: &str) -> Result<Option<RouteRuntimeAggregate>, CamelError> {
163 Ok(self
164 .routes
165 .lock()
166 .expect("lock test routes")
167 .get(route_id)
168 .cloned())
169 }
170
171 async fn save(&self, aggregate: RouteRuntimeAggregate) -> Result<(), CamelError> {
172 self.routes
173 .lock()
174 .expect("lock test routes")
175 .insert(aggregate.route_id().to_string(), aggregate);
176 Ok(())
177 }
178
179 async fn save_if_version(
180 &self,
181 aggregate: RouteRuntimeAggregate,
182 expected_version: u64,
183 ) -> Result<(), CamelError> {
184 let route_id = aggregate.route_id().to_string();
185 let mut routes = self.routes.lock().expect("lock test routes");
186 let current = routes.get(&route_id).ok_or_else(|| {
187 CamelError::RouteError(format!(
188 "optimistic lock conflict for route '{route_id}': route not found"
189 ))
190 })?;
191
192 if current.version() != expected_version {
193 return Err(CamelError::RouteError(format!(
194 "optimistic lock conflict for route '{route_id}': expected version {expected_version}, actual {}",
195 current.version()
196 )));
197 }
198
199 routes.insert(route_id, aggregate);
200 Ok(())
201 }
202
203 async fn delete(&self, route_id: &str) -> Result<(), CamelError> {
204 self.routes
205 .lock()
206 .expect("lock test routes")
207 .remove(route_id);
208 Ok(())
209 }
210 }
211
212 #[derive(Clone, Default)]
213 struct InMemoryTestProjectionStore {
214 statuses: Arc<Mutex<HashMap<String, RouteStatusProjection>>>,
215 }
216
217 #[async_trait]
218 impl ProjectionStorePort for InMemoryTestProjectionStore {
219 async fn upsert_status(&self, status: RouteStatusProjection) -> Result<(), CamelError> {
220 self.statuses
221 .lock()
222 .expect("lock test statuses")
223 .insert(status.route_id.clone(), status);
224 Ok(())
225 }
226
227 async fn get_status(
228 &self,
229 route_id: &str,
230 ) -> Result<Option<RouteStatusProjection>, CamelError> {
231 Ok(self
232 .statuses
233 .lock()
234 .expect("lock test statuses")
235 .get(route_id)
236 .cloned())
237 }
238
239 async fn list_statuses(&self) -> Result<Vec<RouteStatusProjection>, CamelError> {
240 Ok(self
241 .statuses
242 .lock()
243 .expect("lock test statuses")
244 .values()
245 .cloned()
246 .collect())
247 }
248
249 async fn remove_status(&self, route_id: &str) -> Result<(), CamelError> {
250 self.statuses
251 .lock()
252 .expect("lock test statuses")
253 .remove(route_id);
254 Ok(())
255 }
256 }
257
258 #[derive(Clone, Default)]
259 struct InMemoryTestEventPublisher;
260
261 #[async_trait]
262 impl EventPublisherPort for InMemoryTestEventPublisher {
263 async fn publish(&self, _events: &[RuntimeEvent]) -> Result<(), CamelError> {
264 Ok(())
265 }
266 }
267
268 #[derive(Clone, Default)]
269 struct InMemoryTestDedup {
270 seen: Arc<Mutex<HashSet<String>>>,
271 }
272
273 #[derive(Clone, Default)]
274 struct InspectableDedup {
275 seen: Arc<Mutex<HashSet<String>>>,
276 forget_calls: Arc<Mutex<u32>>,
277 }
278
279 #[async_trait]
280 impl CommandDedupPort for InMemoryTestDedup {
281 async fn first_seen(&self, command_id: &str) -> Result<bool, CamelError> {
282 let mut seen = self.seen.lock().expect("lock dedup set");
283 Ok(seen.insert(command_id.to_string()))
284 }
285
286 async fn forget_seen(&self, command_id: &str) -> Result<(), CamelError> {
287 self.seen.lock().expect("lock dedup set").remove(command_id);
288 Ok(())
289 }
290 }
291
292 #[async_trait]
293 impl CommandDedupPort for InspectableDedup {
294 async fn first_seen(&self, command_id: &str) -> Result<bool, CamelError> {
295 let mut seen = self.seen.lock().expect("lock dedup set");
296 Ok(seen.insert(command_id.to_string()))
297 }
298
299 async fn forget_seen(&self, command_id: &str) -> Result<(), CamelError> {
300 self.seen.lock().expect("lock dedup set").remove(command_id);
301 let mut calls = self.forget_calls.lock().expect("forget calls");
302 *calls += 1;
303 Ok(())
304 }
305 }
306
307 fn build_test_runtime_bus() -> RuntimeBus {
308 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
309 let projections: Arc<dyn ProjectionStorePort> =
310 Arc::new(InMemoryTestProjectionStore::default());
311 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
312 let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
313 RuntimeBus::new(repo, projections, events, dedup)
314 }
315
316 #[derive(Default)]
317 struct CountingUow {
318 recover_calls: Arc<Mutex<u32>>,
319 }
320
321 #[derive(Default)]
322 struct FailingRecoverUow;
323
324 #[async_trait]
325 impl RuntimeUnitOfWorkPort for CountingUow {
326 async fn persist_upsert(
327 &self,
328 _aggregate: RouteRuntimeAggregate,
329 _expected_version: Option<u64>,
330 _projection: RouteStatusProjection,
331 _events: &[RuntimeEvent],
332 ) -> Result<(), CamelError> {
333 Ok(())
334 }
335
336 async fn persist_delete(
337 &self,
338 _route_id: &str,
339 _events: &[RuntimeEvent],
340 ) -> Result<(), CamelError> {
341 Ok(())
342 }
343
344 async fn recover_from_journal(&self) -> Result<(), CamelError> {
345 let mut calls = self.recover_calls.lock().expect("recover_calls");
346 *calls += 1;
347 Ok(())
348 }
349 }
350
351 #[async_trait]
352 impl RuntimeUnitOfWorkPort for FailingRecoverUow {
353 async fn persist_upsert(
354 &self,
355 _aggregate: RouteRuntimeAggregate,
356 _expected_version: Option<u64>,
357 _projection: RouteStatusProjection,
358 _events: &[RuntimeEvent],
359 ) -> Result<(), CamelError> {
360 Ok(())
361 }
362
363 async fn persist_delete(
364 &self,
365 _route_id: &str,
366 _events: &[RuntimeEvent],
367 ) -> Result<(), CamelError> {
368 Ok(())
369 }
370
371 async fn recover_from_journal(&self) -> Result<(), CamelError> {
372 Err(CamelError::RouteError("recover failed".into()))
373 }
374 }
375
376 #[derive(Default)]
377 struct InFlightExecutionPort;
378
379 #[async_trait]
380 impl RuntimeExecutionPort for InFlightExecutionPort {
381 async fn register_route(&self, _definition: RouteDefinition) -> Result<(), CamelError> {
382 Ok(())
383 }
384 async fn start_route(&self, _route_id: &str) -> Result<(), CamelError> {
385 Ok(())
386 }
387 async fn stop_route(&self, _route_id: &str) -> Result<(), CamelError> {
388 Ok(())
389 }
390 async fn suspend_route(&self, _route_id: &str) -> Result<(), CamelError> {
391 Ok(())
392 }
393 async fn resume_route(&self, _route_id: &str) -> Result<(), CamelError> {
394 Ok(())
395 }
396 async fn reload_route(&self, _route_id: &str) -> Result<(), CamelError> {
397 Ok(())
398 }
399 async fn remove_route(&self, _route_id: &str) -> Result<(), CamelError> {
400 Ok(())
401 }
402 async fn in_flight_count(&self, route_id: &str) -> Result<RuntimeQueryResult, CamelError> {
403 if route_id == "known" {
404 Ok(RuntimeQueryResult::InFlightCount {
405 route_id: route_id.to_string(),
406 count: 3,
407 })
408 } else {
409 Ok(RuntimeQueryResult::RouteNotFound {
410 route_id: route_id.to_string(),
411 })
412 }
413 }
414 }
415
416 #[tokio::test]
417 async fn runtime_bus_implements_internal_command_bus() {
418 let bus = build_test_runtime_bus();
419 let def = RouteDefinition::new("timer:test", vec![]).with_route_id("internal-route");
420 let result = InternalRuntimeCommandBus::register_route(&bus, def).await;
421 assert!(
422 result.is_ok(),
423 "internal bus registration failed: {:?}",
424 result
425 );
426
427 let status = bus
428 .ask(RuntimeQuery::GetRouteStatus {
429 route_id: "internal-route".to_string(),
430 })
431 .await
432 .unwrap();
433 match status {
434 RuntimeQueryResult::RouteStatus { status, .. } => {
435 assert_eq!(status, "Registered");
436 }
437 _ => panic!("unexpected query result"),
438 }
439 }
440
441 #[tokio::test]
442 async fn execute_returns_duplicate_for_replayed_command_id() {
443 use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
444
445 let bus = build_test_runtime_bus();
446
447 let mut spec = CanonicalRouteSpec::new("dup-route", "timer:tick");
448 spec.steps = vec![CanonicalStepSpec::Stop];
449
450 let cmd = RuntimeCommand::RegisterRoute {
451 spec: spec.clone(),
452 command_id: "dup-cmd".into(),
453 causation_id: None,
454 };
455 let first = bus.execute(cmd).await.unwrap();
456 assert!(matches!(
457 first,
458 RuntimeCommandResult::RouteRegistered { route_id } if route_id == "dup-route"
459 ));
460
461 let second = bus
462 .execute(RuntimeCommand::RegisterRoute {
463 spec,
464 command_id: "dup-cmd".into(),
465 causation_id: None,
466 })
467 .await
468 .unwrap();
469 assert!(matches!(
470 second,
471 RuntimeCommandResult::Duplicate { command_id } if command_id == "dup-cmd"
472 ));
473 }
474
475 #[tokio::test]
476 async fn ask_in_flight_count_without_execution_returns_route_not_found() {
477 let bus = build_test_runtime_bus();
478 let res = bus
479 .ask(RuntimeQuery::InFlightCount {
480 route_id: "missing".into(),
481 })
482 .await
483 .unwrap();
484 assert!(matches!(
485 res,
486 RuntimeQueryResult::RouteNotFound { route_id } if route_id == "missing"
487 ));
488 }
489
490 #[tokio::test]
491 async fn ask_in_flight_count_with_execution_delegates_to_adapter() {
492 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
493 let projections: Arc<dyn ProjectionStorePort> =
494 Arc::new(InMemoryTestProjectionStore::default());
495 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
496 let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
497 let execution: Arc<dyn RuntimeExecutionPort> = Arc::new(InFlightExecutionPort);
498 let bus = RuntimeBus::new(repo, projections, events, dedup).with_execution(execution);
499
500 let known = bus
501 .ask(RuntimeQuery::InFlightCount {
502 route_id: "known".into(),
503 })
504 .await
505 .unwrap();
506 assert!(matches!(
507 known,
508 RuntimeQueryResult::InFlightCount { route_id, count }
509 if route_id == "known" && count == 3
510 ));
511 }
512
513 #[tokio::test]
514 async fn journal_recovery_runs_once_even_with_multiple_commands() {
515 use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
516
517 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
518 let projections: Arc<dyn ProjectionStorePort> =
519 Arc::new(InMemoryTestProjectionStore::default());
520 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
521 let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
522 let uow = Arc::new(CountingUow::default());
523 let bus = RuntimeBus::new(repo, projections, events, dedup).with_uow(uow.clone());
524
525 let mut spec_a = CanonicalRouteSpec::new("a", "timer:a");
526 spec_a.steps = vec![CanonicalStepSpec::Stop];
527 let mut spec_b = CanonicalRouteSpec::new("b", "timer:b");
528 spec_b.steps = vec![CanonicalStepSpec::Stop];
529
530 bus.execute(RuntimeCommand::RegisterRoute {
531 spec: spec_a,
532 command_id: "c-a".into(),
533 causation_id: None,
534 })
535 .await
536 .unwrap();
537
538 bus.execute(RuntimeCommand::RegisterRoute {
539 spec: spec_b,
540 command_id: "c-b".into(),
541 causation_id: None,
542 })
543 .await
544 .unwrap();
545
546 let calls = *uow.recover_calls.lock().expect("recover calls");
547 assert_eq!(calls, 1, "journal recovery should run once");
548 }
549
550 #[tokio::test]
551 async fn execute_on_command_error_forgets_dedup_marker() {
552 use camel_api::runtime::{CanonicalRouteSpec, RuntimeCommand};
553
554 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
555 let projections: Arc<dyn ProjectionStorePort> =
556 Arc::new(InMemoryTestProjectionStore::default());
557 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
558 let dedup = Arc::new(InspectableDedup::default());
559 let dedup_port: Arc<dyn CommandDedupPort> = dedup.clone();
560
561 let bus = RuntimeBus::new(repo, projections, events, dedup_port);
562
563 let cmd = RuntimeCommand::RegisterRoute {
565 spec: CanonicalRouteSpec::new("", "timer:tick"),
566 command_id: "err-cmd".into(),
567 causation_id: None,
568 };
569
570 let err = bus.execute(cmd).await.expect_err("must fail");
571 assert!(err.to_string().contains("route_id cannot be empty"));
572
573 assert_eq!(*dedup.forget_calls.lock().expect("forget calls"), 1);
574 assert!(!dedup.seen.lock().expect("seen").contains("err-cmd"));
575 }
576
577 #[tokio::test]
578 async fn execute_propagates_recover_error_from_uow() {
579 use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
580
581 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
582 let projections: Arc<dyn ProjectionStorePort> =
583 Arc::new(InMemoryTestProjectionStore::default());
584 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
585 let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
586 let uow: Arc<dyn RuntimeUnitOfWorkPort> = Arc::new(FailingRecoverUow);
587
588 let bus = RuntimeBus::new(repo, projections, events, dedup).with_uow(uow);
589
590 let mut spec = CanonicalRouteSpec::new("x", "timer:x");
591 spec.steps = vec![CanonicalStepSpec::Stop];
592 let err = bus
593 .execute(RuntimeCommand::RegisterRoute {
594 spec,
595 command_id: "recover-err".into(),
596 causation_id: None,
597 })
598 .await
599 .expect_err("recover should fail");
600
601 assert!(err.to_string().contains("recover failed"));
602 }
603
604 #[tokio::test]
605 async fn ask_in_flight_count_with_execution_handles_unknown_route() {
606 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
607 let projections: Arc<dyn ProjectionStorePort> =
608 Arc::new(InMemoryTestProjectionStore::default());
609 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
610 let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
611 let execution: Arc<dyn RuntimeExecutionPort> = Arc::new(InFlightExecutionPort);
612 let bus = RuntimeBus::new(repo, projections, events, dedup).with_execution(execution);
613
614 let unknown = bus
615 .ask(RuntimeQuery::InFlightCount {
616 route_id: "unknown".into(),
617 })
618 .await
619 .unwrap();
620 assert!(matches!(
621 unknown,
622 RuntimeQueryResult::RouteNotFound { route_id } if route_id == "unknown"
623 ));
624 }
625}