1use std::sync::Arc;
2
3use async_trait::async_trait;
4use tokio::sync::OnceCell;
5
6use camel_api::{
7 CamelError, RuntimeCommand, RuntimeCommandBus, RuntimeCommandResult, RuntimeQuery,
8 RuntimeQueryBus, RuntimeQueryResult,
9};
10
11use crate::lifecycle::application::commands::{
12 CommandDeps, execute_command, handle_register_internal,
13};
14use crate::lifecycle::application::queries::{QueryDeps, execute_query};
15use crate::lifecycle::application::route_definition::RouteDefinition;
16use crate::lifecycle::ports::RouteRegistrationPort;
17use crate::lifecycle::ports::{
18 CommandDedupPort, EventPublisherPort, ProjectionStorePort, RouteRepositoryPort,
19 RuntimeExecutionPort, RuntimeUnitOfWorkPort,
20};
21
22pub struct RuntimeBus {
23 repo: Arc<dyn RouteRepositoryPort>,
24 projections: Arc<dyn ProjectionStorePort>,
25 events: Arc<dyn EventPublisherPort>,
26 dedup: Arc<dyn CommandDedupPort>,
27 uow: Option<Arc<dyn RuntimeUnitOfWorkPort>>,
28 execution: Option<Arc<dyn RuntimeExecutionPort>>,
29 journal_recovered_once: OnceCell<()>,
30}
31
32impl RuntimeBus {
33 pub fn new(
34 repo: Arc<dyn RouteRepositoryPort>,
35 projections: Arc<dyn ProjectionStorePort>,
36 events: Arc<dyn EventPublisherPort>,
37 dedup: Arc<dyn CommandDedupPort>,
38 ) -> Self {
39 Self {
40 repo,
41 projections,
42 events,
43 dedup,
44 uow: None,
45 execution: None,
46 journal_recovered_once: OnceCell::new(),
47 }
48 }
49
50 pub fn with_uow(mut self, uow: Arc<dyn RuntimeUnitOfWorkPort>) -> Self {
51 self.uow = Some(uow);
52 self
53 }
54
55 pub fn with_execution(mut self, execution: Arc<dyn RuntimeExecutionPort>) -> Self {
56 self.execution = Some(execution);
57 self
58 }
59
60 pub fn repo(&self) -> &Arc<dyn RouteRepositoryPort> {
61 &self.repo
62 }
63
64 pub(crate) async fn register_aggregate_only(&self, route_id: String) -> Result<(), CamelError> {
65 self.ensure_journal_recovered().await?;
66 let deps = self.deps();
67 if deps.repo.load(&route_id).await?.is_some() {
68 return Err(CamelError::RouteError(format!(
69 "route '{route_id}' already registered"
70 )));
71 }
72 let (aggregate, events) =
73 crate::lifecycle::domain::RouteRuntimeAggregate::register(route_id.clone());
74 if let Some(uow) = &deps.uow {
75 uow.persist_upsert(
76 aggregate.clone(),
77 None,
78 crate::lifecycle::application::commands::project_from_aggregate(&aggregate),
79 &events,
80 )
81 .await?;
82 } else {
83 deps.repo.save(aggregate.clone()).await?;
84 if let Some(primary_error) =
85 crate::lifecycle::application::commands::upsert_projection_with_reconciliation(
86 &*deps.projections,
87 crate::lifecycle::application::commands::project_from_aggregate(&aggregate),
88 )
89 .await?
90 {
91 deps.events.publish(&events).await?;
92 return Err(CamelError::RouteError(format!(
93 "post-effect reconciliation recovered after runtime persistence error: {primary_error}"
94 )));
95 }
96 deps.events.publish(&events).await?;
97 }
98 Ok(())
99 }
100
101 fn deps(&self) -> CommandDeps {
102 CommandDeps {
103 repo: Arc::clone(&self.repo),
104 projections: Arc::clone(&self.projections),
105 events: Arc::clone(&self.events),
106 uow: self.uow.clone(),
107 execution: self.execution.clone(),
108 }
109 }
110
111 fn query_deps(&self) -> QueryDeps {
112 QueryDeps {
113 projections: Arc::clone(&self.projections),
114 }
115 }
116
117 async fn ensure_journal_recovered(&self) -> Result<(), CamelError> {
118 let Some(uow) = &self.uow else {
119 return Ok(());
120 };
121
122 self.journal_recovered_once
123 .get_or_try_init(|| async {
124 uow.recover_from_journal().await?;
125 Ok::<(), CamelError>(())
126 })
127 .await?;
128 Ok(())
129 }
130}
131
132#[async_trait]
133impl RuntimeCommandBus for RuntimeBus {
134 async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
135 self.ensure_journal_recovered().await?;
136 let command_id = cmd.command_id().to_string();
137 if !self.dedup.first_seen(&command_id).await? {
138 return Ok(RuntimeCommandResult::Duplicate { command_id });
139 }
140 let deps = self.deps();
141 match execute_command(&deps, cmd).await {
142 Ok(result) => Ok(result),
143 Err(err) => {
144 let _ = self.dedup.forget_seen(&command_id).await;
145 Err(err)
146 }
147 }
148 }
149}
150
151#[async_trait]
152impl RuntimeQueryBus for RuntimeBus {
153 async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
154 self.ensure_journal_recovered().await?;
155
156 match query {
157 RuntimeQuery::InFlightCount { route_id } => {
158 if let Some(execution) = &self.execution {
159 execution
160 .in_flight_count(&route_id)
161 .await
162 .map_err(Into::into)
163 } else {
164 Ok(RuntimeQueryResult::RouteNotFound { route_id })
165 }
166 }
167 other => {
168 let deps = self.query_deps();
169 execute_query(&deps, other).await
170 }
171 }
172 }
173}
174
175#[async_trait]
176impl RouteRegistrationPort for RuntimeBus {
177 async fn register_route(&self, def: RouteDefinition) -> Result<(), CamelError> {
178 self.ensure_journal_recovered().await?;
179 let deps = self.deps();
180 handle_register_internal(&deps, def).await.map(|_| ())
181 }
182}
183
184#[cfg(test)]
185mod tests {
186 use crate::lifecycle::domain::DomainError;
187
188 use super::*;
189 use std::collections::{HashMap, HashSet};
190 use std::sync::Mutex;
191
192 use crate::lifecycle::application::route_definition::RouteDefinition;
193 use crate::lifecycle::domain::{RouteRuntimeAggregate, RuntimeEvent};
194 use crate::lifecycle::ports::RouteRegistrationPort as InternalRuntimeCommandBus;
195 use crate::lifecycle::ports::RouteStatusProjection;
196
197 #[derive(Clone, Default)]
198 struct InMemoryTestRepo {
199 routes: Arc<Mutex<HashMap<String, RouteRuntimeAggregate>>>,
200 }
201
202 #[async_trait]
203 impl RouteRepositoryPort for InMemoryTestRepo {
204 async fn load(&self, route_id: &str) -> Result<Option<RouteRuntimeAggregate>, DomainError> {
205 Ok(self
206 .routes
207 .lock()
208 .expect("lock test routes")
209 .get(route_id)
210 .cloned())
211 }
212
213 async fn save(&self, aggregate: RouteRuntimeAggregate) -> Result<(), DomainError> {
214 self.routes
215 .lock()
216 .expect("lock test routes")
217 .insert(aggregate.route_id().to_string(), aggregate);
218 Ok(())
219 }
220
221 async fn save_if_version(
222 &self,
223 aggregate: RouteRuntimeAggregate,
224 expected_version: u64,
225 ) -> Result<(), DomainError> {
226 let route_id = aggregate.route_id().to_string();
227 let mut routes = self.routes.lock().expect("lock test routes");
228 let current = routes.get(&route_id).ok_or_else(|| {
229 DomainError::InvalidState(format!(
230 "optimistic lock conflict for route '{route_id}': route not found"
231 ))
232 })?;
233
234 if current.version() != expected_version {
235 return Err(DomainError::InvalidState(format!(
236 "optimistic lock conflict for route '{route_id}': expected version {expected_version}, actual {}",
237 current.version()
238 )));
239 }
240
241 routes.insert(route_id, aggregate);
242 Ok(())
243 }
244
245 async fn delete(&self, route_id: &str) -> Result<(), DomainError> {
246 self.routes
247 .lock()
248 .expect("lock test routes")
249 .remove(route_id);
250 Ok(())
251 }
252 }
253
254 #[derive(Clone, Default)]
255 struct InMemoryTestProjectionStore {
256 statuses: Arc<Mutex<HashMap<String, RouteStatusProjection>>>,
257 }
258
259 #[async_trait]
260 impl ProjectionStorePort for InMemoryTestProjectionStore {
261 async fn upsert_status(&self, status: RouteStatusProjection) -> Result<(), DomainError> {
262 self.statuses
263 .lock()
264 .expect("lock test statuses")
265 .insert(status.route_id.clone(), status);
266 Ok(())
267 }
268
269 async fn get_status(
270 &self,
271 route_id: &str,
272 ) -> Result<Option<RouteStatusProjection>, DomainError> {
273 Ok(self
274 .statuses
275 .lock()
276 .expect("lock test statuses")
277 .get(route_id)
278 .cloned())
279 }
280
281 async fn list_statuses(&self) -> Result<Vec<RouteStatusProjection>, DomainError> {
282 Ok(self
283 .statuses
284 .lock()
285 .expect("lock test statuses")
286 .values()
287 .cloned()
288 .collect())
289 }
290
291 async fn remove_status(&self, route_id: &str) -> Result<(), DomainError> {
292 self.statuses
293 .lock()
294 .expect("lock test statuses")
295 .remove(route_id);
296 Ok(())
297 }
298 }
299
300 #[derive(Clone, Default)]
301 struct InMemoryTestEventPublisher;
302
303 #[async_trait]
304 impl EventPublisherPort for InMemoryTestEventPublisher {
305 async fn publish(&self, _events: &[RuntimeEvent]) -> Result<(), DomainError> {
306 Ok(())
307 }
308 }
309
310 #[derive(Clone, Default)]
311 struct InMemoryTestDedup {
312 seen: Arc<Mutex<HashSet<String>>>,
313 }
314
315 #[derive(Clone, Default)]
316 struct InspectableDedup {
317 seen: Arc<Mutex<HashSet<String>>>,
318 forget_calls: Arc<Mutex<u32>>,
319 }
320
321 #[async_trait]
322 impl CommandDedupPort for InMemoryTestDedup {
323 async fn first_seen(&self, command_id: &str) -> Result<bool, DomainError> {
324 let mut seen = self.seen.lock().expect("lock dedup set");
325 Ok(seen.insert(command_id.to_string()))
326 }
327
328 async fn forget_seen(&self, command_id: &str) -> Result<(), DomainError> {
329 self.seen.lock().expect("lock dedup set").remove(command_id);
330 Ok(())
331 }
332 }
333
334 #[async_trait]
335 impl CommandDedupPort for InspectableDedup {
336 async fn first_seen(&self, command_id: &str) -> Result<bool, DomainError> {
337 let mut seen = self.seen.lock().expect("lock dedup set");
338 Ok(seen.insert(command_id.to_string()))
339 }
340
341 async fn forget_seen(&self, command_id: &str) -> Result<(), DomainError> {
342 self.seen.lock().expect("lock dedup set").remove(command_id);
343 let mut calls = self.forget_calls.lock().expect("forget calls");
344 *calls += 1;
345 Ok(())
346 }
347 }
348
349 fn build_test_runtime_bus() -> RuntimeBus {
350 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
351 let projections: Arc<dyn ProjectionStorePort> =
352 Arc::new(InMemoryTestProjectionStore::default());
353 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
354 let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
355 RuntimeBus::new(repo, projections, events, dedup)
356 }
357
358 #[derive(Default)]
359 struct CountingUow {
360 recover_calls: Arc<Mutex<u32>>,
361 }
362
363 #[derive(Default)]
364 struct FailingRecoverUow;
365
366 #[async_trait]
367 impl RuntimeUnitOfWorkPort for CountingUow {
368 async fn persist_upsert(
369 &self,
370 _aggregate: RouteRuntimeAggregate,
371 _expected_version: Option<u64>,
372 _projection: RouteStatusProjection,
373 _events: &[RuntimeEvent],
374 ) -> Result<(), DomainError> {
375 Ok(())
376 }
377
378 async fn persist_delete(
379 &self,
380 _route_id: &str,
381 _events: &[RuntimeEvent],
382 ) -> Result<(), DomainError> {
383 Ok(())
384 }
385
386 async fn recover_from_journal(&self) -> Result<(), DomainError> {
387 let mut calls = self.recover_calls.lock().expect("recover_calls");
388 *calls += 1;
389 Ok(())
390 }
391 }
392
393 #[async_trait]
394 impl RuntimeUnitOfWorkPort for FailingRecoverUow {
395 async fn persist_upsert(
396 &self,
397 _aggregate: RouteRuntimeAggregate,
398 _expected_version: Option<u64>,
399 _projection: RouteStatusProjection,
400 _events: &[RuntimeEvent],
401 ) -> Result<(), DomainError> {
402 Ok(())
403 }
404
405 async fn persist_delete(
406 &self,
407 _route_id: &str,
408 _events: &[RuntimeEvent],
409 ) -> Result<(), DomainError> {
410 Ok(())
411 }
412
413 async fn recover_from_journal(&self) -> Result<(), DomainError> {
414 Err(DomainError::InvalidState("recover failed".into()))
415 }
416 }
417
418 #[derive(Default)]
419 struct InFlightExecutionPort;
420
421 #[async_trait]
422 impl RuntimeExecutionPort for InFlightExecutionPort {
423 async fn register_route(&self, _definition: RouteDefinition) -> Result<(), DomainError> {
424 Ok(())
425 }
426 async fn start_route(&self, _route_id: &str) -> Result<(), DomainError> {
427 Ok(())
428 }
429 async fn stop_route(&self, _route_id: &str) -> Result<(), DomainError> {
430 Ok(())
431 }
432 async fn suspend_route(&self, _route_id: &str) -> Result<(), DomainError> {
433 Ok(())
434 }
435 async fn resume_route(&self, _route_id: &str) -> Result<(), DomainError> {
436 Ok(())
437 }
438 async fn reload_route(&self, _route_id: &str) -> Result<(), DomainError> {
439 Ok(())
440 }
441 async fn remove_route(&self, _route_id: &str) -> Result<(), DomainError> {
442 Ok(())
443 }
444 async fn in_flight_count(&self, route_id: &str) -> Result<RuntimeQueryResult, DomainError> {
445 if route_id == "known" {
446 Ok(RuntimeQueryResult::InFlightCount {
447 route_id: route_id.to_string(),
448 count: 3,
449 })
450 } else {
451 Ok(RuntimeQueryResult::RouteNotFound {
452 route_id: route_id.to_string(),
453 })
454 }
455 }
456 }
457
458 #[tokio::test]
459 async fn runtime_bus_implements_internal_command_bus() {
460 let bus = build_test_runtime_bus();
461 let def = RouteDefinition::new("timer:test", vec![]).with_route_id("internal-route");
462 let result = InternalRuntimeCommandBus::register_route(&bus, def).await;
463 assert!(
464 result.is_ok(),
465 "internal bus registration failed: {:?}",
466 result
467 );
468
469 let status = bus
470 .ask(RuntimeQuery::GetRouteStatus {
471 route_id: "internal-route".to_string(),
472 })
473 .await
474 .unwrap();
475 match status {
476 RuntimeQueryResult::RouteStatus { status, .. } => {
477 assert_eq!(status, "Registered");
478 }
479 _ => panic!("unexpected query result"),
480 }
481 }
482
483 #[tokio::test]
484 async fn execute_returns_duplicate_for_replayed_command_id() {
485 use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
486
487 let bus = build_test_runtime_bus();
488
489 let mut spec = CanonicalRouteSpec::new("dup-route", "timer:tick");
490 spec.steps = vec![CanonicalStepSpec::Stop];
491
492 let cmd = RuntimeCommand::RegisterRoute {
493 spec: spec.clone(),
494 command_id: "dup-cmd".into(),
495 causation_id: None,
496 };
497 let first = bus.execute(cmd).await.unwrap();
498 assert!(matches!(
499 first,
500 RuntimeCommandResult::RouteRegistered { route_id } if route_id == "dup-route"
501 ));
502
503 let second = bus
504 .execute(RuntimeCommand::RegisterRoute {
505 spec,
506 command_id: "dup-cmd".into(),
507 causation_id: None,
508 })
509 .await
510 .unwrap();
511 assert!(matches!(
512 second,
513 RuntimeCommandResult::Duplicate { command_id } if command_id == "dup-cmd"
514 ));
515 }
516
517 #[tokio::test]
518 async fn ask_in_flight_count_without_execution_returns_route_not_found() {
519 let bus = build_test_runtime_bus();
520 let res = bus
521 .ask(RuntimeQuery::InFlightCount {
522 route_id: "missing".into(),
523 })
524 .await
525 .unwrap();
526 assert!(matches!(
527 res,
528 RuntimeQueryResult::RouteNotFound { route_id } if route_id == "missing"
529 ));
530 }
531
532 #[tokio::test]
533 async fn ask_in_flight_count_with_execution_delegates_to_adapter() {
534 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
535 let projections: Arc<dyn ProjectionStorePort> =
536 Arc::new(InMemoryTestProjectionStore::default());
537 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
538 let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
539 let execution: Arc<dyn RuntimeExecutionPort> = Arc::new(InFlightExecutionPort);
540 let bus = RuntimeBus::new(repo, projections, events, dedup).with_execution(execution);
541
542 let known = bus
543 .ask(RuntimeQuery::InFlightCount {
544 route_id: "known".into(),
545 })
546 .await
547 .unwrap();
548 assert!(matches!(
549 known,
550 RuntimeQueryResult::InFlightCount { route_id, count }
551 if route_id == "known" && count == 3
552 ));
553 }
554
555 #[tokio::test]
556 async fn journal_recovery_runs_once_even_with_multiple_commands() {
557 use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
558
559 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
560 let projections: Arc<dyn ProjectionStorePort> =
561 Arc::new(InMemoryTestProjectionStore::default());
562 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
563 let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
564 let uow = Arc::new(CountingUow::default());
565 let bus = RuntimeBus::new(repo, projections, events, dedup).with_uow(uow.clone());
566
567 let mut spec_a = CanonicalRouteSpec::new("a", "timer:a");
568 spec_a.steps = vec![CanonicalStepSpec::Stop];
569 let mut spec_b = CanonicalRouteSpec::new("b", "timer:b");
570 spec_b.steps = vec![CanonicalStepSpec::Stop];
571
572 bus.execute(RuntimeCommand::RegisterRoute {
573 spec: spec_a,
574 command_id: "c-a".into(),
575 causation_id: None,
576 })
577 .await
578 .unwrap();
579
580 bus.execute(RuntimeCommand::RegisterRoute {
581 spec: spec_b,
582 command_id: "c-b".into(),
583 causation_id: None,
584 })
585 .await
586 .unwrap();
587
588 let calls = *uow.recover_calls.lock().expect("recover calls");
589 assert_eq!(calls, 1, "journal recovery should run once");
590 }
591
592 #[tokio::test]
593 async fn execute_on_command_error_forgets_dedup_marker() {
594 use camel_api::runtime::{CanonicalRouteSpec, RuntimeCommand};
595
596 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
597 let projections: Arc<dyn ProjectionStorePort> =
598 Arc::new(InMemoryTestProjectionStore::default());
599 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
600 let dedup = Arc::new(InspectableDedup::default());
601 let dedup_port: Arc<dyn CommandDedupPort> = dedup.clone();
602
603 let bus = RuntimeBus::new(repo, projections, events, dedup_port);
604
605 let cmd = RuntimeCommand::RegisterRoute {
607 spec: CanonicalRouteSpec::new("", "timer:tick"),
608 command_id: "err-cmd".into(),
609 causation_id: None,
610 };
611
612 let err = bus.execute(cmd).await.expect_err("must fail");
613 assert!(err.to_string().contains("route_id cannot be empty"));
614
615 assert_eq!(*dedup.forget_calls.lock().expect("forget calls"), 1);
616 assert!(!dedup.seen.lock().expect("seen").contains("err-cmd"));
617 }
618
619 #[tokio::test]
620 async fn execute_propagates_recover_error_from_uow() {
621 use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
622
623 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
624 let projections: Arc<dyn ProjectionStorePort> =
625 Arc::new(InMemoryTestProjectionStore::default());
626 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
627 let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
628 let uow: Arc<dyn RuntimeUnitOfWorkPort> = Arc::new(FailingRecoverUow);
629
630 let bus = RuntimeBus::new(repo, projections, events, dedup).with_uow(uow);
631
632 let mut spec = CanonicalRouteSpec::new("x", "timer:x");
633 spec.steps = vec![CanonicalStepSpec::Stop];
634 let err = bus
635 .execute(RuntimeCommand::RegisterRoute {
636 spec,
637 command_id: "recover-err".into(),
638 causation_id: None,
639 })
640 .await
641 .expect_err("recover should fail");
642
643 assert!(err.to_string().contains("recover failed"));
644 }
645
646 #[tokio::test]
647 async fn ask_in_flight_count_with_execution_handles_unknown_route() {
648 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
649 let projections: Arc<dyn ProjectionStorePort> =
650 Arc::new(InMemoryTestProjectionStore::default());
651 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
652 let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
653 let execution: Arc<dyn RuntimeExecutionPort> = Arc::new(InFlightExecutionPort);
654 let bus = RuntimeBus::new(repo, projections, events, dedup).with_execution(execution);
655
656 let unknown = bus
657 .ask(RuntimeQuery::InFlightCount {
658 route_id: "unknown".into(),
659 })
660 .await
661 .unwrap();
662 assert!(matches!(
663 unknown,
664 RuntimeQueryResult::RouteNotFound { route_id } if route_id == "unknown"
665 ));
666 }
667}