1use std::sync::Arc;
2
3use async_trait::async_trait;
4use tokio::sync::OnceCell;
5
6use camel_api::{
7 CamelError, RuntimeCommand, RuntimeCommandBus, RuntimeCommandResult, RuntimeQuery,
8 RuntimeQueryBus, RuntimeQueryResult,
9};
10
11use crate::lifecycle::application::commands::{
12 CommandDeps, execute_command, handle_register_internal,
13};
14use crate::lifecycle::application::queries::{QueryDeps, execute_query};
15use crate::lifecycle::application::route_definition::RouteDefinition;
16use crate::lifecycle::domain::DomainError;
17use crate::lifecycle::ports::RouteRegistrationPort;
18use crate::lifecycle::ports::{
19 CommandDedupPort, EventPublisherPort, InFlightCountResult, ProjectionStorePort,
20 RouteRepositoryPort, RuntimeExecutionPort, RuntimeUnitOfWorkPort,
21};
22
23impl From<InFlightCountResult> for RuntimeQueryResult {
24 fn from(r: InFlightCountResult) -> Self {
25 match r {
26 InFlightCountResult::InFlightCount { route_id, count } => {
27 RuntimeQueryResult::InFlightCount { route_id, count }
28 }
29 InFlightCountResult::RouteNotFound { route_id } => {
30 RuntimeQueryResult::RouteNotFound { route_id }
31 }
32 }
33 }
34}
35
36pub struct RuntimeBus {
37 repo: Arc<dyn RouteRepositoryPort>,
38 projections: Arc<dyn ProjectionStorePort>,
39 events: Arc<dyn EventPublisherPort>,
40 dedup: Arc<dyn CommandDedupPort>,
41 uow: Option<Arc<dyn RuntimeUnitOfWorkPort>>,
42 execution: Option<Arc<dyn RuntimeExecutionPort>>,
43 journal_recovered_once: OnceCell<()>,
44}
45
46impl RuntimeBus {
47 pub fn new(
48 repo: Arc<dyn RouteRepositoryPort>,
49 projections: Arc<dyn ProjectionStorePort>,
50 events: Arc<dyn EventPublisherPort>,
51 dedup: Arc<dyn CommandDedupPort>,
52 ) -> Self {
53 Self {
54 repo,
55 projections,
56 events,
57 dedup,
58 uow: None,
59 execution: None,
60 journal_recovered_once: OnceCell::new(),
61 }
62 }
63
64 pub fn with_uow(mut self, uow: Arc<dyn RuntimeUnitOfWorkPort>) -> Self {
65 self.uow = Some(uow);
66 self
67 }
68
69 pub fn with_execution(mut self, execution: Arc<dyn RuntimeExecutionPort>) -> Self {
70 self.execution = Some(execution);
71 self
72 }
73
74 pub fn repo(&self) -> &Arc<dyn RouteRepositoryPort> {
75 &self.repo
76 }
77
78 pub(crate) async fn register_aggregate_only(&self, route_id: String) -> Result<(), CamelError> {
79 self.ensure_journal_recovered().await?;
80 let deps = self.deps();
81 if deps.repo.load(&route_id).await?.is_some() {
82 return Err(CamelError::RouteError(format!(
83 "route '{route_id}' already registered"
84 )));
85 }
86 let (aggregate, events) =
87 crate::lifecycle::domain::RouteRuntimeAggregate::register(route_id.clone());
88 if let Some(uow) = &deps.uow {
89 uow.persist_upsert(
90 aggregate.clone(),
91 None,
92 crate::lifecycle::application::commands::project_from_aggregate(&aggregate),
93 &events,
94 )
95 .await?;
96 } else {
97 deps.repo.save(aggregate.clone()).await?;
98 if let Some(primary_error) =
99 crate::lifecycle::application::commands::upsert_projection_with_reconciliation(
100 &*deps.projections,
101 crate::lifecycle::application::commands::project_from_aggregate(&aggregate),
102 )
103 .await?
104 {
105 deps.events.publish(&events).await?;
106 return Err(CamelError::RouteError(format!(
107 "post-effect reconciliation recovered after runtime persistence error: {primary_error}"
108 )));
109 }
110 deps.events.publish(&events).await?;
111 }
112 Ok(())
113 }
114
115 fn deps(&self) -> CommandDeps {
116 CommandDeps {
117 repo: Arc::clone(&self.repo),
118 projections: Arc::clone(&self.projections),
119 events: Arc::clone(&self.events),
120 uow: self.uow.clone(),
121 execution: self.execution.clone(),
122 }
123 }
124
125 fn query_deps(&self) -> QueryDeps {
126 QueryDeps {
127 projections: Arc::clone(&self.projections),
128 }
129 }
130
131 async fn ensure_journal_recovered(&self) -> Result<(), CamelError> {
132 let Some(uow) = &self.uow else {
133 return Ok(());
134 };
135
136 self.journal_recovered_once
137 .get_or_try_init(|| async {
138 uow.recover_from_journal().await?;
139 Ok::<(), CamelError>(())
140 })
141 .await?;
142 Ok(())
143 }
144}
145
146#[async_trait]
147impl RuntimeCommandBus for RuntimeBus {
148 async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
149 self.ensure_journal_recovered().await?;
150 let command_id = cmd.command_id().to_string();
151 if !self.dedup.first_seen(&command_id).await? {
152 return Ok(RuntimeCommandResult::Duplicate { command_id });
153 }
154 let deps = self.deps();
155 match execute_command(&deps, cmd).await {
156 Ok(result) => Ok(result),
157 Err(err) => {
158 let _ = self.dedup.forget_seen(&command_id).await;
159 Err(err)
160 }
161 }
162 }
163}
164
165#[async_trait]
166impl RuntimeQueryBus for RuntimeBus {
167 async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
168 self.ensure_journal_recovered().await?;
169
170 match query {
171 RuntimeQuery::InFlightCount { route_id } => {
172 if let Some(execution) = &self.execution {
173 execution
174 .in_flight_count(&route_id)
175 .await
176 .map(|r| r.into())
177 .map_err(Into::into)
178 } else {
179 Ok(RuntimeQueryResult::RouteNotFound { route_id })
180 }
181 }
182 other => {
183 let deps = self.query_deps();
184 execute_query(&deps, other).await
185 }
186 }
187 }
188}
189
190#[async_trait]
191impl RouteRegistrationPort for RuntimeBus {
192 async fn register_route(&self, def: RouteDefinition) -> Result<(), DomainError> {
193 self.ensure_journal_recovered()
194 .await
195 .map_err(|e| DomainError::InvalidState(e.to_string()))?;
196 let deps = self.deps();
197 handle_register_internal(&deps, def)
198 .await
199 .map(|_| ())
200 .map_err(|e| match e {
201 CamelError::RouteError(msg) => DomainError::InvalidState(msg),
202 other => DomainError::InvalidState(other.to_string()),
203 })
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use crate::lifecycle::domain::DomainError;
210
211 use super::*;
212 use std::collections::{HashMap, HashSet};
213 use std::sync::Mutex;
214
215 use crate::lifecycle::application::route_definition::RouteDefinition;
216 use crate::lifecycle::domain::{RouteRuntimeAggregate, RuntimeEvent};
217 use crate::lifecycle::ports::RouteRegistrationPort as InternalRuntimeCommandBus;
218 use crate::lifecycle::ports::RouteStatusProjection;
219
220 #[derive(Clone, Default)]
221 struct InMemoryTestRepo {
222 routes: Arc<Mutex<HashMap<String, RouteRuntimeAggregate>>>,
223 }
224
225 #[async_trait]
226 impl RouteRepositoryPort for InMemoryTestRepo {
227 async fn load(&self, route_id: &str) -> Result<Option<RouteRuntimeAggregate>, DomainError> {
228 Ok(self
229 .routes
230 .lock()
231 .expect("lock test routes")
232 .get(route_id)
233 .cloned())
234 }
235
236 async fn save(&self, aggregate: RouteRuntimeAggregate) -> Result<(), DomainError> {
237 self.routes
238 .lock()
239 .expect("lock test routes")
240 .insert(aggregate.route_id().to_string(), aggregate);
241 Ok(())
242 }
243
244 async fn save_if_version(
245 &self,
246 aggregate: RouteRuntimeAggregate,
247 expected_version: u64,
248 ) -> Result<(), DomainError> {
249 let route_id = aggregate.route_id().to_string();
250 let mut routes = self.routes.lock().expect("lock test routes");
251 let current = routes.get(&route_id).ok_or_else(|| {
252 DomainError::InvalidState(format!(
253 "optimistic lock conflict for route '{route_id}': route not found"
254 ))
255 })?;
256
257 if current.version() != expected_version {
258 return Err(DomainError::InvalidState(format!(
259 "optimistic lock conflict for route '{route_id}': expected version {expected_version}, actual {}",
260 current.version()
261 )));
262 }
263
264 routes.insert(route_id, aggregate);
265 Ok(())
266 }
267
268 async fn delete(&self, route_id: &str) -> Result<(), DomainError> {
269 self.routes
270 .lock()
271 .expect("lock test routes")
272 .remove(route_id);
273 Ok(())
274 }
275 }
276
277 #[derive(Clone, Default)]
278 struct InMemoryTestProjectionStore {
279 statuses: Arc<Mutex<HashMap<String, RouteStatusProjection>>>,
280 }
281
282 #[async_trait]
283 impl ProjectionStorePort for InMemoryTestProjectionStore {
284 async fn upsert_status(&self, status: RouteStatusProjection) -> Result<(), DomainError> {
285 self.statuses
286 .lock()
287 .expect("lock test statuses")
288 .insert(status.route_id.clone(), status);
289 Ok(())
290 }
291
292 async fn get_status(
293 &self,
294 route_id: &str,
295 ) -> Result<Option<RouteStatusProjection>, DomainError> {
296 Ok(self
297 .statuses
298 .lock()
299 .expect("lock test statuses")
300 .get(route_id)
301 .cloned())
302 }
303
304 async fn list_statuses(&self) -> Result<Vec<RouteStatusProjection>, DomainError> {
305 Ok(self
306 .statuses
307 .lock()
308 .expect("lock test statuses")
309 .values()
310 .cloned()
311 .collect())
312 }
313
314 async fn remove_status(&self, route_id: &str) -> Result<(), DomainError> {
315 self.statuses
316 .lock()
317 .expect("lock test statuses")
318 .remove(route_id);
319 Ok(())
320 }
321 }
322
323 #[derive(Clone, Default)]
324 struct InMemoryTestEventPublisher;
325
326 #[async_trait]
327 impl EventPublisherPort for InMemoryTestEventPublisher {
328 async fn publish(&self, _events: &[RuntimeEvent]) -> Result<(), DomainError> {
329 Ok(())
330 }
331 }
332
333 #[derive(Clone, Default)]
334 struct InMemoryTestDedup {
335 seen: Arc<Mutex<HashSet<String>>>,
336 }
337
338 #[derive(Clone, Default)]
339 struct InspectableDedup {
340 seen: Arc<Mutex<HashSet<String>>>,
341 forget_calls: Arc<Mutex<u32>>,
342 }
343
344 #[async_trait]
345 impl CommandDedupPort for InMemoryTestDedup {
346 async fn first_seen(&self, command_id: &str) -> Result<bool, DomainError> {
347 let mut seen = self.seen.lock().expect("lock dedup set");
348 Ok(seen.insert(command_id.to_string()))
349 }
350
351 async fn forget_seen(&self, command_id: &str) -> Result<(), DomainError> {
352 self.seen.lock().expect("lock dedup set").remove(command_id);
353 Ok(())
354 }
355 }
356
357 #[async_trait]
358 impl CommandDedupPort for InspectableDedup {
359 async fn first_seen(&self, command_id: &str) -> Result<bool, DomainError> {
360 let mut seen = self.seen.lock().expect("lock dedup set");
361 Ok(seen.insert(command_id.to_string()))
362 }
363
364 async fn forget_seen(&self, command_id: &str) -> Result<(), DomainError> {
365 self.seen.lock().expect("lock dedup set").remove(command_id);
366 let mut calls = self.forget_calls.lock().expect("forget calls");
367 *calls += 1;
368 Ok(())
369 }
370 }
371
372 fn build_test_runtime_bus() -> RuntimeBus {
373 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
374 let projections: Arc<dyn ProjectionStorePort> =
375 Arc::new(InMemoryTestProjectionStore::default());
376 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
377 let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
378 RuntimeBus::new(repo, projections, events, dedup)
379 }
380
381 #[derive(Default)]
382 struct CountingUow {
383 recover_calls: Arc<Mutex<u32>>,
384 }
385
386 #[derive(Default)]
387 struct FailingRecoverUow;
388
389 #[async_trait]
390 impl RuntimeUnitOfWorkPort for CountingUow {
391 async fn persist_upsert(
392 &self,
393 _aggregate: RouteRuntimeAggregate,
394 _expected_version: Option<u64>,
395 _projection: RouteStatusProjection,
396 _events: &[RuntimeEvent],
397 ) -> Result<(), DomainError> {
398 Ok(())
399 }
400
401 async fn persist_delete(
402 &self,
403 _route_id: &str,
404 _events: &[RuntimeEvent],
405 ) -> Result<(), DomainError> {
406 Ok(())
407 }
408
409 async fn recover_from_journal(&self) -> Result<(), DomainError> {
410 let mut calls = self.recover_calls.lock().expect("recover_calls");
411 *calls += 1;
412 Ok(())
413 }
414 }
415
416 #[async_trait]
417 impl RuntimeUnitOfWorkPort for FailingRecoverUow {
418 async fn persist_upsert(
419 &self,
420 _aggregate: RouteRuntimeAggregate,
421 _expected_version: Option<u64>,
422 _projection: RouteStatusProjection,
423 _events: &[RuntimeEvent],
424 ) -> Result<(), DomainError> {
425 Ok(())
426 }
427
428 async fn persist_delete(
429 &self,
430 _route_id: &str,
431 _events: &[RuntimeEvent],
432 ) -> Result<(), DomainError> {
433 Ok(())
434 }
435
436 async fn recover_from_journal(&self) -> Result<(), DomainError> {
437 Err(DomainError::InvalidState("recover failed".into()))
438 }
439 }
440
441 #[derive(Default)]
442 struct InFlightExecutionPort;
443
444 #[async_trait]
445 impl RuntimeExecutionPort for InFlightExecutionPort {
446 async fn register_route(&self, _definition: RouteDefinition) -> Result<(), DomainError> {
447 Ok(())
448 }
449 async fn start_route(&self, _route_id: &str) -> Result<(), DomainError> {
450 Ok(())
451 }
452 async fn stop_route(&self, _route_id: &str) -> Result<(), DomainError> {
453 Ok(())
454 }
455 async fn suspend_route(&self, _route_id: &str) -> Result<(), DomainError> {
456 Ok(())
457 }
458 async fn resume_route(&self, _route_id: &str) -> Result<(), DomainError> {
459 Ok(())
460 }
461 async fn reload_route(&self, _route_id: &str) -> Result<(), DomainError> {
462 Ok(())
463 }
464 async fn remove_route(&self, _route_id: &str) -> Result<(), DomainError> {
465 Ok(())
466 }
467 async fn in_flight_count(
468 &self,
469 route_id: &str,
470 ) -> Result<InFlightCountResult, DomainError> {
471 if route_id == "known" {
472 Ok(InFlightCountResult::InFlightCount {
473 route_id: route_id.to_string(),
474 count: 3,
475 })
476 } else {
477 Ok(InFlightCountResult::RouteNotFound {
478 route_id: route_id.to_string(),
479 })
480 }
481 }
482 }
483
484 #[tokio::test]
485 async fn runtime_bus_implements_internal_command_bus() {
486 let bus = build_test_runtime_bus();
487 let def = RouteDefinition::new("timer:test", vec![]).with_route_id("internal-route");
488 let result = InternalRuntimeCommandBus::register_route(&bus, def).await;
489 assert!(
490 result.is_ok(),
491 "internal bus registration failed: {:?}",
492 result
493 );
494
495 let status = bus
496 .ask(RuntimeQuery::GetRouteStatus {
497 route_id: "internal-route".to_string(),
498 })
499 .await
500 .unwrap();
501 match status {
502 RuntimeQueryResult::RouteStatus { status, .. } => {
503 assert_eq!(status, "Registered");
504 }
505 _ => panic!("unexpected query result"),
506 }
507 }
508
509 #[tokio::test]
510 async fn execute_returns_duplicate_for_replayed_command_id() {
511 use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
512
513 let bus = build_test_runtime_bus();
514
515 let mut spec = CanonicalRouteSpec::new("dup-route", "timer:tick");
516 spec.steps = vec![CanonicalStepSpec::Stop];
517
518 let cmd = RuntimeCommand::RegisterRoute {
519 spec: spec.clone(),
520 command_id: "dup-cmd".into(),
521 causation_id: None,
522 };
523 let first = bus.execute(cmd).await.unwrap();
524 assert!(matches!(
525 first,
526 RuntimeCommandResult::RouteRegistered { route_id } if route_id == "dup-route"
527 ));
528
529 let second = bus
530 .execute(RuntimeCommand::RegisterRoute {
531 spec,
532 command_id: "dup-cmd".into(),
533 causation_id: None,
534 })
535 .await
536 .unwrap();
537 assert!(matches!(
538 second,
539 RuntimeCommandResult::Duplicate { command_id } if command_id == "dup-cmd"
540 ));
541 }
542
543 #[tokio::test]
544 async fn ask_in_flight_count_without_execution_returns_route_not_found() {
545 let bus = build_test_runtime_bus();
546 let res = bus
547 .ask(RuntimeQuery::InFlightCount {
548 route_id: "missing".into(),
549 })
550 .await
551 .unwrap();
552 assert!(matches!(
553 res,
554 RuntimeQueryResult::RouteNotFound { route_id } if route_id == "missing"
555 ));
556 }
557
558 #[tokio::test]
559 async fn ask_in_flight_count_with_execution_delegates_to_adapter() {
560 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
561 let projections: Arc<dyn ProjectionStorePort> =
562 Arc::new(InMemoryTestProjectionStore::default());
563 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
564 let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
565 let execution: Arc<dyn RuntimeExecutionPort> = Arc::new(InFlightExecutionPort);
566 let bus = RuntimeBus::new(repo, projections, events, dedup).with_execution(execution);
567
568 let known = bus
569 .ask(RuntimeQuery::InFlightCount {
570 route_id: "known".into(),
571 })
572 .await
573 .unwrap();
574 assert!(matches!(
575 known,
576 RuntimeQueryResult::InFlightCount { route_id, count }
577 if route_id == "known" && count == 3
578 ));
579 }
580
581 #[tokio::test]
582 async fn journal_recovery_runs_once_even_with_multiple_commands() {
583 use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
584
585 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
586 let projections: Arc<dyn ProjectionStorePort> =
587 Arc::new(InMemoryTestProjectionStore::default());
588 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
589 let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
590 let uow = Arc::new(CountingUow::default());
591 let bus = RuntimeBus::new(repo, projections, events, dedup).with_uow(uow.clone());
592
593 let mut spec_a = CanonicalRouteSpec::new("a", "timer:a");
594 spec_a.steps = vec![CanonicalStepSpec::Stop];
595 let mut spec_b = CanonicalRouteSpec::new("b", "timer:b");
596 spec_b.steps = vec![CanonicalStepSpec::Stop];
597
598 bus.execute(RuntimeCommand::RegisterRoute {
599 spec: spec_a,
600 command_id: "c-a".into(),
601 causation_id: None,
602 })
603 .await
604 .unwrap();
605
606 bus.execute(RuntimeCommand::RegisterRoute {
607 spec: spec_b,
608 command_id: "c-b".into(),
609 causation_id: None,
610 })
611 .await
612 .unwrap();
613
614 let calls = *uow.recover_calls.lock().expect("recover calls");
615 assert_eq!(calls, 1, "journal recovery should run once");
616 }
617
618 #[tokio::test]
619 async fn execute_on_command_error_forgets_dedup_marker() {
620 use camel_api::runtime::{CanonicalRouteSpec, RuntimeCommand};
621
622 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
623 let projections: Arc<dyn ProjectionStorePort> =
624 Arc::new(InMemoryTestProjectionStore::default());
625 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
626 let dedup = Arc::new(InspectableDedup::default());
627 let dedup_port: Arc<dyn CommandDedupPort> = dedup.clone();
628
629 let bus = RuntimeBus::new(repo, projections, events, dedup_port);
630
631 let cmd = RuntimeCommand::RegisterRoute {
633 spec: CanonicalRouteSpec::new("", "timer:tick"),
634 command_id: "err-cmd".into(),
635 causation_id: None,
636 };
637
638 let err = bus.execute(cmd).await.expect_err("must fail");
639 assert!(err.to_string().contains("route_id cannot be empty"));
640
641 assert_eq!(*dedup.forget_calls.lock().expect("forget calls"), 1);
642 assert!(!dedup.seen.lock().expect("seen").contains("err-cmd"));
643 }
644
645 #[tokio::test]
646 async fn execute_propagates_recover_error_from_uow() {
647 use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
648
649 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
650 let projections: Arc<dyn ProjectionStorePort> =
651 Arc::new(InMemoryTestProjectionStore::default());
652 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
653 let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
654 let uow: Arc<dyn RuntimeUnitOfWorkPort> = Arc::new(FailingRecoverUow);
655
656 let bus = RuntimeBus::new(repo, projections, events, dedup).with_uow(uow);
657
658 let mut spec = CanonicalRouteSpec::new("x", "timer:x");
659 spec.steps = vec![CanonicalStepSpec::Stop];
660 let err = bus
661 .execute(RuntimeCommand::RegisterRoute {
662 spec,
663 command_id: "recover-err".into(),
664 causation_id: None,
665 })
666 .await
667 .expect_err("recover should fail");
668
669 assert!(err.to_string().contains("recover failed"));
670 }
671
672 #[tokio::test]
673 async fn ask_in_flight_count_with_execution_handles_unknown_route() {
674 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
675 let projections: Arc<dyn ProjectionStorePort> =
676 Arc::new(InMemoryTestProjectionStore::default());
677 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
678 let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
679 let execution: Arc<dyn RuntimeExecutionPort> = Arc::new(InFlightExecutionPort);
680 let bus = RuntimeBus::new(repo, projections, events, dedup).with_execution(execution);
681
682 let unknown = bus
683 .ask(RuntimeQuery::InFlightCount {
684 route_id: "unknown".into(),
685 })
686 .await
687 .unwrap();
688 assert!(matches!(
689 unknown,
690 RuntimeQueryResult::RouteNotFound { route_id } if route_id == "unknown"
691 ));
692 }
693}