Skip to main content

camel_core/lifecycle/application/
runtime_bus.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use tokio::sync::OnceCell;
5
6use camel_api::{
7    CamelError, RuntimeCommand, RuntimeCommandBus, RuntimeCommandResult, RuntimeQuery,
8    RuntimeQueryBus, RuntimeQueryResult,
9};
10
11use crate::health_registry::HealthCheckRegistry;
12use crate::lifecycle::application::commands::{
13    CommandDeps, execute_command, handle_register_internal,
14};
15use crate::lifecycle::application::queries::{QueryDeps, execute_query};
16use crate::lifecycle::application::route_definition::RouteDefinition;
17use crate::lifecycle::domain::DomainError;
18use crate::lifecycle::ports::RouteRegistrationPort;
19use crate::lifecycle::ports::{
20    CommandDedupPort, EventPublisherPort, InFlightCountResult, ProjectionStorePort,
21    RouteRepositoryPort, RuntimeExecutionPort, RuntimeUnitOfWorkPort,
22};
23
24impl From<InFlightCountResult> for RuntimeQueryResult {
25    fn from(r: InFlightCountResult) -> Self {
26        match r {
27            InFlightCountResult::InFlightCount { route_id, count } => {
28                RuntimeQueryResult::InFlightCount { route_id, count }
29            }
30            InFlightCountResult::RouteNotFound { route_id } => {
31                RuntimeQueryResult::RouteNotFound { route_id }
32            }
33        }
34    }
35}
36
37pub struct RuntimeBus {
38    repo: Arc<dyn RouteRepositoryPort>,
39    projections: Arc<dyn ProjectionStorePort>,
40    events: Arc<dyn EventPublisherPort>,
41    dedup: Arc<dyn CommandDedupPort>,
42    uow: Option<Arc<dyn RuntimeUnitOfWorkPort>>,
43    execution: Option<Arc<dyn RuntimeExecutionPort>>,
44    health_registry: Option<Arc<HealthCheckRegistry>>,
45    journal_recovered_once: OnceCell<()>,
46}
47
48impl RuntimeBus {
49    pub fn new(
50        repo: Arc<dyn RouteRepositoryPort>,
51        projections: Arc<dyn ProjectionStorePort>,
52        events: Arc<dyn EventPublisherPort>,
53        dedup: Arc<dyn CommandDedupPort>,
54    ) -> Self {
55        Self {
56            repo,
57            projections,
58            events,
59            dedup,
60            uow: None,
61            execution: None,
62            health_registry: None,
63            journal_recovered_once: OnceCell::new(),
64        }
65    }
66
67    pub fn with_uow(mut self, uow: Arc<dyn RuntimeUnitOfWorkPort>) -> Self {
68        self.uow = Some(uow);
69        self
70    }
71
72    pub fn with_execution(mut self, execution: Arc<dyn RuntimeExecutionPort>) -> Self {
73        self.execution = Some(execution);
74        self
75    }
76
77    pub fn with_health_registry(mut self, health_registry: Arc<HealthCheckRegistry>) -> Self {
78        self.health_registry = Some(health_registry);
79        self
80    }
81
82    pub fn repo(&self) -> &Arc<dyn RouteRepositoryPort> {
83        &self.repo
84    }
85
86    pub(crate) async fn register_aggregate_only(&self, route_id: String) -> Result<(), CamelError> {
87        self.ensure_journal_recovered().await?;
88        let deps = self.deps();
89        if deps.repo.load(&route_id).await?.is_some() {
90            return Err(CamelError::RouteError(format!(
91                "route '{route_id}' already registered"
92            )));
93        }
94        let (aggregate, events) =
95            crate::lifecycle::domain::RouteRuntimeAggregate::register(route_id.clone());
96        if let Some(uow) = &deps.uow {
97            uow.persist_upsert(
98                aggregate.clone(),
99                None,
100                crate::lifecycle::application::commands::project_from_aggregate(&aggregate),
101                &events,
102            )
103            .await?;
104        } else {
105            deps.repo.save(aggregate.clone()).await?;
106            if let Some(primary_error) =
107                crate::lifecycle::application::commands::upsert_projection_with_reconciliation(
108                    &*deps.projections,
109                    crate::lifecycle::application::commands::project_from_aggregate(&aggregate),
110                )
111                .await?
112            {
113                deps.events.publish(&events).await?;
114                return Err(CamelError::RouteError(format!(
115                    "post-effect reconciliation recovered after runtime persistence error: {primary_error}"
116                )));
117            }
118            deps.events.publish(&events).await?;
119        }
120        Ok(())
121    }
122
123    fn deps(&self) -> CommandDeps {
124        CommandDeps {
125            repo: Arc::clone(&self.repo),
126            projections: Arc::clone(&self.projections),
127            events: Arc::clone(&self.events),
128            uow: self.uow.clone(),
129            execution: self.execution.clone(),
130            health_registry: self.health_registry.clone(),
131        }
132    }
133
134    fn query_deps(&self) -> QueryDeps {
135        QueryDeps {
136            projections: Arc::clone(&self.projections),
137        }
138    }
139
140    async fn ensure_journal_recovered(&self) -> Result<(), CamelError> {
141        let Some(uow) = &self.uow else {
142            return Ok(());
143        };
144
145        self.journal_recovered_once
146            .get_or_try_init(|| async {
147                uow.recover_from_journal().await?;
148                Ok::<(), CamelError>(())
149            })
150            .await?;
151        Ok(())
152    }
153}
154
155#[async_trait]
156impl RuntimeCommandBus for RuntimeBus {
157    async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
158        self.ensure_journal_recovered().await?;
159        let command_id = cmd.command_id().to_string();
160        if !self.dedup.first_seen(&command_id).await? {
161            return Ok(RuntimeCommandResult::Duplicate { command_id });
162        }
163        let deps = self.deps();
164        match execute_command(&deps, cmd).await {
165            Ok(result) => Ok(result),
166            Err(err) => {
167                let _ = self.dedup.forget_seen(&command_id).await;
168                Err(err)
169            }
170        }
171    }
172}
173
174#[async_trait]
175impl RuntimeQueryBus for RuntimeBus {
176    async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
177        self.ensure_journal_recovered().await?;
178
179        match query {
180            RuntimeQuery::InFlightCount { route_id } => {
181                if let Some(execution) = &self.execution {
182                    execution
183                        .in_flight_count(&route_id)
184                        .await
185                        .map(|r| r.into())
186                        .map_err(Into::into)
187                } else {
188                    Ok(RuntimeQueryResult::RouteNotFound { route_id })
189                }
190            }
191            other => {
192                let deps = self.query_deps();
193                execute_query(&deps, other).await
194            }
195        }
196    }
197}
198
199#[async_trait]
200impl RouteRegistrationPort for RuntimeBus {
201    async fn register_route(&self, def: RouteDefinition) -> Result<(), DomainError> {
202        self.ensure_journal_recovered()
203            .await
204            .map_err(|e| DomainError::InvalidState(e.to_string()))?;
205        let deps = self.deps();
206        handle_register_internal(&deps, def)
207            .await
208            .map(|_| ())
209            .map_err(|e| match e {
210                CamelError::RouteError(msg) => DomainError::InvalidState(msg),
211                other => DomainError::InvalidState(other.to_string()),
212            })
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use crate::lifecycle::domain::DomainError;
219
220    use super::*;
221    use std::collections::{HashMap, HashSet};
222    use std::sync::Mutex;
223
224    use crate::lifecycle::application::route_definition::RouteDefinition;
225    use crate::lifecycle::domain::{RouteRuntimeAggregate, RuntimeEvent};
226    use crate::lifecycle::ports::RouteRegistrationPort as InternalRuntimeCommandBus;
227    use crate::lifecycle::ports::RouteStatusProjection;
228
229    #[derive(Clone, Default)]
230    struct InMemoryTestRepo {
231        routes: Arc<Mutex<HashMap<String, RouteRuntimeAggregate>>>,
232    }
233
234    #[async_trait]
235    impl RouteRepositoryPort for InMemoryTestRepo {
236        async fn load(&self, route_id: &str) -> Result<Option<RouteRuntimeAggregate>, DomainError> {
237            Ok(self
238                .routes
239                .lock()
240                .expect("lock test routes")
241                .get(route_id)
242                .cloned())
243        }
244
245        async fn save(&self, aggregate: RouteRuntimeAggregate) -> Result<(), DomainError> {
246            self.routes
247                .lock()
248                .expect("lock test routes")
249                .insert(aggregate.route_id().to_string(), aggregate);
250            Ok(())
251        }
252
253        async fn save_if_version(
254            &self,
255            aggregate: RouteRuntimeAggregate,
256            expected_version: u64,
257        ) -> Result<(), DomainError> {
258            let route_id = aggregate.route_id().to_string();
259            let mut routes = self.routes.lock().expect("lock test routes");
260            let current = routes.get(&route_id).ok_or_else(|| {
261                DomainError::InvalidState(format!(
262                    "optimistic lock conflict for route '{route_id}': route not found"
263                ))
264            })?;
265
266            if current.version() != expected_version {
267                return Err(DomainError::InvalidState(format!(
268                    "optimistic lock conflict for route '{route_id}': expected version {expected_version}, actual {}",
269                    current.version()
270                )));
271            }
272
273            routes.insert(route_id, aggregate);
274            Ok(())
275        }
276
277        async fn delete(&self, route_id: &str) -> Result<(), DomainError> {
278            self.routes
279                .lock()
280                .expect("lock test routes")
281                .remove(route_id);
282            Ok(())
283        }
284    }
285
286    #[derive(Clone, Default)]
287    struct InMemoryTestProjectionStore {
288        statuses: Arc<Mutex<HashMap<String, RouteStatusProjection>>>,
289    }
290
291    #[async_trait]
292    impl ProjectionStorePort for InMemoryTestProjectionStore {
293        async fn upsert_status(&self, status: RouteStatusProjection) -> Result<(), DomainError> {
294            self.statuses
295                .lock()
296                .expect("lock test statuses")
297                .insert(status.route_id.clone(), status);
298            Ok(())
299        }
300
301        async fn get_status(
302            &self,
303            route_id: &str,
304        ) -> Result<Option<RouteStatusProjection>, DomainError> {
305            Ok(self
306                .statuses
307                .lock()
308                .expect("lock test statuses")
309                .get(route_id)
310                .cloned())
311        }
312
313        async fn list_statuses(&self) -> Result<Vec<RouteStatusProjection>, DomainError> {
314            Ok(self
315                .statuses
316                .lock()
317                .expect("lock test statuses")
318                .values()
319                .cloned()
320                .collect())
321        }
322
323        async fn remove_status(&self, route_id: &str) -> Result<(), DomainError> {
324            self.statuses
325                .lock()
326                .expect("lock test statuses")
327                .remove(route_id);
328            Ok(())
329        }
330    }
331
332    #[derive(Clone, Default)]
333    struct InMemoryTestEventPublisher;
334
335    #[async_trait]
336    impl EventPublisherPort for InMemoryTestEventPublisher {
337        async fn publish(&self, _events: &[RuntimeEvent]) -> Result<(), DomainError> {
338            Ok(())
339        }
340    }
341
342    #[derive(Clone, Default)]
343    struct InMemoryTestDedup {
344        seen: Arc<Mutex<HashSet<String>>>,
345    }
346
347    #[derive(Clone, Default)]
348    struct InspectableDedup {
349        seen: Arc<Mutex<HashSet<String>>>,
350        forget_calls: Arc<Mutex<u32>>,
351    }
352
353    #[async_trait]
354    impl CommandDedupPort for InMemoryTestDedup {
355        async fn first_seen(&self, command_id: &str) -> Result<bool, DomainError> {
356            let mut seen = self.seen.lock().expect("lock dedup set");
357            Ok(seen.insert(command_id.to_string()))
358        }
359
360        async fn forget_seen(&self, command_id: &str) -> Result<(), DomainError> {
361            self.seen.lock().expect("lock dedup set").remove(command_id);
362            Ok(())
363        }
364    }
365
366    #[async_trait]
367    impl CommandDedupPort for InspectableDedup {
368        async fn first_seen(&self, command_id: &str) -> Result<bool, DomainError> {
369            let mut seen = self.seen.lock().expect("lock dedup set");
370            Ok(seen.insert(command_id.to_string()))
371        }
372
373        async fn forget_seen(&self, command_id: &str) -> Result<(), DomainError> {
374            self.seen.lock().expect("lock dedup set").remove(command_id);
375            let mut calls = self.forget_calls.lock().expect("forget calls");
376            *calls += 1;
377            Ok(())
378        }
379    }
380
381    fn build_test_runtime_bus() -> RuntimeBus {
382        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
383        let projections: Arc<dyn ProjectionStorePort> =
384            Arc::new(InMemoryTestProjectionStore::default());
385        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
386        let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
387        RuntimeBus::new(repo, projections, events, dedup)
388    }
389
390    #[derive(Default)]
391    struct CountingUow {
392        recover_calls: Arc<Mutex<u32>>,
393    }
394
395    #[derive(Default)]
396    struct FailingRecoverUow;
397
398    #[async_trait]
399    impl RuntimeUnitOfWorkPort for CountingUow {
400        async fn persist_upsert(
401            &self,
402            _aggregate: RouteRuntimeAggregate,
403            _expected_version: Option<u64>,
404            _projection: RouteStatusProjection,
405            _events: &[RuntimeEvent],
406        ) -> Result<(), DomainError> {
407            Ok(())
408        }
409
410        async fn persist_delete(
411            &self,
412            _route_id: &str,
413            _events: &[RuntimeEvent],
414        ) -> Result<(), DomainError> {
415            Ok(())
416        }
417
418        async fn recover_from_journal(&self) -> Result<(), DomainError> {
419            let mut calls = self.recover_calls.lock().expect("recover_calls");
420            *calls += 1;
421            Ok(())
422        }
423    }
424
425    #[async_trait]
426    impl RuntimeUnitOfWorkPort for FailingRecoverUow {
427        async fn persist_upsert(
428            &self,
429            _aggregate: RouteRuntimeAggregate,
430            _expected_version: Option<u64>,
431            _projection: RouteStatusProjection,
432            _events: &[RuntimeEvent],
433        ) -> Result<(), DomainError> {
434            Ok(())
435        }
436
437        async fn persist_delete(
438            &self,
439            _route_id: &str,
440            _events: &[RuntimeEvent],
441        ) -> Result<(), DomainError> {
442            Ok(())
443        }
444
445        async fn recover_from_journal(&self) -> Result<(), DomainError> {
446            Err(DomainError::InvalidState("recover failed".into()))
447        }
448    }
449
450    #[derive(Default)]
451    struct InFlightExecutionPort;
452
453    #[async_trait]
454    impl RuntimeExecutionPort for InFlightExecutionPort {
455        async fn register_route(&self, _definition: RouteDefinition) -> Result<(), DomainError> {
456            Ok(())
457        }
458        async fn start_route(&self, _route_id: &str) -> Result<(), DomainError> {
459            Ok(())
460        }
461        async fn stop_route(&self, _route_id: &str) -> Result<(), DomainError> {
462            Ok(())
463        }
464        async fn suspend_route(&self, _route_id: &str) -> Result<(), DomainError> {
465            Ok(())
466        }
467        async fn resume_route(&self, _route_id: &str) -> Result<(), DomainError> {
468            Ok(())
469        }
470        async fn reload_route(&self, _route_id: &str) -> Result<(), DomainError> {
471            Ok(())
472        }
473        async fn remove_route(&self, _route_id: &str) -> Result<(), DomainError> {
474            Ok(())
475        }
476        async fn in_flight_count(
477            &self,
478            route_id: &str,
479        ) -> Result<InFlightCountResult, DomainError> {
480            if route_id == "known" {
481                Ok(InFlightCountResult::InFlightCount {
482                    route_id: route_id.to_string(),
483                    count: 3,
484                })
485            } else {
486                Ok(InFlightCountResult::RouteNotFound {
487                    route_id: route_id.to_string(),
488                })
489            }
490        }
491    }
492
493    #[tokio::test]
494    async fn runtime_bus_implements_internal_command_bus() {
495        let bus = build_test_runtime_bus();
496        let def = RouteDefinition::new("timer:test", vec![]).with_route_id("internal-route");
497        let result = InternalRuntimeCommandBus::register_route(&bus, def).await;
498        assert!(
499            result.is_ok(),
500            "internal bus registration failed: {:?}",
501            result
502        );
503
504        let status = bus
505            .ask(RuntimeQuery::GetRouteStatus {
506                route_id: "internal-route".to_string(),
507            })
508            .await
509            .unwrap();
510        match status {
511            RuntimeQueryResult::RouteStatus { status, .. } => {
512                assert_eq!(status, "Registered");
513            }
514            _ => panic!("unexpected query result"),
515        }
516    }
517
518    #[tokio::test]
519    async fn execute_returns_duplicate_for_replayed_command_id() {
520        use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
521
522        let bus = build_test_runtime_bus();
523
524        let mut spec = CanonicalRouteSpec::new("dup-route", "timer:tick");
525        spec.steps = vec![CanonicalStepSpec::Stop];
526
527        let cmd = RuntimeCommand::RegisterRoute {
528            spec: spec.clone(),
529            command_id: "dup-cmd".into(),
530            causation_id: None,
531        };
532        let first = bus.execute(cmd).await.unwrap();
533        assert!(matches!(
534            first,
535            RuntimeCommandResult::RouteRegistered { route_id } if route_id == "dup-route"
536        ));
537
538        let second = bus
539            .execute(RuntimeCommand::RegisterRoute {
540                spec,
541                command_id: "dup-cmd".into(),
542                causation_id: None,
543            })
544            .await
545            .unwrap();
546        assert!(matches!(
547            second,
548            RuntimeCommandResult::Duplicate { command_id } if command_id == "dup-cmd"
549        ));
550    }
551
552    #[tokio::test]
553    async fn ask_in_flight_count_without_execution_returns_route_not_found() {
554        let bus = build_test_runtime_bus();
555        let res = bus
556            .ask(RuntimeQuery::InFlightCount {
557                route_id: "missing".into(),
558            })
559            .await
560            .unwrap();
561        assert!(matches!(
562            res,
563            RuntimeQueryResult::RouteNotFound { route_id } if route_id == "missing"
564        ));
565    }
566
567    #[tokio::test]
568    async fn ask_in_flight_count_with_execution_delegates_to_adapter() {
569        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
570        let projections: Arc<dyn ProjectionStorePort> =
571            Arc::new(InMemoryTestProjectionStore::default());
572        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
573        let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
574        let execution: Arc<dyn RuntimeExecutionPort> = Arc::new(InFlightExecutionPort);
575        let bus = RuntimeBus::new(repo, projections, events, dedup).with_execution(execution);
576
577        let known = bus
578            .ask(RuntimeQuery::InFlightCount {
579                route_id: "known".into(),
580            })
581            .await
582            .unwrap();
583        assert!(matches!(
584            known,
585            RuntimeQueryResult::InFlightCount { route_id, count }
586            if route_id == "known" && count == 3
587        ));
588    }
589
590    #[tokio::test]
591    async fn journal_recovery_runs_once_even_with_multiple_commands() {
592        use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
593
594        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
595        let projections: Arc<dyn ProjectionStorePort> =
596            Arc::new(InMemoryTestProjectionStore::default());
597        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
598        let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
599        let uow = Arc::new(CountingUow::default());
600        let bus = RuntimeBus::new(repo, projections, events, dedup).with_uow(uow.clone());
601
602        let mut spec_a = CanonicalRouteSpec::new("a", "timer:a");
603        spec_a.steps = vec![CanonicalStepSpec::Stop];
604        let mut spec_b = CanonicalRouteSpec::new("b", "timer:b");
605        spec_b.steps = vec![CanonicalStepSpec::Stop];
606
607        bus.execute(RuntimeCommand::RegisterRoute {
608            spec: spec_a,
609            command_id: "c-a".into(),
610            causation_id: None,
611        })
612        .await
613        .unwrap();
614
615        bus.execute(RuntimeCommand::RegisterRoute {
616            spec: spec_b,
617            command_id: "c-b".into(),
618            causation_id: None,
619        })
620        .await
621        .unwrap();
622
623        let calls = *uow.recover_calls.lock().expect("recover calls");
624        assert_eq!(calls, 1, "journal recovery should run once");
625    }
626
627    #[tokio::test]
628    async fn execute_on_command_error_forgets_dedup_marker() {
629        use camel_api::runtime::{CanonicalRouteSpec, RuntimeCommand};
630
631        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
632        let projections: Arc<dyn ProjectionStorePort> =
633            Arc::new(InMemoryTestProjectionStore::default());
634        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
635        let dedup = Arc::new(InspectableDedup::default());
636        let dedup_port: Arc<dyn CommandDedupPort> = dedup.clone();
637
638        let bus = RuntimeBus::new(repo, projections, events, dedup_port);
639
640        // Invalid canonical contract: empty route_id -> execute_command should fail.
641        let cmd = RuntimeCommand::RegisterRoute {
642            spec: CanonicalRouteSpec::new("", "timer:tick"),
643            command_id: "err-cmd".into(),
644            causation_id: None,
645        };
646
647        let err = bus.execute(cmd).await.expect_err("must fail");
648        assert!(err.to_string().contains("route_id cannot be empty"));
649
650        assert_eq!(*dedup.forget_calls.lock().expect("forget calls"), 1);
651        assert!(!dedup.seen.lock().expect("seen").contains("err-cmd"));
652    }
653
654    #[tokio::test]
655    async fn execute_propagates_recover_error_from_uow() {
656        use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
657
658        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
659        let projections: Arc<dyn ProjectionStorePort> =
660            Arc::new(InMemoryTestProjectionStore::default());
661        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
662        let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
663        let uow: Arc<dyn RuntimeUnitOfWorkPort> = Arc::new(FailingRecoverUow);
664
665        let bus = RuntimeBus::new(repo, projections, events, dedup).with_uow(uow);
666
667        let mut spec = CanonicalRouteSpec::new("x", "timer:x");
668        spec.steps = vec![CanonicalStepSpec::Stop];
669        let err = bus
670            .execute(RuntimeCommand::RegisterRoute {
671                spec,
672                command_id: "recover-err".into(),
673                causation_id: None,
674            })
675            .await
676            .expect_err("recover should fail");
677
678        assert!(err.to_string().contains("recover failed"));
679    }
680
681    #[tokio::test]
682    async fn ask_in_flight_count_with_execution_handles_unknown_route() {
683        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
684        let projections: Arc<dyn ProjectionStorePort> =
685            Arc::new(InMemoryTestProjectionStore::default());
686        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
687        let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
688        let execution: Arc<dyn RuntimeExecutionPort> = Arc::new(InFlightExecutionPort);
689        let bus = RuntimeBus::new(repo, projections, events, dedup).with_execution(execution);
690
691        let unknown = bus
692            .ask(RuntimeQuery::InFlightCount {
693                route_id: "unknown".into(),
694            })
695            .await
696            .unwrap();
697        assert!(matches!(
698            unknown,
699            RuntimeQueryResult::RouteNotFound { route_id } if route_id == "unknown"
700        ));
701    }
702}