Skip to main content

camel_core/lifecycle/application/
runtime_bus.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use tokio::sync::OnceCell;
5
6use camel_api::{
7    CamelError, RuntimeCommand, RuntimeCommandBus, RuntimeCommandResult, RuntimeQuery,
8    RuntimeQueryBus, RuntimeQueryResult,
9};
10
11use crate::lifecycle::application::commands::{
12    CommandDeps, execute_command, handle_register_internal,
13};
14use crate::lifecycle::application::queries::{QueryDeps, execute_query};
15use crate::lifecycle::application::route_definition::RouteDefinition;
16use crate::lifecycle::ports::RouteRegistrationPort;
17use crate::lifecycle::ports::{
18    CommandDedupPort, EventPublisherPort, ProjectionStorePort, RouteRepositoryPort,
19    RuntimeExecutionPort, RuntimeUnitOfWorkPort,
20};
21
22pub struct RuntimeBus {
23    repo: Arc<dyn RouteRepositoryPort>,
24    projections: Arc<dyn ProjectionStorePort>,
25    events: Arc<dyn EventPublisherPort>,
26    dedup: Arc<dyn CommandDedupPort>,
27    uow: Option<Arc<dyn RuntimeUnitOfWorkPort>>,
28    execution: Option<Arc<dyn RuntimeExecutionPort>>,
29    journal_recovered_once: OnceCell<()>,
30}
31
32impl RuntimeBus {
33    pub fn new(
34        repo: Arc<dyn RouteRepositoryPort>,
35        projections: Arc<dyn ProjectionStorePort>,
36        events: Arc<dyn EventPublisherPort>,
37        dedup: Arc<dyn CommandDedupPort>,
38    ) -> Self {
39        Self {
40            repo,
41            projections,
42            events,
43            dedup,
44            uow: None,
45            execution: None,
46            journal_recovered_once: OnceCell::new(),
47        }
48    }
49
50    pub fn with_uow(mut self, uow: Arc<dyn RuntimeUnitOfWorkPort>) -> Self {
51        self.uow = Some(uow);
52        self
53    }
54
55    pub fn with_execution(mut self, execution: Arc<dyn RuntimeExecutionPort>) -> Self {
56        self.execution = Some(execution);
57        self
58    }
59
60    pub fn repo(&self) -> &Arc<dyn RouteRepositoryPort> {
61        &self.repo
62    }
63
64    fn deps(&self) -> CommandDeps {
65        CommandDeps {
66            repo: Arc::clone(&self.repo),
67            projections: Arc::clone(&self.projections),
68            events: Arc::clone(&self.events),
69            uow: self.uow.clone(),
70            execution: self.execution.clone(),
71        }
72    }
73
74    fn query_deps(&self) -> QueryDeps {
75        QueryDeps {
76            projections: Arc::clone(&self.projections),
77        }
78    }
79
80    async fn ensure_journal_recovered(&self) -> Result<(), CamelError> {
81        let Some(uow) = &self.uow else {
82            return Ok(());
83        };
84
85        self.journal_recovered_once
86            .get_or_try_init(|| async {
87                uow.recover_from_journal().await?;
88                Ok::<(), CamelError>(())
89            })
90            .await?;
91        Ok(())
92    }
93}
94
95#[async_trait]
96impl RuntimeCommandBus for RuntimeBus {
97    async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
98        self.ensure_journal_recovered().await?;
99        let command_id = cmd.command_id().to_string();
100        if !self.dedup.first_seen(&command_id).await? {
101            return Ok(RuntimeCommandResult::Duplicate { command_id });
102        }
103        let deps = self.deps();
104        match execute_command(&deps, cmd).await {
105            Ok(result) => Ok(result),
106            Err(err) => {
107                let _ = self.dedup.forget_seen(&command_id).await;
108                Err(err)
109            }
110        }
111    }
112}
113
114#[async_trait]
115impl RuntimeQueryBus for RuntimeBus {
116    async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
117        self.ensure_journal_recovered().await?;
118
119        match query {
120            RuntimeQuery::InFlightCount { route_id } => {
121                if let Some(execution) = &self.execution {
122                    execution
123                        .in_flight_count(&route_id)
124                        .await
125                        .map_err(Into::into)
126                } else {
127                    Ok(RuntimeQueryResult::RouteNotFound { route_id })
128                }
129            }
130            other => {
131                let deps = self.query_deps();
132                execute_query(&deps, other).await
133            }
134        }
135    }
136}
137
138#[async_trait]
139impl RouteRegistrationPort for RuntimeBus {
140    async fn register_route(&self, def: RouteDefinition) -> Result<(), CamelError> {
141        self.ensure_journal_recovered().await?;
142        let deps = self.deps();
143        handle_register_internal(&deps, def).await.map(|_| ())
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use crate::lifecycle::domain::DomainError;
150
151    use super::*;
152    use std::collections::{HashMap, HashSet};
153    use std::sync::Mutex;
154
155    use crate::lifecycle::application::route_definition::RouteDefinition;
156    use crate::lifecycle::domain::{RouteRuntimeAggregate, RuntimeEvent};
157    use crate::lifecycle::ports::RouteRegistrationPort as InternalRuntimeCommandBus;
158    use crate::lifecycle::ports::RouteStatusProjection;
159
160    #[derive(Clone, Default)]
161    struct InMemoryTestRepo {
162        routes: Arc<Mutex<HashMap<String, RouteRuntimeAggregate>>>,
163    }
164
165    #[async_trait]
166    impl RouteRepositoryPort for InMemoryTestRepo {
167        async fn load(&self, route_id: &str) -> Result<Option<RouteRuntimeAggregate>, DomainError> {
168            Ok(self
169                .routes
170                .lock()
171                .expect("lock test routes")
172                .get(route_id)
173                .cloned())
174        }
175
176        async fn save(&self, aggregate: RouteRuntimeAggregate) -> Result<(), DomainError> {
177            self.routes
178                .lock()
179                .expect("lock test routes")
180                .insert(aggregate.route_id().to_string(), aggregate);
181            Ok(())
182        }
183
184        async fn save_if_version(
185            &self,
186            aggregate: RouteRuntimeAggregate,
187            expected_version: u64,
188        ) -> Result<(), DomainError> {
189            let route_id = aggregate.route_id().to_string();
190            let mut routes = self.routes.lock().expect("lock test routes");
191            let current = routes.get(&route_id).ok_or_else(|| {
192                DomainError::InvalidState(format!(
193                    "optimistic lock conflict for route '{route_id}': route not found"
194                ))
195            })?;
196
197            if current.version() != expected_version {
198                return Err(DomainError::InvalidState(format!(
199                    "optimistic lock conflict for route '{route_id}': expected version {expected_version}, actual {}",
200                    current.version()
201                )));
202            }
203
204            routes.insert(route_id, aggregate);
205            Ok(())
206        }
207
208        async fn delete(&self, route_id: &str) -> Result<(), DomainError> {
209            self.routes
210                .lock()
211                .expect("lock test routes")
212                .remove(route_id);
213            Ok(())
214        }
215    }
216
217    #[derive(Clone, Default)]
218    struct InMemoryTestProjectionStore {
219        statuses: Arc<Mutex<HashMap<String, RouteStatusProjection>>>,
220    }
221
222    #[async_trait]
223    impl ProjectionStorePort for InMemoryTestProjectionStore {
224        async fn upsert_status(&self, status: RouteStatusProjection) -> Result<(), DomainError> {
225            self.statuses
226                .lock()
227                .expect("lock test statuses")
228                .insert(status.route_id.clone(), status);
229            Ok(())
230        }
231
232        async fn get_status(
233            &self,
234            route_id: &str,
235        ) -> Result<Option<RouteStatusProjection>, DomainError> {
236            Ok(self
237                .statuses
238                .lock()
239                .expect("lock test statuses")
240                .get(route_id)
241                .cloned())
242        }
243
244        async fn list_statuses(&self) -> Result<Vec<RouteStatusProjection>, DomainError> {
245            Ok(self
246                .statuses
247                .lock()
248                .expect("lock test statuses")
249                .values()
250                .cloned()
251                .collect())
252        }
253
254        async fn remove_status(&self, route_id: &str) -> Result<(), DomainError> {
255            self.statuses
256                .lock()
257                .expect("lock test statuses")
258                .remove(route_id);
259            Ok(())
260        }
261    }
262
263    #[derive(Clone, Default)]
264    struct InMemoryTestEventPublisher;
265
266    #[async_trait]
267    impl EventPublisherPort for InMemoryTestEventPublisher {
268        async fn publish(&self, _events: &[RuntimeEvent]) -> Result<(), DomainError> {
269            Ok(())
270        }
271    }
272
273    #[derive(Clone, Default)]
274    struct InMemoryTestDedup {
275        seen: Arc<Mutex<HashSet<String>>>,
276    }
277
278    #[derive(Clone, Default)]
279    struct InspectableDedup {
280        seen: Arc<Mutex<HashSet<String>>>,
281        forget_calls: Arc<Mutex<u32>>,
282    }
283
284    #[async_trait]
285    impl CommandDedupPort for InMemoryTestDedup {
286        async fn first_seen(&self, command_id: &str) -> Result<bool, DomainError> {
287            let mut seen = self.seen.lock().expect("lock dedup set");
288            Ok(seen.insert(command_id.to_string()))
289        }
290
291        async fn forget_seen(&self, command_id: &str) -> Result<(), DomainError> {
292            self.seen.lock().expect("lock dedup set").remove(command_id);
293            Ok(())
294        }
295    }
296
297    #[async_trait]
298    impl CommandDedupPort for InspectableDedup {
299        async fn first_seen(&self, command_id: &str) -> Result<bool, DomainError> {
300            let mut seen = self.seen.lock().expect("lock dedup set");
301            Ok(seen.insert(command_id.to_string()))
302        }
303
304        async fn forget_seen(&self, command_id: &str) -> Result<(), DomainError> {
305            self.seen.lock().expect("lock dedup set").remove(command_id);
306            let mut calls = self.forget_calls.lock().expect("forget calls");
307            *calls += 1;
308            Ok(())
309        }
310    }
311
312    fn build_test_runtime_bus() -> RuntimeBus {
313        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
314        let projections: Arc<dyn ProjectionStorePort> =
315            Arc::new(InMemoryTestProjectionStore::default());
316        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
317        let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
318        RuntimeBus::new(repo, projections, events, dedup)
319    }
320
321    #[derive(Default)]
322    struct CountingUow {
323        recover_calls: Arc<Mutex<u32>>,
324    }
325
326    #[derive(Default)]
327    struct FailingRecoverUow;
328
329    #[async_trait]
330    impl RuntimeUnitOfWorkPort for CountingUow {
331        async fn persist_upsert(
332            &self,
333            _aggregate: RouteRuntimeAggregate,
334            _expected_version: Option<u64>,
335            _projection: RouteStatusProjection,
336            _events: &[RuntimeEvent],
337        ) -> Result<(), DomainError> {
338            Ok(())
339        }
340
341        async fn persist_delete(
342            &self,
343            _route_id: &str,
344            _events: &[RuntimeEvent],
345        ) -> Result<(), DomainError> {
346            Ok(())
347        }
348
349        async fn recover_from_journal(&self) -> Result<(), DomainError> {
350            let mut calls = self.recover_calls.lock().expect("recover_calls");
351            *calls += 1;
352            Ok(())
353        }
354    }
355
356    #[async_trait]
357    impl RuntimeUnitOfWorkPort for FailingRecoverUow {
358        async fn persist_upsert(
359            &self,
360            _aggregate: RouteRuntimeAggregate,
361            _expected_version: Option<u64>,
362            _projection: RouteStatusProjection,
363            _events: &[RuntimeEvent],
364        ) -> Result<(), DomainError> {
365            Ok(())
366        }
367
368        async fn persist_delete(
369            &self,
370            _route_id: &str,
371            _events: &[RuntimeEvent],
372        ) -> Result<(), DomainError> {
373            Ok(())
374        }
375
376        async fn recover_from_journal(&self) -> Result<(), DomainError> {
377            Err(DomainError::InvalidState("recover failed".into()))
378        }
379    }
380
381    #[derive(Default)]
382    struct InFlightExecutionPort;
383
384    #[async_trait]
385    impl RuntimeExecutionPort for InFlightExecutionPort {
386        async fn register_route(&self, _definition: RouteDefinition) -> Result<(), DomainError> {
387            Ok(())
388        }
389        async fn start_route(&self, _route_id: &str) -> Result<(), DomainError> {
390            Ok(())
391        }
392        async fn stop_route(&self, _route_id: &str) -> Result<(), DomainError> {
393            Ok(())
394        }
395        async fn suspend_route(&self, _route_id: &str) -> Result<(), DomainError> {
396            Ok(())
397        }
398        async fn resume_route(&self, _route_id: &str) -> Result<(), DomainError> {
399            Ok(())
400        }
401        async fn reload_route(&self, _route_id: &str) -> Result<(), DomainError> {
402            Ok(())
403        }
404        async fn remove_route(&self, _route_id: &str) -> Result<(), DomainError> {
405            Ok(())
406        }
407        async fn in_flight_count(&self, route_id: &str) -> Result<RuntimeQueryResult, DomainError> {
408            if route_id == "known" {
409                Ok(RuntimeQueryResult::InFlightCount {
410                    route_id: route_id.to_string(),
411                    count: 3,
412                })
413            } else {
414                Ok(RuntimeQueryResult::RouteNotFound {
415                    route_id: route_id.to_string(),
416                })
417            }
418        }
419    }
420
421    #[tokio::test]
422    async fn runtime_bus_implements_internal_command_bus() {
423        let bus = build_test_runtime_bus();
424        let def = RouteDefinition::new("timer:test", vec![]).with_route_id("internal-route");
425        let result = InternalRuntimeCommandBus::register_route(&bus, def).await;
426        assert!(
427            result.is_ok(),
428            "internal bus registration failed: {:?}",
429            result
430        );
431
432        let status = bus
433            .ask(RuntimeQuery::GetRouteStatus {
434                route_id: "internal-route".to_string(),
435            })
436            .await
437            .unwrap();
438        match status {
439            RuntimeQueryResult::RouteStatus { status, .. } => {
440                assert_eq!(status, "Registered");
441            }
442            _ => panic!("unexpected query result"),
443        }
444    }
445
446    #[tokio::test]
447    async fn execute_returns_duplicate_for_replayed_command_id() {
448        use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
449
450        let bus = build_test_runtime_bus();
451
452        let mut spec = CanonicalRouteSpec::new("dup-route", "timer:tick");
453        spec.steps = vec![CanonicalStepSpec::Stop];
454
455        let cmd = RuntimeCommand::RegisterRoute {
456            spec: spec.clone(),
457            command_id: "dup-cmd".into(),
458            causation_id: None,
459        };
460        let first = bus.execute(cmd).await.unwrap();
461        assert!(matches!(
462            first,
463            RuntimeCommandResult::RouteRegistered { route_id } if route_id == "dup-route"
464        ));
465
466        let second = bus
467            .execute(RuntimeCommand::RegisterRoute {
468                spec,
469                command_id: "dup-cmd".into(),
470                causation_id: None,
471            })
472            .await
473            .unwrap();
474        assert!(matches!(
475            second,
476            RuntimeCommandResult::Duplicate { command_id } if command_id == "dup-cmd"
477        ));
478    }
479
480    #[tokio::test]
481    async fn ask_in_flight_count_without_execution_returns_route_not_found() {
482        let bus = build_test_runtime_bus();
483        let res = bus
484            .ask(RuntimeQuery::InFlightCount {
485                route_id: "missing".into(),
486            })
487            .await
488            .unwrap();
489        assert!(matches!(
490            res,
491            RuntimeQueryResult::RouteNotFound { route_id } if route_id == "missing"
492        ));
493    }
494
495    #[tokio::test]
496    async fn ask_in_flight_count_with_execution_delegates_to_adapter() {
497        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
498        let projections: Arc<dyn ProjectionStorePort> =
499            Arc::new(InMemoryTestProjectionStore::default());
500        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
501        let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
502        let execution: Arc<dyn RuntimeExecutionPort> = Arc::new(InFlightExecutionPort);
503        let bus = RuntimeBus::new(repo, projections, events, dedup).with_execution(execution);
504
505        let known = bus
506            .ask(RuntimeQuery::InFlightCount {
507                route_id: "known".into(),
508            })
509            .await
510            .unwrap();
511        assert!(matches!(
512            known,
513            RuntimeQueryResult::InFlightCount { route_id, count }
514            if route_id == "known" && count == 3
515        ));
516    }
517
518    #[tokio::test]
519    async fn journal_recovery_runs_once_even_with_multiple_commands() {
520        use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
521
522        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
523        let projections: Arc<dyn ProjectionStorePort> =
524            Arc::new(InMemoryTestProjectionStore::default());
525        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
526        let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
527        let uow = Arc::new(CountingUow::default());
528        let bus = RuntimeBus::new(repo, projections, events, dedup).with_uow(uow.clone());
529
530        let mut spec_a = CanonicalRouteSpec::new("a", "timer:a");
531        spec_a.steps = vec![CanonicalStepSpec::Stop];
532        let mut spec_b = CanonicalRouteSpec::new("b", "timer:b");
533        spec_b.steps = vec![CanonicalStepSpec::Stop];
534
535        bus.execute(RuntimeCommand::RegisterRoute {
536            spec: spec_a,
537            command_id: "c-a".into(),
538            causation_id: None,
539        })
540        .await
541        .unwrap();
542
543        bus.execute(RuntimeCommand::RegisterRoute {
544            spec: spec_b,
545            command_id: "c-b".into(),
546            causation_id: None,
547        })
548        .await
549        .unwrap();
550
551        let calls = *uow.recover_calls.lock().expect("recover calls");
552        assert_eq!(calls, 1, "journal recovery should run once");
553    }
554
555    #[tokio::test]
556    async fn execute_on_command_error_forgets_dedup_marker() {
557        use camel_api::runtime::{CanonicalRouteSpec, RuntimeCommand};
558
559        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
560        let projections: Arc<dyn ProjectionStorePort> =
561            Arc::new(InMemoryTestProjectionStore::default());
562        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
563        let dedup = Arc::new(InspectableDedup::default());
564        let dedup_port: Arc<dyn CommandDedupPort> = dedup.clone();
565
566        let bus = RuntimeBus::new(repo, projections, events, dedup_port);
567
568        // Invalid canonical contract: empty route_id -> execute_command should fail.
569        let cmd = RuntimeCommand::RegisterRoute {
570            spec: CanonicalRouteSpec::new("", "timer:tick"),
571            command_id: "err-cmd".into(),
572            causation_id: None,
573        };
574
575        let err = bus.execute(cmd).await.expect_err("must fail");
576        assert!(err.to_string().contains("route_id cannot be empty"));
577
578        assert_eq!(*dedup.forget_calls.lock().expect("forget calls"), 1);
579        assert!(!dedup.seen.lock().expect("seen").contains("err-cmd"));
580    }
581
582    #[tokio::test]
583    async fn execute_propagates_recover_error_from_uow() {
584        use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
585
586        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
587        let projections: Arc<dyn ProjectionStorePort> =
588            Arc::new(InMemoryTestProjectionStore::default());
589        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
590        let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
591        let uow: Arc<dyn RuntimeUnitOfWorkPort> = Arc::new(FailingRecoverUow);
592
593        let bus = RuntimeBus::new(repo, projections, events, dedup).with_uow(uow);
594
595        let mut spec = CanonicalRouteSpec::new("x", "timer:x");
596        spec.steps = vec![CanonicalStepSpec::Stop];
597        let err = bus
598            .execute(RuntimeCommand::RegisterRoute {
599                spec,
600                command_id: "recover-err".into(),
601                causation_id: None,
602            })
603            .await
604            .expect_err("recover should fail");
605
606        assert!(err.to_string().contains("recover failed"));
607    }
608
609    #[tokio::test]
610    async fn ask_in_flight_count_with_execution_handles_unknown_route() {
611        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
612        let projections: Arc<dyn ProjectionStorePort> =
613            Arc::new(InMemoryTestProjectionStore::default());
614        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
615        let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
616        let execution: Arc<dyn RuntimeExecutionPort> = Arc::new(InFlightExecutionPort);
617        let bus = RuntimeBus::new(repo, projections, events, dedup).with_execution(execution);
618
619        let unknown = bus
620            .ask(RuntimeQuery::InFlightCount {
621                route_id: "unknown".into(),
622            })
623            .await
624            .unwrap();
625        assert!(matches!(
626            unknown,
627            RuntimeQueryResult::RouteNotFound { route_id } if route_id == "unknown"
628        ));
629    }
630}