Skip to main content

camel_core/lifecycle/application/
runtime_bus.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use tokio::sync::OnceCell;
5
6use camel_api::{
7    CamelError, RuntimeCommand, RuntimeCommandBus, RuntimeCommandResult, RuntimeQuery,
8    RuntimeQueryBus, RuntimeQueryResult,
9};
10
11use crate::lifecycle::application::commands::{
12    CommandDeps, execute_command, handle_register_internal,
13};
14use crate::lifecycle::application::queries::{QueryDeps, execute_query};
15use crate::lifecycle::application::route_definition::RouteDefinition;
16use crate::lifecycle::domain::DomainError;
17use crate::lifecycle::ports::RouteRegistrationPort;
18use crate::lifecycle::ports::{
19    CommandDedupPort, EventPublisherPort, InFlightCountResult, ProjectionStorePort,
20    RouteRepositoryPort, RuntimeExecutionPort, RuntimeUnitOfWorkPort,
21};
22
23impl From<InFlightCountResult> for RuntimeQueryResult {
24    fn from(r: InFlightCountResult) -> Self {
25        match r {
26            InFlightCountResult::InFlightCount { route_id, count } => {
27                RuntimeQueryResult::InFlightCount { route_id, count }
28            }
29            InFlightCountResult::RouteNotFound { route_id } => {
30                RuntimeQueryResult::RouteNotFound { route_id }
31            }
32        }
33    }
34}
35
36pub struct RuntimeBus {
37    repo: Arc<dyn RouteRepositoryPort>,
38    projections: Arc<dyn ProjectionStorePort>,
39    events: Arc<dyn EventPublisherPort>,
40    dedup: Arc<dyn CommandDedupPort>,
41    uow: Option<Arc<dyn RuntimeUnitOfWorkPort>>,
42    execution: Option<Arc<dyn RuntimeExecutionPort>>,
43    journal_recovered_once: OnceCell<()>,
44}
45
46impl RuntimeBus {
47    pub fn new(
48        repo: Arc<dyn RouteRepositoryPort>,
49        projections: Arc<dyn ProjectionStorePort>,
50        events: Arc<dyn EventPublisherPort>,
51        dedup: Arc<dyn CommandDedupPort>,
52    ) -> Self {
53        Self {
54            repo,
55            projections,
56            events,
57            dedup,
58            uow: None,
59            execution: None,
60            journal_recovered_once: OnceCell::new(),
61        }
62    }
63
64    pub fn with_uow(mut self, uow: Arc<dyn RuntimeUnitOfWorkPort>) -> Self {
65        self.uow = Some(uow);
66        self
67    }
68
69    pub fn with_execution(mut self, execution: Arc<dyn RuntimeExecutionPort>) -> Self {
70        self.execution = Some(execution);
71        self
72    }
73
74    pub fn repo(&self) -> &Arc<dyn RouteRepositoryPort> {
75        &self.repo
76    }
77
78    pub(crate) async fn register_aggregate_only(&self, route_id: String) -> Result<(), CamelError> {
79        self.ensure_journal_recovered().await?;
80        let deps = self.deps();
81        if deps.repo.load(&route_id).await?.is_some() {
82            return Err(CamelError::RouteError(format!(
83                "route '{route_id}' already registered"
84            )));
85        }
86        let (aggregate, events) =
87            crate::lifecycle::domain::RouteRuntimeAggregate::register(route_id.clone());
88        if let Some(uow) = &deps.uow {
89            uow.persist_upsert(
90                aggregate.clone(),
91                None,
92                crate::lifecycle::application::commands::project_from_aggregate(&aggregate),
93                &events,
94            )
95            .await?;
96        } else {
97            deps.repo.save(aggregate.clone()).await?;
98            if let Some(primary_error) =
99                crate::lifecycle::application::commands::upsert_projection_with_reconciliation(
100                    &*deps.projections,
101                    crate::lifecycle::application::commands::project_from_aggregate(&aggregate),
102                )
103                .await?
104            {
105                deps.events.publish(&events).await?;
106                return Err(CamelError::RouteError(format!(
107                    "post-effect reconciliation recovered after runtime persistence error: {primary_error}"
108                )));
109            }
110            deps.events.publish(&events).await?;
111        }
112        Ok(())
113    }
114
115    fn deps(&self) -> CommandDeps {
116        CommandDeps {
117            repo: Arc::clone(&self.repo),
118            projections: Arc::clone(&self.projections),
119            events: Arc::clone(&self.events),
120            uow: self.uow.clone(),
121            execution: self.execution.clone(),
122        }
123    }
124
125    fn query_deps(&self) -> QueryDeps {
126        QueryDeps {
127            projections: Arc::clone(&self.projections),
128        }
129    }
130
131    async fn ensure_journal_recovered(&self) -> Result<(), CamelError> {
132        let Some(uow) = &self.uow else {
133            return Ok(());
134        };
135
136        self.journal_recovered_once
137            .get_or_try_init(|| async {
138                uow.recover_from_journal().await?;
139                Ok::<(), CamelError>(())
140            })
141            .await?;
142        Ok(())
143    }
144}
145
146#[async_trait]
147impl RuntimeCommandBus for RuntimeBus {
148    async fn execute(&self, cmd: RuntimeCommand) -> Result<RuntimeCommandResult, CamelError> {
149        self.ensure_journal_recovered().await?;
150        let command_id = cmd.command_id().to_string();
151        if !self.dedup.first_seen(&command_id).await? {
152            return Ok(RuntimeCommandResult::Duplicate { command_id });
153        }
154        let deps = self.deps();
155        match execute_command(&deps, cmd).await {
156            Ok(result) => Ok(result),
157            Err(err) => {
158                let _ = self.dedup.forget_seen(&command_id).await;
159                Err(err)
160            }
161        }
162    }
163}
164
165#[async_trait]
166impl RuntimeQueryBus for RuntimeBus {
167    async fn ask(&self, query: RuntimeQuery) -> Result<RuntimeQueryResult, CamelError> {
168        self.ensure_journal_recovered().await?;
169
170        match query {
171            RuntimeQuery::InFlightCount { route_id } => {
172                if let Some(execution) = &self.execution {
173                    execution
174                        .in_flight_count(&route_id)
175                        .await
176                        .map(|r| r.into())
177                        .map_err(Into::into)
178                } else {
179                    Ok(RuntimeQueryResult::RouteNotFound { route_id })
180                }
181            }
182            other => {
183                let deps = self.query_deps();
184                execute_query(&deps, other).await
185            }
186        }
187    }
188}
189
190#[async_trait]
191impl RouteRegistrationPort for RuntimeBus {
192    async fn register_route(&self, def: RouteDefinition) -> Result<(), DomainError> {
193        self.ensure_journal_recovered()
194            .await
195            .map_err(|e| DomainError::InvalidState(e.to_string()))?;
196        let deps = self.deps();
197        handle_register_internal(&deps, def)
198            .await
199            .map(|_| ())
200            .map_err(|e| match e {
201                CamelError::RouteError(msg) => DomainError::InvalidState(msg),
202                other => DomainError::InvalidState(other.to_string()),
203            })
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use crate::lifecycle::domain::DomainError;
210
211    use super::*;
212    use std::collections::{HashMap, HashSet};
213    use std::sync::Mutex;
214
215    use crate::lifecycle::application::route_definition::RouteDefinition;
216    use crate::lifecycle::domain::{RouteRuntimeAggregate, RuntimeEvent};
217    use crate::lifecycle::ports::RouteRegistrationPort as InternalRuntimeCommandBus;
218    use crate::lifecycle::ports::RouteStatusProjection;
219
220    #[derive(Clone, Default)]
221    struct InMemoryTestRepo {
222        routes: Arc<Mutex<HashMap<String, RouteRuntimeAggregate>>>,
223    }
224
225    #[async_trait]
226    impl RouteRepositoryPort for InMemoryTestRepo {
227        async fn load(&self, route_id: &str) -> Result<Option<RouteRuntimeAggregate>, DomainError> {
228            Ok(self
229                .routes
230                .lock()
231                .expect("lock test routes")
232                .get(route_id)
233                .cloned())
234        }
235
236        async fn save(&self, aggregate: RouteRuntimeAggregate) -> Result<(), DomainError> {
237            self.routes
238                .lock()
239                .expect("lock test routes")
240                .insert(aggregate.route_id().to_string(), aggregate);
241            Ok(())
242        }
243
244        async fn save_if_version(
245            &self,
246            aggregate: RouteRuntimeAggregate,
247            expected_version: u64,
248        ) -> Result<(), DomainError> {
249            let route_id = aggregate.route_id().to_string();
250            let mut routes = self.routes.lock().expect("lock test routes");
251            let current = routes.get(&route_id).ok_or_else(|| {
252                DomainError::InvalidState(format!(
253                    "optimistic lock conflict for route '{route_id}': route not found"
254                ))
255            })?;
256
257            if current.version() != expected_version {
258                return Err(DomainError::InvalidState(format!(
259                    "optimistic lock conflict for route '{route_id}': expected version {expected_version}, actual {}",
260                    current.version()
261                )));
262            }
263
264            routes.insert(route_id, aggregate);
265            Ok(())
266        }
267
268        async fn delete(&self, route_id: &str) -> Result<(), DomainError> {
269            self.routes
270                .lock()
271                .expect("lock test routes")
272                .remove(route_id);
273            Ok(())
274        }
275    }
276
277    #[derive(Clone, Default)]
278    struct InMemoryTestProjectionStore {
279        statuses: Arc<Mutex<HashMap<String, RouteStatusProjection>>>,
280    }
281
282    #[async_trait]
283    impl ProjectionStorePort for InMemoryTestProjectionStore {
284        async fn upsert_status(&self, status: RouteStatusProjection) -> Result<(), DomainError> {
285            self.statuses
286                .lock()
287                .expect("lock test statuses")
288                .insert(status.route_id.clone(), status);
289            Ok(())
290        }
291
292        async fn get_status(
293            &self,
294            route_id: &str,
295        ) -> Result<Option<RouteStatusProjection>, DomainError> {
296            Ok(self
297                .statuses
298                .lock()
299                .expect("lock test statuses")
300                .get(route_id)
301                .cloned())
302        }
303
304        async fn list_statuses(&self) -> Result<Vec<RouteStatusProjection>, DomainError> {
305            Ok(self
306                .statuses
307                .lock()
308                .expect("lock test statuses")
309                .values()
310                .cloned()
311                .collect())
312        }
313
314        async fn remove_status(&self, route_id: &str) -> Result<(), DomainError> {
315            self.statuses
316                .lock()
317                .expect("lock test statuses")
318                .remove(route_id);
319            Ok(())
320        }
321    }
322
323    #[derive(Clone, Default)]
324    struct InMemoryTestEventPublisher;
325
326    #[async_trait]
327    impl EventPublisherPort for InMemoryTestEventPublisher {
328        async fn publish(&self, _events: &[RuntimeEvent]) -> Result<(), DomainError> {
329            Ok(())
330        }
331    }
332
333    #[derive(Clone, Default)]
334    struct InMemoryTestDedup {
335        seen: Arc<Mutex<HashSet<String>>>,
336    }
337
338    #[derive(Clone, Default)]
339    struct InspectableDedup {
340        seen: Arc<Mutex<HashSet<String>>>,
341        forget_calls: Arc<Mutex<u32>>,
342    }
343
344    #[async_trait]
345    impl CommandDedupPort for InMemoryTestDedup {
346        async fn first_seen(&self, command_id: &str) -> Result<bool, DomainError> {
347            let mut seen = self.seen.lock().expect("lock dedup set");
348            Ok(seen.insert(command_id.to_string()))
349        }
350
351        async fn forget_seen(&self, command_id: &str) -> Result<(), DomainError> {
352            self.seen.lock().expect("lock dedup set").remove(command_id);
353            Ok(())
354        }
355    }
356
357    #[async_trait]
358    impl CommandDedupPort for InspectableDedup {
359        async fn first_seen(&self, command_id: &str) -> Result<bool, DomainError> {
360            let mut seen = self.seen.lock().expect("lock dedup set");
361            Ok(seen.insert(command_id.to_string()))
362        }
363
364        async fn forget_seen(&self, command_id: &str) -> Result<(), DomainError> {
365            self.seen.lock().expect("lock dedup set").remove(command_id);
366            let mut calls = self.forget_calls.lock().expect("forget calls");
367            *calls += 1;
368            Ok(())
369        }
370    }
371
372    fn build_test_runtime_bus() -> RuntimeBus {
373        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
374        let projections: Arc<dyn ProjectionStorePort> =
375            Arc::new(InMemoryTestProjectionStore::default());
376        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
377        let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
378        RuntimeBus::new(repo, projections, events, dedup)
379    }
380
381    #[derive(Default)]
382    struct CountingUow {
383        recover_calls: Arc<Mutex<u32>>,
384    }
385
386    #[derive(Default)]
387    struct FailingRecoverUow;
388
389    #[async_trait]
390    impl RuntimeUnitOfWorkPort for CountingUow {
391        async fn persist_upsert(
392            &self,
393            _aggregate: RouteRuntimeAggregate,
394            _expected_version: Option<u64>,
395            _projection: RouteStatusProjection,
396            _events: &[RuntimeEvent],
397        ) -> Result<(), DomainError> {
398            Ok(())
399        }
400
401        async fn persist_delete(
402            &self,
403            _route_id: &str,
404            _events: &[RuntimeEvent],
405        ) -> Result<(), DomainError> {
406            Ok(())
407        }
408
409        async fn recover_from_journal(&self) -> Result<(), DomainError> {
410            let mut calls = self.recover_calls.lock().expect("recover_calls");
411            *calls += 1;
412            Ok(())
413        }
414    }
415
416    #[async_trait]
417    impl RuntimeUnitOfWorkPort for FailingRecoverUow {
418        async fn persist_upsert(
419            &self,
420            _aggregate: RouteRuntimeAggregate,
421            _expected_version: Option<u64>,
422            _projection: RouteStatusProjection,
423            _events: &[RuntimeEvent],
424        ) -> Result<(), DomainError> {
425            Ok(())
426        }
427
428        async fn persist_delete(
429            &self,
430            _route_id: &str,
431            _events: &[RuntimeEvent],
432        ) -> Result<(), DomainError> {
433            Ok(())
434        }
435
436        async fn recover_from_journal(&self) -> Result<(), DomainError> {
437            Err(DomainError::InvalidState("recover failed".into()))
438        }
439    }
440
441    #[derive(Default)]
442    struct InFlightExecutionPort;
443
444    #[async_trait]
445    impl RuntimeExecutionPort for InFlightExecutionPort {
446        async fn register_route(&self, _definition: RouteDefinition) -> Result<(), DomainError> {
447            Ok(())
448        }
449        async fn start_route(&self, _route_id: &str) -> Result<(), DomainError> {
450            Ok(())
451        }
452        async fn stop_route(&self, _route_id: &str) -> Result<(), DomainError> {
453            Ok(())
454        }
455        async fn suspend_route(&self, _route_id: &str) -> Result<(), DomainError> {
456            Ok(())
457        }
458        async fn resume_route(&self, _route_id: &str) -> Result<(), DomainError> {
459            Ok(())
460        }
461        async fn reload_route(&self, _route_id: &str) -> Result<(), DomainError> {
462            Ok(())
463        }
464        async fn remove_route(&self, _route_id: &str) -> Result<(), DomainError> {
465            Ok(())
466        }
467        async fn in_flight_count(
468            &self,
469            route_id: &str,
470        ) -> Result<InFlightCountResult, DomainError> {
471            if route_id == "known" {
472                Ok(InFlightCountResult::InFlightCount {
473                    route_id: route_id.to_string(),
474                    count: 3,
475                })
476            } else {
477                Ok(InFlightCountResult::RouteNotFound {
478                    route_id: route_id.to_string(),
479                })
480            }
481        }
482    }
483
484    #[tokio::test]
485    async fn runtime_bus_implements_internal_command_bus() {
486        let bus = build_test_runtime_bus();
487        let def = RouteDefinition::new("timer:test", vec![]).with_route_id("internal-route");
488        let result = InternalRuntimeCommandBus::register_route(&bus, def).await;
489        assert!(
490            result.is_ok(),
491            "internal bus registration failed: {:?}",
492            result
493        );
494
495        let status = bus
496            .ask(RuntimeQuery::GetRouteStatus {
497                route_id: "internal-route".to_string(),
498            })
499            .await
500            .unwrap();
501        match status {
502            RuntimeQueryResult::RouteStatus { status, .. } => {
503                assert_eq!(status, "Registered");
504            }
505            _ => panic!("unexpected query result"),
506        }
507    }
508
509    #[tokio::test]
510    async fn execute_returns_duplicate_for_replayed_command_id() {
511        use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
512
513        let bus = build_test_runtime_bus();
514
515        let mut spec = CanonicalRouteSpec::new("dup-route", "timer:tick");
516        spec.steps = vec![CanonicalStepSpec::Stop];
517
518        let cmd = RuntimeCommand::RegisterRoute {
519            spec: spec.clone(),
520            command_id: "dup-cmd".into(),
521            causation_id: None,
522        };
523        let first = bus.execute(cmd).await.unwrap();
524        assert!(matches!(
525            first,
526            RuntimeCommandResult::RouteRegistered { route_id } if route_id == "dup-route"
527        ));
528
529        let second = bus
530            .execute(RuntimeCommand::RegisterRoute {
531                spec,
532                command_id: "dup-cmd".into(),
533                causation_id: None,
534            })
535            .await
536            .unwrap();
537        assert!(matches!(
538            second,
539            RuntimeCommandResult::Duplicate { command_id } if command_id == "dup-cmd"
540        ));
541    }
542
543    #[tokio::test]
544    async fn ask_in_flight_count_without_execution_returns_route_not_found() {
545        let bus = build_test_runtime_bus();
546        let res = bus
547            .ask(RuntimeQuery::InFlightCount {
548                route_id: "missing".into(),
549            })
550            .await
551            .unwrap();
552        assert!(matches!(
553            res,
554            RuntimeQueryResult::RouteNotFound { route_id } if route_id == "missing"
555        ));
556    }
557
558    #[tokio::test]
559    async fn ask_in_flight_count_with_execution_delegates_to_adapter() {
560        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
561        let projections: Arc<dyn ProjectionStorePort> =
562            Arc::new(InMemoryTestProjectionStore::default());
563        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
564        let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
565        let execution: Arc<dyn RuntimeExecutionPort> = Arc::new(InFlightExecutionPort);
566        let bus = RuntimeBus::new(repo, projections, events, dedup).with_execution(execution);
567
568        let known = bus
569            .ask(RuntimeQuery::InFlightCount {
570                route_id: "known".into(),
571            })
572            .await
573            .unwrap();
574        assert!(matches!(
575            known,
576            RuntimeQueryResult::InFlightCount { route_id, count }
577            if route_id == "known" && count == 3
578        ));
579    }
580
581    #[tokio::test]
582    async fn journal_recovery_runs_once_even_with_multiple_commands() {
583        use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
584
585        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
586        let projections: Arc<dyn ProjectionStorePort> =
587            Arc::new(InMemoryTestProjectionStore::default());
588        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
589        let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
590        let uow = Arc::new(CountingUow::default());
591        let bus = RuntimeBus::new(repo, projections, events, dedup).with_uow(uow.clone());
592
593        let mut spec_a = CanonicalRouteSpec::new("a", "timer:a");
594        spec_a.steps = vec![CanonicalStepSpec::Stop];
595        let mut spec_b = CanonicalRouteSpec::new("b", "timer:b");
596        spec_b.steps = vec![CanonicalStepSpec::Stop];
597
598        bus.execute(RuntimeCommand::RegisterRoute {
599            spec: spec_a,
600            command_id: "c-a".into(),
601            causation_id: None,
602        })
603        .await
604        .unwrap();
605
606        bus.execute(RuntimeCommand::RegisterRoute {
607            spec: spec_b,
608            command_id: "c-b".into(),
609            causation_id: None,
610        })
611        .await
612        .unwrap();
613
614        let calls = *uow.recover_calls.lock().expect("recover calls");
615        assert_eq!(calls, 1, "journal recovery should run once");
616    }
617
618    #[tokio::test]
619    async fn execute_on_command_error_forgets_dedup_marker() {
620        use camel_api::runtime::{CanonicalRouteSpec, RuntimeCommand};
621
622        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
623        let projections: Arc<dyn ProjectionStorePort> =
624            Arc::new(InMemoryTestProjectionStore::default());
625        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
626        let dedup = Arc::new(InspectableDedup::default());
627        let dedup_port: Arc<dyn CommandDedupPort> = dedup.clone();
628
629        let bus = RuntimeBus::new(repo, projections, events, dedup_port);
630
631        // Invalid canonical contract: empty route_id -> execute_command should fail.
632        let cmd = RuntimeCommand::RegisterRoute {
633            spec: CanonicalRouteSpec::new("", "timer:tick"),
634            command_id: "err-cmd".into(),
635            causation_id: None,
636        };
637
638        let err = bus.execute(cmd).await.expect_err("must fail");
639        assert!(err.to_string().contains("route_id cannot be empty"));
640
641        assert_eq!(*dedup.forget_calls.lock().expect("forget calls"), 1);
642        assert!(!dedup.seen.lock().expect("seen").contains("err-cmd"));
643    }
644
645    #[tokio::test]
646    async fn execute_propagates_recover_error_from_uow() {
647        use camel_api::runtime::{CanonicalRouteSpec, CanonicalStepSpec, RuntimeCommand};
648
649        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
650        let projections: Arc<dyn ProjectionStorePort> =
651            Arc::new(InMemoryTestProjectionStore::default());
652        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
653        let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
654        let uow: Arc<dyn RuntimeUnitOfWorkPort> = Arc::new(FailingRecoverUow);
655
656        let bus = RuntimeBus::new(repo, projections, events, dedup).with_uow(uow);
657
658        let mut spec = CanonicalRouteSpec::new("x", "timer:x");
659        spec.steps = vec![CanonicalStepSpec::Stop];
660        let err = bus
661            .execute(RuntimeCommand::RegisterRoute {
662                spec,
663                command_id: "recover-err".into(),
664                causation_id: None,
665            })
666            .await
667            .expect_err("recover should fail");
668
669        assert!(err.to_string().contains("recover failed"));
670    }
671
672    #[tokio::test]
673    async fn ask_in_flight_count_with_execution_handles_unknown_route() {
674        let repo: Arc<dyn RouteRepositoryPort> = Arc::new(InMemoryTestRepo::default());
675        let projections: Arc<dyn ProjectionStorePort> =
676            Arc::new(InMemoryTestProjectionStore::default());
677        let events: Arc<dyn EventPublisherPort> = Arc::new(InMemoryTestEventPublisher);
678        let dedup: Arc<dyn CommandDedupPort> = Arc::new(InMemoryTestDedup::default());
679        let execution: Arc<dyn RuntimeExecutionPort> = Arc::new(InFlightExecutionPort);
680        let bus = RuntimeBus::new(repo, projections, events, dedup).with_execution(execution);
681
682        let unknown = bus
683            .ask(RuntimeQuery::InFlightCount {
684                route_id: "unknown".into(),
685            })
686            .await
687            .unwrap();
688        assert!(matches!(
689            unknown,
690            RuntimeQueryResult::RouteNotFound { route_id } if route_id == "unknown"
691        ));
692    }
693}