1use std::sync::Arc;
2
3use async_trait::async_trait;
4use tokio::sync::OnceCell;
5
6use camel_api::{
7 CamelError, RuntimeCommand, RuntimeCommandBus, RuntimeCommandResult, RuntimeQuery,
8 RuntimeQueryBus, RuntimeQueryResult,
9};
10
11use crate::health_registry::HealthCheckRegistry;
12use crate::lifecycle::application::commands::{
13 CommandDeps, execute_command, handle_register_internal,
14};
15use crate::lifecycle::application::queries::{QueryDeps, execute_query};
16use crate::lifecycle::application::route_definition::RouteDefinition;
17use crate::lifecycle::domain::DomainError;
18use crate::lifecycle::ports::RouteRegistrationPort;
19use crate::lifecycle::ports::{
20 CommandDedupPort, EventPublisherPort, InFlightCountResult, ProjectionStorePort,
21 RouteRepositoryPort, RuntimeExecutionPort, RuntimeUnitOfWorkPort,
22};
23
24impl From<InFlightCountResult> for RuntimeQueryResult {
25 fn from(r: InFlightCountResult) -> Self {
26 match r {
27 InFlightCountResult::InFlightCount { route_id, count } => {
28 RuntimeQueryResult::InFlightCount { route_id, count }
29 }
30 InFlightCountResult::RouteNotFound { route_id } => {
31 RuntimeQueryResult::RouteNotFound { route_id }
32 }
33 }
34 }
35}
36
37pub struct RuntimeBus {
38 repo: Arc<dyn RouteRepositoryPort>,
39 projections: Arc<dyn ProjectionStorePort>,
40 events: Arc<dyn EventPublisherPort>,
41 dedup: Arc<dyn CommandDedupPort>,
42 uow: Option<Arc<dyn RuntimeUnitOfWorkPort>>,
43 execution: Option<Arc<dyn RuntimeExecutionPort>>,
44 health_registry: Option<Arc<HealthCheckRegistry>>,
45 journal_recovered_once: OnceCell<()>,
46}
47
48impl RuntimeBus {
49 pub fn new(
50 repo: Arc<dyn RouteRepositoryPort>,
51 projections: Arc<dyn ProjectionStorePort>,
52 events: Arc<dyn EventPublisherPort>,
53 dedup: Arc<dyn CommandDedupPort>,
54 ) -> Self {
55 Self {
56 repo,
57 projections,
58 events,
59 dedup,
60 uow: None,
61 execution: None,
62 health_registry: None,
63 journal_recovered_once: OnceCell::new(),
64 }
65 }
66
67 pub fn with_uow(mut self, uow: Arc<dyn RuntimeUnitOfWorkPort>) -> Self {
68 self.uow = Some(uow);
69 self
70 }
71
72 pub fn with_execution(mut self, execution: Arc<dyn RuntimeExecutionPort>) -> Self {
73 self.execution = Some(execution);
74 self
75 }
76
77 pub fn with_health_registry(mut self, health_registry: Arc<HealthCheckRegistry>) -> Self {
78 self.health_registry = Some(health_registry);
79 self
80 }
81
82 pub fn repo(&self) -> &Arc<dyn RouteRepositoryPort> {
83 &self.repo
84 }
85
86 pub(crate) async fn register_aggregate_only(&self, route_id: String) -> Result<(), CamelError> {
87 self.ensure_journal_recovered().await?;
88 let deps = self.deps();
89 if deps.repo.load(&route_id).await?.is_some() {
90 return Err(CamelError::RouteError(format!(
91 "route '{route_id}' already registered"
92 )));
93 }
94 let (aggregate, events) =
95 crate::lifecycle::domain::RouteRuntimeAggregate::register(route_id.clone());
96 if let Some(uow) = &deps.uow {
97 uow.persist_upsert(
98 aggregate.clone(),
99 None,
100 crate::lifecycle::application::commands::project_from_aggregate(&aggregate),
101 &events,
102 )
103 .await?;
104 } else {
105 deps.repo.save(aggregate.clone()).await?;
106 if let Some(primary_error) =
107 crate::lifecycle::application::commands::upsert_projection_with_reconciliation(
108 &*deps.projections,
109 crate::lifecycle::application::commands::project_from_aggregate(&aggregate),
110 )
111 .await?
112 {
113 deps.events.publish(&events).await?;
114 return Err(CamelError::RouteError(format!(
115 "post-effect reconciliation recovered after runtime persistence error: {primary_error}"
116 )));
117 }
118 deps.events.publish(&events).await?;
119 }
120 Ok(())
121 }
122
123 fn deps(&self) -> CommandDeps {
124 CommandDeps {
125 repo: Arc::clone(&self.repo),
126 projections: Arc::clone(&self.projections),
127 events: Arc::clone(&self.events),
128 uow: self.uow.clone(),
129 execution: self.execution.clone(),
130 health_registry: self.health_registry.clone(),
131 }
132 }
133
134 fn query_deps(&self) -> QueryDeps {
135 QueryDeps {
136 projections: Arc::clone(&self.projections),
137 }
138 }
139
140 async fn ensure_journal_recovered(&self) -> Result<(), CamelError> {
141 let Some(uow) = &self.uow else {
142 return Ok(());
143 };
144
145 self.journal_recovered_once
146 .get_or_try_init(|| async {
147 uow.recover_from_journal().await?;
148 Ok::<(), CamelError>(())
149 })
150 .await?;
151 Ok(())
152 }
153}
154
155#[async_trait]
156impl RuntimeCommandBus for RuntimeBus {
157 async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
158 self.ensure_journal_recovered().await?;
159 let command_id = cmd.command_id().to_string();
160 if !self.dedup.first_seen(&command_id).await? {
161 return Ok(RuntimeCommandResult::Duplicate { command_id });
162 }
163 let deps = self.deps();
164 match execute_command(&deps, cmd).await {
165 Ok(result) => Ok(result),
166 Err(err) => {
167 let _ = self.dedup.forget_seen(&command_id).await;
168 Err(err)
169 }
170 }
171 }
172}
173
174#[async_trait]
175impl RuntimeQueryBus for RuntimeBus {
176 async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
177 self.ensure_journal_recovered().await?;
178
179 match query {
180 RuntimeQuery::InFlightCount { route_id } => {
181 if let Some(execution) = &self.execution {
182 execution
183 .in_flight_count(&route_id)
184 .await
185 .map(|r| r.into())
186 .map_err(Into::into)
187 } else {
188 Ok(RuntimeQueryResult::RouteNotFound { route_id })
189 }
190 }
191 other => {
192 let deps = self.query_deps();
193 execute_query(&deps, other).await
194 }
195 }
196 }
197}
198
199#[async_trait]
200impl RouteRegistrationPort for RuntimeBus {
201 async fn register_route(&self, def: RouteDefinition) -> Result<(), DomainError> {
202 self.ensure_journal_recovered()
203 .await
204 .map_err(|e| DomainError::InvalidState(e.to_string()))?;
205 let deps = self.deps();
206 handle_register_internal(&deps, def)
207 .await
208 .map(|_| ())
209 .map_err(|e| match e {
210 CamelError::RouteError(msg) => DomainError::InvalidState(msg),
211 other => DomainError::InvalidState(other.to_string()),
212 })
213 }
214}
215
216#[cfg(test)]
217mod tests {
218 use crate::lifecycle::domain::DomainError;
219
220 use super::*;
221 use std::collections::{HashMap, HashSet};
222 use std::sync::Mutex;
223
224 use crate::lifecycle::application::route_definition::RouteDefinition;
225 use crate::lifecycle::domain::{RouteRuntimeAggregate, RuntimeEvent};
226 use crate::lifecycle::ports::RouteRegistrationPort as InternalRuntimeCommandBus;
227 use crate::lifecycle::ports::RouteStatusProjection;
228
229 #[derive(Clone, Default)]
230 struct InMemoryTestRepo {
231 routes: Arc<Mutex<HashMap<String, RouteRuntimeAggregate>>>,
232 }
233
234 #[async_trait]
235 impl RouteRepositoryPort for InMemoryTestRepo {
236 async fn load(&self, route_id: &str) -> Result<Option<RouteRuntimeAggregate>, DomainError> {
237 Ok(self
238 .routes
239 .lock()
240 .expect("lock test routes")
241 .get(route_id)
242 .cloned())
243 }
244
245 async fn save(&self, aggregate: RouteRuntimeAggregate) -> Result<(), DomainError> {
246 self.routes
247 .lock()
248 .expect("lock test routes")
249 .insert(aggregate.route_id().to_string(), aggregate);
250 Ok(())
251 }
252
253 async fn save_if_version(
254 &self,
255 aggregate: RouteRuntimeAggregate,
256 expected_version: u64,
257 ) -> Result<(), DomainError> {
258 let route_id = aggregate.route_id().to_string();
259 let mut routes = self.routes.lock().expect("lock test routes");
260 let current = routes.get(&route_id).ok_or_else(|| {
261 DomainError::InvalidState(format!(
262 "optimistic lock conflict for route '{route_id}': route not found"
263 ))
264 })?;
265
266 if current.version() != expected_version {
267 return Err(DomainError::InvalidState(format!(
268 "optimistic lock conflict for route '{route_id}': expected version {expected_version}, actual {}",
269 current.version()
270 )));
271 }
272
273 routes.insert(route_id, aggregate);
274 Ok(())
275 }
276
277 async fn delete(&self, route_id: &str) -> Result<(), DomainError> {
278 self.routes
279 .lock()
280 .expect("lock test routes")
281 .remove(route_id);
282 Ok(())
283 }
284 }
285
286 #[derive(Clone, Default)]
287 struct InMemoryTestProjectionStore {
288 statuses: Arc<Mutex<HashMap<String, RouteStatusProjection>>>,
289 }
290
291 #[async_trait]
292 impl ProjectionStorePort for InMemoryTestProjectionStore {
293 async fn upsert_status(&self, status: RouteStatusProjection) -> Result<(), DomainError> {
294 self.statuses
295 .lock()
296 .expect("lock test statuses")
297 .insert(status.route_id.clone(), status);
298 Ok(())
299 }
300
301 async fn get_status(
302 &self,
303 route_id: &str,
304 ) -> Result<Option<RouteStatusProjection>, DomainError> {
305 Ok(self
306 .statuses
307 .lock()
308 .expect("lock test statuses")
309 .get(route_id)
310 .cloned())
311 }
312
313 async fn list_statuses(&self) -> Result<Vec<RouteStatusProjection>, DomainError> {
314 Ok(self
315 .statuses
316 .lock()
317 .expect("lock test statuses")
318 .values()
319 .cloned()
320 .collect())
321 }
322
323 async fn remove_status(&self, route_id: &str) -> Result<(), DomainError> {
324 self.statuses
325 .lock()
326 .expect("lock test statuses")
327 .remove(route_id);
328 Ok(())
329 }
330 }
331
332 #[derive(Clone, Default)]
333 struct InMemoryTestEventPublisher;
334
335 #[async_trait]
336 impl EventPublisherPort for InMemoryTestEventPublisher {
337 async fn publish(&self, _events: &[RuntimeEvent]) -> Result<(), DomainError> {
338 Ok(())
339 }
340 }
341
342 #[derive(Clone, Default)]
343 struct InMemoryTestDedup {
344 seen: Arc<Mutex<HashSet<String>>>,
345 }
346
347 #[derive(Clone, Default)]
348 struct InspectableDedup {
349 seen: Arc<Mutex<HashSet<String>>>,
350 forget_calls: Arc<Mutex<u32>>,
351 }
352
353 #[async_trait]
354 impl CommandDedupPort for InMemoryTestDedup {
355 async fn first_seen(&self, command_id: &str) -> Result<bool, DomainError> {
356 let mut seen = self.seen.lock().expect("lock dedup set");
357 Ok(seen.insert(command_id.to_string()))
358 }
359
360 async fn forget_seen(&self, command_id: &str) -> Result<(), DomainError> {
361 self.seen.lock().expect("lock dedup set").remove(command_id);
362 Ok(())
363 }
364 }
365
366 #[async_trait]
367 impl CommandDedupPort for InspectableDedup {
368 async fn first_seen(&self, command_id: &str) -> Result<bool, DomainError> {
369 let mut seen = self.seen.lock().expect("lock dedup set");
370 Ok(seen.insert(command_id.to_string()))
371 }
372
373 async fn forget_seen(&self, command_id: &str) -> Result<(), DomainError> {
374 self.seen.lock().expect("lock dedup set").remove(command_id);
375 let mut calls = self.forget_calls.lock().expect("forget calls");
376 *calls += 1;
377 Ok(())
378 }
379 }
380
381 fn build_test_runtime_bus() -> RuntimeBus {
382 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
383 let projections: Arc<dyn ProjectionStorePort> =
384 Arc::new(InMemoryTestProjectionStore::default());
385 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
386 let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
387 RuntimeBus::new(repo, projections, events, dedup)
388 }
389
390 #[derive(Default)]
391 struct CountingUow {
392 recover_calls: Arc<Mutex<u32>>,
393 }
394
395 #[derive(Default)]
396 struct FailingRecoverUow;
397
398 #[async_trait]
399 impl RuntimeUnitOfWorkPort for CountingUow {
400 async fn persist_upsert(
401 &self,
402 _aggregate: RouteRuntimeAggregate,
403 _expected_version: Option<u64>,
404 _projection: RouteStatusProjection,
405 _events: &[RuntimeEvent],
406 ) -> Result<(), DomainError> {
407 Ok(())
408 }
409
410 async fn persist_delete(
411 &self,
412 _route_id: &str,
413 _events: &[RuntimeEvent],
414 ) -> Result<(), DomainError> {
415 Ok(())
416 }
417
418 async fn recover_from_journal(&self) -> Result<(), DomainError> {
419 let mut calls = self.recover_calls.lock().expect("recover_calls");
420 *calls += 1;
421 Ok(())
422 }
423 }
424
425 #[async_trait]
426 impl RuntimeUnitOfWorkPort for FailingRecoverUow {
427 async fn persist_upsert(
428 &self,
429 _aggregate: RouteRuntimeAggregate,
430 _expected_version: Option<u64>,
431 _projection: RouteStatusProjection,
432 _events: &[RuntimeEvent],
433 ) -> Result<(), DomainError> {
434 Ok(())
435 }
436
437 async fn persist_delete(
438 &self,
439 _route_id: &str,
440 _events: &[RuntimeEvent],
441 ) -> Result<(), DomainError> {
442 Ok(())
443 }
444
445 async fn recover_from_journal(&self) -> Result<(), DomainError> {
446 Err(DomainError::InvalidState("recover failed".into()))
447 }
448 }
449
450 #[derive(Default)]
451 struct InFlightExecutionPort;
452
453 #[async_trait]
454 impl RuntimeExecutionPort for InFlightExecutionPort {
455 async fn register_route(&self, _definition: RouteDefinition) -> Result<(), DomainError> {
456 Ok(())
457 }
458 async fn start_route(&self, _route_id: &str) -> Result<(), DomainError> {
459 Ok(())
460 }
461 async fn stop_route(&self, _route_id: &str) -> Result<(), DomainError> {
462 Ok(())
463 }
464 async fn suspend_route(&self, _route_id: &str) -> Result<(), DomainError> {
465 Ok(())
466 }
467 async fn resume_route(&self, _route_id: &str) -> Result<(), DomainError> {
468 Ok(())
469 }
470 async fn reload_route(&self, _route_id: &str) -> Result<(), DomainError> {
471 Ok(())
472 }
473 async fn remove_route(&self, _route_id: &str) -> Result<(), DomainError> {
474 Ok(())
475 }
476 async fn in_flight_count(
477 &self,
478 route_id: &str,
479 ) -> Result<InFlightCountResult, DomainError> {
480 if route_id == "known" {
481 Ok(InFlightCountResult::InFlightCount {
482 route_id: route_id.to_string(),
483 count: 3,
484 })
485 } else {
486 Ok(InFlightCountResult::RouteNotFound {
487 route_id: route_id.to_string(),
488 })
489 }
490 }
491 }
492
493 #[tokio::test]
494 async fn runtime_bus_implements_internal_command_bus() {
495 let bus = build_test_runtime_bus();
496 let def = RouteDefinition::new("timer:test", vec![]).with_route_id("internal-route");
497 let result = InternalRuntimeCommandBus::register_route(&bus, def).await;
498 assert!(
499 result.is_ok(),
500 "internal bus registration failed: {:?}",
501 result
502 );
503
504 let status = bus
505 .ask(RuntimeQuery::GetRouteStatus {
506 route_id: "internal-route".to_string(),
507 })
508 .await
509 .unwrap();
510 match status {
511 RuntimeQueryResult::RouteStatus { status, .. } => {
512 assert_eq!(status, "Registered");
513 }
514 _ => panic!("unexpected query result"),
515 }
516 }
517
518 #[tokio::test]
519 async fn execute_returns_duplicate_for_replayed_command_id() {
520 use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
521
522 let bus = build_test_runtime_bus();
523
524 let mut spec = CanonicalRouteSpec::new("dup-route", "timer:tick");
525 spec.steps = vec![CanonicalStepSpec::Stop];
526
527 let cmd = RuntimeCommand::RegisterRoute {
528 spec: spec.clone(),
529 command_id: "dup-cmd".into(),
530 causation_id: None,
531 };
532 let first = bus.execute(cmd).await.unwrap();
533 assert!(matches!(
534 first,
535 RuntimeCommandResult::RouteRegistered { route_id } if route_id == "dup-route"
536 ));
537
538 let second = bus
539 .execute(RuntimeCommand::RegisterRoute {
540 spec,
541 command_id: "dup-cmd".into(),
542 causation_id: None,
543 })
544 .await
545 .unwrap();
546 assert!(matches!(
547 second,
548 RuntimeCommandResult::Duplicate { command_id } if command_id == "dup-cmd"
549 ));
550 }
551
552 #[tokio::test]
553 async fn ask_in_flight_count_without_execution_returns_route_not_found() {
554 let bus = build_test_runtime_bus();
555 let res = bus
556 .ask(RuntimeQuery::InFlightCount {
557 route_id: "missing".into(),
558 })
559 .await
560 .unwrap();
561 assert!(matches!(
562 res,
563 RuntimeQueryResult::RouteNotFound { route_id } if route_id == "missing"
564 ));
565 }
566
567 #[tokio::test]
568 async fn ask_in_flight_count_with_execution_delegates_to_adapter() {
569 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
570 let projections: Arc<dyn ProjectionStorePort> =
571 Arc::new(InMemoryTestProjectionStore::default());
572 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
573 let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
574 let execution: Arc<dyn RuntimeExecutionPort> = Arc::new(InFlightExecutionPort);
575 let bus = RuntimeBus::new(repo, projections, events, dedup).with_execution(execution);
576
577 let known = bus
578 .ask(RuntimeQuery::InFlightCount {
579 route_id: "known".into(),
580 })
581 .await
582 .unwrap();
583 assert!(matches!(
584 known,
585 RuntimeQueryResult::InFlightCount { route_id, count }
586 if route_id == "known" && count == 3
587 ));
588 }
589
590 #[tokio::test]
591 async fn journal_recovery_runs_once_even_with_multiple_commands() {
592 use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
593
594 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
595 let projections: Arc<dyn ProjectionStorePort> =
596 Arc::new(InMemoryTestProjectionStore::default());
597 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
598 let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
599 let uow = Arc::new(CountingUow::default());
600 let bus = RuntimeBus::new(repo, projections, events, dedup).with_uow(uow.clone());
601
602 let mut spec_a = CanonicalRouteSpec::new("a", "timer:a");
603 spec_a.steps = vec![CanonicalStepSpec::Stop];
604 let mut spec_b = CanonicalRouteSpec::new("b", "timer:b");
605 spec_b.steps = vec![CanonicalStepSpec::Stop];
606
607 bus.execute(RuntimeCommand::RegisterRoute {
608 spec: spec_a,
609 command_id: "c-a".into(),
610 causation_id: None,
611 })
612 .await
613 .unwrap();
614
615 bus.execute(RuntimeCommand::RegisterRoute {
616 spec: spec_b,
617 command_id: "c-b".into(),
618 causation_id: None,
619 })
620 .await
621 .unwrap();
622
623 let calls = *uow.recover_calls.lock().expect("recover calls");
624 assert_eq!(calls, 1, "journal recovery should run once");
625 }
626
627 #[tokio::test]
628 async fn execute_on_command_error_forgets_dedup_marker() {
629 use camel_api::runtime::{CanonicalRouteSpec, RuntimeCommand};
630
631 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
632 let projections: Arc<dyn ProjectionStorePort> =
633 Arc::new(InMemoryTestProjectionStore::default());
634 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
635 let dedup = Arc::new(InspectableDedup::default());
636 let dedup_port: Arc<dyn CommandDedupPort> = dedup.clone();
637
638 let bus = RuntimeBus::new(repo, projections, events, dedup_port);
639
640 let cmd = RuntimeCommand::RegisterRoute {
642 spec: CanonicalRouteSpec::new("", "timer:tick"),
643 command_id: "err-cmd".into(),
644 causation_id: None,
645 };
646
647 let err = bus.execute(cmd).await.expect_err("must fail");
648 assert!(err.to_string().contains("route_id cannot be empty"));
649
650 assert_eq!(*dedup.forget_calls.lock().expect("forget calls"), 1);
651 assert!(!dedup.seen.lock().expect("seen").contains("err-cmd"));
652 }
653
654 #[tokio::test]
655 async fn execute_propagates_recover_error_from_uow() {
656 use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
657
658 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
659 let projections: Arc<dyn ProjectionStorePort> =
660 Arc::new(InMemoryTestProjectionStore::default());
661 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
662 let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
663 let uow: Arc<dyn RuntimeUnitOfWorkPort> = Arc::new(FailingRecoverUow);
664
665 let bus = RuntimeBus::new(repo, projections, events, dedup).with_uow(uow);
666
667 let mut spec = CanonicalRouteSpec::new("x", "timer:x");
668 spec.steps = vec![CanonicalStepSpec::Stop];
669 let err = bus
670 .execute(RuntimeCommand::RegisterRoute {
671 spec,
672 command_id: "recover-err".into(),
673 causation_id: None,
674 })
675 .await
676 .expect_err("recover should fail");
677
678 assert!(err.to_string().contains("recover failed"));
679 }
680
681 #[tokio::test]
682 async fn ask_in_flight_count_with_execution_handles_unknown_route() {
683 let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
684 let projections: Arc<dyn ProjectionStorePort> =
685 Arc::new(InMemoryTestProjectionStore::default());
686 let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
687 let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
688 let execution: Arc<dyn RuntimeExecutionPort> = Arc::new(InFlightExecutionPort);
689 let bus = RuntimeBus::new(repo, projections, events, dedup).with_execution(execution);
690
691 let unknown = bus
692 .ask(RuntimeQuery::InFlightCount {
693 route_id: "unknown".into(),
694 })
695 .await
696 .unwrap();
697 assert!(matches!(
698 unknown,
699 RuntimeQueryResult::RouteNotFound { route_id } if route_id == "unknown"
700 ));
701 }
702}