Skip to main content

jacquard_router/
middleware.rs

1//! Generic multi-engine router middleware over the shared routing traits.
2//!
3//! Control flow: this module owns the cross-engine orchestration
4//! loop. Activation goes `objective -> policy profile -> tick registered
5//! engines -> gather candidates across engines -> select one ordered candidate
6//! -> admit/materialize through the owning engine -> publish router-owned
7//! canonical state`. Maintenance, re-selection, expiry, and anti-entropy reuse
8//! the same engine registry and always mutate canonical state on the router
9//! side, even when one selected engine performs the route-private work.
10//! Candidate ordering, admission, and publication operate on shared summaries,
11//! checks, and installation evidence; they do not require explicit hop-by-hop
12//! path disclosure from every engine.
13//!
14//! Ownership:
15//! - canonical route mutations and registry-level engine dispatch happen here
16//! - registered engines return typed evidence and opaque private runtime state
17
18use alloc::{boxed::Box, collections::BTreeMap, vec::Vec};
19use core::{any::Any, cmp::Reverse};
20
21#[cfg(feature = "std")]
22use std::sync::Arc;
23
24use jacquard_core::{
25    AdmissionDecision, Belief, CapabilityError, Configuration, FactSourceClass, MaterializedRoute,
26    Observation, OrderStamp, OriginAuthenticationClass, PublicationId, RouteCandidate,
27    RouteCommitment, RouteDegradation, RouteError, RouteHandle, RouteHealth, RouteId,
28    RouteIdentityStamp, RouteLease, RouteMaintenanceResult, RouteMaintenanceTrigger,
29    RouteMaterializationInput, RoutePartitionClass, RouteProtectionClass, RouteRepairClass,
30    RouteRuntimeError, RouteSelectionError, RouteSemanticHandoff, RouterCanonicalMutation,
31    RouterMaintenanceOutcome, RouterRoundOutcome, RoutingEngineCapabilities, RoutingEngineId,
32    RoutingEvidenceClass, RoutingObjective, RoutingPolicyInputs, RoutingTickChange,
33    RoutingTickContext, RoutingTickHint, SelectedRoutingParameters, Tick, TimeWindow,
34    TransportKind, TransportObservation,
35};
36use jacquard_traits::{
37    OrderEffects, PolicyEngine, RouteEventLogEffects, Router, RouterEngineRegistry,
38    RouterManagedEngine, RoutingControlPlane, RoutingDataPlane, RoutingMiddleware, StorageEffects,
39    TimeEffects,
40};
41
42use crate::runtime::{RouterCheckpointRecord, RouterRuntimeAdapter};
43
44const DEFAULT_ROUTE_LEASE_TICKS: u64 = 32;
45
46#[derive(Clone, Debug)]
47pub struct FixedPolicyEngine {
48    profile: SelectedRoutingParameters,
49}
50
51impl FixedPolicyEngine {
52    #[must_use]
53    pub fn new(profile: SelectedRoutingParameters) -> Self {
54        Self { profile }
55    }
56}
57
58impl PolicyEngine for FixedPolicyEngine {
59    fn compute_profile(
60        &self,
61        _objective: &RoutingObjective,
62        _inputs: &RoutingPolicyInputs,
63    ) -> SelectedRoutingParameters {
64        self.profile.clone()
65    }
66}
67
68struct RegisteredEngine {
69    capabilities: RoutingEngineCapabilities,
70    engine: Box<dyn RouterManagedEngine>,
71}
72
73pub struct MultiEngineRouter<Policy, Effects> {
74    local_node_id: jacquard_core::NodeId,
75    registered_engines: BTreeMap<RoutingEngineId, RegisteredEngine>,
76    policy_engine: Policy,
77    effects: Effects,
78    topology: Observation<Configuration>,
79    policy_inputs: RoutingPolicyInputs,
80    active_routes: BTreeMap<RouteId, MaterializedRoute>,
81    published_commitments: BTreeMap<RouteId, Vec<RouteCommitment>>,
82}
83
84impl<Policy, Effects> MultiEngineRouter<Policy, Effects>
85where
86    Policy: PolicyEngine,
87    Effects: TimeEffects + OrderEffects + StorageEffects + RouteEventLogEffects,
88{
89    #[must_use]
90    pub fn new(
91        local_node_id: jacquard_core::NodeId,
92        policy_engine: Policy,
93        effects: Effects,
94        topology: Observation<Configuration>,
95        policy_inputs: RoutingPolicyInputs,
96    ) -> Self {
97        Self {
98            local_node_id,
99            registered_engines: BTreeMap::new(),
100            policy_engine,
101            effects,
102            topology,
103            policy_inputs,
104            active_routes: BTreeMap::new(),
105            published_commitments: BTreeMap::new(),
106        }
107    }
108
109    pub fn register_engine(
110        &mut self,
111        extension: Box<dyn RouterManagedEngine>,
112    ) -> Result<(), RouteError> {
113        let engine_id = extension.engine_id();
114        if extension.local_node_id_for_router() != self.local_node_id {
115            return Err(CapabilityError::Rejected.into());
116        }
117        if self.registered_engines.contains_key(&engine_id) {
118            return Err(CapabilityError::Rejected.into());
119        }
120        let capabilities = extension.capabilities();
121        self.registered_engines.insert(
122            engine_id,
123            RegisteredEngine {
124                capabilities,
125                engine: extension,
126            },
127        );
128        Ok(())
129    }
130
131    #[cfg(feature = "std")]
132    pub fn ingest_shared_topology_observation(
133        &mut self,
134        topology: &Arc<Observation<Configuration>>,
135    ) {
136        self.topology = topology.as_ref().clone();
137    }
138
139    pub fn ingest_topology_observation(&mut self, topology: Observation<Configuration>) {
140        self.topology = topology;
141    }
142
143    pub fn ingest_policy_inputs(&mut self, inputs: RoutingPolicyInputs) {
144        self.policy_inputs = inputs;
145    }
146
147    pub fn ingest_transport_observation(
148        &mut self,
149        observation: &TransportObservation,
150    ) -> Result<(), RouteError> {
151        for entry in self.registered_engines.values_mut() {
152            entry
153                .engine
154                .ingest_transport_observation_for_router(observation)?;
155        }
156        Ok(())
157    }
158
159    pub fn recover_checkpointed_routes(&mut self) -> Result<usize, RouteError> {
160        let records =
161            RouterRuntimeAdapter::new(self.local_node_id, &mut self.effects).load_routes()?;
162        let mut recovered = 0usize;
163        for (route_id, record) in records {
164            let engine_id = record.route.identity.admission.summary.engine.clone();
165            let restored = match self.registered_engines.get_mut(&engine_id) {
166                Some(entry) => entry
167                    .engine
168                    .restore_route_runtime_with_record_for_router(&record.route, &self.topology)?,
169                None => false,
170            };
171            if !restored {
172                RouterRuntimeAdapter::new(self.local_node_id, &mut self.effects)
173                    .remove_route(&route_id)?;
174                continue;
175            }
176            self.active_routes.insert(route_id, record.route);
177            self.published_commitments
178                .insert(route_id, record.commitments);
179            recovered = recovered.saturating_add(1);
180        }
181        Ok(recovered)
182    }
183
184    #[must_use]
185    pub fn effects(&self) -> &Effects {
186        &self.effects
187    }
188
189    pub fn effects_mut(&mut self) -> &mut Effects {
190        &mut self.effects
191    }
192
193    #[must_use]
194    pub fn local_node_id(&self) -> jacquard_core::NodeId {
195        self.local_node_id
196    }
197
198    #[must_use]
199    pub fn registered_engine_ids(&self) -> Vec<RoutingEngineId> {
200        self.registered_engines.keys().cloned().collect()
201    }
202
203    #[must_use]
204    pub fn registered_engine_capabilities(
205        &self,
206        engine_id: &RoutingEngineId,
207    ) -> Option<RoutingEngineCapabilities> {
208        self.registered_engines
209            .get(engine_id)
210            .map(|entry| entry.capabilities.clone())
211    }
212
213    #[must_use]
214    pub fn active_route(&self, route_id: &RouteId) -> Option<&MaterializedRoute> {
215        self.active_routes.get(route_id)
216    }
217
218    #[must_use]
219    pub fn active_route_count(&self) -> usize {
220        self.active_routes.len()
221    }
222
223    #[must_use]
224    pub fn active_routes_snapshot(&self) -> Vec<MaterializedRoute> {
225        let mut routes = self.active_routes.values().cloned().collect::<Vec<_>>();
226        routes.sort_by_key(|route| route.identity.stamp.route_id);
227        routes
228    }
229
230    #[must_use]
231    pub fn engine_analysis_snapshot(&self, engine_id: &RoutingEngineId) -> Option<Box<dyn Any>> {
232        let entry = self.registered_engines.get(engine_id)?;
233        let mut routes = self
234            .active_routes
235            .values()
236            .filter(|route| route.identity.admission.summary.engine == *engine_id)
237            .cloned()
238            .collect::<Vec<_>>();
239        routes.sort_by_key(|route| route.identity.stamp.route_id);
240        entry.engine.analysis_snapshot_for_router(&routes)
241    }
242
243    fn current_policy_inputs(&self) -> RoutingPolicyInputs {
244        let mut inputs = self.policy_inputs.clone();
245        inputs.routing_engine_count =
246            u32::try_from(self.registered_engines.len()).unwrap_or(u32::MAX);
247        inputs
248    }
249
250    fn runtime_adapter(&mut self) -> RouterRuntimeAdapter<'_, Effects> {
251        RouterRuntimeAdapter::new(self.local_node_id, &mut self.effects)
252    }
253
254    // T8: get/get_mut variants intentionally separate due to borrow-checker
255    // requirements.
256    fn engine_for_id(
257        &self,
258        engine_id: &RoutingEngineId,
259    ) -> Result<&(dyn RouterManagedEngine + '_), RouteError> {
260        if let Some(entry) = self.registered_engines.get(engine_id) {
261            Ok(entry.engine.as_ref())
262        } else {
263            Err(CapabilityError::Unsupported.into())
264        }
265    }
266
267    fn engine_for_id_mut(
268        &mut self,
269        engine_id: &RoutingEngineId,
270    ) -> Result<&mut (dyn RouterManagedEngine + '_), RouteError> {
271        if let Some(entry) = self.registered_engines.get_mut(engine_id) {
272            Ok(entry.engine.as_mut())
273        } else {
274            Err(CapabilityError::Unsupported.into())
275        }
276    }
277
278    fn route_engine_id(&self, route_id: &RouteId) -> Result<RoutingEngineId, RouteError> {
279        self.active_routes
280            .get(route_id)
281            .map(|route| route.identity.admission.summary.engine.clone())
282            .ok_or(RouteSelectionError::NoCandidate.into())
283    }
284
285    fn route_commitments_for(
286        &self,
287        route: &MaterializedRoute,
288    ) -> Result<Vec<RouteCommitment>, RouteError> {
289        let engine_id = route.identity.admission.summary.engine.clone();
290        Ok(self.engine_for_id(&engine_id)?.route_commitments(route))
291    }
292
293    fn advance_all_engines(&mut self) -> Result<(RoutingTickChange, RoutingTickHint), RouteError> {
294        let tick = RoutingTickContext::new(self.topology.clone());
295        let mut aggregate = RoutingTickChange::NoChange;
296        let mut hint = RoutingTickHint::HostDefault;
297        for entry in self.registered_engines.values_mut() {
298            let outcome = entry.engine.engine_tick(&tick)?;
299            if outcome.change == RoutingTickChange::PrivateStateUpdated {
300                aggregate = RoutingTickChange::PrivateStateUpdated;
301            }
302            hint = hint.more_urgent(outcome.next_tick_hint);
303        }
304        Ok((aggregate, hint))
305    }
306
307    fn remove_published_route(&mut self, route_id: &RouteId) -> Result<(), RouteError> {
308        self.runtime_adapter().remove_route(route_id)?;
309        self.active_routes.remove(route_id);
310        self.published_commitments.remove(route_id);
311        Ok(())
312    }
313
314    fn publish_route_state(
315        &mut self,
316        route: MaterializedRoute,
317        commitments: Vec<RouteCommitment>,
318    ) -> Result<(), RouteError> {
319        let route_id = route.identity.stamp.route_id;
320        self.runtime_adapter()
321            .persist_route(&RouterCheckpointRecord {
322                route: route.clone(),
323                commitments: commitments.clone(),
324            })?;
325        self.active_routes.insert(route_id, route);
326        self.published_commitments.insert(route_id, commitments);
327        Ok(())
328    }
329
330    // long-block-exception: activation is one fail-closed canonical route
331    // publication path from candidate selection through checkpointing.
332    fn activate_with_profile(
333        &mut self,
334        objective: &RoutingObjective,
335        profile: &SelectedRoutingParameters,
336    ) -> Result<MaterializedRoute, RouteError> {
337        self.advance_all_engines()?;
338        self.activate_with_profile_without_tick(objective, profile)
339    }
340
341    // long-block-exception: fail-closed activation keeps candidate selection,
342    // admission, materialization, and checkpoint publication in one path.
343    fn activate_with_profile_without_tick(
344        &mut self,
345        objective: &RoutingObjective,
346        profile: &SelectedRoutingParameters,
347    ) -> Result<MaterializedRoute, RouteError> {
348        let mut first_inadmissible = None;
349        for candidate in self.ordered_candidates(objective, profile) {
350            let route_id = candidate.route_id;
351            let engine_id = candidate.backend_ref.engine.clone();
352            let admission = match self.engine_for_id(&engine_id)?.admit_route(
353                objective,
354                profile,
355                candidate,
356                &self.topology,
357            ) {
358                Ok(admission) => admission,
359                Err(RouteError::Selection(RouteSelectionError::Inadmissible(reason))) => {
360                    first_inadmissible.get_or_insert(reason);
361                    continue;
362                }
363                Err(RouteError::Selection(RouteSelectionError::NoCandidate)) => continue,
364                Err(error) => return Err(error),
365            };
366            if admission.admission_check.decision != AdmissionDecision::Admissible {
367                if let AdmissionDecision::Rejected(reason) = admission.admission_check.decision {
368                    first_inadmissible.get_or_insert(reason);
369                }
370                continue;
371            }
372
373            let input = self.materialization_input(route_id, &admission)?;
374            let route_id = *input.handle.route_id();
375            let installation = self
376                .engine_for_id_mut(&engine_id)?
377                .materialize_route(input.clone())?;
378            let route = MaterializedRoute::from_installation(input, installation);
379            let commitments = self.route_commitments_for(&route)?;
380            let record = RouterCheckpointRecord {
381                route: route.clone(),
382                commitments: commitments.clone(),
383            };
384            if let Err(error) = self.runtime_adapter().persist_route(&record) {
385                self.engine_for_id_mut(&engine_id)?.teardown(&route_id);
386                return Err(error);
387            }
388            if let Err(error) = self.runtime_adapter().record_route_event(
389                jacquard_core::RouteEvent::RouteMaterialized {
390                    handle: jacquard_core::RouteHandle {
391                        stamp: route.identity.stamp.clone(),
392                    },
393                    proof: route.identity.proof.clone(),
394                },
395            ) {
396                let _rollback_failed = self.runtime_adapter().remove_route(&route_id).is_err();
397                self.engine_for_id_mut(&engine_id)?.teardown(&route_id);
398                return Err(error);
399            }
400            self.active_routes.insert(route_id, route.clone());
401            self.published_commitments.insert(route_id, commitments);
402            return Ok(route);
403        }
404        if let Some(reason) = first_inadmissible {
405            Err(RouteSelectionError::Inadmissible(reason).into())
406        } else {
407            Err(RouteSelectionError::NoCandidate.into())
408        }
409    }
410
411    /// Proof-bearing action: selects, admits, materializes, and publishes one
412    /// canonical route using the current policy inputs without advancing time.
413    pub fn activate_route_without_tick(
414        &mut self,
415        objective: &RoutingObjective,
416    ) -> Result<MaterializedRoute, RouteError> {
417        let profile = self
418            .policy_engine
419            .compute_profile(objective, &self.current_policy_inputs());
420        self.activate_with_profile_without_tick(objective, &profile)
421    }
422
423    fn materialization_input(
424        &mut self,
425        route_id: RouteId,
426        admission: &jacquard_core::RouteAdmission,
427    ) -> Result<RouteMaterializationInput, RouteError> {
428        let publication_id = publication_id(self.effects.next_order_stamp());
429        let now = self.effects.now_tick();
430        let lease = RouteLease {
431            owner_node_id: self.local_node_id,
432            lease_epoch: self.topology.value.epoch,
433            valid_for: TimeWindow::new(now, Tick(now.0.saturating_add(DEFAULT_ROUTE_LEASE_TICKS)))
434                .map_err(|_| RouteRuntimeError::Invalidated)?,
435        };
436        Ok(RouteMaterializationInput {
437            handle: RouteHandle {
438                stamp: RouteIdentityStamp {
439                    route_id,
440                    topology_epoch: self.topology.value.epoch,
441                    materialized_at_tick: now,
442                    publication_id,
443                },
444            },
445            admission: admission.clone(),
446            lease,
447        })
448    }
449
450    fn ordered_candidates(
451        &self,
452        objective: &RoutingObjective,
453        profile: &SelectedRoutingParameters,
454    ) -> Vec<RouteCandidate> {
455        let mut candidates = self
456            .registered_engines
457            .values()
458            .flat_map(|entry| {
459                entry
460                    .engine
461                    .candidate_routes(objective, profile, &self.topology)
462            })
463            .collect::<Vec<_>>();
464        candidates.sort_by_key(|candidate| candidate_ordering_key(candidate, profile));
465        candidates
466    }
467
468    fn expire_stale_leases(&mut self) -> Result<Option<RouteId>, RouteError> {
469        let now = self.effects.now_tick();
470        let expired = self
471            .active_routes
472            .iter()
473            .filter_map(|(route_id, route)| {
474                (!route.identity.lease.is_valid_at(now)).then_some(*route_id)
475            })
476            .collect::<Vec<_>>();
477        let first = expired.first().copied();
478        for route_id in expired {
479            let engine_id = self.route_engine_id(&route_id)?;
480            self.engine_for_id_mut(&engine_id)?.teardown(&route_id);
481            self.remove_published_route(&route_id)?;
482        }
483        Ok(first)
484    }
485
486    fn transfer_route_lease_inner(
487        &mut self,
488        route_id: &RouteId,
489        handoff: &RouteSemanticHandoff,
490    ) -> Result<MaterializedRoute, RouteError> {
491        let mut route = self
492            .active_routes
493            .get(route_id)
494            .cloned()
495            .ok_or(RouteSelectionError::NoCandidate)?;
496        if handoff.route_id != route.identity.stamp.route_id {
497            return Err(RouteRuntimeError::Invalidated.into());
498        }
499        route.identity.lease.owner_node_id = handoff.to_node_id;
500        route.identity.lease.lease_epoch = handoff.handoff_epoch;
501        let commitments = self.route_commitments_for(&route)?;
502        self.publish_route_state(route.clone(), commitments)?;
503        Ok(route)
504    }
505
506    fn apply_maintenance_result(
507        &mut self,
508        route_id: &RouteId,
509        next_runtime: jacquard_core::RouteRuntimeState,
510        result: RouteMaintenanceResult,
511    ) -> Result<RouterMaintenanceOutcome, RouteError> {
512        let canonical_mutation = match &result.outcome {
513            jacquard_core::RouteMaintenanceOutcome::ReplacementRequired { trigger } => {
514                self.handle_replacement_required(route_id, *trigger)?
515            }
516            jacquard_core::RouteMaintenanceOutcome::HandedOff(handoff) => {
517                self.handle_handoff_maintenance(route_id, next_runtime, handoff, &result)?
518            }
519            jacquard_core::RouteMaintenanceOutcome::Failed(
520                jacquard_core::RouteMaintenanceFailure::LeaseExpired,
521            ) => self.handle_expired_route(route_id, &result)?,
522            _ if result.event == jacquard_core::RouteLifecycleEvent::Expired => {
523                self.handle_expired_route(route_id, &result)?
524            }
525            _ => self.handle_continued_route(route_id, next_runtime, &result)?,
526        };
527
528        Ok(RouterMaintenanceOutcome {
529            engine_result: result,
530            canonical_mutation,
531        })
532    }
533
534    fn handle_replacement_required(
535        &mut self,
536        route_id: &RouteId,
537        trigger: RouteMaintenanceTrigger,
538    ) -> Result<RouterCanonicalMutation, RouteError> {
539        let route = <Self as Router>::reselect_route(self, route_id, trigger)?;
540        Ok(RouterCanonicalMutation::RouteReplaced {
541            previous_route_id: *route_id,
542            route: Box::new(route),
543        })
544    }
545
546    fn handle_handoff_maintenance(
547        &mut self,
548        route_id: &RouteId,
549        next_runtime: jacquard_core::RouteRuntimeState,
550        handoff: &RouteSemanticHandoff,
551        result: &RouteMaintenanceResult,
552    ) -> Result<RouterCanonicalMutation, RouteError> {
553        let rollback_record = self.checkpoint_record_for_active_route(route_id)?;
554        let mut route = self
555            .active_routes
556            .get(route_id)
557            .cloned()
558            .ok_or(RouteSelectionError::NoCandidate)?;
559        route.runtime = next_runtime;
560        route.identity.lease.owner_node_id = handoff.to_node_id;
561        route.identity.lease.lease_epoch = handoff.handoff_epoch;
562        let commitments = self.route_commitments_for(&route)?;
563        self.persist_route_with_event(
564            route_id,
565            route.clone(),
566            commitments,
567            result,
568            Some(rollback_record),
569        )?;
570        Ok(RouterCanonicalMutation::LeaseTransferred {
571            route_id: *route_id,
572            handoff: handoff.clone(),
573            lease: route.identity.lease,
574        })
575    }
576
577    fn handle_expired_route(
578        &mut self,
579        route_id: &RouteId,
580        result: &RouteMaintenanceResult,
581    ) -> Result<RouterCanonicalMutation, RouteError> {
582        let engine_id = self.route_engine_id(route_id)?;
583        let rollback_record = self.checkpoint_record_for_active_route(route_id)?;
584        self.runtime_adapter().remove_route(route_id)?;
585        if let Err(error) = self.runtime_adapter().record_route_event(
586            jacquard_core::RouteEvent::RouteMaintenanceCompleted {
587                route_id: *route_id,
588                result: result.clone(),
589            },
590        ) {
591            let _rollback_failed = self
592                .runtime_adapter()
593                .restore_route_record(&rollback_record)
594                .is_err();
595            return Err(error);
596        }
597        self.engine_for_id_mut(&engine_id)?.teardown(route_id);
598        self.active_routes.remove(route_id);
599        self.published_commitments.remove(route_id);
600        Ok(RouterCanonicalMutation::RouteExpired {
601            route_id: *route_id,
602        })
603    }
604
605    fn handle_continued_route(
606        &mut self,
607        route_id: &RouteId,
608        next_runtime: jacquard_core::RouteRuntimeState,
609        result: &RouteMaintenanceResult,
610    ) -> Result<RouterCanonicalMutation, RouteError> {
611        let rollback_record = self.checkpoint_record_for_active_route(route_id)?;
612        let mut route = self
613            .active_routes
614            .get(route_id)
615            .cloned()
616            .ok_or(RouteSelectionError::NoCandidate)?;
617        route.runtime = next_runtime;
618        let commitments = self.route_commitments_for(&route)?;
619        self.persist_route_with_event(route_id, route, commitments, result, Some(rollback_record))?;
620        Ok(RouterCanonicalMutation::None)
621    }
622
623    fn checkpoint_record_for_active_route(
624        &self,
625        route_id: &RouteId,
626    ) -> Result<RouterCheckpointRecord, RouteError> {
627        let route = self
628            .active_routes
629            .get(route_id)
630            .cloned()
631            .ok_or(RouteSelectionError::NoCandidate)?;
632        let commitments = self
633            .published_commitments
634            .get(route_id)
635            .cloned()
636            .ok_or(RouteSelectionError::NoCandidate)?;
637        Ok(RouterCheckpointRecord { route, commitments })
638    }
639
640    fn persist_route_with_event(
641        &mut self,
642        route_id: &RouteId,
643        route: MaterializedRoute,
644        commitments: Vec<RouteCommitment>,
645        result: &RouteMaintenanceResult,
646        rollback_record: Option<RouterCheckpointRecord>,
647    ) -> Result<(), RouteError> {
648        self.runtime_adapter()
649            .persist_route(&RouterCheckpointRecord {
650                route: route.clone(),
651                commitments: commitments.clone(),
652            })?;
653        if let Err(error) = self.runtime_adapter().record_route_event(
654            jacquard_core::RouteEvent::RouteMaintenanceCompleted {
655                route_id: *route_id,
656                result: result.clone(),
657            },
658        ) {
659            if let Some(record) = rollback_record {
660                let _rollback_failed = self
661                    .runtime_adapter()
662                    .restore_route_record(&record)
663                    .is_err();
664            } else {
665                let _rollback_failed = self.runtime_adapter().remove_route(route_id).is_err();
666            }
667            return Err(error);
668        }
669        self.active_routes.insert(*route_id, route);
670        self.published_commitments.insert(*route_id, commitments);
671        Ok(())
672    }
673}
674
675impl<Policy, Effects> RouterEngineRegistry for MultiEngineRouter<Policy, Effects>
676where
677    Policy: PolicyEngine,
678    Effects: TimeEffects + OrderEffects + StorageEffects + RouteEventLogEffects,
679{
680    fn register_engine(
681        &mut self,
682        extension: Box<dyn RouterManagedEngine>,
683    ) -> Result<(), RouteError> {
684        Self::register_engine(self, extension)
685    }
686
687    fn registered_engine_ids(&self) -> Vec<RoutingEngineId> {
688        Self::registered_engine_ids(self)
689    }
690
691    fn registered_engine_capabilities(
692        &self,
693        engine_id: &RoutingEngineId,
694    ) -> Option<RoutingEngineCapabilities> {
695        Self::registered_engine_capabilities(self, engine_id)
696    }
697}
698
699impl<Policy, Effects> RoutingMiddleware for MultiEngineRouter<Policy, Effects>
700where
701    Policy: PolicyEngine,
702    Effects: TimeEffects + OrderEffects + StorageEffects + RouteEventLogEffects,
703{
704    fn ingest_topology_observation(&mut self, topology: Observation<Configuration>) {
705        Self::ingest_topology_observation(self, topology);
706    }
707
708    fn ingest_policy_inputs(&mut self, inputs: RoutingPolicyInputs) {
709        Self::ingest_policy_inputs(self, inputs);
710    }
711
712    fn ingest_transport_observation(
713        &mut self,
714        observation: &TransportObservation,
715    ) -> Result<(), RouteError> {
716        Self::ingest_transport_observation(self, observation)
717    }
718
719    fn recover_checkpointed_routes(&mut self) -> Result<usize, RouteError> {
720        Self::recover_checkpointed_routes(self)
721    }
722}
723
724impl<Policy, Effects> Router for MultiEngineRouter<Policy, Effects>
725where
726    Policy: PolicyEngine,
727    Effects: TimeEffects + OrderEffects + StorageEffects + RouteEventLogEffects,
728{
729    fn activate_route(
730        &mut self,
731        objective: RoutingObjective,
732    ) -> Result<MaterializedRoute, RouteError> {
733        let profile = self
734            .policy_engine
735            .compute_profile(&objective, &self.current_policy_inputs());
736        self.activate_with_profile(&objective, &profile)
737    }
738
739    fn route_commitments(&self, route_id: &RouteId) -> Result<Vec<RouteCommitment>, RouteError> {
740        let commitments = self
741            .published_commitments
742            .get(route_id)
743            .ok_or(RouteSelectionError::NoCandidate)?;
744        Ok(commitments.clone())
745    }
746
747    fn reselect_route(
748        &mut self,
749        route_id: &RouteId,
750        _trigger: RouteMaintenanceTrigger,
751    ) -> Result<MaterializedRoute, RouteError> {
752        let objective = self
753            .active_routes
754            .get(route_id)
755            .ok_or(RouteSelectionError::NoCandidate)?
756            .identity
757            .admission
758            .objective
759            .clone();
760        let engine_id = self.route_engine_id(route_id)?;
761        self.engine_for_id_mut(&engine_id)?.teardown(route_id);
762        self.remove_published_route(route_id)?;
763        let profile = self
764            .policy_engine
765            .compute_profile(&objective, &self.current_policy_inputs());
766        self.activate_with_profile(&objective, &profile)
767    }
768
769    fn transfer_route_lease(
770        &mut self,
771        route_id: &RouteId,
772        handoff: RouteSemanticHandoff,
773    ) -> Result<MaterializedRoute, RouteError> {
774        self.transfer_route_lease_inner(route_id, &handoff)
775    }
776}
777
778impl<Policy, Effects> RoutingControlPlane for MultiEngineRouter<Policy, Effects>
779where
780    Policy: PolicyEngine,
781    Effects: TimeEffects + OrderEffects + StorageEffects + RouteEventLogEffects,
782{
783    fn activate_route(
784        &mut self,
785        objective: RoutingObjective,
786    ) -> Result<MaterializedRoute, RouteError> {
787        <Self as Router>::activate_route(self, objective)
788    }
789
790    fn maintain_route(
791        &mut self,
792        route_id: &RouteId,
793        trigger: RouteMaintenanceTrigger,
794    ) -> Result<RouterMaintenanceOutcome, RouteError> {
795        let (identity, mut next_runtime, engine_id) = {
796            let route = self
797                .active_routes
798                .get(route_id)
799                .ok_or(RouteSelectionError::NoCandidate)?;
800            (
801                route.identity.clone(),
802                route.runtime.clone(),
803                route.identity.admission.summary.engine.clone(),
804            )
805        };
806        let result = self.engine_for_id_mut(&engine_id)?.maintain_route(
807            &identity,
808            &mut next_runtime,
809            trigger,
810        )?;
811        self.apply_maintenance_result(route_id, next_runtime, result)
812    }
813
814    fn advance_round(&mut self) -> Result<RouterRoundOutcome, RouteError> {
815        let (aggregate, tick_hint) = self.advance_all_engines()?;
816        let expired_route_id = if aggregate == RoutingTickChange::PrivateStateUpdated {
817            self.expire_stale_leases()?
818        } else {
819            None
820        };
821        let canonical_mutation = expired_route_id
822            .map(|route_id| RouterCanonicalMutation::RouteExpired { route_id })
823            .unwrap_or(RouterCanonicalMutation::None);
824        Ok(RouterRoundOutcome {
825            topology_epoch: self.topology.value.epoch,
826            engine_change: aggregate,
827            next_round_hint: tick_hint,
828            canonical_mutation,
829        })
830    }
831}
832
833impl<Policy, Effects> RoutingDataPlane for MultiEngineRouter<Policy, Effects>
834where
835    Policy: PolicyEngine,
836    Effects: TimeEffects + OrderEffects + StorageEffects + RouteEventLogEffects,
837{
838    fn forward_payload(&mut self, route_id: &RouteId, payload: &[u8]) -> Result<(), RouteError> {
839        let engine_id = self.route_engine_id(route_id)?;
840        self.engine_for_id_mut(&engine_id)?
841            .forward_payload_for_router(route_id, payload)
842    }
843
844    fn observe_route_health(
845        &self,
846        route_id: &RouteId,
847    ) -> Result<Observation<RouteHealth>, RouteError> {
848        let route = self
849            .active_routes
850            .get(route_id)
851            .ok_or(RouteSelectionError::NoCandidate)?;
852        Ok(Observation {
853            value: route.runtime.health.clone(),
854            source_class: FactSourceClass::Local,
855            evidence_class: RoutingEvidenceClass::AdmissionWitnessed,
856            origin_authentication: OriginAuthenticationClass::Controlled,
857            observed_at_tick: self.effects.now_tick(),
858        })
859    }
860}
861
862fn publication_id(order: OrderStamp) -> PublicationId {
863    let mut bytes = [0_u8; 16];
864    bytes[..8].copy_from_slice(&order.0.to_le_bytes());
865    PublicationId(bytes)
866}
867
868type CandidateOrderingKey = (
869    Reverse<bool>,
870    Reverse<bool>,
871    Reverse<bool>,
872    Reverse<RouteProtectionClass>,
873    Reverse<RouteRepairClass>,
874    Reverse<RoutePartitionClass>,
875    RouteDegradation,
876    Belief<u8>,
877    Vec<TransportKind>,
878    RoutingEngineId,
879    Vec<u8>,
880);
881
882fn candidate_ordering_key(
883    candidate: &RouteCandidate,
884    profile: &SelectedRoutingParameters,
885) -> CandidateOrderingKey {
886    (
887        Reverse(candidate.summary.protection == profile.selected_protection),
888        Reverse(candidate.summary.connectivity.repair == profile.selected_connectivity.repair),
889        Reverse(
890            candidate.summary.connectivity.partition == profile.selected_connectivity.partition,
891        ),
892        Reverse(candidate.summary.protection),
893        Reverse(candidate.summary.connectivity.repair),
894        Reverse(candidate.summary.connectivity.partition),
895        candidate.estimate.value.degradation,
896        candidate.summary.hop_count_hint,
897        candidate.summary.protocol_mix.clone(),
898        candidate.backend_ref.engine.clone(),
899        candidate.backend_ref.backend_route_id.0.clone(),
900    )
901}