1use alloc::{collections::BTreeMap, vec::Vec};
14
15use jacquard_core::{
16 Configuration, DestinationId, Link, MaterializedRoute, Node, NodeId, Observation, RouteEvent,
17 RouteEventStamped, RouteHealth, RouteId, RouteLifecycleEvent, RouteShapeVisibility,
18 RouterCanonicalMutation, RouterRoundOutcome, RoutingEngineCapabilities, RoutingEngineId, Tick,
19 TransportDeliveryMode,
20};
21use serde::{Deserialize, Serialize};
22
23#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
24pub struct ObservedNode {
25 pub controller_id: jacquard_core::ControllerId,
26 pub profile: jacquard_core::NodeProfile,
27 pub state: jacquard_core::NodeState,
28 pub observed_at_tick: Tick,
29}
30
31#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
32pub struct ObservedLink {
33 pub endpoint: jacquard_core::LinkEndpoint,
34 pub profile: jacquard_core::LinkProfile,
35 pub state: jacquard_core::LinkState,
36 pub observed_at_tick: Tick,
37}
38
39#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
40pub enum ObservedRouteShape {
41 ExplicitPath,
42 CorridorEnvelope,
43 NextHopOnly,
44 Opaque,
45}
46
47impl ObservedRouteShape {
48 #[must_use]
49 pub fn from_visibility(visibility: RouteShapeVisibility) -> Self {
50 match visibility {
51 RouteShapeVisibility::ExplicitPath => Self::ExplicitPath,
52 RouteShapeVisibility::CorridorEnvelope => Self::CorridorEnvelope,
53 RouteShapeVisibility::NextHopOnly => Self::NextHopOnly,
54 RouteShapeVisibility::Opaque => Self::Opaque,
55 }
56 }
57}
58
59#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
60pub struct ObservedRoute {
61 pub route_id: RouteId,
62 pub destination: DestinationId,
63 pub engine_id: RoutingEngineId,
64 pub route_shape: ObservedRouteShape,
65 pub delivery_mode: TransportDeliveryMode,
66 pub hop_count_hint: jacquard_core::Belief<u8>,
67 pub topology_epoch: jacquard_core::RouteEpoch,
68 pub publication_id: jacquard_core::PublicationId,
69 pub lease: jacquard_core::RouteLease,
70 pub protection: jacquard_core::RouteProtectionClass,
71 pub connectivity: jacquard_core::ConnectivityPosture,
72 pub protocol_mix: Vec<jacquard_core::TransportKind>,
73 pub lifecycle_event: RouteLifecycleEvent,
74 pub lifecycle_updated_at_tick: Tick,
75 pub health: RouteHealth,
76}
77
78#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
79pub struct TopologySnapshot {
80 pub local_node_id: NodeId,
81 pub observed_at_tick: Tick,
82 pub nodes: BTreeMap<NodeId, ObservedNode>,
83 pub links: BTreeMap<(NodeId, NodeId), ObservedLink>,
84 pub active_routes: BTreeMap<RouteId, ObservedRoute>,
85}
86
87pub struct TopologyProjector {
88 snapshot: TopologySnapshot,
89 engine_capabilities: BTreeMap<RoutingEngineId, RoutingEngineCapabilities>,
90}
91
92impl TopologyProjector {
93 #[must_use]
94 pub fn new(local_node_id: NodeId, initial: Observation<Configuration>) -> Self {
95 let mut projector = Self {
96 snapshot: TopologySnapshot {
97 local_node_id,
98 observed_at_tick: initial.observed_at_tick,
99 nodes: BTreeMap::new(),
100 links: BTreeMap::new(),
101 active_routes: BTreeMap::new(),
102 },
103 engine_capabilities: BTreeMap::new(),
104 };
105 projector.ingest_topology(initial);
106 projector
107 }
108
109 pub fn ingest_topology(&mut self, observation: Observation<Configuration>) {
110 self.snapshot.observed_at_tick = observation.observed_at_tick;
111 self.snapshot.nodes = observation
112 .value
113 .nodes
114 .into_iter()
115 .map(|(node_id, node)| {
116 (
117 node_id,
118 ObservedNode::from_node(node, observation.observed_at_tick),
119 )
120 })
121 .collect();
122 self.snapshot.links = observation
123 .value
124 .links
125 .into_iter()
126 .map(|(endpoints, link)| {
127 (
128 endpoints,
129 ObservedLink::from_link(link, observation.observed_at_tick),
130 )
131 })
132 .collect();
133 }
134
135 pub fn ingest_engine_capabilities(&mut self, capabilities: RoutingEngineCapabilities) {
136 let visibility = capabilities.route_shape_visibility;
137 let engine_id = capabilities.engine.clone();
138 self.engine_capabilities
139 .insert(engine_id.clone(), capabilities);
140 for route in self.snapshot.active_routes.values_mut() {
141 if route.engine_id == engine_id {
142 route.route_shape = ObservedRouteShape::from_visibility(visibility);
143 }
144 }
145 }
146
147 pub fn ingest_materialized_route(&mut self, route: &MaterializedRoute) {
148 self.snapshot.observed_at_tick = self
149 .snapshot
150 .observed_at_tick
151 .max(route.identity.materialized_at_tick());
152 self.snapshot.active_routes.insert(
153 *route.identity.route_id(),
154 self.project_route(route, route.identity.materialized_at_tick()),
155 );
156 }
157
158 pub fn ingest_route_event(&mut self, event: &RouteEventStamped) {
159 self.snapshot.observed_at_tick = self.snapshot.observed_at_tick.max(event.emitted_at_tick);
160 match &event.event {
161 RouteEvent::RouteMaterialized { handle, .. } => {
162 if let Some(route) = self.snapshot.active_routes.get_mut(handle.route_id()) {
163 route.lifecycle_event = RouteLifecycleEvent::Activated;
164 route.lifecycle_updated_at_tick = event.emitted_at_tick;
165 }
166 }
167 RouteEvent::RouteMaintenanceCompleted { route_id, result } => {
168 if let Some(route) = self.snapshot.active_routes.get_mut(route_id) {
169 route.lifecycle_event = result.event;
170 route.lifecycle_updated_at_tick = event.emitted_at_tick;
171 }
172 }
173 RouteEvent::RouteCommitmentUpdated { .. } => {}
174 RouteEvent::RouteHealthObserved { route_id, health } => {
175 if let Some(route) = self.snapshot.active_routes.get_mut(route_id) {
176 route.health = health.value.clone();
177 route.lifecycle_updated_at_tick =
178 route.lifecycle_updated_at_tick.max(health.observed_at_tick);
179 }
180 }
181 }
182 }
183
184 pub fn ingest_round_outcome(&mut self, outcome: &RouterRoundOutcome) {
185 match &outcome.canonical_mutation {
186 RouterCanonicalMutation::None => {}
187 RouterCanonicalMutation::RouteReplaced {
188 previous_route_id,
189 route,
190 } => {
191 self.snapshot.active_routes.remove(previous_route_id);
192 self.ingest_materialized_route(route);
193 if let Some(current) = self
194 .snapshot
195 .active_routes
196 .get_mut(route.identity.route_id())
197 {
198 current.lifecycle_event = RouteLifecycleEvent::Replaced;
199 }
200 }
201 RouterCanonicalMutation::LeaseTransferred {
202 route_id,
203 handoff: _,
204 lease,
205 } => {
206 if let Some(route) = self.snapshot.active_routes.get_mut(route_id) {
207 route.lease = lease.clone();
208 }
209 }
210 RouterCanonicalMutation::RouteExpired { route_id } => {
211 self.snapshot.active_routes.remove(route_id);
212 }
213 }
214 }
215
216 #[must_use]
217 pub fn snapshot(&self) -> &TopologySnapshot {
218 &self.snapshot
219 }
220
221 fn project_route(
222 &self,
223 route: &MaterializedRoute,
224 lifecycle_updated_at_tick: Tick,
225 ) -> ObservedRoute {
226 let summary = &route.identity.admission.summary;
227 ObservedRoute {
228 route_id: *route.identity.route_id(),
229 destination: route.identity.admission.objective.destination.clone(),
230 engine_id: summary.engine.clone(),
231 route_shape: self.project_shape(&summary.engine),
232 delivery_mode: delivery_mode_for_destination(
233 &route.identity.admission.objective.destination,
234 ),
235 hop_count_hint: summary.hop_count_hint,
236 topology_epoch: route.identity.topology_epoch(),
237 publication_id: *route.identity.publication_id(),
238 lease: route.identity.lease.clone(),
239 protection: summary.protection,
240 connectivity: summary.connectivity,
241 protocol_mix: summary.protocol_mix.clone(),
242 lifecycle_event: route.runtime.last_lifecycle_event,
243 lifecycle_updated_at_tick,
244 health: route.runtime.health.clone(),
245 }
246 }
247
248 fn project_shape(&self, engine_id: &RoutingEngineId) -> ObservedRouteShape {
249 let visibility = self
250 .engine_capabilities
251 .get(engine_id)
252 .map_or(RouteShapeVisibility::Opaque, |caps| {
253 caps.route_shape_visibility
254 });
255 ObservedRouteShape::from_visibility(visibility)
256 }
257}
258
259fn delivery_mode_for_destination(destination: &DestinationId) -> TransportDeliveryMode {
260 match destination {
261 DestinationId::Node(_) => TransportDeliveryMode::Unicast,
262 DestinationId::Service(_) | DestinationId::Gateway(_) => TransportDeliveryMode::Unicast,
263 }
264}
265
266impl ObservedNode {
267 fn from_node(node: Node, observed_at_tick: Tick) -> Self {
268 Self {
269 controller_id: node.controller_id,
270 profile: node.profile,
271 state: node.state,
272 observed_at_tick,
273 }
274 }
275}
276
277impl ObservedLink {
278 fn from_link(link: Link, observed_at_tick: Tick) -> Self {
279 Self {
280 endpoint: link.endpoint,
281 profile: link.profile,
282 state: link.state,
283 observed_at_tick,
284 }
285 }
286}
287
288#[cfg(test)]
289mod tests {
290 use std::collections::BTreeMap;
291
292 use jacquard_core::{
293 AdmissionAssumptions, AdmissionDecision, AdversaryRegime, BackendRouteId, Belief,
294 ByteCount, CapacityHint, ClaimStrength, Configuration, ConnectivityPosture,
295 ConnectivityRegime, ControllerId, DegradationReason, DestinationId, DurationMs,
296 EndpointLocator, Environment, Estimate, Fact, FactBasis, FactSourceClass,
297 FailureModelClass, HealthScore, HoldItemCount, Limit, Link, LinkEndpoint, LinkProfile,
298 LinkRuntimeState, LinkState, MessageFlowAssumptionClass, NodeBuilder, NodeDensityClass,
299 NodeId, NodeProfileBuilder, NodeStateBuilder, Observation, OperatingMode, OrderStamp,
300 OriginAuthenticationClass, PartitionRecoveryClass, PenaltyPoints, PriorityPoints,
301 PublicationId, QuantitativeBoundSupport, RatioPermille, ReachabilityState,
302 ReconfigurationSupport, RelayWorkBudget, RepairCapability, RepairSupport, RouteAdmission,
303 RouteAdmissionCheck, RouteCandidate, RouteCost, RouteDegradation, RouteEpoch,
304 RouteEstimate, RouteEvent, RouteEventStamped, RouteHandle, RouteHealth, RouteId,
305 RouteLease, RouteLifecycleEvent, RouteMaintenanceOutcome, RouteMaintenanceResult,
306 RouteMaterializationInput, RouteMaterializationProof, RoutePartitionClass,
307 RouteProgressContract, RouteProgressState, RouteProtectionClass, RouteRepairClass,
308 RouteReplacementPolicy, RouteSemanticHandoff, RouteServiceKind, RouteShapeVisibility,
309 RouteWitness, RouterCanonicalMutation, RouterRoundOutcome, RoutingEngineCapabilities,
310 RoutingEngineFallbackPolicy, RoutingEngineId, RoutingEvidenceClass, RoutingObjective,
311 RuntimeEnvelopeClass, SelectedRoutingParameters, ServiceDescriptorBuilder, Tick,
312 TimeWindow, TransportKind,
313 };
314
315 use super::*;
316
317 const LOCAL_NODE_ID: NodeId = NodeId([1; 32]);
318 const REMOTE_NODE_ID: NodeId = NodeId([2; 32]);
319 const ENGINE_A: RoutingEngineId = RoutingEngineId::from_contract_bytes(*b"test.engine.a...");
320 const ENGINE_B: RoutingEngineId = RoutingEngineId::from_contract_bytes(*b"test.engine.b...");
321
322 fn topology() -> Observation<Configuration> {
323 Observation {
324 value: Configuration {
325 epoch: RouteEpoch(2),
326 nodes: BTreeMap::from([
327 (LOCAL_NODE_ID, node(LOCAL_NODE_ID, 1)),
328 (REMOTE_NODE_ID, node(REMOTE_NODE_ID, 2)),
329 ]),
330 links: BTreeMap::from([((LOCAL_NODE_ID, REMOTE_NODE_ID), link(7))]),
331 environment: Environment {
332 reachable_neighbor_count: 1,
333 churn_permille: RatioPermille(0),
334 contention_permille: RatioPermille(0),
335 },
336 },
337 source_class: FactSourceClass::Local,
338 evidence_class: RoutingEvidenceClass::DirectObservation,
339 origin_authentication: OriginAuthenticationClass::Controlled,
340 observed_at_tick: Tick(5),
341 }
342 }
343
344 fn endpoint(byte: u8) -> LinkEndpoint {
345 LinkEndpoint::new(
346 TransportKind::Custom("reference".to_owned()),
347 EndpointLocator::Opaque(vec![byte]),
348 ByteCount(128),
349 )
350 }
351
352 fn node(node_id: NodeId, controller_byte: u8) -> jacquard_core::Node {
353 let controller_id = ControllerId([controller_byte; 32]);
354 let endpoint = endpoint(controller_byte);
355 let service = ServiceDescriptorBuilder::new(node_id, controller_id, RouteServiceKind::Move)
356 .with_endpoint(endpoint.clone())
357 .with_routing_engine(&ENGINE_A)
358 .with_valid_for(TimeWindow::new(Tick(1), Tick(10)).expect("window"))
359 .with_capacity(
360 CapacityHint::new(RatioPermille(200))
361 .with_hold_capacity_bytes(ByteCount(128), Tick(5)),
362 Tick(5),
363 )
364 .build();
365 let profile = NodeProfileBuilder::new()
366 .with_service(service)
367 .with_endpoint(endpoint)
368 .with_connection_limits(2, 4, 2, 2)
369 .with_work_budgets(
370 RelayWorkBudget(10),
371 jacquard_core::MaintenanceWorkBudget(10),
372 )
373 .with_hold_limits(HoldItemCount(2), ByteCount(512))
374 .build();
375 let state = NodeStateBuilder::new()
376 .with_relay_budget(
377 RelayWorkBudget(4),
378 RatioPermille(250),
379 DurationMs(1_000),
380 Tick(5),
381 )
382 .with_available_connections(1, Tick(5))
383 .with_hold_capacity(ByteCount(256), Tick(5))
384 .with_information_summary(HoldItemCount(2), ByteCount(128), RatioPermille(10), Tick(5))
385 .build();
386 NodeBuilder::new(controller_id, profile, state).build()
387 }
388
389 fn link(byte: u8) -> Link {
390 Link {
391 endpoint: jacquard_core::LinkEndpoint::new(
392 TransportKind::Custom("reference".to_owned()),
393 EndpointLocator::Opaque(vec![byte]),
394 ByteCount(128),
395 ),
396 profile: LinkProfile {
397 latency_floor_ms: DurationMs(8),
398 repair_capability: RepairCapability::TransportRetransmit,
399 partition_recovery: PartitionRecoveryClass::LocalReconnect,
400 },
401 state: LinkState {
402 state: LinkRuntimeState::Active,
403 median_rtt_ms: Belief::certain(DurationMs(12), Tick(5)),
404 transfer_rate_bytes_per_sec: Belief::certain(4_096, Tick(5)),
405 stability_horizon_ms: Belief::certain(DurationMs(1_000), Tick(5)),
406 loss_permille: RatioPermille(25),
407 delivery_confidence_permille: Belief::certain(RatioPermille(960), Tick(5)),
408 symmetry_permille: Belief::certain(RatioPermille(990), Tick(5)),
409 },
410 }
411 }
412
413 fn capabilities(
414 engine: RoutingEngineId,
415 route_shape_visibility: RouteShapeVisibility,
416 ) -> RoutingEngineCapabilities {
417 RoutingEngineCapabilities {
418 engine,
419 max_protection: RouteProtectionClass::LinkProtected,
420 max_connectivity: ConnectivityPosture {
421 repair: RouteRepairClass::Repairable,
422 partition: RoutePartitionClass::PartitionTolerant,
423 },
424 repair_support: RepairSupport::Supported,
425 hold_support: jacquard_core::HoldSupport::Supported,
426 decidable_admission: jacquard_core::DecidableSupport::Supported,
427 quantitative_bounds: QuantitativeBoundSupport::ProductiveOnly,
428 reconfiguration_support: ReconfigurationSupport::LinkAndDelegate,
429 route_shape_visibility,
430 }
431 }
432
433 fn materialized_route(engine: RoutingEngineId, route_byte: u8) -> MaterializedRoute {
434 let objective = route_objective();
435 let summary = route_summary(engine.clone());
436 let candidate = route_candidate(engine, route_byte, &summary);
437 let witness = route_witness(summary.connectivity);
438 let input =
439 route_materialization_input(route_byte, objective, summary, &candidate, &witness);
440 MaterializedRoute::from_installation(input, route_installation(route_byte, witness))
441 }
442
443 fn route_objective() -> RoutingObjective {
444 RoutingObjective {
445 destination: DestinationId::Node(REMOTE_NODE_ID),
446 service_kind: RouteServiceKind::Move,
447 target_protection: RouteProtectionClass::LinkProtected,
448 protection_floor: RouteProtectionClass::LinkProtected,
449 target_connectivity: ConnectivityPosture {
450 repair: RouteRepairClass::Repairable,
451 partition: RoutePartitionClass::PartitionTolerant,
452 },
453 hold_fallback_policy: jacquard_core::HoldFallbackPolicy::Allowed,
454 latency_budget_ms: Limit::Bounded(DurationMs(250)),
455 protection_priority: PriorityPoints(10),
456 connectivity_priority: PriorityPoints(20),
457 }
458 }
459
460 fn route_summary(engine: RoutingEngineId) -> jacquard_core::RouteSummary {
461 jacquard_core::RouteSummary {
462 engine,
463 protection: RouteProtectionClass::LinkProtected,
464 connectivity: ConnectivityPosture {
465 repair: RouteRepairClass::Repairable,
466 partition: RoutePartitionClass::PartitionTolerant,
467 },
468 protocol_mix: vec![TransportKind::Custom("reference".to_owned())],
469 hop_count_hint: Belief::certain(2, Tick(5)),
470 valid_for: TimeWindow::new(Tick(5), Tick(20)).expect("valid summary window"),
471 }
472 }
473
474 fn route_candidate(
475 engine: RoutingEngineId,
476 route_byte: u8,
477 summary: &jacquard_core::RouteSummary,
478 ) -> RouteCandidate {
479 RouteCandidate {
480 route_id: RouteId([route_byte; 16]),
481 summary: summary.clone(),
482 estimate: Estimate {
483 value: RouteEstimate {
484 estimated_protection: summary.protection,
485 estimated_connectivity: summary.connectivity,
486 topology_epoch: RouteEpoch(2),
487 degradation: RouteDegradation::Degraded(DegradationReason::LinkInstability),
488 },
489 confidence_permille: RatioPermille(950),
490 updated_at_tick: Tick(5),
491 },
492 backend_ref: jacquard_core::BackendRouteRef {
493 engine,
494 backend_route_id: BackendRouteId(vec![1, 2, 3]),
495 },
496 }
497 }
498
499 fn route_witness(connectivity: ConnectivityPosture) -> RouteWitness {
500 RouteWitness {
501 protection: jacquard_core::ObjectiveVsDelivered {
502 objective: RouteProtectionClass::LinkProtected,
503 delivered: RouteProtectionClass::LinkProtected,
504 },
505 connectivity: jacquard_core::ObjectiveVsDelivered {
506 objective: connectivity,
507 delivered: connectivity,
508 },
509 admission_profile: AdmissionAssumptions {
510 message_flow_assumption: MessageFlowAssumptionClass::BestEffort,
511 failure_model: FailureModelClass::Benign,
512 runtime_envelope: RuntimeEnvelopeClass::Canonical,
513 node_density_class: NodeDensityClass::Moderate,
514 connectivity_regime: ConnectivityRegime::Stable,
515 adversary_regime: AdversaryRegime::Cooperative,
516 claim_strength: ClaimStrength::ConservativeUnderProfile,
517 },
518 topology_epoch: RouteEpoch(2),
519 degradation: RouteDegradation::Degraded(DegradationReason::LinkInstability),
520 }
521 }
522
523 fn route_materialization_input(
524 route_byte: u8,
525 objective: RoutingObjective,
526 summary: jacquard_core::RouteSummary,
527 candidate: &RouteCandidate,
528 witness: &RouteWitness,
529 ) -> RouteMaterializationInput {
530 RouteMaterializationInput {
531 handle: RouteHandle {
532 stamp: route_identity_stamp(route_byte),
533 },
534 admission: RouteAdmission {
535 backend_ref: candidate.backend_ref.clone(),
536 objective,
537 profile: SelectedRoutingParameters {
538 selected_protection: RouteProtectionClass::LinkProtected,
539 selected_connectivity: summary.connectivity,
540 deployment_profile: OperatingMode::DenseInteractive,
541 diversity_floor: jacquard_core::DiversityFloor(1),
542 routing_engine_fallback_policy: RoutingEngineFallbackPolicy::Allowed,
543 route_replacement_policy: RouteReplacementPolicy::Allowed,
544 },
545 admission_check: route_admission_check(),
546 summary,
547 witness: witness.clone(),
548 },
549 lease: RouteLease {
550 owner_node_id: LOCAL_NODE_ID,
551 lease_epoch: RouteEpoch(2),
552 valid_for: TimeWindow::new(Tick(6), Tick(12)).expect("lease window"),
553 },
554 }
555 }
556
557 fn route_admission_check() -> RouteAdmissionCheck {
558 RouteAdmissionCheck {
559 decision: AdmissionDecision::Admissible,
560 profile: AdmissionAssumptions {
561 message_flow_assumption: MessageFlowAssumptionClass::BestEffort,
562 failure_model: FailureModelClass::Benign,
563 runtime_envelope: RuntimeEnvelopeClass::Canonical,
564 node_density_class: NodeDensityClass::Moderate,
565 connectivity_regime: ConnectivityRegime::Stable,
566 adversary_regime: AdversaryRegime::Cooperative,
567 claim_strength: ClaimStrength::ConservativeUnderProfile,
568 },
569 productive_step_bound: Limit::Bounded(3),
570 total_step_bound: Limit::Bounded(6),
571 route_cost: RouteCost {
572 message_count_max: Limit::Bounded(8),
573 byte_count_max: Limit::Bounded(ByteCount(512)),
574 hop_count: 2,
575 repair_attempt_count_max: Limit::Bounded(2),
576 hold_bytes_reserved: Limit::Bounded(ByteCount(128)),
577 work_step_count_max: Limit::Bounded(12),
578 },
579 }
580 }
581
582 fn route_installation(
583 route_byte: u8,
584 witness: RouteWitness,
585 ) -> jacquard_core::RouteInstallation {
586 jacquard_core::RouteInstallation {
587 materialization_proof: RouteMaterializationProof {
588 stamp: route_identity_stamp(route_byte),
589 witness: Fact {
590 value: witness,
591 basis: FactBasis::Published,
592 established_at_tick: Tick(6),
593 },
594 },
595 last_lifecycle_event: RouteLifecycleEvent::Activated,
596 health: RouteHealth {
597 reachability_state: ReachabilityState::Reachable,
598 stability_score: HealthScore(900),
599 congestion_penalty_points: PenaltyPoints(4),
600 last_validated_at_tick: Tick(6),
601 },
602 progress: RouteProgressContract {
603 productive_step_count_max: Limit::Bounded(4),
604 total_step_count_max: Limit::Bounded(8),
605 last_progress_at_tick: Tick(6),
606 state: RouteProgressState::Satisfied,
607 },
608 }
609 }
610
611 fn route_identity_stamp(route_byte: u8) -> jacquard_core::RouteIdentityStamp {
612 jacquard_core::RouteIdentityStamp {
613 route_id: RouteId([route_byte; 16]),
614 topology_epoch: RouteEpoch(2),
615 materialized_at_tick: Tick(6),
616 publication_id: PublicationId([route_byte; 16]),
617 }
618 }
619
620 #[test]
621 fn topology_ingestion_replaces_node_and_link_projection() {
622 let projector = TopologyProjector::new(LOCAL_NODE_ID, topology());
623
624 assert_eq!(projector.snapshot().local_node_id, LOCAL_NODE_ID);
625 assert_eq!(projector.snapshot().observed_at_tick, Tick(5));
626 assert_eq!(projector.snapshot().nodes.len(), 2);
627 assert_eq!(projector.snapshot().links.len(), 1);
628 }
629
630 #[test]
631 fn materialized_route_defaults_to_opaque_until_capabilities_are_known() {
632 let mut projector = TopologyProjector::new(LOCAL_NODE_ID, topology());
633 let route = materialized_route(ENGINE_A, 9);
634
635 projector.ingest_materialized_route(&route);
636
637 let projected = projector
638 .snapshot()
639 .active_routes
640 .get(route.identity.route_id())
641 .expect("projected route");
642 assert_eq!(projected.engine_id, ENGINE_A);
643 assert_eq!(projected.route_shape, ObservedRouteShape::Opaque);
644 assert_eq!(projected.delivery_mode, TransportDeliveryMode::Unicast);
645 assert_eq!(projected.hop_count_hint, Belief::certain(2, Tick(5)));
646 }
647
648 #[test]
649 fn capabilities_upgrade_existing_route_shape_projection() {
650 let mut projector = TopologyProjector::new(LOCAL_NODE_ID, topology());
651 let route = materialized_route(ENGINE_A, 9);
652 projector.ingest_materialized_route(&route);
653 projector
654 .ingest_engine_capabilities(capabilities(ENGINE_A, RouteShapeVisibility::ExplicitPath));
655
656 let projected = projector
657 .snapshot()
658 .active_routes
659 .get(route.identity.route_id())
660 .expect("projected route");
661 assert_eq!(projected.route_shape, ObservedRouteShape::ExplicitPath);
662 assert_eq!(projected.hop_count_hint, Belief::certain(2, Tick(5)));
663 }
664
665 #[test]
666 fn maintenance_events_update_projected_lifecycle_and_health() {
667 let mut projector = TopologyProjector::new(LOCAL_NODE_ID, topology());
668 let route = materialized_route(ENGINE_B, 9);
669 projector.ingest_materialized_route(&route);
670 projector
671 .ingest_engine_capabilities(capabilities(ENGINE_B, RouteShapeVisibility::NextHopOnly));
672
673 projector.ingest_route_event(&RouteEventStamped {
674 order_stamp: OrderStamp(2),
675 emitted_at_tick: Tick(7),
676 event: RouteEvent::RouteMaintenanceCompleted {
677 route_id: *route.identity.route_id(),
678 result: RouteMaintenanceResult {
679 event: RouteLifecycleEvent::Repaired,
680 outcome: RouteMaintenanceOutcome::Repaired,
681 },
682 },
683 });
684 projector.ingest_route_event(&RouteEventStamped {
685 order_stamp: OrderStamp(3),
686 emitted_at_tick: Tick(8),
687 event: RouteEvent::RouteHealthObserved {
688 route_id: *route.identity.route_id(),
689 health: Observation {
690 value: RouteHealth {
691 reachability_state: ReachabilityState::Reachable,
692 stability_score: HealthScore(850),
693 congestion_penalty_points: PenaltyPoints(7),
694 last_validated_at_tick: Tick(8),
695 },
696 source_class: FactSourceClass::Local,
697 evidence_class: RoutingEvidenceClass::DirectObservation,
698 origin_authentication: OriginAuthenticationClass::Controlled,
699 observed_at_tick: Tick(8),
700 },
701 },
702 });
703
704 let projected = projector
705 .snapshot()
706 .active_routes
707 .get(route.identity.route_id())
708 .expect("projected route");
709 assert_eq!(projected.lifecycle_event, RouteLifecycleEvent::Repaired);
710 assert_eq!(projected.lifecycle_updated_at_tick, Tick(8));
711 assert_eq!(projected.health.stability_score, HealthScore(850));
712 assert_eq!(projected.route_shape, ObservedRouteShape::NextHopOnly);
713 assert_eq!(projected.hop_count_hint, Belief::certain(2, Tick(5)));
714 }
715
716 #[test]
717 fn round_outcomes_apply_router_owned_canonical_mutations() {
720 let mut projector = TopologyProjector::new(LOCAL_NODE_ID, topology());
721 let route = materialized_route(ENGINE_A, 9);
722 let replacement = materialized_route(ENGINE_B, 10);
723 projector.ingest_materialized_route(&route);
724
725 projector.ingest_round_outcome(&RouterRoundOutcome {
726 topology_epoch: RouteEpoch(2),
727 engine_change: jacquard_core::RoutingTickChange::PrivateStateUpdated,
728 next_round_hint: jacquard_core::RoutingTickHint::Immediate,
729 canonical_mutation: RouterCanonicalMutation::RouteReplaced {
730 previous_route_id: *route.identity.route_id(),
731 route: Box::new(replacement.clone()),
732 },
733 });
734 assert!(!projector
735 .snapshot()
736 .active_routes
737 .contains_key(route.identity.route_id()));
738 assert!(projector
739 .snapshot()
740 .active_routes
741 .contains_key(replacement.identity.route_id()));
742
743 projector.ingest_round_outcome(&RouterRoundOutcome {
744 topology_epoch: RouteEpoch(2),
745 engine_change: jacquard_core::RoutingTickChange::NoChange,
746 next_round_hint: jacquard_core::RoutingTickHint::HostDefault,
747 canonical_mutation: RouterCanonicalMutation::LeaseTransferred {
748 route_id: *replacement.identity.route_id(),
749 handoff: RouteSemanticHandoff {
750 route_id: *replacement.identity.route_id(),
751 from_node_id: LOCAL_NODE_ID,
752 to_node_id: REMOTE_NODE_ID,
753 handoff_epoch: RouteEpoch(3),
754 receipt_id: jacquard_core::ReceiptId([8; 16]),
755 },
756 lease: RouteLease {
757 owner_node_id: REMOTE_NODE_ID,
758 lease_epoch: RouteEpoch(3),
759 valid_for: TimeWindow::new(Tick(8), Tick(20)).expect("lease"),
760 },
761 },
762 });
763 assert_eq!(
764 projector
765 .snapshot()
766 .active_routes
767 .get(replacement.identity.route_id())
768 .expect("replacement route")
769 .lease
770 .owner_node_id,
771 REMOTE_NODE_ID
772 );
773
774 projector.ingest_round_outcome(&RouterRoundOutcome {
775 topology_epoch: RouteEpoch(3),
776 engine_change: jacquard_core::RoutingTickChange::NoChange,
777 next_round_hint: jacquard_core::RoutingTickHint::HostDefault,
778 canonical_mutation: RouterCanonicalMutation::RouteExpired {
779 route_id: *replacement.identity.route_id(),
780 },
781 });
782 assert!(!projector
783 .snapshot()
784 .active_routes
785 .contains_key(replacement.identity.route_id()));
786 }
787
788 #[test]
791 fn route_event_without_canonical_route_does_not_invent_route_truth() {
792 let mut projector = TopologyProjector::new(LOCAL_NODE_ID, topology());
793
794 projector.ingest_route_event(&RouteEventStamped {
795 order_stamp: OrderStamp(1),
796 emitted_at_tick: Tick(6),
797 event: RouteEvent::RouteMaterialized {
798 handle: RouteHandle {
799 stamp: jacquard_core::RouteIdentityStamp {
800 route_id: RouteId([1; 16]),
801 topology_epoch: RouteEpoch(2),
802 materialized_at_tick: Tick(6),
803 publication_id: PublicationId([1; 16]),
804 },
805 },
806 proof: RouteMaterializationProof {
807 stamp: jacquard_core::RouteIdentityStamp {
808 route_id: RouteId([1; 16]),
809 topology_epoch: RouteEpoch(2),
810 materialized_at_tick: Tick(6),
811 publication_id: PublicationId([1; 16]),
812 },
813 witness: Fact {
814 value: RouteWitness {
815 protection: jacquard_core::ObjectiveVsDelivered {
816 objective: RouteProtectionClass::LinkProtected,
817 delivered: RouteProtectionClass::LinkProtected,
818 },
819 connectivity: jacquard_core::ObjectiveVsDelivered {
820 objective: ConnectivityPosture {
821 repair: RouteRepairClass::Repairable,
822 partition: RoutePartitionClass::PartitionTolerant,
823 },
824 delivered: ConnectivityPosture {
825 repair: RouteRepairClass::Repairable,
826 partition: RoutePartitionClass::PartitionTolerant,
827 },
828 },
829 admission_profile: AdmissionAssumptions {
830 message_flow_assumption: MessageFlowAssumptionClass::BestEffort,
831 failure_model: FailureModelClass::Benign,
832 runtime_envelope: RuntimeEnvelopeClass::Canonical,
833 node_density_class: NodeDensityClass::Moderate,
834 connectivity_regime: ConnectivityRegime::Stable,
835 adversary_regime: AdversaryRegime::Cooperative,
836 claim_strength: ClaimStrength::ConservativeUnderProfile,
837 },
838 topology_epoch: RouteEpoch(2),
839 degradation: RouteDegradation::None,
840 },
841 basis: FactBasis::Published,
842 established_at_tick: Tick(6),
843 },
844 },
845 },
846 });
847
848 assert!(projector.snapshot().active_routes.is_empty());
849 }
850}