Skip to main content

camel_core/lifecycle/application/
runtime_bus.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use tokio::sync::OnceCell;
5
6use camel_api::{
7    CamelError, RuntimeCommand, RuntimeCommandBus, RuntimeCommandResult, RuntimeQuery,
8    RuntimeQueryBus, RuntimeQueryResult,
9};
10
11use crate::lifecycle::application::commands::{
12    CommandDeps, execute_command, handle_register_internal,
13};
14use crate::lifecycle::application::queries::{QueryDeps, execute_query};
15use crate::lifecycle::application::route_definition::RouteDefinition;
16use crate::lifecycle::ports::RouteRegistrationPort;
17use crate::lifecycle::ports::{
18    CommandDedupPort, EventPublisherPort, ProjectionStorePort, RouteRepositoryPort,
19    RuntimeExecutionPort, RuntimeUnitOfWorkPort,
20};
21
22pub struct RuntimeBus {
23    repo: Arc<dyn RouteRepositoryPort>,
24    projections: Arc<dyn ProjectionStorePort>,
25    events: Arc<dyn EventPublisherPort>,
26    dedup: Arc<dyn CommandDedupPort>,
27    uow: Option<Arc<dyn RuntimeUnitOfWorkPort>>,
28    execution: Option<Arc<dyn RuntimeExecutionPort>>,
29    journal_recovered_once: OnceCell<()>,
30}
31
32impl RuntimeBus {
33    pub fn new(
34        repo: Arc<dyn RouteRepositoryPort>,
35        projections: Arc<dyn ProjectionStorePort>,
36        events: Arc<dyn EventPublisherPort>,
37        dedup: Arc<dyn CommandDedupPort>,
38    ) -> Self {
39        Self {
40            repo,
41            projections,
42            events,
43            dedup,
44            uow: None,
45            execution: None,
46            journal_recovered_once: OnceCell::new(),
47        }
48    }
49
50    pub fn with_uow(mut self, uow: Arc<dyn RuntimeUnitOfWorkPort>) -> Self {
51        self.uow = Some(uow);
52        self
53    }
54
55    pub fn with_execution(mut self, execution: Arc<dyn RuntimeExecutionPort>) -> Self {
56        self.execution = Some(execution);
57        self
58    }
59
60    pub fn repo(&self) -> &Arc<dyn RouteRepositoryPort> {
61        &self.repo
62    }
63
64    pub(crate) async fn register_aggregate_only(&self, route_id: String) -> Result<(), CamelError> {
65        self.ensure_journal_recovered().await?;
66        let deps = self.deps();
67        if deps.repo.load(&route_id).await?.is_some() {
68            return Err(CamelError::RouteError(format!(
69                "route '{route_id}' already registered"
70            )));
71        }
72        let (aggregate, events) =
73            crate::lifecycle::domain::RouteRuntimeAggregate::register(route_id.clone());
74        if let Some(uow) = &deps.uow {
75            uow.persist_upsert(
76                aggregate.clone(),
77                None,
78                crate::lifecycle::application::commands::project_from_aggregate(&aggregate),
79                &events,
80            )
81            .await?;
82        } else {
83            deps.repo.save(aggregate.clone()).await?;
84            if let Some(primary_error) =
85                crate::lifecycle::application::commands::upsert_projection_with_reconciliation(
86                    &*deps.projections,
87                    crate::lifecycle::application::commands::project_from_aggregate(&aggregate),
88                )
89                .await?
90            {
91                deps.events.publish(&events).await?;
92                return Err(CamelError::RouteError(format!(
93                    "post-effect reconciliation recovered after runtime persistence error: {primary_error}"
94                )));
95            }
96            deps.events.publish(&events).await?;
97        }
98        Ok(())
99    }
100
101    fn deps(&self) -> CommandDeps {
102        CommandDeps {
103            repo: Arc::clone(&self.repo),
104            projections: Arc::clone(&self.projections),
105            events: Arc::clone(&self.events),
106            uow: self.uow.clone(),
107            execution: self.execution.clone(),
108        }
109    }
110
111    fn query_deps(&self) -> QueryDeps {
112        QueryDeps {
113            projections: Arc::clone(&self.projections),
114        }
115    }
116
117    async fn ensure_journal_recovered(&self) -> Result<(), CamelError> {
118        let Some(uow) = &self.uow else {
119            return Ok(());
120        };
121
122        self.journal_recovered_once
123            .get_or_try_init(|| async {
124                uow.recover_from_journal().await?;
125                Ok::<(), CamelError>(())
126            })
127            .await?;
128        Ok(())
129    }
130}
131
132#[async_trait]
133impl RuntimeCommandBus for RuntimeBus {
134    async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
135        self.ensure_journal_recovered().await?;
136        let command_id = cmd.command_id().to_string();
137        if !self.dedup.first_seen(&command_id).await? {
138            return Ok(RuntimeCommandResult::Duplicate { command_id });
139        }
140        let deps = self.deps();
141        match execute_command(&deps, cmd).await {
142            Ok(result) => Ok(result),
143            Err(err) => {
144                let _ = self.dedup.forget_seen(&command_id).await;
145                Err(err)
146            }
147        }
148    }
149}
150
151#[async_trait]
152impl RuntimeQueryBus for RuntimeBus {
153    async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
154        self.ensure_journal_recovered().await?;
155
156        match query {
157            RuntimeQuery::InFlightCount { route_id } => {
158                if let Some(execution) = &self.execution {
159                    execution
160                        .in_flight_count(&route_id)
161                        .await
162                        .map_err(Into::into)
163                } else {
164                    Ok(RuntimeQueryResult::RouteNotFound { route_id })
165                }
166            }
167            other => {
168                let deps = self.query_deps();
169                execute_query(&deps, other).await
170            }
171        }
172    }
173}
174
175#[async_trait]
176impl RouteRegistrationPort for RuntimeBus {
177    async fn register_route(&self, def: RouteDefinition) -> Result<(), CamelError> {
178        self.ensure_journal_recovered().await?;
179        let deps = self.deps();
180        handle_register_internal(&deps, def).await.map(|_| ())
181    }
182}
183
184#[cfg(test)]
185mod tests {
186    use crate::lifecycle::domain::DomainError;
187
188    use super::*;
189    use std::collections::{HashMap, HashSet};
190    use std::sync::Mutex;
191
192    use crate::lifecycle::application::route_definition::RouteDefinition;
193    use crate::lifecycle::domain::{RouteRuntimeAggregate, RuntimeEvent};
194    use crate::lifecycle::ports::RouteRegistrationPort as InternalRuntimeCommandBus;
195    use crate::lifecycle::ports::RouteStatusProjection;
196
197    #[derive(Clone, Default)]
198    struct InMemoryTestRepo {
199        routes: Arc<Mutex<HashMap<String, RouteRuntimeAggregate>>>,
200    }
201
202    #[async_trait]
203    impl RouteRepositoryPort for InMemoryTestRepo {
204        async fn load(&self, route_id: &str) -> Result<Option<RouteRuntimeAggregate>, DomainError> {
205            Ok(self
206                .routes
207                .lock()
208                .expect("lock test routes")
209                .get(route_id)
210                .cloned())
211        }
212
213        async fn save(&self, aggregate: RouteRuntimeAggregate) -> Result<(), DomainError> {
214            self.routes
215                .lock()
216                .expect("lock test routes")
217                .insert(aggregate.route_id().to_string(), aggregate);
218            Ok(())
219        }
220
221        async fn save_if_version(
222            &self,
223            aggregate: RouteRuntimeAggregate,
224            expected_version: u64,
225        ) -> Result<(), DomainError> {
226            let route_id = aggregate.route_id().to_string();
227            let mut routes = self.routes.lock().expect("lock test routes");
228            let current = routes.get(&route_id).ok_or_else(|| {
229                DomainError::InvalidState(format!(
230                    "optimistic lock conflict for route '{route_id}': route not found"
231                ))
232            })?;
233
234            if current.version() != expected_version {
235                return Err(DomainError::InvalidState(format!(
236                    "optimistic lock conflict for route '{route_id}': expected version {expected_version}, actual {}",
237                    current.version()
238                )));
239            }
240
241            routes.insert(route_id, aggregate);
242            Ok(())
243        }
244
245        async fn delete(&self, route_id: &str) -> Result<(), DomainError> {
246            self.routes
247                .lock()
248                .expect("lock test routes")
249                .remove(route_id);
250            Ok(())
251        }
252    }
253
254    #[derive(Clone, Default)]
255    struct InMemoryTestProjectionStore {
256        statuses: Arc<Mutex<HashMap<String, RouteStatusProjection>>>,
257    }
258
259    #[async_trait]
260    impl ProjectionStorePort for InMemoryTestProjectionStore {
261        async fn upsert_status(&self, status: RouteStatusProjection) -> Result<(), DomainError> {
262            self.statuses
263                .lock()
264                .expect("lock test statuses")
265                .insert(status.route_id.clone(), status);
266            Ok(())
267        }
268
269        async fn get_status(
270            &self,
271            route_id: &str,
272        ) -> Result<Option<RouteStatusProjection>, DomainError> {
273            Ok(self
274                .statuses
275                .lock()
276                .expect("lock test statuses")
277                .get(route_id)
278                .cloned())
279        }
280
281        async fn list_statuses(&self) -> Result<Vec<RouteStatusProjection>, DomainError> {
282            Ok(self
283                .statuses
284                .lock()
285                .expect("lock test statuses")
286                .values()
287                .cloned()
288                .collect())
289        }
290
291        async fn remove_status(&self, route_id: &str) -> Result<(), DomainError> {
292            self.statuses
293                .lock()
294                .expect("lock test statuses")
295                .remove(route_id);
296            Ok(())
297        }
298    }
299
300    #[derive(Clone, Default)]
301    struct InMemoryTestEventPublisher;
302
303    #[async_trait]
304    impl EventPublisherPort for InMemoryTestEventPublisher {
305        async fn publish(&self, _events: &[RuntimeEvent]) -> Result<(), DomainError> {
306            Ok(())
307        }
308    }
309
310    #[derive(Clone, Default)]
311    struct InMemoryTestDedup {
312        seen: Arc<Mutex<HashSet<String>>>,
313    }
314
315    #[derive(Clone, Default)]
316    struct InspectableDedup {
317        seen: Arc<Mutex<HashSet<String>>>,
318        forget_calls: Arc<Mutex<u32>>,
319    }
320
321    #[async_trait]
322    impl CommandDedupPort for InMemoryTestDedup {
323        async fn first_seen(&self, command_id: &str) -> Result<bool, DomainError> {
324            let mut seen = self.seen.lock().expect("lock dedup set");
325            Ok(seen.insert(command_id.to_string()))
326        }
327
328        async fn forget_seen(&self, command_id: &str) -> Result<(), DomainError> {
329            self.seen.lock().expect("lock dedup set").remove(command_id);
330            Ok(())
331        }
332    }
333
334    #[async_trait]
335    impl CommandDedupPort for InspectableDedup {
336        async fn first_seen(&self, command_id: &str) -> Result<bool, DomainError> {
337            let mut seen = self.seen.lock().expect("lock dedup set");
338            Ok(seen.insert(command_id.to_string()))
339        }
340
341        async fn forget_seen(&self, command_id: &str) -> Result<(), DomainError> {
342            self.seen.lock().expect("lock dedup set").remove(command_id);
343            let mut calls = self.forget_calls.lock().expect("forget calls");
344            *calls += 1;
345            Ok(())
346        }
347    }
348
349    fn build_test_runtime_bus() -> RuntimeBus {
350        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
351        let projections: Arc<dyn ProjectionStorePort> =
352            Arc::new(InMemoryTestProjectionStore::default());
353        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
354        let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
355        RuntimeBus::new(repo, projections, events, dedup)
356    }
357
358    #[derive(Default)]
359    struct CountingUow {
360        recover_calls: Arc<Mutex<u32>>,
361    }
362
363    #[derive(Default)]
364    struct FailingRecoverUow;
365
366    #[async_trait]
367    impl RuntimeUnitOfWorkPort for CountingUow {
368        async fn persist_upsert(
369            &self,
370            _aggregate: RouteRuntimeAggregate,
371            _expected_version: Option<u64>,
372            _projection: RouteStatusProjection,
373            _events: &[RuntimeEvent],
374        ) -> Result<(), DomainError> {
375            Ok(())
376        }
377
378        async fn persist_delete(
379            &self,
380            _route_id: &str,
381            _events: &[RuntimeEvent],
382        ) -> Result<(), DomainError> {
383            Ok(())
384        }
385
386        async fn recover_from_journal(&self) -> Result<(), DomainError> {
387            let mut calls = self.recover_calls.lock().expect("recover_calls");
388            *calls += 1;
389            Ok(())
390        }
391    }
392
393    #[async_trait]
394    impl RuntimeUnitOfWorkPort for FailingRecoverUow {
395        async fn persist_upsert(
396            &self,
397            _aggregate: RouteRuntimeAggregate,
398            _expected_version: Option<u64>,
399            _projection: RouteStatusProjection,
400            _events: &[RuntimeEvent],
401        ) -> Result<(), DomainError> {
402            Ok(())
403        }
404
405        async fn persist_delete(
406            &self,
407            _route_id: &str,
408            _events: &[RuntimeEvent],
409        ) -> Result<(), DomainError> {
410            Ok(())
411        }
412
413        async fn recover_from_journal(&self) -> Result<(), DomainError> {
414            Err(DomainError::InvalidState("recover failed".into()))
415        }
416    }
417
418    #[derive(Default)]
419    struct InFlightExecutionPort;
420
421    #[async_trait]
422    impl RuntimeExecutionPort for InFlightExecutionPort {
423        async fn register_route(&self, _definition: RouteDefinition) -> Result<(), DomainError> {
424            Ok(())
425        }
426        async fn start_route(&self, _route_id: &str) -> Result<(), DomainError> {
427            Ok(())
428        }
429        async fn stop_route(&self, _route_id: &str) -> Result<(), DomainError> {
430            Ok(())
431        }
432        async fn suspend_route(&self, _route_id: &str) -> Result<(), DomainError> {
433            Ok(())
434        }
435        async fn resume_route(&self, _route_id: &str) -> Result<(), DomainError> {
436            Ok(())
437        }
438        async fn reload_route(&self, _route_id: &str) -> Result<(), DomainError> {
439            Ok(())
440        }
441        async fn remove_route(&self, _route_id: &str) -> Result<(), DomainError> {
442            Ok(())
443        }
444        async fn in_flight_count(&self, route_id: &str) -> Result<RuntimeQueryResult, DomainError> {
445            if route_id == "known" {
446                Ok(RuntimeQueryResult::InFlightCount {
447                    route_id: route_id.to_string(),
448                    count: 3,
449                })
450            } else {
451                Ok(RuntimeQueryResult::RouteNotFound {
452                    route_id: route_id.to_string(),
453                })
454            }
455        }
456    }
457
458    #[tokio::test]
459    async fn runtime_bus_implements_internal_command_bus() {
460        let bus = build_test_runtime_bus();
461        let def = RouteDefinition::new("timer:test", vec![]).with_route_id("internal-route");
462        let result = InternalRuntimeCommandBus::register_route(&bus, def).await;
463        assert!(
464            result.is_ok(),
465            "internal bus registration failed: {:?}",
466            result
467        );
468
469        let status = bus
470            .ask(RuntimeQuery::GetRouteStatus {
471                route_id: "internal-route".to_string(),
472            })
473            .await
474            .unwrap();
475        match status {
476            RuntimeQueryResult::RouteStatus { status, .. } => {
477                assert_eq!(status, "Registered");
478            }
479            _ => panic!("unexpected query result"),
480        }
481    }
482
483    #[tokio::test]
484    async fn execute_returns_duplicate_for_replayed_command_id() {
485        use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
486
487        let bus = build_test_runtime_bus();
488
489        let mut spec = CanonicalRouteSpec::new("dup-route", "timer:tick");
490        spec.steps = vec![CanonicalStepSpec::Stop];
491
492        let cmd = RuntimeCommand::RegisterRoute {
493            spec: spec.clone(),
494            command_id: "dup-cmd".into(),
495            causation_id: None,
496        };
497        let first = bus.execute(cmd).await.unwrap();
498        assert!(matches!(
499            first,
500            RuntimeCommandResult::RouteRegistered { route_id } if route_id == "dup-route"
501        ));
502
503        let second = bus
504            .execute(RuntimeCommand::RegisterRoute {
505                spec,
506                command_id: "dup-cmd".into(),
507                causation_id: None,
508            })
509            .await
510            .unwrap();
511        assert!(matches!(
512            second,
513            RuntimeCommandResult::Duplicate { command_id } if command_id == "dup-cmd"
514        ));
515    }
516
517    #[tokio::test]
518    async fn ask_in_flight_count_without_execution_returns_route_not_found() {
519        let bus = build_test_runtime_bus();
520        let res = bus
521            .ask(RuntimeQuery::InFlightCount {
522                route_id: "missing".into(),
523            })
524            .await
525            .unwrap();
526        assert!(matches!(
527            res,
528            RuntimeQueryResult::RouteNotFound { route_id } if route_id == "missing"
529        ));
530    }
531
532    #[tokio::test]
533    async fn ask_in_flight_count_with_execution_delegates_to_adapter() {
534        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
535        let projections: Arc<dyn ProjectionStorePort> =
536            Arc::new(InMemoryTestProjectionStore::default());
537        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
538        let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
539        let execution: Arc<dyn RuntimeExecutionPort> = Arc::new(InFlightExecutionPort);
540        let bus = RuntimeBus::new(repo, projections, events, dedup).with_execution(execution);
541
542        let known = bus
543            .ask(RuntimeQuery::InFlightCount {
544                route_id: "known".into(),
545            })
546            .await
547            .unwrap();
548        assert!(matches!(
549            known,
550            RuntimeQueryResult::InFlightCount { route_id, count }
551            if route_id == "known" && count == 3
552        ));
553    }
554
555    #[tokio::test]
556    async fn journal_recovery_runs_once_even_with_multiple_commands() {
557        use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
558
559        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
560        let projections: Arc<dyn ProjectionStorePort> =
561            Arc::new(InMemoryTestProjectionStore::default());
562        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
563        let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
564        let uow = Arc::new(CountingUow::default());
565        let bus = RuntimeBus::new(repo, projections, events, dedup).with_uow(uow.clone());
566
567        let mut spec_a = CanonicalRouteSpec::new("a", "timer:a");
568        spec_a.steps = vec![CanonicalStepSpec::Stop];
569        let mut spec_b = CanonicalRouteSpec::new("b", "timer:b");
570        spec_b.steps = vec![CanonicalStepSpec::Stop];
571
572        bus.execute(RuntimeCommand::RegisterRoute {
573            spec: spec_a,
574            command_id: "c-a".into(),
575            causation_id: None,
576        })
577        .await
578        .unwrap();
579
580        bus.execute(RuntimeCommand::RegisterRoute {
581            spec: spec_b,
582            command_id: "c-b".into(),
583            causation_id: None,
584        })
585        .await
586        .unwrap();
587
588        let calls = *uow.recover_calls.lock().expect("recover calls");
589        assert_eq!(calls, 1, "journal recovery should run once");
590    }
591
592    #[tokio::test]
593    async fn execute_on_command_error_forgets_dedup_marker() {
594        use camel_api::runtime::{CanonicalRouteSpec, RuntimeCommand};
595
596        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
597        let projections: Arc<dyn ProjectionStorePort> =
598            Arc::new(InMemoryTestProjectionStore::default());
599        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
600        let dedup = Arc::new(InspectableDedup::default());
601        let dedup_port: Arc<dyn CommandDedupPort> = dedup.clone();
602
603        let bus = RuntimeBus::new(repo, projections, events, dedup_port);
604
605        // Invalid canonical contract: empty route_id -> execute_command should fail.
606        let cmd = RuntimeCommand::RegisterRoute {
607            spec: CanonicalRouteSpec::new("", "timer:tick"),
608            command_id: "err-cmd".into(),
609            causation_id: None,
610        };
611
612        let err = bus.execute(cmd).await.expect_err("must fail");
613        assert!(err.to_string().contains("route_id cannot be empty"));
614
615        assert_eq!(*dedup.forget_calls.lock().expect("forget calls"), 1);
616        assert!(!dedup.seen.lock().expect("seen").contains("err-cmd"));
617    }
618
619    #[tokio::test]
620    async fn execute_propagates_recover_error_from_uow() {
621        use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
622
623        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
624        let projections: Arc<dyn ProjectionStorePort> =
625            Arc::new(InMemoryTestProjectionStore::default());
626        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
627        let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
628        let uow: Arc<dyn RuntimeUnitOfWorkPort> = Arc::new(FailingRecoverUow);
629
630        let bus = RuntimeBus::new(repo, projections, events, dedup).with_uow(uow);
631
632        let mut spec = CanonicalRouteSpec::new("x", "timer:x");
633        spec.steps = vec![CanonicalStepSpec::Stop];
634        let err = bus
635            .execute(RuntimeCommand::RegisterRoute {
636                spec,
637                command_id: "recover-err".into(),
638                causation_id: None,
639            })
640            .await
641            .expect_err("recover should fail");
642
643        assert!(err.to_string().contains("recover failed"));
644    }
645
646    #[tokio::test]
647    async fn ask_in_flight_count_with_execution_handles_unknown_route() {
648        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
649        let projections: Arc<dyn ProjectionStorePort> =
650            Arc::new(InMemoryTestProjectionStore::default());
651        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
652        let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
653        let execution: Arc<dyn RuntimeExecutionPort> = Arc::new(InFlightExecutionPort);
654        let bus = RuntimeBus::new(repo, projections, events, dedup).with_execution(execution);
655
656        let unknown = bus
657            .ask(RuntimeQuery::InFlightCount {
658                route_id: "unknown".into(),
659            })
660            .await
661            .unwrap();
662        assert!(matches!(
663            unknown,
664            RuntimeQueryResult::RouteNotFound { route_id } if route_id == "unknown"
665        ));
666    }
667}