1use alloc::{boxed::Box, collections::BTreeMap, vec::Vec};
19use core::{any::Any, cmp::Reverse};
20
21#[cfg(feature = "std")]
22use std::sync::Arc;
23
24use jacquard_core::{
25 AdmissionDecision, Belief, CapabilityError, Configuration, FactSourceClass, MaterializedRoute,
26 Observation, OrderStamp, OriginAuthenticationClass, PublicationId, RouteCandidate,
27 RouteCommitment, RouteDegradation, RouteError, RouteHandle, RouteHealth, RouteId,
28 RouteIdentityStamp, RouteLease, RouteMaintenanceResult, RouteMaintenanceTrigger,
29 RouteMaterializationInput, RoutePartitionClass, RouteProtectionClass, RouteRepairClass,
30 RouteRuntimeError, RouteSelectionError, RouteSemanticHandoff, RouterCanonicalMutation,
31 RouterMaintenanceOutcome, RouterRoundOutcome, RoutingEngineCapabilities, RoutingEngineId,
32 RoutingEvidenceClass, RoutingObjective, RoutingPolicyInputs, RoutingTickChange,
33 RoutingTickContext, RoutingTickHint, SelectedRoutingParameters, Tick, TimeWindow,
34 TransportKind, TransportObservation,
35};
36use jacquard_traits::{
37 OrderEffects, PolicyEngine, RouteEventLogEffects, Router, RouterEngineRegistry,
38 RouterManagedEngine, RoutingControlPlane, RoutingDataPlane, RoutingMiddleware, StorageEffects,
39 TimeEffects,
40};
41
42use crate::runtime::{RouterCheckpointRecord, RouterRuntimeAdapter};
43
44const DEFAULT_ROUTE_LEASE_TICKS: u64 = 32;
45
46#[derive(Clone, Debug)]
47pub struct FixedPolicyEngine {
48 profile: SelectedRoutingParameters,
49}
50
51impl FixedPolicyEngine {
52 #[must_use]
53 pub fn new(profile: SelectedRoutingParameters) -> Self {
54 Self { profile }
55 }
56}
57
58impl PolicyEngine for FixedPolicyEngine {
59 fn compute_profile(
60 &self,
61 _objective: &RoutingObjective,
62 _inputs: &RoutingPolicyInputs,
63 ) -> SelectedRoutingParameters {
64 self.profile.clone()
65 }
66}
67
68struct RegisteredEngine {
69 capabilities: RoutingEngineCapabilities,
70 engine: Box<dyn RouterManagedEngine>,
71}
72
73pub struct MultiEngineRouter<Policy, Effects> {
74 local_node_id: jacquard_core::NodeId,
75 registered_engines: BTreeMap<RoutingEngineId, RegisteredEngine>,
76 policy_engine: Policy,
77 effects: Effects,
78 topology: Observation<Configuration>,
79 policy_inputs: RoutingPolicyInputs,
80 active_routes: BTreeMap<RouteId, MaterializedRoute>,
81 published_commitments: BTreeMap<RouteId, Vec<RouteCommitment>>,
82}
83
84impl<Policy, Effects> MultiEngineRouter<Policy, Effects>
85where
86 Policy: PolicyEngine,
87 Effects: TimeEffects + OrderEffects + StorageEffects + RouteEventLogEffects,
88{
89 #[must_use]
90 pub fn new(
91 local_node_id: jacquard_core::NodeId,
92 policy_engine: Policy,
93 effects: Effects,
94 topology: Observation<Configuration>,
95 policy_inputs: RoutingPolicyInputs,
96 ) -> Self {
97 Self {
98 local_node_id,
99 registered_engines: BTreeMap::new(),
100 policy_engine,
101 effects,
102 topology,
103 policy_inputs,
104 active_routes: BTreeMap::new(),
105 published_commitments: BTreeMap::new(),
106 }
107 }
108
109 pub fn register_engine(
110 &mut self,
111 extension: Box<dyn RouterManagedEngine>,
112 ) -> Result<(), RouteError> {
113 let engine_id = extension.engine_id();
114 if extension.local_node_id_for_router() != self.local_node_id {
115 return Err(CapabilityError::Rejected.into());
116 }
117 if self.registered_engines.contains_key(&engine_id) {
118 return Err(CapabilityError::Rejected.into());
119 }
120 let capabilities = extension.capabilities();
121 self.registered_engines.insert(
122 engine_id,
123 RegisteredEngine {
124 capabilities,
125 engine: extension,
126 },
127 );
128 Ok(())
129 }
130
131 #[cfg(feature = "std")]
132 pub fn ingest_shared_topology_observation(
133 &mut self,
134 topology: &Arc<Observation<Configuration>>,
135 ) {
136 self.topology = topology.as_ref().clone();
137 }
138
139 pub fn ingest_topology_observation(&mut self, topology: Observation<Configuration>) {
140 self.topology = topology;
141 }
142
143 pub fn ingest_policy_inputs(&mut self, inputs: RoutingPolicyInputs) {
144 self.policy_inputs = inputs;
145 }
146
147 pub fn ingest_transport_observation(
148 &mut self,
149 observation: &TransportObservation,
150 ) -> Result<(), RouteError> {
151 for entry in self.registered_engines.values_mut() {
152 entry
153 .engine
154 .ingest_transport_observation_for_router(observation)?;
155 }
156 Ok(())
157 }
158
159 pub fn recover_checkpointed_routes(&mut self) -> Result<usize, RouteError> {
160 let records =
161 RouterRuntimeAdapter::new(self.local_node_id, &mut self.effects).load_routes()?;
162 let mut recovered = 0usize;
163 for (route_id, record) in records {
164 let engine_id = record.route.identity.admission.summary.engine.clone();
165 let restored = match self.registered_engines.get_mut(&engine_id) {
166 Some(entry) => entry
167 .engine
168 .restore_route_runtime_with_record_for_router(&record.route, &self.topology)?,
169 None => false,
170 };
171 if !restored {
172 RouterRuntimeAdapter::new(self.local_node_id, &mut self.effects)
173 .remove_route(&route_id)?;
174 continue;
175 }
176 self.active_routes.insert(route_id, record.route);
177 self.published_commitments
178 .insert(route_id, record.commitments);
179 recovered = recovered.saturating_add(1);
180 }
181 Ok(recovered)
182 }
183
184 #[must_use]
185 pub fn effects(&self) -> &Effects {
186 &self.effects
187 }
188
189 pub fn effects_mut(&mut self) -> &mut Effects {
190 &mut self.effects
191 }
192
193 #[must_use]
194 pub fn local_node_id(&self) -> jacquard_core::NodeId {
195 self.local_node_id
196 }
197
198 #[must_use]
199 pub fn registered_engine_ids(&self) -> Vec<RoutingEngineId> {
200 self.registered_engines.keys().cloned().collect()
201 }
202
203 #[must_use]
204 pub fn registered_engine_capabilities(
205 &self,
206 engine_id: &RoutingEngineId,
207 ) -> Option<RoutingEngineCapabilities> {
208 self.registered_engines
209 .get(engine_id)
210 .map(|entry| entry.capabilities.clone())
211 }
212
213 #[must_use]
214 pub fn active_route(&self, route_id: &RouteId) -> Option<&MaterializedRoute> {
215 self.active_routes.get(route_id)
216 }
217
218 #[must_use]
219 pub fn active_route_count(&self) -> usize {
220 self.active_routes.len()
221 }
222
223 #[must_use]
224 pub fn active_routes_snapshot(&self) -> Vec<MaterializedRoute> {
225 let mut routes = self.active_routes.values().cloned().collect::<Vec<_>>();
226 routes.sort_by_key(|route| route.identity.stamp.route_id);
227 routes
228 }
229
230 #[must_use]
231 pub fn engine_analysis_snapshot(&self, engine_id: &RoutingEngineId) -> Option<Box<dyn Any>> {
232 let entry = self.registered_engines.get(engine_id)?;
233 let mut routes = self
234 .active_routes
235 .values()
236 .filter(|route| route.identity.admission.summary.engine == *engine_id)
237 .cloned()
238 .collect::<Vec<_>>();
239 routes.sort_by_key(|route| route.identity.stamp.route_id);
240 entry.engine.analysis_snapshot_for_router(&routes)
241 }
242
243 fn current_policy_inputs(&self) -> RoutingPolicyInputs {
244 let mut inputs = self.policy_inputs.clone();
245 inputs.routing_engine_count =
246 u32::try_from(self.registered_engines.len()).unwrap_or(u32::MAX);
247 inputs
248 }
249
250 fn runtime_adapter(&mut self) -> RouterRuntimeAdapter<'_, Effects> {
251 RouterRuntimeAdapter::new(self.local_node_id, &mut self.effects)
252 }
253
254 fn engine_for_id(
257 &self,
258 engine_id: &RoutingEngineId,
259 ) -> Result<&(dyn RouterManagedEngine + '_), RouteError> {
260 if let Some(entry) = self.registered_engines.get(engine_id) {
261 Ok(entry.engine.as_ref())
262 } else {
263 Err(CapabilityError::Unsupported.into())
264 }
265 }
266
267 fn engine_for_id_mut(
268 &mut self,
269 engine_id: &RoutingEngineId,
270 ) -> Result<&mut (dyn RouterManagedEngine + '_), RouteError> {
271 if let Some(entry) = self.registered_engines.get_mut(engine_id) {
272 Ok(entry.engine.as_mut())
273 } else {
274 Err(CapabilityError::Unsupported.into())
275 }
276 }
277
278 fn route_engine_id(&self, route_id: &RouteId) -> Result<RoutingEngineId, RouteError> {
279 self.active_routes
280 .get(route_id)
281 .map(|route| route.identity.admission.summary.engine.clone())
282 .ok_or(RouteSelectionError::NoCandidate.into())
283 }
284
285 fn route_commitments_for(
286 &self,
287 route: &MaterializedRoute,
288 ) -> Result<Vec<RouteCommitment>, RouteError> {
289 let engine_id = route.identity.admission.summary.engine.clone();
290 Ok(self.engine_for_id(&engine_id)?.route_commitments(route))
291 }
292
293 fn advance_all_engines(&mut self) -> Result<(RoutingTickChange, RoutingTickHint), RouteError> {
294 let tick = RoutingTickContext::new(self.topology.clone());
295 let mut aggregate = RoutingTickChange::NoChange;
296 let mut hint = RoutingTickHint::HostDefault;
297 for entry in self.registered_engines.values_mut() {
298 let outcome = entry.engine.engine_tick(&tick)?;
299 if outcome.change == RoutingTickChange::PrivateStateUpdated {
300 aggregate = RoutingTickChange::PrivateStateUpdated;
301 }
302 hint = hint.more_urgent(outcome.next_tick_hint);
303 }
304 Ok((aggregate, hint))
305 }
306
307 fn remove_published_route(&mut self, route_id: &RouteId) -> Result<(), RouteError> {
308 self.runtime_adapter().remove_route(route_id)?;
309 self.active_routes.remove(route_id);
310 self.published_commitments.remove(route_id);
311 Ok(())
312 }
313
314 fn publish_route_state(
315 &mut self,
316 route: MaterializedRoute,
317 commitments: Vec<RouteCommitment>,
318 ) -> Result<(), RouteError> {
319 let route_id = route.identity.stamp.route_id;
320 self.runtime_adapter()
321 .persist_route(&RouterCheckpointRecord {
322 route: route.clone(),
323 commitments: commitments.clone(),
324 })?;
325 self.active_routes.insert(route_id, route);
326 self.published_commitments.insert(route_id, commitments);
327 Ok(())
328 }
329
330 fn activate_with_profile(
333 &mut self,
334 objective: &RoutingObjective,
335 profile: &SelectedRoutingParameters,
336 ) -> Result<MaterializedRoute, RouteError> {
337 self.advance_all_engines()?;
338 self.activate_with_profile_without_tick(objective, profile)
339 }
340
341 fn activate_with_profile_without_tick(
344 &mut self,
345 objective: &RoutingObjective,
346 profile: &SelectedRoutingParameters,
347 ) -> Result<MaterializedRoute, RouteError> {
348 let mut first_inadmissible = None;
349 for candidate in self.ordered_candidates(objective, profile) {
350 let route_id = candidate.route_id;
351 let engine_id = candidate.backend_ref.engine.clone();
352 let admission = match self.engine_for_id(&engine_id)?.admit_route(
353 objective,
354 profile,
355 candidate,
356 &self.topology,
357 ) {
358 Ok(admission) => admission,
359 Err(RouteError::Selection(RouteSelectionError::Inadmissible(reason))) => {
360 first_inadmissible.get_or_insert(reason);
361 continue;
362 }
363 Err(RouteError::Selection(RouteSelectionError::NoCandidate)) => continue,
364 Err(error) => return Err(error),
365 };
366 if admission.admission_check.decision != AdmissionDecision::Admissible {
367 if let AdmissionDecision::Rejected(reason) = admission.admission_check.decision {
368 first_inadmissible.get_or_insert(reason);
369 }
370 continue;
371 }
372
373 let input = self.materialization_input(route_id, &admission)?;
374 let route_id = *input.handle.route_id();
375 let installation = self
376 .engine_for_id_mut(&engine_id)?
377 .materialize_route(input.clone())?;
378 let route = MaterializedRoute::from_installation(input, installation);
379 let commitments = self.route_commitments_for(&route)?;
380 let record = RouterCheckpointRecord {
381 route: route.clone(),
382 commitments: commitments.clone(),
383 };
384 if let Err(error) = self.runtime_adapter().persist_route(&record) {
385 self.engine_for_id_mut(&engine_id)?.teardown(&route_id);
386 return Err(error);
387 }
388 if let Err(error) = self.runtime_adapter().record_route_event(
389 jacquard_core::RouteEvent::RouteMaterialized {
390 handle: jacquard_core::RouteHandle {
391 stamp: route.identity.stamp.clone(),
392 },
393 proof: route.identity.proof.clone(),
394 },
395 ) {
396 let _rollback_failed = self.runtime_adapter().remove_route(&route_id).is_err();
397 self.engine_for_id_mut(&engine_id)?.teardown(&route_id);
398 return Err(error);
399 }
400 self.active_routes.insert(route_id, route.clone());
401 self.published_commitments.insert(route_id, commitments);
402 return Ok(route);
403 }
404 if let Some(reason) = first_inadmissible {
405 Err(RouteSelectionError::Inadmissible(reason).into())
406 } else {
407 Err(RouteSelectionError::NoCandidate.into())
408 }
409 }
410
411 pub fn activate_route_without_tick(
414 &mut self,
415 objective: &RoutingObjective,
416 ) -> Result<MaterializedRoute, RouteError> {
417 let profile = self
418 .policy_engine
419 .compute_profile(objective, &self.current_policy_inputs());
420 self.activate_with_profile_without_tick(objective, &profile)
421 }
422
423 fn materialization_input(
424 &mut self,
425 route_id: RouteId,
426 admission: &jacquard_core::RouteAdmission,
427 ) -> Result<RouteMaterializationInput, RouteError> {
428 let publication_id = publication_id(self.effects.next_order_stamp());
429 let now = self.effects.now_tick();
430 let lease = RouteLease {
431 owner_node_id: self.local_node_id,
432 lease_epoch: self.topology.value.epoch,
433 valid_for: TimeWindow::new(now, Tick(now.0.saturating_add(DEFAULT_ROUTE_LEASE_TICKS)))
434 .map_err(|_| RouteRuntimeError::Invalidated)?,
435 };
436 Ok(RouteMaterializationInput {
437 handle: RouteHandle {
438 stamp: RouteIdentityStamp {
439 route_id,
440 topology_epoch: self.topology.value.epoch,
441 materialized_at_tick: now,
442 publication_id,
443 },
444 },
445 admission: admission.clone(),
446 lease,
447 })
448 }
449
450 fn ordered_candidates(
451 &self,
452 objective: &RoutingObjective,
453 profile: &SelectedRoutingParameters,
454 ) -> Vec<RouteCandidate> {
455 let mut candidates = self
456 .registered_engines
457 .values()
458 .flat_map(|entry| {
459 entry
460 .engine
461 .candidate_routes(objective, profile, &self.topology)
462 })
463 .collect::<Vec<_>>();
464 candidates.sort_by_key(|candidate| candidate_ordering_key(candidate, profile));
465 candidates
466 }
467
468 fn expire_stale_leases(&mut self) -> Result<Option<RouteId>, RouteError> {
469 let now = self.effects.now_tick();
470 let expired = self
471 .active_routes
472 .iter()
473 .filter_map(|(route_id, route)| {
474 (!route.identity.lease.is_valid_at(now)).then_some(*route_id)
475 })
476 .collect::<Vec<_>>();
477 let first = expired.first().copied();
478 for route_id in expired {
479 let engine_id = self.route_engine_id(&route_id)?;
480 self.engine_for_id_mut(&engine_id)?.teardown(&route_id);
481 self.remove_published_route(&route_id)?;
482 }
483 Ok(first)
484 }
485
486 fn transfer_route_lease_inner(
487 &mut self,
488 route_id: &RouteId,
489 handoff: &RouteSemanticHandoff,
490 ) -> Result<MaterializedRoute, RouteError> {
491 let mut route = self
492 .active_routes
493 .get(route_id)
494 .cloned()
495 .ok_or(RouteSelectionError::NoCandidate)?;
496 if handoff.route_id != route.identity.stamp.route_id {
497 return Err(RouteRuntimeError::Invalidated.into());
498 }
499 route.identity.lease.owner_node_id = handoff.to_node_id;
500 route.identity.lease.lease_epoch = handoff.handoff_epoch;
501 let commitments = self.route_commitments_for(&route)?;
502 self.publish_route_state(route.clone(), commitments)?;
503 Ok(route)
504 }
505
506 fn apply_maintenance_result(
507 &mut self,
508 route_id: &RouteId,
509 next_runtime: jacquard_core::RouteRuntimeState,
510 result: RouteMaintenanceResult,
511 ) -> Result<RouterMaintenanceOutcome, RouteError> {
512 let canonical_mutation = match &result.outcome {
513 jacquard_core::RouteMaintenanceOutcome::ReplacementRequired { trigger } => {
514 self.handle_replacement_required(route_id, *trigger)?
515 }
516 jacquard_core::RouteMaintenanceOutcome::HandedOff(handoff) => {
517 self.handle_handoff_maintenance(route_id, next_runtime, handoff, &result)?
518 }
519 jacquard_core::RouteMaintenanceOutcome::Failed(
520 jacquard_core::RouteMaintenanceFailure::LeaseExpired,
521 ) => self.handle_expired_route(route_id, &result)?,
522 _ if result.event == jacquard_core::RouteLifecycleEvent::Expired => {
523 self.handle_expired_route(route_id, &result)?
524 }
525 _ => self.handle_continued_route(route_id, next_runtime, &result)?,
526 };
527
528 Ok(RouterMaintenanceOutcome {
529 engine_result: result,
530 canonical_mutation,
531 })
532 }
533
534 fn handle_replacement_required(
535 &mut self,
536 route_id: &RouteId,
537 trigger: RouteMaintenanceTrigger,
538 ) -> Result<RouterCanonicalMutation, RouteError> {
539 let route = <Self as Router>::reselect_route(self, route_id, trigger)?;
540 Ok(RouterCanonicalMutation::RouteReplaced {
541 previous_route_id: *route_id,
542 route: Box::new(route),
543 })
544 }
545
546 fn handle_handoff_maintenance(
547 &mut self,
548 route_id: &RouteId,
549 next_runtime: jacquard_core::RouteRuntimeState,
550 handoff: &RouteSemanticHandoff,
551 result: &RouteMaintenanceResult,
552 ) -> Result<RouterCanonicalMutation, RouteError> {
553 let rollback_record = self.checkpoint_record_for_active_route(route_id)?;
554 let mut route = self
555 .active_routes
556 .get(route_id)
557 .cloned()
558 .ok_or(RouteSelectionError::NoCandidate)?;
559 route.runtime = next_runtime;
560 route.identity.lease.owner_node_id = handoff.to_node_id;
561 route.identity.lease.lease_epoch = handoff.handoff_epoch;
562 let commitments = self.route_commitments_for(&route)?;
563 self.persist_route_with_event(
564 route_id,
565 route.clone(),
566 commitments,
567 result,
568 Some(rollback_record),
569 )?;
570 Ok(RouterCanonicalMutation::LeaseTransferred {
571 route_id: *route_id,
572 handoff: handoff.clone(),
573 lease: route.identity.lease,
574 })
575 }
576
577 fn handle_expired_route(
578 &mut self,
579 route_id: &RouteId,
580 result: &RouteMaintenanceResult,
581 ) -> Result<RouterCanonicalMutation, RouteError> {
582 let engine_id = self.route_engine_id(route_id)?;
583 let rollback_record = self.checkpoint_record_for_active_route(route_id)?;
584 self.runtime_adapter().remove_route(route_id)?;
585 if let Err(error) = self.runtime_adapter().record_route_event(
586 jacquard_core::RouteEvent::RouteMaintenanceCompleted {
587 route_id: *route_id,
588 result: result.clone(),
589 },
590 ) {
591 let _rollback_failed = self
592 .runtime_adapter()
593 .restore_route_record(&rollback_record)
594 .is_err();
595 return Err(error);
596 }
597 self.engine_for_id_mut(&engine_id)?.teardown(route_id);
598 self.active_routes.remove(route_id);
599 self.published_commitments.remove(route_id);
600 Ok(RouterCanonicalMutation::RouteExpired {
601 route_id: *route_id,
602 })
603 }
604
605 fn handle_continued_route(
606 &mut self,
607 route_id: &RouteId,
608 next_runtime: jacquard_core::RouteRuntimeState,
609 result: &RouteMaintenanceResult,
610 ) -> Result<RouterCanonicalMutation, RouteError> {
611 let rollback_record = self.checkpoint_record_for_active_route(route_id)?;
612 let mut route = self
613 .active_routes
614 .get(route_id)
615 .cloned()
616 .ok_or(RouteSelectionError::NoCandidate)?;
617 route.runtime = next_runtime;
618 let commitments = self.route_commitments_for(&route)?;
619 self.persist_route_with_event(route_id, route, commitments, result, Some(rollback_record))?;
620 Ok(RouterCanonicalMutation::None)
621 }
622
623 fn checkpoint_record_for_active_route(
624 &self,
625 route_id: &RouteId,
626 ) -> Result<RouterCheckpointRecord, RouteError> {
627 let route = self
628 .active_routes
629 .get(route_id)
630 .cloned()
631 .ok_or(RouteSelectionError::NoCandidate)?;
632 let commitments = self
633 .published_commitments
634 .get(route_id)
635 .cloned()
636 .ok_or(RouteSelectionError::NoCandidate)?;
637 Ok(RouterCheckpointRecord { route, commitments })
638 }
639
640 fn persist_route_with_event(
641 &mut self,
642 route_id: &RouteId,
643 route: MaterializedRoute,
644 commitments: Vec<RouteCommitment>,
645 result: &RouteMaintenanceResult,
646 rollback_record: Option<RouterCheckpointRecord>,
647 ) -> Result<(), RouteError> {
648 self.runtime_adapter()
649 .persist_route(&RouterCheckpointRecord {
650 route: route.clone(),
651 commitments: commitments.clone(),
652 })?;
653 if let Err(error) = self.runtime_adapter().record_route_event(
654 jacquard_core::RouteEvent::RouteMaintenanceCompleted {
655 route_id: *route_id,
656 result: result.clone(),
657 },
658 ) {
659 if let Some(record) = rollback_record {
660 let _rollback_failed = self
661 .runtime_adapter()
662 .restore_route_record(&record)
663 .is_err();
664 } else {
665 let _rollback_failed = self.runtime_adapter().remove_route(route_id).is_err();
666 }
667 return Err(error);
668 }
669 self.active_routes.insert(*route_id, route);
670 self.published_commitments.insert(*route_id, commitments);
671 Ok(())
672 }
673}
674
675impl<Policy, Effects> RouterEngineRegistry for MultiEngineRouter<Policy, Effects>
676where
677 Policy: PolicyEngine,
678 Effects: TimeEffects + OrderEffects + StorageEffects + RouteEventLogEffects,
679{
680 fn register_engine(
681 &mut self,
682 extension: Box<dyn RouterManagedEngine>,
683 ) -> Result<(), RouteError> {
684 Self::register_engine(self, extension)
685 }
686
687 fn registered_engine_ids(&self) -> Vec<RoutingEngineId> {
688 Self::registered_engine_ids(self)
689 }
690
691 fn registered_engine_capabilities(
692 &self,
693 engine_id: &RoutingEngineId,
694 ) -> Option<RoutingEngineCapabilities> {
695 Self::registered_engine_capabilities(self, engine_id)
696 }
697}
698
699impl<Policy, Effects> RoutingMiddleware for MultiEngineRouter<Policy, Effects>
700where
701 Policy: PolicyEngine,
702 Effects: TimeEffects + OrderEffects + StorageEffects + RouteEventLogEffects,
703{
704 fn ingest_topology_observation(&mut self, topology: Observation<Configuration>) {
705 Self::ingest_topology_observation(self, topology);
706 }
707
708 fn ingest_policy_inputs(&mut self, inputs: RoutingPolicyInputs) {
709 Self::ingest_policy_inputs(self, inputs);
710 }
711
712 fn ingest_transport_observation(
713 &mut self,
714 observation: &TransportObservation,
715 ) -> Result<(), RouteError> {
716 Self::ingest_transport_observation(self, observation)
717 }
718
719 fn recover_checkpointed_routes(&mut self) -> Result<usize, RouteError> {
720 Self::recover_checkpointed_routes(self)
721 }
722}
723
724impl<Policy, Effects> Router for MultiEngineRouter<Policy, Effects>
725where
726 Policy: PolicyEngine,
727 Effects: TimeEffects + OrderEffects + StorageEffects + RouteEventLogEffects,
728{
729 fn activate_route(
730 &mut self,
731 objective: RoutingObjective,
732 ) -> Result<MaterializedRoute, RouteError> {
733 let profile = self
734 .policy_engine
735 .compute_profile(&objective, &self.current_policy_inputs());
736 self.activate_with_profile(&objective, &profile)
737 }
738
739 fn route_commitments(&self, route_id: &RouteId) -> Result<Vec<RouteCommitment>, RouteError> {
740 let commitments = self
741 .published_commitments
742 .get(route_id)
743 .ok_or(RouteSelectionError::NoCandidate)?;
744 Ok(commitments.clone())
745 }
746
747 fn reselect_route(
748 &mut self,
749 route_id: &RouteId,
750 _trigger: RouteMaintenanceTrigger,
751 ) -> Result<MaterializedRoute, RouteError> {
752 let objective = self
753 .active_routes
754 .get(route_id)
755 .ok_or(RouteSelectionError::NoCandidate)?
756 .identity
757 .admission
758 .objective
759 .clone();
760 let engine_id = self.route_engine_id(route_id)?;
761 self.engine_for_id_mut(&engine_id)?.teardown(route_id);
762 self.remove_published_route(route_id)?;
763 let profile = self
764 .policy_engine
765 .compute_profile(&objective, &self.current_policy_inputs());
766 self.activate_with_profile(&objective, &profile)
767 }
768
769 fn transfer_route_lease(
770 &mut self,
771 route_id: &RouteId,
772 handoff: RouteSemanticHandoff,
773 ) -> Result<MaterializedRoute, RouteError> {
774 self.transfer_route_lease_inner(route_id, &handoff)
775 }
776}
777
778impl<Policy, Effects> RoutingControlPlane for MultiEngineRouter<Policy, Effects>
779where
780 Policy: PolicyEngine,
781 Effects: TimeEffects + OrderEffects + StorageEffects + RouteEventLogEffects,
782{
783 fn activate_route(
784 &mut self,
785 objective: RoutingObjective,
786 ) -> Result<MaterializedRoute, RouteError> {
787 <Self as Router>::activate_route(self, objective)
788 }
789
790 fn maintain_route(
791 &mut self,
792 route_id: &RouteId,
793 trigger: RouteMaintenanceTrigger,
794 ) -> Result<RouterMaintenanceOutcome, RouteError> {
795 let (identity, mut next_runtime, engine_id) = {
796 let route = self
797 .active_routes
798 .get(route_id)
799 .ok_or(RouteSelectionError::NoCandidate)?;
800 (
801 route.identity.clone(),
802 route.runtime.clone(),
803 route.identity.admission.summary.engine.clone(),
804 )
805 };
806 let result = self.engine_for_id_mut(&engine_id)?.maintain_route(
807 &identity,
808 &mut next_runtime,
809 trigger,
810 )?;
811 self.apply_maintenance_result(route_id, next_runtime, result)
812 }
813
814 fn advance_round(&mut self) -> Result<RouterRoundOutcome, RouteError> {
815 let (aggregate, tick_hint) = self.advance_all_engines()?;
816 let expired_route_id = if aggregate == RoutingTickChange::PrivateStateUpdated {
817 self.expire_stale_leases()?
818 } else {
819 None
820 };
821 let canonical_mutation = expired_route_id
822 .map(|route_id| RouterCanonicalMutation::RouteExpired { route_id })
823 .unwrap_or(RouterCanonicalMutation::None);
824 Ok(RouterRoundOutcome {
825 topology_epoch: self.topology.value.epoch,
826 engine_change: aggregate,
827 next_round_hint: tick_hint,
828 canonical_mutation,
829 })
830 }
831}
832
833impl<Policy, Effects> RoutingDataPlane for MultiEngineRouter<Policy, Effects>
834where
835 Policy: PolicyEngine,
836 Effects: TimeEffects + OrderEffects + StorageEffects + RouteEventLogEffects,
837{
838 fn forward_payload(&mut self, route_id: &RouteId, payload: &[u8]) -> Result<(), RouteError> {
839 let engine_id = self.route_engine_id(route_id)?;
840 self.engine_for_id_mut(&engine_id)?
841 .forward_payload_for_router(route_id, payload)
842 }
843
844 fn observe_route_health(
845 &self,
846 route_id: &RouteId,
847 ) -> Result<Observation<RouteHealth>, RouteError> {
848 let route = self
849 .active_routes
850 .get(route_id)
851 .ok_or(RouteSelectionError::NoCandidate)?;
852 Ok(Observation {
853 value: route.runtime.health.clone(),
854 source_class: FactSourceClass::Local,
855 evidence_class: RoutingEvidenceClass::AdmissionWitnessed,
856 origin_authentication: OriginAuthenticationClass::Controlled,
857 observed_at_tick: self.effects.now_tick(),
858 })
859 }
860}
861
862fn publication_id(order: OrderStamp) -> PublicationId {
863 let mut bytes = [0_u8; 16];
864 bytes[..8].copy_from_slice(&order.0.to_le_bytes());
865 PublicationId(bytes)
866}
867
868type CandidateOrderingKey = (
869 Reverse<bool>,
870 Reverse<bool>,
871 Reverse<bool>,
872 Reverse<RouteProtectionClass>,
873 Reverse<RouteRepairClass>,
874 Reverse<RoutePartitionClass>,
875 RouteDegradation,
876 Belief<u8>,
877 Vec<TransportKind>,
878 RoutingEngineId,
879 Vec<u8>,
880);
881
882fn candidate_ordering_key(
883 candidate: &RouteCandidate,
884 profile: &SelectedRoutingParameters,
885) -> CandidateOrderingKey {
886 (
887 Reverse(candidate.summary.protection == profile.selected_protection),
888 Reverse(candidate.summary.connectivity.repair == profile.selected_connectivity.repair),
889 Reverse(
890 candidate.summary.connectivity.partition == profile.selected_connectivity.partition,
891 ),
892 Reverse(candidate.summary.protection),
893 Reverse(candidate.summary.connectivity.repair),
894 Reverse(candidate.summary.connectivity.partition),
895 candidate.estimate.value.degradation,
896 candidate.summary.hop_count_hint,
897 candidate.summary.protocol_mix.clone(),
898 candidate.backend_ref.engine.clone(),
899 candidate.backend_ref.backend_route_id.0.clone(),
900 )
901}