1use std::sync::Arc;
2
3use async_trait::async_trait;
4use tokio::sync::OnceCell;
5
6use camel_api::{
7 CamelError, RuntimeCommand, RuntimeCommandBus, RuntimeCommandResult, RuntimeQuery,
8 RuntimeQueryBus, RuntimeQueryResult,
9};
10
11use crate::lifecycle::application::commands::{
12 CommandDeps, execute_command, handle_register_internal,
13};
14use crate::lifecycle::application::queries::{QueryDeps, execute_query};
15use crate::lifecycle::application::route_definition::RouteDefinition;
16use crate::lifecycle::ports::RouteRegistrationPort;
17use crate::lifecycle::ports::{
18 CommandDedupPort, EventPublisherPort, ProjectionStorePort, RouteRepositoryPort,
19 RuntimeExecutionPort, RuntimeUnitOfWorkPort,
20};
21
22pub struct RuntimeBus {
23 repo: Arc<dyn RouteRepositoryPort>,
24 projections: Arc<dyn ProjectionStorePort>,
25 events: Arc<dyn EventPublisherPort>,
26 dedup: Arc<dyn CommandDedupPort>,
27 uow: Option<Arc<dyn RuntimeUnitOfWorkPort>>,
28 execution: Option<Arc<dyn RuntimeExecutionPort>>,
29 journal_recovered_once: OnceCell<()>,
30}
31
32impl RuntimeBus {
33 pub fn new(
34 repo: Arc<dyn RouteRepositoryPort>,
35 projections: Arc<dyn ProjectionStorePort>,
36 events: Arc<dyn EventPublisherPort>,
37 dedup: Arc<dyn CommandDedupPort>,
38 ) -> Self {
39 Self {
40 repo,
41 projections,
42 events,
43 dedup,
44 uow: None,
45 execution: None,
46 journal_recovered_once: OnceCell::new(),
47 }
48 }
49
50 pub fn with_uow(mut self, uow: Arc<dyn RuntimeUnitOfWorkPort>) -> Self {
51 self.uow = Some(uow);
52 self
53 }
54
55 pub fn with_execution(mut self, execution: Arc<dyn RuntimeExecutionPort>) -> Self {
56 self.execution = Some(execution);
57 self
58 }
59
60 pub fn repo(&self) -> &Arc<dyn RouteRepositoryPort> {
61 &self.repo
62 }
63
64 fn deps(&self) -> CommandDeps {
65 CommandDeps {
66 repo: Arc::clone(&self.repo),
67 projections: Arc::clone(&self.projections),
68 events: Arc::clone(&self.events),
69 uow: self.uow.clone(),
70 execution: self.execution.clone(),
71 }
72 }
73
74 fn query_deps(&self) -> QueryDeps {
75 QueryDeps {
76 projections: Arc::clone(&self.projections),
77 }
78 }
79
80 async fn ensure_journal_recovered(&self) -> Result<(), CamelError> {
81 let Some(uow) = &self.uow else {
82 return Ok(());
83 };
84
85 self.journal_recovered_once
86 .get_or_try_init(|| async {
87 uow.recover_from_journal().await?;
88 Ok::<(), CamelError>(())
89 })
90 .await?;
91 Ok(())
92 }
93}
94
95#[async_trait]
96impl RuntimeCommandBus for RuntimeBus {
97 async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
98 self.ensure_journal_recovered().await?;
99 let command_id = cmd.command_id().to_string();
100 if !self.dedup.first_seen(&command_id).await? {
101 return Ok(RuntimeCommandResult::Duplicate { command_id });
102 }
103 let deps = self.deps();
104 match execute_command(&deps, cmd).await {
105 Ok(result) => Ok(result),
106 Err(err) => {
107 let _ = self.dedup.forget_seen(&command_id).await;
108 Err(err)
109 }
110 }
111 }
112}
113
114#[async_trait]
115impl RuntimeQueryBus for RuntimeBus {
116 async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
117 self.ensure_journal_recovered().await?;
118
119 match query {
120 RuntimeQuery::InFlightCount { route_id } => {
121 if let Some(execution) = &self.execution {
122 execution
123 .in_flight_count(&route_id)
124 .await
125 .map_err(Into::into)
126 } else {
127 Ok(RuntimeQueryResult::RouteNotFound { route_id })
128 }
129 }
130 other => {
131 let deps = self.query_deps();
132 execute_query(&deps, other).await
133 }
134 }
135 }
136}
137
138#[async_trait]
139impl RouteRegistrationPort for RuntimeBus {
140 async fn register_route(&self, def: RouteDefinition) -> Result<(), CamelError> {
141 self.ensure_journal_recovered().await?;
142 let deps = self.deps();
143 handle_register_internal(&deps, def).await.map(|_| ())
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use crate::lifecycle::domain::DomainError;
150
151 use super::*;
152 use std::collections::{HashMap, HashSet};
153 use std::sync::Mutex;
154
155 use crate::lifecycle::application::route_definition::RouteDefinition;
156 use crate::lifecycle::domain::{RouteRuntimeAggregate, RuntimeEvent};
157 use crate::lifecycle::ports::RouteRegistrationPort as InternalRuntimeCommandBus;
158 use crate::lifecycle::ports::RouteStatusProjection;
159
160 #[derive(Clone, Default)]
161 struct InMemoryTestRepo {
162 routes: Arc<Mutex<HashMap<String, RouteRuntimeAggregate>>>,
163 }
164
165 #[async_trait]
166 impl RouteRepositoryPort for InMemoryTestRepo {
167 async fn load(&self, route_id: &str) -> Result<Option<RouteRuntimeAggregate>, DomainError> {
168 Ok(self
169 .routes
170 .lock()
171 .expect("lock test routes")
172 .get(route_id)
173 .cloned())
174 }
175
176 async fn save(&self, aggregate: RouteRuntimeAggregate) -> Result<(), DomainError> {
177 self.routes
178 .lock()
179 .expect("lock test routes")
180 .insert(aggregate.route_id().to_string(), aggregate);
181 Ok(())
182 }
183
184 async fn save_if_version(
185 &self,
186 aggregate: RouteRuntimeAggregate,
187 expected_version: u64,
188 ) -> Result<(), DomainError> {
189 let route_id = aggregate.route_id().to_string();
190 let mut routes = self.routes.lock().expect("lock test routes");
191 let current = routes.get(&route_id).ok_or_else(|| {
192 DomainError::InvalidState(format!(
193 "optimistic lock conflict for route '{route_id}': route not found"
194 ))
195 })?;
196
197 if current.version() != expected_version {
198 return Err(DomainError::InvalidState(format!(
199 "optimistic lock conflict for route '{route_id}': expected version {expected_version}, actual {}",
200 current.version()
201 )));
202 }
203
204 routes.insert(route_id, aggregate);
205 Ok(())
206 }
207
208 async fn delete(&self, route_id: &str) -> Result<(), DomainError> {
209 self.routes
210 .lock()
211 .expect("lock test routes")
212 .remove(route_id);
213 Ok(())
214 }
215 }
216
217 #[derive(Clone, Default)]
218 struct InMemoryTestProjectionStore {
219 statuses: Arc<Mutex<HashMap<String, RouteStatusProjection>>>,
220 }
221
222 #[async_trait]
223 impl ProjectionStorePort for InMemoryTestProjectionStore {
224 async fn upsert_status(&self, status: RouteStatusProjection) -> Result<(), DomainError> {
225 self.statuses
226 .lock()
227 .expect("lock test statuses")
228 .insert(status.route_id.clone(), status);
229 Ok(())
230 }
231
232 async fn get_status(
233 &self,
234 route_id: &str,
235 ) -> Result<Option<RouteStatusProjection>, DomainError> {
236 Ok(self
237 .statuses
238 .lock()
239 .expect("lock test statuses")
240 .get(route_id)
241 .cloned())
242 }
243
244 async fn list_statuses(&self) -> Result<Vec<RouteStatusProjection>, DomainError> {
245 Ok(self
246 .statuses
247 .lock()
248 .expect("lock test statuses")
249 .values()
250 .cloned()
251 .collect())
252 }
253
254 async fn remove_status(&self, route_id: &str) -> Result<(), DomainError> {
255 self.statuses
256 .lock()
257 .expect("lock test statuses")
258 .remove(route_id);
259 Ok(())
260 }
261 }
262
263 #[derive(Clone, Default)]
264 struct InMemoryTestEventPublisher;
265
266 #[async_trait]
267 impl EventPublisherPort for InMemoryTestEventPublisher {
268 async fn publish(&self, _events: &[RuntimeEvent]) -> Result<(), DomainError> {
269 Ok(())
270 }
271 }
272
273 #[derive(Clone, Default)]
274 struct InMemoryTestDedup {
275 seen: Arc<Mutex<HashSet<String>>>,
276 }
277
278 #[derive(Clone, Default)]
279 struct InspectableDedup {
280 seen: Arc<Mutex<HashSet<String>>>,
281 forget_calls: Arc<Mutex<u32>>,
282 }
283
284 #[async_trait]
285 impl CommandDedupPort for InMemoryTestDedup {
286 async fn first_seen(&self, command_id: &str) -> Result<bool, DomainError> {
287 let mut seen = self.seen.lock().expect("lock dedup set");
288 Ok(seen.insert(command_id.to_string()))
289 }
290
291 async fn forget_seen(&self, command_id: &str) -> Result<(), DomainError> {
292 self.seen.lock().expect("lock dedup set").remove(command_id);
293 Ok(())
294 }
295 }
296
297 #[async_trait]
298 impl CommandDedupPort for InspectableDedup {
299 async fn first_seen(&self, command_id: &str) -> Result<bool, DomainError> {
300 let mut seen = self.seen.lock().expect("lock dedup set");
301 Ok(seen.insert(command_id.to_string()))
302 }
303
304 async fn forget_seen(&self, command_id: &str) -> Result<(), DomainError> {
305 self.seen.lock().expect("lock dedup set").remove(command_id);
306 let mut calls = self.forget_calls.lock().expect("forget calls");
307 *calls += 1;
308 Ok(())
309 }
310 }
311
312 fn build_test_runtime_bus() -> RuntimeBus {
313 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
314 let projections: Arc<dyn ProjectionStorePort> =
315 Arc::new(InMemoryTestProjectionStore::default());
316 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
317 let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
318 RuntimeBus::new(repo, projections, events, dedup)
319 }
320
321 #[derive(Default)]
322 struct CountingUow {
323 recover_calls: Arc<Mutex<u32>>,
324 }
325
326 #[derive(Default)]
327 struct FailingRecoverUow;
328
329 #[async_trait]
330 impl RuntimeUnitOfWorkPort for CountingUow {
331 async fn persist_upsert(
332 &self,
333 _aggregate: RouteRuntimeAggregate,
334 _expected_version: Option<u64>,
335 _projection: RouteStatusProjection,
336 _events: &[RuntimeEvent],
337 ) -> Result<(), DomainError> {
338 Ok(())
339 }
340
341 async fn persist_delete(
342 &self,
343 _route_id: &str,
344 _events: &[RuntimeEvent],
345 ) -> Result<(), DomainError> {
346 Ok(())
347 }
348
349 async fn recover_from_journal(&self) -> Result<(), DomainError> {
350 let mut calls = self.recover_calls.lock().expect("recover_calls");
351 *calls += 1;
352 Ok(())
353 }
354 }
355
356 #[async_trait]
357 impl RuntimeUnitOfWorkPort for FailingRecoverUow {
358 async fn persist_upsert(
359 &self,
360 _aggregate: RouteRuntimeAggregate,
361 _expected_version: Option<u64>,
362 _projection: RouteStatusProjection,
363 _events: &[RuntimeEvent],
364 ) -> Result<(), DomainError> {
365 Ok(())
366 }
367
368 async fn persist_delete(
369 &self,
370 _route_id: &str,
371 _events: &[RuntimeEvent],
372 ) -> Result<(), DomainError> {
373 Ok(())
374 }
375
376 async fn recover_from_journal(&self) -> Result<(), DomainError> {
377 Err(DomainError::InvalidState("recover failed".into()))
378 }
379 }
380
381 #[derive(Default)]
382 struct InFlightExecutionPort;
383
384 #[async_trait]
385 impl RuntimeExecutionPort for InFlightExecutionPort {
386 async fn register_route(&self, _definition: RouteDefinition) -> Result<(), DomainError> {
387 Ok(())
388 }
389 async fn start_route(&self, _route_id: &str) -> Result<(), DomainError> {
390 Ok(())
391 }
392 async fn stop_route(&self, _route_id: &str) -> Result<(), DomainError> {
393 Ok(())
394 }
395 async fn suspend_route(&self, _route_id: &str) -> Result<(), DomainError> {
396 Ok(())
397 }
398 async fn resume_route(&self, _route_id: &str) -> Result<(), DomainError> {
399 Ok(())
400 }
401 async fn reload_route(&self, _route_id: &str) -> Result<(), DomainError> {
402 Ok(())
403 }
404 async fn remove_route(&self, _route_id: &str) -> Result<(), DomainError> {
405 Ok(())
406 }
407 async fn in_flight_count(&self, route_id: &str) -> Result<RuntimeQueryResult, DomainError> {
408 if route_id == "known" {
409 Ok(RuntimeQueryResult::InFlightCount {
410 route_id: route_id.to_string(),
411 count: 3,
412 })
413 } else {
414 Ok(RuntimeQueryResult::RouteNotFound {
415 route_id: route_id.to_string(),
416 })
417 }
418 }
419 }
420
421 #[tokio::test]
422 async fn runtime_bus_implements_internal_command_bus() {
423 let bus = build_test_runtime_bus();
424 let def = RouteDefinition::new("timer:test", vec![]).with_route_id("internal-route");
425 let result = InternalRuntimeCommandBus::register_route(&bus, def).await;
426 assert!(
427 result.is_ok(),
428 "internal bus registration failed: {:?}",
429 result
430 );
431
432 let status = bus
433 .ask(RuntimeQuery::GetRouteStatus {
434 route_id: "internal-route".to_string(),
435 })
436 .await
437 .unwrap();
438 match status {
439 RuntimeQueryResult::RouteStatus { status, .. } => {
440 assert_eq!(status, "Registered");
441 }
442 _ => panic!("unexpected query result"),
443 }
444 }
445
446 #[tokio::test]
447 async fn execute_returns_duplicate_for_replayed_command_id() {
448 use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
449
450 let bus = build_test_runtime_bus();
451
452 let mut spec = CanonicalRouteSpec::new("dup-route", "timer:tick");
453 spec.steps = vec![CanonicalStepSpec::Stop];
454
455 let cmd = RuntimeCommand::RegisterRoute {
456 spec: spec.clone(),
457 command_id: "dup-cmd".into(),
458 causation_id: None,
459 };
460 let first = bus.execute(cmd).await.unwrap();
461 assert!(matches!(
462 first,
463 RuntimeCommandResult::RouteRegistered { route_id } if route_id == "dup-route"
464 ));
465
466 let second = bus
467 .execute(RuntimeCommand::RegisterRoute {
468 spec,
469 command_id: "dup-cmd".into(),
470 causation_id: None,
471 })
472 .await
473 .unwrap();
474 assert!(matches!(
475 second,
476 RuntimeCommandResult::Duplicate { command_id } if command_id == "dup-cmd"
477 ));
478 }
479
480 #[tokio::test]
481 async fn ask_in_flight_count_without_execution_returns_route_not_found() {
482 let bus = build_test_runtime_bus();
483 let res = bus
484 .ask(RuntimeQuery::InFlightCount {
485 route_id: "missing".into(),
486 })
487 .await
488 .unwrap();
489 assert!(matches!(
490 res,
491 RuntimeQueryResult::RouteNotFound { route_id } if route_id == "missing"
492 ));
493 }
494
495 #[tokio::test]
496 async fn ask_in_flight_count_with_execution_delegates_to_adapter() {
497 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
498 let projections: Arc<dyn ProjectionStorePort> =
499 Arc::new(InMemoryTestProjectionStore::default());
500 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
501 let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
502 let execution: Arc<dyn RuntimeExecutionPort> = Arc::new(InFlightExecutionPort);
503 let bus = RuntimeBus::new(repo, projections, events, dedup).with_execution(execution);
504
505 let known = bus
506 .ask(RuntimeQuery::InFlightCount {
507 route_id: "known".into(),
508 })
509 .await
510 .unwrap();
511 assert!(matches!(
512 known,
513 RuntimeQueryResult::InFlightCount { route_id, count }
514 if route_id == "known" && count == 3
515 ));
516 }
517
518 #[tokio::test]
519 async fn journal_recovery_runs_once_even_with_multiple_commands() {
520 use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
521
522 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
523 let projections: Arc<dyn ProjectionStorePort> =
524 Arc::new(InMemoryTestProjectionStore::default());
525 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
526 let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
527 let uow = Arc::new(CountingUow::default());
528 let bus = RuntimeBus::new(repo, projections, events, dedup).with_uow(uow.clone());
529
530 let mut spec_a = CanonicalRouteSpec::new("a", "timer:a");
531 spec_a.steps = vec![CanonicalStepSpec::Stop];
532 let mut spec_b = CanonicalRouteSpec::new("b", "timer:b");
533 spec_b.steps = vec![CanonicalStepSpec::Stop];
534
535 bus.execute(RuntimeCommand::RegisterRoute {
536 spec: spec_a,
537 command_id: "c-a".into(),
538 causation_id: None,
539 })
540 .await
541 .unwrap();
542
543 bus.execute(RuntimeCommand::RegisterRoute {
544 spec: spec_b,
545 command_id: "c-b".into(),
546 causation_id: None,
547 })
548 .await
549 .unwrap();
550
551 let calls = *uow.recover_calls.lock().expect("recover calls");
552 assert_eq!(calls, 1, "journal recovery should run once");
553 }
554
555 #[tokio::test]
556 async fn execute_on_command_error_forgets_dedup_marker() {
557 use camel_api::runtime::{CanonicalRouteSpec, RuntimeCommand};
558
559 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
560 let projections: Arc<dyn ProjectionStorePort> =
561 Arc::new(InMemoryTestProjectionStore::default());
562 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
563 let dedup = Arc::new(InspectableDedup::default());
564 let dedup_port: Arc<dyn CommandDedupPort> = dedup.clone();
565
566 let bus = RuntimeBus::new(repo, projections, events, dedup_port);
567
568 let cmd = RuntimeCommand::RegisterRoute {
570 spec: CanonicalRouteSpec::new("", "timer:tick"),
571 command_id: "err-cmd".into(),
572 causation_id: None,
573 };
574
575 let err = bus.execute(cmd).await.expect_err("must fail");
576 assert!(err.to_string().contains("route_id cannot be empty"));
577
578 assert_eq!(*dedup.forget_calls.lock().expect("forget calls"), 1);
579 assert!(!dedup.seen.lock().expect("seen").contains("err-cmd"));
580 }
581
582 #[tokio::test]
583 async fn execute_propagates_recover_error_from_uow() {
584 use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
585
586 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
587 let projections: Arc<dyn ProjectionStorePort> =
588 Arc::new(InMemoryTestProjectionStore::default());
589 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
590 let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
591 let uow: Arc<dyn RuntimeUnitOfWorkPort> = Arc::new(FailingRecoverUow);
592
593 let bus = RuntimeBus::new(repo, projections, events, dedup).with_uow(uow);
594
595 let mut spec = CanonicalRouteSpec::new("x", "timer:x");
596 spec.steps = vec![CanonicalStepSpec::Stop];
597 let err = bus
598 .execute(RuntimeCommand::RegisterRoute {
599 spec,
600 command_id: "recover-err".into(),
601 causation_id: None,
602 })
603 .await
604 .expect_err("recover should fail");
605
606 assert!(err.to_string().contains("recover failed"));
607 }
608
609 #[tokio::test]
610 async fn ask_in_flight_count_with_execution_handles_unknown_route() {
611 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
612 let projections: Arc<dyn ProjectionStorePort> =
613 Arc::new(InMemoryTestProjectionStore::default());
614 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
615 let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
616 let execution: Arc<dyn RuntimeExecutionPort> = Arc::new(InFlightExecutionPort);
617 let bus = RuntimeBus::new(repo, projections, events, dedup).with_execution(execution);
618
619 let unknown = bus
620 .ask(RuntimeQuery::InFlightCount {
621 route_id: "unknown".into(),
622 })
623 .await
624 .unwrap();
625 assert!(matches!(
626 unknown,
627 RuntimeQueryResult::RouteNotFound { route_id } if route_id == "unknown"
628 ));
629 }
630}