Skip to main content

camel_core/lifecycle/application/
runtime_bus.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use tokio::sync::OnceCell;
5
6use camel_api::{
7    CamelError, RuntimeCommand, RuntimeCommandBus, RuntimeCommandResult, RuntimeQuery,
8    RuntimeQueryBus, RuntimeQueryResult,
9};
10
11use crate::lifecycle::application::commands::{
12    CommandDeps, execute_command, handle_register_internal,
13};
14use crate::lifecycle::application::queries::{QueryDeps, execute_query};
15use crate::lifecycle::application::route_definition::RouteDefinition;
16use crate::lifecycle::ports::RouteRegistrationPort;
17use crate::lifecycle::ports::{
18    CommandDedupPort, EventPublisherPort, ProjectionStorePort, RouteRepositoryPort,
19    RuntimeExecutionPort, RuntimeUnitOfWorkPort,
20};
21
22pub struct RuntimeBus {
23    repo: Arc<dyn RouteRepositoryPort>,
24    projections: Arc<dyn ProjectionStorePort>,
25    events: Arc<dyn EventPublisherPort>,
26    dedup: Arc<dyn CommandDedupPort>,
27    uow: Option<Arc<dyn RuntimeUnitOfWorkPort>>,
28    execution: Option<Arc<dyn RuntimeExecutionPort>>,
29    journal_recovered_once: OnceCell<()>,
30}
31
32impl RuntimeBus {
33    pub fn new(
34        repo: Arc<dyn RouteRepositoryPort>,
35        projections: Arc<dyn ProjectionStorePort>,
36        events: Arc<dyn EventPublisherPort>,
37        dedup: Arc<dyn CommandDedupPort>,
38    ) -> Self {
39        Self {
40            repo,
41            projections,
42            events,
43            dedup,
44            uow: None,
45            execution: None,
46            journal_recovered_once: OnceCell::new(),
47        }
48    }
49
50    pub fn with_uow(mut self, uow: Arc<dyn RuntimeUnitOfWorkPort>) -> Self {
51        self.uow = Some(uow);
52        self
53    }
54
55    pub fn with_execution(mut self, execution: Arc<dyn RuntimeExecutionPort>) -> Self {
56        self.execution = Some(execution);
57        self
58    }
59
60    pub fn repo(&self) -> &Arc<dyn RouteRepositoryPort> {
61        &self.repo
62    }
63
64    fn deps(&self) -> CommandDeps {
65        CommandDeps {
66            repo: Arc::clone(&self.repo),
67            projections: Arc::clone(&self.projections),
68            events: Arc::clone(&self.events),
69            uow: self.uow.clone(),
70            execution: self.execution.clone(),
71        }
72    }
73
74    fn query_deps(&self) -> QueryDeps {
75        QueryDeps {
76            projections: Arc::clone(&self.projections),
77        }
78    }
79
80    async fn ensure_journal_recovered(&self) -> Result<(), CamelError> {
81        let Some(uow) = &self.uow else {
82            return Ok(());
83        };
84
85        self.journal_recovered_once
86            .get_or_try_init(|| async {
87                uow.recover_from_journal().await?;
88                Ok::<(), CamelError>(())
89            })
90            .await?;
91        Ok(())
92    }
93}
94
95#[async_trait]
96impl RuntimeCommandBus for RuntimeBus {
97    async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
98        self.ensure_journal_recovered().await?;
99        let command_id = cmd.command_id().to_string();
100        if !self.dedup.first_seen(&command_id).await? {
101            return Ok(RuntimeCommandResult::Duplicate { command_id });
102        }
103        let deps = self.deps();
104        match execute_command(&deps, cmd).await {
105            Ok(result) => Ok(result),
106            Err(err) => {
107                let _ = self.dedup.forget_seen(&command_id).await;
108                Err(err)
109            }
110        }
111    }
112}
113
114#[async_trait]
115impl RuntimeQueryBus for RuntimeBus {
116    async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
117        self.ensure_journal_recovered().await?;
118
119        match query {
120            RuntimeQuery::InFlightCount { route_id } => {
121                if let Some(execution) = &self.execution {
122                    execution.in_flight_count(&route_id).await
123                } else {
124                    Ok(RuntimeQueryResult::RouteNotFound { route_id })
125                }
126            }
127            other => {
128                let deps = self.query_deps();
129                execute_query(&deps, other).await
130            }
131        }
132    }
133}
134
135#[async_trait]
136impl RouteRegistrationPort for RuntimeBus {
137    async fn register_route(&self, def: RouteDefinition) -> Result<(), CamelError> {
138        self.ensure_journal_recovered().await?;
139        let deps = self.deps();
140        handle_register_internal(&deps, def).await.map(|_| ())
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use super::*;
147    use std::collections::{HashMap, HashSet};
148    use std::sync::Mutex;
149
150    use crate::lifecycle::application::route_definition::RouteDefinition;
151    use crate::lifecycle::domain::{RouteRuntimeAggregate, RuntimeEvent};
152    use crate::lifecycle::ports::RouteRegistrationPort as InternalRuntimeCommandBus;
153    use crate::lifecycle::ports::RouteStatusProjection;
154
155    #[derive(Clone, Default)]
156    struct InMemoryTestRepo {
157        routes: Arc<Mutex<HashMap<String, RouteRuntimeAggregate>>>,
158    }
159
160    #[async_trait]
161    impl RouteRepositoryPort for InMemoryTestRepo {
162        async fn load(&self, route_id: &str) -> Result<Option<RouteRuntimeAggregate>, CamelError> {
163            Ok(self
164                .routes
165                .lock()
166                .expect("lock test routes")
167                .get(route_id)
168                .cloned())
169        }
170
171        async fn save(&self, aggregate: RouteRuntimeAggregate) -> Result<(), CamelError> {
172            self.routes
173                .lock()
174                .expect("lock test routes")
175                .insert(aggregate.route_id().to_string(), aggregate);
176            Ok(())
177        }
178
179        async fn save_if_version(
180            &self,
181            aggregate: RouteRuntimeAggregate,
182            expected_version: u64,
183        ) -> Result<(), CamelError> {
184            let route_id = aggregate.route_id().to_string();
185            let mut routes = self.routes.lock().expect("lock test routes");
186            let current = routes.get(&route_id).ok_or_else(|| {
187                CamelError::RouteError(format!(
188                    "optimistic lock conflict for route '{route_id}': route not found"
189                ))
190            })?;
191
192            if current.version() != expected_version {
193                return Err(CamelError::RouteError(format!(
194                    "optimistic lock conflict for route '{route_id}': expected version {expected_version}, actual {}",
195                    current.version()
196                )));
197            }
198
199            routes.insert(route_id, aggregate);
200            Ok(())
201        }
202
203        async fn delete(&self, route_id: &str) -> Result<(), CamelError> {
204            self.routes
205                .lock()
206                .expect("lock test routes")
207                .remove(route_id);
208            Ok(())
209        }
210    }
211
212    #[derive(Clone, Default)]
213    struct InMemoryTestProjectionStore {
214        statuses: Arc<Mutex<HashMap<String, RouteStatusProjection>>>,
215    }
216
217    #[async_trait]
218    impl ProjectionStorePort for InMemoryTestProjectionStore {
219        async fn upsert_status(&self, status: RouteStatusProjection) -> Result<(), CamelError> {
220            self.statuses
221                .lock()
222                .expect("lock test statuses")
223                .insert(status.route_id.clone(), status);
224            Ok(())
225        }
226
227        async fn get_status(
228            &self,
229            route_id: &str,
230        ) -> Result<Option<RouteStatusProjection>, CamelError> {
231            Ok(self
232                .statuses
233                .lock()
234                .expect("lock test statuses")
235                .get(route_id)
236                .cloned())
237        }
238
239        async fn list_statuses(&self) -> Result<Vec<RouteStatusProjection>, CamelError> {
240            Ok(self
241                .statuses
242                .lock()
243                .expect("lock test statuses")
244                .values()
245                .cloned()
246                .collect())
247        }
248
249        async fn remove_status(&self, route_id: &str) -> Result<(), CamelError> {
250            self.statuses
251                .lock()
252                .expect("lock test statuses")
253                .remove(route_id);
254            Ok(())
255        }
256    }
257
258    #[derive(Clone, Default)]
259    struct InMemoryTestEventPublisher;
260
261    #[async_trait]
262    impl EventPublisherPort for InMemoryTestEventPublisher {
263        async fn publish(&self, _events: &[RuntimeEvent]) -> Result<(), CamelError> {
264            Ok(())
265        }
266    }
267
268    #[derive(Clone, Default)]
269    struct InMemoryTestDedup {
270        seen: Arc<Mutex<HashSet<String>>>,
271    }
272
273    #[derive(Clone, Default)]
274    struct InspectableDedup {
275        seen: Arc<Mutex<HashSet<String>>>,
276        forget_calls: Arc<Mutex<u32>>,
277    }
278
279    #[async_trait]
280    impl CommandDedupPort for InMemoryTestDedup {
281        async fn first_seen(&self, command_id: &str) -> Result<bool, CamelError> {
282            let mut seen = self.seen.lock().expect("lock dedup set");
283            Ok(seen.insert(command_id.to_string()))
284        }
285
286        async fn forget_seen(&self, command_id: &str) -> Result<(), CamelError> {
287            self.seen.lock().expect("lock dedup set").remove(command_id);
288            Ok(())
289        }
290    }
291
292    #[async_trait]
293    impl CommandDedupPort for InspectableDedup {
294        async fn first_seen(&self, command_id: &str) -> Result<bool, CamelError> {
295            let mut seen = self.seen.lock().expect("lock dedup set");
296            Ok(seen.insert(command_id.to_string()))
297        }
298
299        async fn forget_seen(&self, command_id: &str) -> Result<(), CamelError> {
300            self.seen.lock().expect("lock dedup set").remove(command_id);
301            let mut calls = self.forget_calls.lock().expect("forget calls");
302            *calls += 1;
303            Ok(())
304        }
305    }
306
307    fn build_test_runtime_bus() -> RuntimeBus {
308        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
309        let projections: Arc<dyn ProjectionStorePort> =
310            Arc::new(InMemoryTestProjectionStore::default());
311        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
312        let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
313        RuntimeBus::new(repo, projections, events, dedup)
314    }
315
316    #[derive(Default)]
317    struct CountingUow {
318        recover_calls: Arc<Mutex<u32>>,
319    }
320
321    #[derive(Default)]
322    struct FailingRecoverUow;
323
324    #[async_trait]
325    impl RuntimeUnitOfWorkPort for CountingUow {
326        async fn persist_upsert(
327            &self,
328            _aggregate: RouteRuntimeAggregate,
329            _expected_version: Option<u64>,
330            _projection: RouteStatusProjection,
331            _events: &[RuntimeEvent],
332        ) -> Result<(), CamelError> {
333            Ok(())
334        }
335
336        async fn persist_delete(
337            &self,
338            _route_id: &str,
339            _events: &[RuntimeEvent],
340        ) -> Result<(), CamelError> {
341            Ok(())
342        }
343
344        async fn recover_from_journal(&self) -> Result<(), CamelError> {
345            let mut calls = self.recover_calls.lock().expect("recover_calls");
346            *calls += 1;
347            Ok(())
348        }
349    }
350
351    #[async_trait]
352    impl RuntimeUnitOfWorkPort for FailingRecoverUow {
353        async fn persist_upsert(
354            &self,
355            _aggregate: RouteRuntimeAggregate,
356            _expected_version: Option<u64>,
357            _projection: RouteStatusProjection,
358            _events: &[RuntimeEvent],
359        ) -> Result<(), CamelError> {
360            Ok(())
361        }
362
363        async fn persist_delete(
364            &self,
365            _route_id: &str,
366            _events: &[RuntimeEvent],
367        ) -> Result<(), CamelError> {
368            Ok(())
369        }
370
371        async fn recover_from_journal(&self) -> Result<(), CamelError> {
372            Err(CamelError::RouteError("recover failed".into()))
373        }
374    }
375
376    #[derive(Default)]
377    struct InFlightExecutionPort;
378
379    #[async_trait]
380    impl RuntimeExecutionPort for InFlightExecutionPort {
381        async fn register_route(&self, _definition: RouteDefinition) -> Result<(), CamelError> {
382            Ok(())
383        }
384        async fn start_route(&self, _route_id: &str) -> Result<(), CamelError> {
385            Ok(())
386        }
387        async fn stop_route(&self, _route_id: &str) -> Result<(), CamelError> {
388            Ok(())
389        }
390        async fn suspend_route(&self, _route_id: &str) -> Result<(), CamelError> {
391            Ok(())
392        }
393        async fn resume_route(&self, _route_id: &str) -> Result<(), CamelError> {
394            Ok(())
395        }
396        async fn reload_route(&self, _route_id: &str) -> Result<(), CamelError> {
397            Ok(())
398        }
399        async fn remove_route(&self, _route_id: &str) -> Result<(), CamelError> {
400            Ok(())
401        }
402        async fn in_flight_count(&self, route_id: &str) -> Result<RuntimeQueryResult, CamelError> {
403            if route_id == "known" {
404                Ok(RuntimeQueryResult::InFlightCount {
405                    route_id: route_id.to_string(),
406                    count: 3,
407                })
408            } else {
409                Ok(RuntimeQueryResult::RouteNotFound {
410                    route_id: route_id.to_string(),
411                })
412            }
413        }
414    }
415
416    #[tokio::test]
417    async fn runtime_bus_implements_internal_command_bus() {
418        let bus = build_test_runtime_bus();
419        let def = RouteDefinition::new("timer:test", vec![]).with_route_id("internal-route");
420        let result = InternalRuntimeCommandBus::register_route(&bus, def).await;
421        assert!(
422            result.is_ok(),
423            "internal bus registration failed: {:?}",
424            result
425        );
426
427        let status = bus
428            .ask(RuntimeQuery::GetRouteStatus {
429                route_id: "internal-route".to_string(),
430            })
431            .await
432            .unwrap();
433        match status {
434            RuntimeQueryResult::RouteStatus { status, .. } => {
435                assert_eq!(status, "Registered");
436            }
437            _ => panic!("unexpected query result"),
438        }
439    }
440
441    #[tokio::test]
442    async fn execute_returns_duplicate_for_replayed_command_id() {
443        use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
444
445        let bus = build_test_runtime_bus();
446
447        let mut spec = CanonicalRouteSpec::new("dup-route", "timer:tick");
448        spec.steps = vec![CanonicalStepSpec::Stop];
449
450        let cmd = RuntimeCommand::RegisterRoute {
451            spec: spec.clone(),
452            command_id: "dup-cmd".into(),
453            causation_id: None,
454        };
455        let first = bus.execute(cmd).await.unwrap();
456        assert!(matches!(
457            first,
458            RuntimeCommandResult::RouteRegistered { route_id } if route_id == "dup-route"
459        ));
460
461        let second = bus
462            .execute(RuntimeCommand::RegisterRoute {
463                spec,
464                command_id: "dup-cmd".into(),
465                causation_id: None,
466            })
467            .await
468            .unwrap();
469        assert!(matches!(
470            second,
471            RuntimeCommandResult::Duplicate { command_id } if command_id == "dup-cmd"
472        ));
473    }
474
475    #[tokio::test]
476    async fn ask_in_flight_count_without_execution_returns_route_not_found() {
477        let bus = build_test_runtime_bus();
478        let res = bus
479            .ask(RuntimeQuery::InFlightCount {
480                route_id: "missing".into(),
481            })
482            .await
483            .unwrap();
484        assert!(matches!(
485            res,
486            RuntimeQueryResult::RouteNotFound { route_id } if route_id == "missing"
487        ));
488    }
489
490    #[tokio::test]
491    async fn ask_in_flight_count_with_execution_delegates_to_adapter() {
492        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
493        let projections: Arc<dyn ProjectionStorePort> =
494            Arc::new(InMemoryTestProjectionStore::default());
495        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
496        let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
497        let execution: Arc<dyn RuntimeExecutionPort> = Arc::new(InFlightExecutionPort);
498        let bus = RuntimeBus::new(repo, projections, events, dedup).with_execution(execution);
499
500        let known = bus
501            .ask(RuntimeQuery::InFlightCount {
502                route_id: "known".into(),
503            })
504            .await
505            .unwrap();
506        assert!(matches!(
507            known,
508            RuntimeQueryResult::InFlightCount { route_id, count }
509            if route_id == "known" && count == 3
510        ));
511    }
512
513    #[tokio::test]
514    async fn journal_recovery_runs_once_even_with_multiple_commands() {
515        use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
516
517        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
518        let projections: Arc<dyn ProjectionStorePort> =
519            Arc::new(InMemoryTestProjectionStore::default());
520        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
521        let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
522        let uow = Arc::new(CountingUow::default());
523        let bus = RuntimeBus::new(repo, projections, events, dedup).with_uow(uow.clone());
524
525        let mut spec_a = CanonicalRouteSpec::new("a", "timer:a");
526        spec_a.steps = vec![CanonicalStepSpec::Stop];
527        let mut spec_b = CanonicalRouteSpec::new("b", "timer:b");
528        spec_b.steps = vec![CanonicalStepSpec::Stop];
529
530        bus.execute(RuntimeCommand::RegisterRoute {
531            spec: spec_a,
532            command_id: "c-a".into(),
533            causation_id: None,
534        })
535        .await
536        .unwrap();
537
538        bus.execute(RuntimeCommand::RegisterRoute {
539            spec: spec_b,
540            command_id: "c-b".into(),
541            causation_id: None,
542        })
543        .await
544        .unwrap();
545
546        let calls = *uow.recover_calls.lock().expect("recover calls");
547        assert_eq!(calls, 1, "journal recovery should run once");
548    }
549
550    #[tokio::test]
551    async fn execute_on_command_error_forgets_dedup_marker() {
552        use camel_api::runtime::{CanonicalRouteSpec, RuntimeCommand};
553
554        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
555        let projections: Arc<dyn ProjectionStorePort> =
556            Arc::new(InMemoryTestProjectionStore::default());
557        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
558        let dedup = Arc::new(InspectableDedup::default());
559        let dedup_port: Arc<dyn CommandDedupPort> = dedup.clone();
560
561        let bus = RuntimeBus::new(repo, projections, events, dedup_port);
562
563        // Invalid canonical contract: empty route_id -> execute_command should fail.
564        let cmd = RuntimeCommand::RegisterRoute {
565            spec: CanonicalRouteSpec::new("", "timer:tick"),
566            command_id: "err-cmd".into(),
567            causation_id: None,
568        };
569
570        let err = bus.execute(cmd).await.expect_err("must fail");
571        assert!(err.to_string().contains("route_id cannot be empty"));
572
573        assert_eq!(*dedup.forget_calls.lock().expect("forget calls"), 1);
574        assert!(!dedup.seen.lock().expect("seen").contains("err-cmd"));
575    }
576
577    #[tokio::test]
578    async fn execute_propagates_recover_error_from_uow() {
579        use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
580
581        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
582        let projections: Arc<dyn ProjectionStorePort> =
583            Arc::new(InMemoryTestProjectionStore::default());
584        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
585        let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
586        let uow: Arc<dyn RuntimeUnitOfWorkPort> = Arc::new(FailingRecoverUow);
587
588        let bus = RuntimeBus::new(repo, projections, events, dedup).with_uow(uow);
589
590        let mut spec = CanonicalRouteSpec::new("x", "timer:x");
591        spec.steps = vec![CanonicalStepSpec::Stop];
592        let err = bus
593            .execute(RuntimeCommand::RegisterRoute {
594                spec,
595                command_id: "recover-err".into(),
596                causation_id: None,
597            })
598            .await
599            .expect_err("recover should fail");
600
601        assert!(err.to_string().contains("recover failed"));
602    }
603
604    #[tokio::test]
605    async fn ask_in_flight_count_with_execution_handles_unknown_route() {
606        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
607        let projections: Arc<dyn ProjectionStorePort> =
608            Arc::new(InMemoryTestProjectionStore::default());
609        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
610        let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
611        let execution: Arc<dyn RuntimeExecutionPort> = Arc::new(InFlightExecutionPort);
612        let bus = RuntimeBus::new(repo, projections, events, dedup).with_execution(execution);
613
614        let unknown = bus
615            .ask(RuntimeQuery::InFlightCount {
616                route_id: "unknown".into(),
617            })
618            .await
619            .unwrap();
620        assert!(matches!(
621            unknown,
622            RuntimeQueryResult::RouteNotFound { route_id } if route_id == "unknown"
623        ));
624    }
625}