1#![allow(dead_code)]
3
4use super::config_profiles::impl_service_config_profiles;
5use super::local_health_observer::LocalHealthObserverService;
6use super::service_registry::{ProviderHealthSnapshot, ServiceRegistry};
7use super::traits::{RuntimeService, RuntimeServiceContext, ServiceError, ServiceHealth};
8use async_trait::async_trait;
9use aura_core::effects::RandomEffects;
10use aura_core::service::{
11 BootstrapContactHint, BootstrapIntroductionHint, LinkEndpoint, LocalEstablishDecision,
12 LocalHealthSnapshot, LocalHoldDecision, LocalMoveDecision, LocalRoutingProfile,
13 LocalSelectionProfile, MessageClassRoutingConstraint, MovePath, MovePathBinding,
14 NeighborhoodReentryHint, PrivacyMessageClass, ProviderCandidate, ProviderEvidence, Route,
15 SchedulerClass, SecurityControlClass, SelectionState, ServiceFamily, ServiceProfile,
16};
17use aura_core::types::identifiers::{AuthorityId, ContextId, DeviceId};
18use aura_rendezvous::{
19 validate_bootstrap_contact_hint, validate_bootstrap_introduction_hint,
20 validate_neighborhood_reentry_hint,
21};
22use std::collections::{BTreeMap, BTreeSet, HashMap};
23use std::sync::Arc;
24use tokio::sync::RwLock;
25
26#[allow(dead_code)]
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28enum SelectionManagerCommand {
29 SelectProfile,
30 RecordProfile,
31}
32
33#[derive(Debug, Clone)]
34pub struct SelectionManagerConfig {
35 pub default_path_ttl_ms: u64,
36 pub profile_change_min_interval_ms: u64,
37 pub residency_turns: u32,
38 pub security_control_floor: u32,
39 pub privacy_mode_enabled: bool,
40 pub min_mixing_depth: u8,
41 pub max_mixing_depth: u8,
42 pub path_diversity_floor: u8,
43 pub cover_floor_per_second: u32,
44 pub cover_gain_per_rtt_bucket: u32,
45 pub delay_gain_numerator: u32,
46 pub delay_gain_denominator: u32,
47 pub delay_hysteresis_ms: u64,
48 pub cover_hysteresis_per_second: u32,
49 pub diversity_hysteresis: u8,
50 pub tuning_enabled: bool,
51}
52
53impl Default for SelectionManagerConfig {
54 fn default() -> Self {
55 Self {
56 default_path_ttl_ms: 30_000,
57 profile_change_min_interval_ms: 1_000,
58 residency_turns: 2,
59 security_control_floor: 2,
60 privacy_mode_enabled: false,
61 min_mixing_depth: 1,
62 max_mixing_depth: 3,
63 path_diversity_floor: 2,
67 cover_floor_per_second: 2,
70 cover_gain_per_rtt_bucket: 1,
71 delay_gain_numerator: 1,
72 delay_gain_denominator: 3,
75 delay_hysteresis_ms: 25,
76 cover_hysteresis_per_second: 1,
77 diversity_hysteresis: 1,
78 tuning_enabled: false,
79 }
80 }
81}
82
83impl_service_config_profiles!(SelectionManagerConfig {
84 pub fn for_testing() -> Self {
85 Self {
86 default_path_ttl_ms: 1_000,
87 profile_change_min_interval_ms: 0,
88 residency_turns: 1,
89 security_control_floor: 1,
90 privacy_mode_enabled: true,
91 min_mixing_depth: 1,
92 max_mixing_depth: 3,
93 path_diversity_floor: 2,
94 cover_floor_per_second: 1,
95 cover_gain_per_rtt_bucket: 1,
96 delay_gain_numerator: 1,
97 delay_gain_denominator: 2,
98 delay_hysteresis_ms: 5,
99 cover_hysteresis_per_second: 1,
100 diversity_hysteresis: 1,
101 tuning_enabled: true,
102 }
103 }
104});
105
106#[derive(Debug)]
107struct SelectionManagerState {
108 last_profiles: HashMap<ContextId, LocalSelectionProfile>,
109 residency: HashMap<(ContextId, AuthorityId), u32>,
110 bootstrap: HashMap<ContextId, BootstrapScopeState>,
111 lifecycle: ServiceHealth,
112}
113
114impl Default for SelectionManagerState {
115 fn default() -> Self {
116 Self {
117 last_profiles: HashMap::new(),
118 residency: HashMap::new(),
119 bootstrap: HashMap::new(),
120 lifecycle: ServiceHealth::NotStarted,
121 }
122 }
123}
124
125#[derive(Debug, thiserror::Error)]
126pub enum SelectionManagerError {
127 #[error("no reachable candidates available for family {family:?}")]
128 NoReachableCandidates { family: ServiceFamily },
129 #[error("privacy mode requires at least one anonymous relay candidate")]
130 NoAnonymousRelayCandidates,
131 #[error("invalid bootstrap record ({kind}): {reason}")]
132 InvalidBootstrapRecord { kind: &'static str, reason: String },
133}
134
135#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
136pub enum BootstrapHintSource {
137 RememberedDirectContact,
138 PriorProvider,
139 NeighborhoodDiscoveryBoard,
140 WebOfTrustIntroduction,
141 BootstrapBridge,
142}
143
144#[derive(Debug, Clone, PartialEq, Eq)]
145pub struct BootstrapHint {
146 pub authority_id: AuthorityId,
147 pub device_id: Option<DeviceId>,
148 pub link_endpoints: Vec<LinkEndpoint>,
149 pub route_layer_public_key: Option<[u8; 32]>,
150 pub source: BootstrapHintSource,
151 pub observed_at_ms: u64,
152 pub reliability_bps: u16,
153 pub breadth_hint: u8,
154 pub bridge_cluster: Option<u64>,
155}
156
157#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
158pub enum BootstrapReentryStage {
159 RememberedContacts,
160 NeighborhoodBoards,
161 WebOfTrustIntroductions,
162 BootstrapBridges,
163}
164
165#[derive(Debug, Clone, PartialEq, Eq)]
166pub struct BootstrapReentryDecision {
167 pub stage: BootstrapReentryStage,
168 pub candidate_authorities: Vec<AuthorityId>,
169}
170
171#[derive(Debug, Clone, Default)]
172struct BootstrapAttemptState {
173 attempts: u32,
174 last_attempt_at_ms: Option<u64>,
175 last_success_at_ms: Option<u64>,
176}
177
178#[derive(Debug, Clone)]
179struct BootstrapCandidateState {
180 authority_id: AuthorityId,
181 device_id: Option<DeviceId>,
182 link_endpoints: Vec<LinkEndpoint>,
183 route_layer_public_key: Option<[u8; 32]>,
184 sources: BTreeSet<BootstrapHintSource>,
185 freshest_observed_at_ms: u64,
186 reliability_bps: u16,
187 breadth_hint: u8,
188 bridge_cluster: Option<u64>,
189}
190
191#[derive(Debug, Clone, Default)]
192struct BootstrapScopeState {
193 candidates: HashMap<AuthorityId, BootstrapCandidateState>,
194 attempts: BTreeMap<BootstrapReentryStage, BootstrapAttemptState>,
195}
196
197#[aura_macros::service_surface(
198 families = "Establish,Move,Hold",
199 object_categories = "runtime_derived_local,transport_protocol",
200 discover = "social_manager_and_service_registry",
201 permit = "runtime_local_policy_and_health",
202 transfer = "selection_manager",
203 select = "selection_manager",
204 authoritative = "",
205 runtime_local = "selection_profiles,residency_windows,weighting_state",
206 category = "service_surface"
207)]
208#[aura_macros::actor_owned(
209 owner = "selection_manager",
210 domain = "adaptive_privacy_selection",
211 gate = "selection_manager_command_ingress",
212 command = SelectionManagerCommand,
213 capacity = 64,
214 category = "actor_owned"
215)]
216pub struct SelectionManagerService {
217 config: SelectionManagerConfig,
218 registry: Arc<ServiceRegistry>,
219 health: LocalHealthObserverService,
220 state: RwLock<SelectionManagerState>,
221}
222
223impl SelectionManagerService {
224 pub fn new(
225 config: SelectionManagerConfig,
226 registry: Arc<ServiceRegistry>,
227 health: LocalHealthObserverService,
228 ) -> Self {
229 Self {
230 config: sanitize_config_for_build(config),
231 registry,
232 health,
233 state: RwLock::new(SelectionManagerState::default()),
234 }
235 }
236
237 pub async fn select_profile<E: RandomEffects + ?Sized>(
238 &self,
239 scope: ContextId,
240 destination: LinkEndpoint,
241 move_candidates: &[ProviderCandidate],
242 hold_candidates: &[ProviderCandidate],
243 now_ms: u64,
244 random: &E,
245 ) -> Result<LocalSelectionProfile, SelectionManagerError> {
246 if let Some(existing) = self.state.read().await.last_profiles.get(&scope).cloned() {
247 if now_ms
248 < existing
249 .health
250 .generated_at_ms
251 .saturating_add(self.config.profile_change_min_interval_ms)
252 {
253 return Ok(existing);
254 }
255 }
256 let health = self.health.snapshot().await;
257 let move_candidates = self
258 .merge_bootstrap_candidates(scope, ServiceFamily::Move, move_candidates, now_ms)
259 .await;
260 let hold_candidates = self
261 .merge_bootstrap_candidates(scope, ServiceFamily::Hold, hold_candidates, now_ms)
262 .await;
263 let selected_move_authorities: Vec<AuthorityId> = self
264 .pick_authorities(
265 scope,
266 &move_candidates,
267 ServiceFamily::Move,
268 &health,
269 now_ms,
270 random,
271 )
272 .await?;
273 let selected_hold_authorities: Vec<AuthorityId> = self
274 .pick_authorities(
275 scope,
276 &hold_candidates,
277 ServiceFamily::Hold,
278 &health,
279 now_ms,
280 random,
281 )
282 .await
283 .unwrap_or_default();
284
285 let routing_profile = if self.config.privacy_mode_enabled {
286 self.continuous_routing_profile(&health, selected_move_authorities.len() as u8)
287 } else {
288 LocalRoutingProfile::passthrough()
289 };
290
291 let establish = if self.config.privacy_mode_enabled {
292 if selected_move_authorities.is_empty() {
293 return Err(SelectionManagerError::NoAnonymousRelayCandidates);
294 }
295 let route = build_route(
296 &selected_move_authorities,
297 &move_candidates,
298 &move_candidates_by_authority(&move_candidates),
299 destination.clone(),
300 );
301 Some(LocalEstablishDecision {
302 profile: ServiceProfile::AnonymousPathEstablish,
303 route,
304 retain_path_until_ms: Some(now_ms.saturating_add(self.config.default_path_ttl_ms)),
305 scheduler_class: Some(SchedulerClass::BoundedDeadlineReply),
306 })
307 } else {
308 None
309 };
310
311 let hold = if !selected_hold_authorities.is_empty() {
312 Some(LocalHoldDecision {
313 profile: ServiceProfile::DeferredDeliveryHold,
314 selected_authorities: selected_hold_authorities.clone(),
315 bounded_residency_remaining: Some(self.config.residency_turns),
316 })
317 } else {
318 None
319 };
320
321 let synthetic_cover_gap_per_second = routing_profile.cover_rate_per_second.saturating_sub(
322 ((health.traffic_volume_bytes + health.sync_blended_retrieval_bytes) / 1024) as u32,
323 );
324
325 let profile = LocalSelectionProfile {
326 scope,
327 health: health.clone(),
328 establish,
329 move_decision: LocalMoveDecision {
330 routing_profile: apply_profile_hysteresis(
331 self.state
332 .read()
333 .await
334 .last_profiles
335 .get(&scope)
336 .map(|profile| &profile.move_decision.routing_profile),
337 routing_profile,
338 &self.config,
339 ),
340 binding: MovePathBinding::Direct(MovePath::direct(destination)),
341 scheduler_class: SchedulerClass::SyncBlended,
342 metadata_minimized: true,
343 },
344 hold,
345 security_control_floor: self.config.security_control_floor,
346 security_controls: vec![
347 SecurityControlClass::AnonymousPathEstablish,
348 SecurityControlClass::CapabilityTrustUpdate,
349 SecurityControlClass::AccountabilityReply,
350 SecurityControlClass::RetrievalCapabilityRotation,
351 ],
352 message_class_constraints: vec![
353 MessageClassRoutingConstraint {
354 message_class: PrivacyMessageClass::Ceremony,
355 force_scheduler_class: Some(SchedulerClass::BoundedDeadlineReply),
356 max_mixing_depth: Some(1),
357 max_delay_ms: Some(0),
358 },
359 MessageClassRoutingConstraint {
360 message_class: PrivacyMessageClass::Consensus,
361 force_scheduler_class: Some(SchedulerClass::BoundedDeadlineReply),
362 max_mixing_depth: Some(1),
363 max_delay_ms: Some(0),
364 },
365 MessageClassRoutingConstraint {
366 message_class: PrivacyMessageClass::AccountabilityReply,
367 force_scheduler_class: Some(SchedulerClass::BoundedDeadlineReply),
368 max_mixing_depth: Some(1),
369 max_delay_ms: Some(self.config.delay_hysteresis_ms),
370 },
371 ],
372 synthetic_cover_gap_per_second,
373 };
374
375 let profile = self.apply_profile_rate_limit(scope, profile, now_ms).await;
376
377 self.registry
378 .record_selection_state(
379 scope,
380 SelectionState {
381 family: ServiceFamily::Move,
382 selected_authorities: selected_move_authorities,
383 epoch: None,
384 bounded_residency_remaining: Some(self.config.residency_turns),
385 },
386 )
387 .await;
388 if !selected_hold_authorities.is_empty() {
389 self.registry
390 .record_selection_state(
391 scope,
392 SelectionState {
393 family: ServiceFamily::Hold,
394 selected_authorities: selected_hold_authorities,
395 epoch: None,
396 bounded_residency_remaining: Some(self.config.residency_turns),
397 },
398 )
399 .await;
400 }
401 self.state
402 .write()
403 .await
404 .last_profiles
405 .insert(scope, profile.clone());
406 Ok(profile)
407 }
408
409 pub async fn remember_bootstrap_hints(&self, scope: ContextId, hints: &[BootstrapHint]) {
410 let mut state = self.state.write().await;
411 let scope_state = state.bootstrap.entry(scope).or_default();
412 for hint in hints {
413 let entry = scope_state
414 .candidates
415 .entry(hint.authority_id)
416 .or_insert_with(|| BootstrapCandidateState {
417 authority_id: hint.authority_id,
418 device_id: hint.device_id,
419 link_endpoints: hint.link_endpoints.clone(),
420 route_layer_public_key: hint.route_layer_public_key,
421 sources: BTreeSet::new(),
422 freshest_observed_at_ms: hint.observed_at_ms,
423 reliability_bps: hint.reliability_bps,
424 breadth_hint: hint.breadth_hint,
425 bridge_cluster: hint.bridge_cluster,
426 });
427 entry.device_id = entry.device_id.or(hint.device_id);
428 for endpoint in &hint.link_endpoints {
429 if !entry.link_endpoints.contains(endpoint) {
430 entry.link_endpoints.push(endpoint.clone());
431 }
432 }
433 if entry.route_layer_public_key.is_none() {
434 entry.route_layer_public_key = hint.route_layer_public_key;
435 }
436 entry.sources.insert(hint.source);
437 entry.freshest_observed_at_ms = entry.freshest_observed_at_ms.max(hint.observed_at_ms);
438 entry.reliability_bps = entry.reliability_bps.max(hint.reliability_bps);
439 entry.breadth_hint = entry.breadth_hint.max(hint.breadth_hint);
440 entry.bridge_cluster = entry.bridge_cluster.or(hint.bridge_cluster);
441 }
442 }
443
444 pub async fn remember_bootstrap_contact_records(
445 &self,
446 scope: ContextId,
447 records: &[BootstrapContactHint],
448 ) -> Result<usize, SelectionManagerError> {
449 let hints = records
450 .iter()
451 .map(|record| {
452 validate_bootstrap_contact_hint(record).map_err(|error| {
453 SelectionManagerError::InvalidBootstrapRecord {
454 kind: "bootstrap_contact_hint",
455 reason: format!("{error:?}"),
456 }
457 })?;
458 Ok(BootstrapHint {
459 authority_id: record.authority_id,
460 device_id: record.device_id,
461 link_endpoints: record.link_endpoints.clone(),
462 route_layer_public_key: record.route_layer_public_key,
463 source: BootstrapHintSource::RememberedDirectContact,
464 observed_at_ms: record.freshest_observed_at_ms,
465 reliability_bps: 9_000,
466 breadth_hint: 1,
467 bridge_cluster: None,
468 })
469 })
470 .collect::<Result<Vec<_>, _>>()?;
471 self.remember_bootstrap_hints(scope, &hints).await;
472 Ok(hints.len())
473 }
474
475 pub async fn remember_neighborhood_reentry_records(
476 &self,
477 scope: ContextId,
478 records: &[NeighborhoodReentryHint],
479 ) -> Result<usize, SelectionManagerError> {
480 let hints = records
481 .iter()
482 .map(|record| {
483 validate_neighborhood_reentry_hint(record).map_err(|error| {
484 SelectionManagerError::InvalidBootstrapRecord {
485 kind: "neighborhood_reentry_hint",
486 reason: format!("{error:?}"),
487 }
488 })?;
489 Ok(BootstrapHint {
490 authority_id: record.advertised_authority,
491 device_id: record.advertised_device,
492 link_endpoints: record.link_endpoints.clone(),
493 route_layer_public_key: record.route_layer_public_key,
494 source: BootstrapHintSource::NeighborhoodDiscoveryBoard,
495 observed_at_ms: record.published_at_ms,
496 reliability_bps: 7_000,
497 breadth_hint: 3,
498 bridge_cluster: None,
499 })
500 })
501 .collect::<Result<Vec<_>, _>>()?;
502 self.remember_bootstrap_hints(scope, &hints).await;
503 Ok(hints.len())
504 }
505
506 pub async fn remember_bootstrap_introduction_records(
507 &self,
508 scope: ContextId,
509 records: &[BootstrapIntroductionHint],
510 observed_at_ms: u64,
511 ) -> Result<usize, SelectionManagerError> {
512 let hints = records
513 .iter()
514 .map(|record| {
515 validate_bootstrap_introduction_hint(record).map_err(|error| {
516 SelectionManagerError::InvalidBootstrapRecord {
517 kind: "bootstrap_introduction_hint",
518 reason: format!("{error:?}"),
519 }
520 })?;
521 Ok(BootstrapHint {
522 authority_id: record.introduced_authority,
523 device_id: record.introduced_device,
524 link_endpoints: record.link_endpoints.clone(),
525 route_layer_public_key: record.route_layer_public_key,
526 source: BootstrapHintSource::WebOfTrustIntroduction,
527 observed_at_ms,
528 reliability_bps: 8_000,
529 breadth_hint: record.max_fanout,
530 bridge_cluster: None,
531 })
532 })
533 .collect::<Result<Vec<_>, _>>()?;
534 self.remember_bootstrap_hints(scope, &hints).await;
535 Ok(hints.len())
536 }
537
538 pub async fn record_reentry_attempt(
539 &self,
540 scope: ContextId,
541 stage: BootstrapReentryStage,
542 now_ms: u64,
543 succeeded: bool,
544 ) {
545 let mut state = self.state.write().await;
546 let attempt = state
547 .bootstrap
548 .entry(scope)
549 .or_default()
550 .attempts
551 .entry(stage)
552 .or_default();
553 attempt.attempts = attempt.attempts.saturating_add(1);
554 attempt.last_attempt_at_ms = Some(now_ms);
555 if succeeded {
556 attempt.last_success_at_ms = Some(now_ms);
557 }
558 }
559
560 pub async fn stale_reentry_decision(
561 &self,
562 scope: ContextId,
563 now_ms: u64,
564 ) -> Option<BootstrapReentryDecision> {
565 let state = self.state.read().await;
566 let scope_state = state.bootstrap.get(&scope)?;
567 let stages = [
568 BootstrapReentryStage::RememberedContacts,
569 BootstrapReentryStage::NeighborhoodBoards,
570 BootstrapReentryStage::WebOfTrustIntroductions,
571 BootstrapReentryStage::BootstrapBridges,
572 ];
573 for stage in stages {
574 let attempts = scope_state
575 .attempts
576 .get(&stage)
577 .map(|attempt| attempt.attempts)
578 .unwrap_or(0);
579 if attempts >= 2 {
580 continue;
581 }
582 let mut authorities = scope_state
583 .candidates
584 .values()
585 .filter(|candidate| candidate_matches_reentry_stage(candidate, stage))
586 .filter(|candidate| freshness_score(candidate, now_ms) > 0)
587 .map(|candidate| candidate.authority_id)
588 .collect::<Vec<_>>();
589 authorities.sort();
590 authorities.dedup();
591 if !authorities.is_empty() {
592 return Some(BootstrapReentryDecision {
593 stage,
594 candidate_authorities: authorities,
595 });
596 }
597 }
598 None
599 }
600
601 fn continuous_routing_profile(
602 &self,
603 health: &LocalHealthSnapshot,
604 selected_move_count: u8,
605 ) -> LocalRoutingProfile {
606 let diversity_target = self
607 .config
608 .path_diversity_floor
609 .max(health.observed_route_diversity)
610 .min(self.config.max_mixing_depth.max(1));
611 let mixing_depth = selected_move_count
612 .max(self.config.min_mixing_depth)
613 .min(self.config.max_mixing_depth)
614 .min(diversity_target.max(1));
615 let delay_ms = div_ceil_u64(
616 (health.ema_rtt_ms as u64).saturating_mul(self.config.delay_gain_numerator as u64),
617 self.config.delay_gain_denominator.max(1) as u64,
618 );
619 let cover_rate_per_second = self
620 .config
621 .cover_floor_per_second
622 .saturating_add(
623 div_ceil_u32(health.ema_rtt_ms, 100)
624 .saturating_mul(self.config.cover_gain_per_rtt_bucket),
625 )
626 .saturating_add(div_ceil_u32(health.queue_pressure, 32));
627 LocalRoutingProfile {
628 mixing_depth,
629 delay_ms,
630 cover_rate_per_second,
631 path_diversity: diversity_target.max(1),
632 }
633 }
634
635 async fn apply_profile_rate_limit(
636 &self,
637 scope: ContextId,
638 candidate: LocalSelectionProfile,
639 now_ms: u64,
640 ) -> LocalSelectionProfile {
641 let state = self.state.read().await;
642 let Some(existing) = state.last_profiles.get(&scope) else {
643 return candidate;
644 };
645 if now_ms
646 < existing
647 .health
648 .generated_at_ms
649 .saturating_add(self.config.profile_change_min_interval_ms)
650 {
651 return existing.clone();
652 }
653 candidate
654 }
655
656 async fn pick_authorities<E: RandomEffects + ?Sized>(
657 &self,
658 scope: ContextId,
659 candidates: &[ProviderCandidate],
660 family: ServiceFamily,
661 local_health: &LocalHealthSnapshot,
662 now_ms: u64,
663 random: &E,
664 ) -> Result<Vec<AuthorityId>, SelectionManagerError> {
665 let reachable = candidates
666 .iter()
667 .filter(|candidate| candidate.reachable)
668 .collect::<Vec<_>>();
669 if reachable.is_empty() {
670 return Err(SelectionManagerError::NoReachableCandidates { family });
671 }
672
673 let projection = self.registry.projection(Some(scope), u64::MAX).await;
674 let state = self.state.read().await;
675 let residency = state.residency.clone();
676 let bootstrap_scope = state.bootstrap.get(&scope).cloned().unwrap_or_default();
677 drop(state);
678 let mut weighted = reachable
679 .iter()
680 .map(|candidate| {
681 let health = projection.provider_health.iter().find(|entry| {
682 entry.family == family && entry.authority_id == candidate.authority_id
683 });
684 (
685 candidate.authority_id,
686 score_candidate(
687 candidate,
688 health,
689 local_health,
690 family,
691 residency
692 .get(&(scope, candidate.authority_id))
693 .copied()
694 .unwrap_or(0),
695 bootstrap_scope.candidates.get(&candidate.authority_id),
696 now_ms,
697 ),
698 )
699 })
700 .collect::<Vec<_>>();
701 let mut selected = Vec::new();
702 let max_select = if family == ServiceFamily::Hold {
703 3
704 } else {
705 usize::from(
706 self.config
707 .max_mixing_depth
708 .max(self.config.min_mixing_depth),
709 )
710 };
711 let mut selected_evidence = Vec::new();
712 for _ in 0..max_select {
713 if weighted.is_empty() {
714 break;
715 }
716 let total_weight: u32 = weighted.iter().map(|(_, weight)| *weight).sum();
717 if total_weight == 0 {
718 break;
719 }
720 let draw = random.random_range(0, total_weight as u64).await as u32;
721 let mut cursor = 0u32;
722 let mut chosen_index = 0usize;
723 for (index, (_, weight)) in weighted.iter().enumerate() {
724 cursor = cursor.saturating_add(*weight);
725 if draw < cursor {
726 chosen_index = index;
727 break;
728 }
729 }
730 let (authority, _) = weighted.remove(chosen_index);
731 selected.push(authority);
732 if let Some(candidate) = candidates
733 .iter()
734 .find(|candidate| candidate.authority_id == authority)
735 {
736 for evidence in &candidate.evidence {
737 if !selected_evidence.contains(evidence) {
738 selected_evidence.push(evidence.clone());
739 }
740 }
741 }
742 for (candidate_authority, weight) in &mut weighted {
743 if let Some(candidate) = candidates
744 .iter()
745 .find(|candidate| candidate.authority_id == *candidate_authority)
746 {
747 let diversity_penalty = candidate
748 .evidence
749 .iter()
750 .filter(|evidence| selected_evidence.iter().any(|seen| seen == *evidence))
751 .count() as u32
752 * 10;
753 let bridge_diversity_penalty =
754 bootstrap_scope
755 .candidates
756 .get(candidate_authority)
757 .and_then(|candidate_state| candidate_state.bridge_cluster)
758 .map(|cluster| {
759 selected
760 .iter()
761 .filter(|selected_authority| {
762 bootstrap_scope.candidates.get(selected_authority).and_then(
763 |selected_state| selected_state.bridge_cluster,
764 ) == Some(cluster)
765 })
766 .count() as u32
767 * 12
768 })
769 .unwrap_or(0);
770 *weight = weight
771 .saturating_sub(diversity_penalty)
772 .saturating_sub(bridge_diversity_penalty);
773 }
774 }
775 }
776 selected.sort();
777 selected.dedup();
778 let mut state = self.state.write().await;
779 for value in state.residency.values_mut() {
780 *value = value.saturating_sub(1);
781 }
782 for authority in &selected {
783 state
784 .residency
785 .insert((scope, *authority), self.config.residency_turns);
786 }
787 Ok(selected)
788 }
789
790 async fn merge_bootstrap_candidates(
791 &self,
792 scope: ContextId,
793 family: ServiceFamily,
794 candidates: &[ProviderCandidate],
795 now_ms: u64,
796 ) -> Vec<ProviderCandidate> {
797 if family == ServiceFamily::Hold {
798 return candidates.to_vec();
799 }
800 let state = self.state.read().await;
801 let Some(scope_state) = state.bootstrap.get(&scope) else {
802 return candidates.to_vec();
803 };
804 let mut merged = candidates
805 .iter()
806 .cloned()
807 .map(|candidate| (candidate.authority_id, candidate))
808 .collect::<BTreeMap<_, _>>();
809 for candidate_state in scope_state.candidates.values() {
810 if freshness_score(candidate_state, now_ms) == 0 {
811 continue;
812 }
813 let entry = merged
814 .entry(candidate_state.authority_id)
815 .or_insert_with(|| ProviderCandidate {
816 authority_id: candidate_state.authority_id,
817 device_id: candidate_state.device_id,
818 family,
819 evidence: vec![ProviderEvidence::DescriptorFallback],
820 link_endpoints: candidate_state.link_endpoints.clone(),
821 route_layer_public_key: None,
822 reachable: !candidate_state.link_endpoints.is_empty(),
823 });
824 if entry.device_id.is_none() {
825 entry.device_id = candidate_state.device_id;
826 }
827 for endpoint in &candidate_state.link_endpoints {
828 if !entry.link_endpoints.contains(endpoint) {
829 entry.link_endpoints.push(endpoint.clone());
830 }
831 }
832 if !entry.reachable && !candidate_state.link_endpoints.is_empty() {
833 entry.reachable = true;
834 }
835 if entry.route_layer_public_key.is_none() {
836 entry.route_layer_public_key = candidate_state.route_layer_public_key;
837 }
838 }
839 merged.into_values().collect()
840 }
841}
842
843fn div_ceil_u32(value: u32, divisor: u32) -> u32 {
844 if value == 0 {
845 return 0;
846 }
847 (value.saturating_add(divisor.saturating_sub(1))) / divisor.max(1)
848}
849
850fn div_ceil_u64(value: u64, divisor: u64) -> u64 {
851 if value == 0 {
852 return 0;
853 }
854 (value.saturating_add(divisor.saturating_sub(1))) / divisor.max(1)
855}
856
857fn sanitize_config_for_build(config: SelectionManagerConfig) -> SelectionManagerConfig {
858 #[cfg(not(any(test, debug_assertions)))]
859 {
860 let mut config = config;
861 let fixed = SelectionManagerConfig::default();
862 if config.tuning_enabled {
863 config.min_mixing_depth = fixed.min_mixing_depth;
864 config.max_mixing_depth = fixed.max_mixing_depth;
865 config.path_diversity_floor = fixed.path_diversity_floor;
866 config.cover_floor_per_second = fixed.cover_floor_per_second;
867 config.cover_gain_per_rtt_bucket = fixed.cover_gain_per_rtt_bucket;
868 config.delay_gain_numerator = fixed.delay_gain_numerator;
869 config.delay_gain_denominator = fixed.delay_gain_denominator;
870 config.delay_hysteresis_ms = fixed.delay_hysteresis_ms;
871 config.cover_hysteresis_per_second = fixed.cover_hysteresis_per_second;
872 config.diversity_hysteresis = fixed.diversity_hysteresis;
873 config.tuning_enabled = false;
874 }
875 config
876 }
877
878 #[cfg(any(test, debug_assertions))]
879 {
880 config
881 }
882}
883
884fn move_candidates_by_authority(
885 candidates: &[ProviderCandidate],
886) -> BTreeMap<AuthorityId, Vec<LinkEndpoint>> {
887 let mut map = BTreeMap::new();
888 for candidate in candidates {
889 map.entry(candidate.authority_id)
890 .or_insert_with(Vec::new)
891 .extend(candidate.link_endpoints.clone());
892 }
893 map
894}
895
896fn build_route(
897 authorities: &[AuthorityId],
898 candidates: &[ProviderCandidate],
899 endpoints: &BTreeMap<AuthorityId, Vec<LinkEndpoint>>,
900 destination: LinkEndpoint,
901) -> Route {
902 let route_keys = candidates
903 .iter()
904 .filter_map(|candidate| {
905 candidate
906 .route_layer_public_key
907 .map(|public_key| (candidate.authority_id, public_key))
908 })
909 .collect::<BTreeMap<_, _>>();
910 Route {
911 hops: authorities
912 .iter()
913 .map(|authority_id| aura_core::service::RelayHop {
914 authority_id: *authority_id,
915 link_endpoint: endpoints
916 .get(authority_id)
917 .and_then(|values| values.first().cloned())
918 .unwrap_or_else(|| LinkEndpoint::relay(*authority_id)),
919 route_layer_public_key: route_keys.get(authority_id).copied(),
920 })
921 .collect(),
922 destination,
923 }
924}
925
926fn score_candidate(
927 candidate: &ProviderCandidate,
928 health: Option<&ProviderHealthSnapshot>,
929 local_health: &LocalHealthSnapshot,
930 family: ServiceFamily,
931 residency_penalty_turns: u32,
932 bootstrap: Option<&BootstrapCandidateState>,
933 now_ms: u64,
934) -> u32 {
935 let evidence_score = candidate
936 .evidence
937 .iter()
938 .map(|evidence| match evidence {
939 ProviderEvidence::Neighborhood => 15,
940 ProviderEvidence::DirectFriend => 30,
941 ProviderEvidence::IntroducedFof => 20,
942 ProviderEvidence::Guardian => 25,
943 ProviderEvidence::DescriptorFallback => 5,
944 })
945 .sum::<u32>();
946 let health_score = health
947 .map(|entry| {
948 let total = entry
949 .success_count
950 .saturating_add(entry.failure_count)
951 .max(1);
952 (entry.success_count.saturating_mul(100)) / total
953 })
954 .unwrap_or(50);
955 let hold_quality = if family == ServiceFamily::Hold {
956 local_health.hold_success_bps / 100
957 } else {
958 0
959 };
960 let bootstrap_freshness = bootstrap
961 .map(|candidate| freshness_score(candidate, now_ms))
962 .unwrap_or(0);
963 let bootstrap_source = bootstrap.map(source_score).unwrap_or(0);
964 let bootstrap_breadth = bootstrap
965 .map(|candidate| u32::from(candidate.breadth_hint).saturating_mul(4))
966 .unwrap_or(0);
967 let availability_score = local_health
968 .sync_opportunity_count
969 .min(10)
970 .saturating_mul(3);
971 let diversity_score = local_health.observed_route_diversity as u32 * 5;
972 let reachability_score: u32 = if candidate.reachable { 40 } else { 0 };
973 reachability_score
974 .saturating_add(evidence_score)
975 .saturating_add(health_score)
976 .saturating_add(hold_quality)
977 .saturating_add(bootstrap_freshness)
978 .saturating_add(bootstrap_source)
979 .saturating_add(bootstrap_breadth)
980 .saturating_add(availability_score)
981 .saturating_add(diversity_score)
982 .saturating_sub(residency_penalty_turns.saturating_mul(20))
983}
984
985fn freshness_score(candidate: &BootstrapCandidateState, now_ms: u64) -> u32 {
986 let age_ms = now_ms.saturating_sub(candidate.freshest_observed_at_ms);
987 let age_bucket: u32 = match age_ms {
988 0..=5_000 => 30,
989 5_001..=30_000 => 22,
990 30_001..=120_000 => 14,
991 120_001..=600_000 => 6,
992 _ => 0,
993 };
994 let reliability = u32::from(candidate.reliability_bps) / 250;
995 age_bucket.saturating_add(reliability)
996}
997
998fn source_score(candidate: &BootstrapCandidateState) -> u32 {
999 candidate
1000 .sources
1001 .iter()
1002 .map(|source| match source {
1003 BootstrapHintSource::RememberedDirectContact => 24,
1004 BootstrapHintSource::PriorProvider => 18,
1005 BootstrapHintSource::NeighborhoodDiscoveryBoard => 12,
1006 BootstrapHintSource::WebOfTrustIntroduction => 16,
1007 BootstrapHintSource::BootstrapBridge => 8,
1008 })
1009 .max()
1010 .unwrap_or(0)
1011}
1012
1013fn candidate_matches_reentry_stage(
1014 candidate: &BootstrapCandidateState,
1015 stage: BootstrapReentryStage,
1016) -> bool {
1017 match stage {
1018 BootstrapReentryStage::RememberedContacts => {
1019 candidate
1020 .sources
1021 .contains(&BootstrapHintSource::RememberedDirectContact)
1022 || candidate
1023 .sources
1024 .contains(&BootstrapHintSource::PriorProvider)
1025 }
1026 BootstrapReentryStage::NeighborhoodBoards => candidate
1027 .sources
1028 .contains(&BootstrapHintSource::NeighborhoodDiscoveryBoard),
1029 BootstrapReentryStage::WebOfTrustIntroductions => candidate
1030 .sources
1031 .contains(&BootstrapHintSource::WebOfTrustIntroduction),
1032 BootstrapReentryStage::BootstrapBridges => candidate
1033 .sources
1034 .contains(&BootstrapHintSource::BootstrapBridge),
1035 }
1036}
1037
1038fn apply_profile_hysteresis(
1039 previous: Option<&LocalRoutingProfile>,
1040 mut candidate: LocalRoutingProfile,
1041 config: &SelectionManagerConfig,
1042) -> LocalRoutingProfile {
1043 let Some(previous) = previous else {
1044 return candidate;
1045 };
1046 if previous.mixing_depth.abs_diff(candidate.mixing_depth) <= 1 {
1047 candidate.mixing_depth = previous.mixing_depth;
1048 }
1049 if previous.delay_ms.abs_diff(candidate.delay_ms) <= config.delay_hysteresis_ms {
1050 candidate.delay_ms = previous.delay_ms;
1051 }
1052 if previous
1053 .cover_rate_per_second
1054 .abs_diff(candidate.cover_rate_per_second)
1055 <= config.cover_hysteresis_per_second
1056 {
1057 candidate.cover_rate_per_second = previous.cover_rate_per_second;
1058 }
1059 if previous.path_diversity.abs_diff(candidate.path_diversity) <= config.diversity_hysteresis {
1060 candidate.path_diversity = previous.path_diversity;
1061 }
1062 candidate
1063}
1064
1065#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
1066#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
1067impl RuntimeService for SelectionManagerService {
1068 fn name(&self) -> &'static str {
1069 "selection_manager"
1070 }
1071
1072 fn dependencies(&self) -> &[&'static str] {
1073 &[
1074 "social_manager",
1075 "rendezvous_manager",
1076 "local_health_observer",
1077 ]
1078 }
1079
1080 async fn start(&self, _ctx: &RuntimeServiceContext) -> Result<(), ServiceError> {
1081 self.state.write().await.lifecycle = ServiceHealth::Healthy;
1082 Ok(())
1083 }
1084
1085 async fn stop(&self) -> Result<(), ServiceError> {
1086 self.state.write().await.lifecycle = ServiceHealth::Stopped;
1087 Ok(())
1088 }
1089
1090 async fn health(&self) -> ServiceHealth {
1091 self.state.read().await.lifecycle.clone()
1092 }
1093}
1094
1095#[cfg(test)]
1096mod tests {
1097 use super::*;
1098 use crate::runtime::services::LocalHealthObserverConfig;
1099 use aura_core::effects::RandomCoreEffects;
1100 use aura_core::service::{LinkProtocol, ProviderEvidence};
1101 use aura_core::types::identifiers::DeviceId;
1102
1103 struct TestRandom(u64);
1104
1105 #[async_trait]
1106 impl RandomCoreEffects for TestRandom {
1107 async fn random_bytes(&self, len: usize) -> Vec<u8> {
1108 vec![self.0 as u8; len]
1109 }
1110
1111 async fn random_bytes_32(&self) -> [u8; 32] {
1112 [self.0 as u8; 32]
1113 }
1114
1115 async fn random_u64(&self) -> u64 {
1116 self.0
1117 }
1118 }
1119
1120 fn authority(seed: u8) -> AuthorityId {
1121 AuthorityId::new_from_entropy([seed; 32])
1122 }
1123
1124 fn context(seed: u8) -> ContextId {
1125 ContextId::new_from_entropy([seed; 32])
1126 }
1127
1128 fn candidate(seed: u8, reachable: bool, evidence: ProviderEvidence) -> ProviderCandidate {
1129 ProviderCandidate {
1130 authority_id: authority(seed),
1131 device_id: Some(DeviceId::new_from_entropy([seed; 32])),
1132 family: ServiceFamily::Move,
1133 evidence: vec![evidence],
1134 link_endpoints: vec![LinkEndpoint::direct(
1135 LinkProtocol::Tcp,
1136 format!("127.0.0.1:{}", 7000 + seed as u16),
1137 )],
1138 route_layer_public_key: Some([seed; 32]),
1139 reachable,
1140 }
1141 }
1142
1143 fn bootstrap_hint(
1144 seed: u8,
1145 source: BootstrapHintSource,
1146 observed_at_ms: u64,
1147 reliability_bps: u16,
1148 breadth_hint: u8,
1149 bridge_cluster: Option<u64>,
1150 ) -> BootstrapHint {
1151 BootstrapHint {
1152 authority_id: authority(seed),
1153 device_id: Some(DeviceId::new_from_entropy([seed; 32])),
1154 link_endpoints: vec![LinkEndpoint::direct(
1155 LinkProtocol::Tcp,
1156 format!("127.0.0.1:{}", 7600 + seed as u16),
1157 )],
1158 route_layer_public_key: Some([seed; 32]),
1159 source,
1160 observed_at_ms,
1161 reliability_bps,
1162 breadth_hint,
1163 bridge_cluster,
1164 }
1165 }
1166
1167 #[tokio::test]
1168 async fn selection_manager_prefers_stronger_candidates_without_total_concentration() {
1169 let registry = Arc::new(ServiceRegistry::new());
1170 let health = LocalHealthObserverService::new(LocalHealthObserverConfig::for_testing());
1171 health
1172 .observe_provider_set(
1173 &[
1174 candidate(1, true, ProviderEvidence::DirectFriend),
1175 candidate(2, true, ProviderEvidence::Neighborhood),
1176 candidate(3, true, ProviderEvidence::Neighborhood),
1177 ],
1178 2,
1179 10,
1180 )
1181 .await;
1182 let manager =
1183 SelectionManagerService::new(SelectionManagerConfig::for_testing(), registry, health);
1184 let profile = manager
1185 .select_profile(
1186 context(1),
1187 LinkEndpoint::direct(LinkProtocol::Tcp, "127.0.0.1:9000"),
1188 &[
1189 candidate(1, true, ProviderEvidence::DirectFriend),
1190 candidate(2, true, ProviderEvidence::Neighborhood),
1191 candidate(3, true, ProviderEvidence::Neighborhood),
1192 ],
1193 &[],
1194 20,
1195 &TestRandom(3),
1196 )
1197 .await
1198 .expect("select profile");
1199 let establish = profile.establish.expect("privacy mode establish");
1200 assert_eq!(establish.profile, ServiceProfile::AnonymousPathEstablish);
1201 assert!(!establish.route.hops.is_empty());
1202 assert!(establish
1203 .route
1204 .hops
1205 .iter()
1206 .any(|hop| hop.authority_id == authority(1)));
1207 assert!(establish
1208 .route
1209 .hops
1210 .iter()
1211 .any(|hop| hop.authority_id != authority(1)));
1212 }
1213
1214 #[tokio::test]
1215 async fn selection_manager_keeps_passthrough_available() {
1216 let registry = Arc::new(ServiceRegistry::new());
1217 let health = LocalHealthObserverService::new(LocalHealthObserverConfig::for_testing());
1218 let mut config = SelectionManagerConfig::for_testing();
1219 config.privacy_mode_enabled = false;
1220 let manager = SelectionManagerService::new(config, registry, health);
1221 let profile = manager
1222 .select_profile(
1223 context(2),
1224 LinkEndpoint::direct(LinkProtocol::Tcp, "127.0.0.1:9100"),
1225 &[candidate(1, true, ProviderEvidence::Neighborhood)],
1226 &[],
1227 30,
1228 &TestRandom(1),
1229 )
1230 .await
1231 .expect("select passthrough");
1232 assert!(profile.establish.is_none());
1233 assert_eq!(
1234 profile.move_decision.routing_profile,
1235 LocalRoutingProfile::passthrough()
1236 );
1237 }
1238
1239 #[tokio::test]
1240 async fn selection_manager_records_message_class_constraints_and_real_traffic_gap() {
1241 let registry = Arc::new(ServiceRegistry::new());
1242 let health = LocalHealthObserverService::new(LocalHealthObserverConfig::for_testing());
1243 health.observe_traffic_volume(1024, 10).await;
1244 health.observe_sync_blended_retrieval_volume(1024, 11).await;
1245 health.observe_accountability_reply_volume(2048, 12).await;
1246 health
1247 .observe_provider_set(&[candidate(1, true, ProviderEvidence::Neighborhood)], 2, 13)
1248 .await;
1249 let manager =
1250 SelectionManagerService::new(SelectionManagerConfig::for_testing(), registry, health);
1251 let profile = manager
1252 .select_profile(
1253 context(3),
1254 LinkEndpoint::direct(LinkProtocol::Tcp, "127.0.0.1:9200"),
1255 &[candidate(1, true, ProviderEvidence::Neighborhood)],
1256 &[],
1257 20,
1258 &TestRandom(2),
1259 )
1260 .await
1261 .expect("select profile");
1262 assert!(profile
1263 .message_class_constraints
1264 .iter()
1265 .any(|constraint| constraint.message_class == PrivacyMessageClass::Consensus));
1266 assert!(
1267 profile.synthetic_cover_gap_per_second
1268 <= profile.move_decision.routing_profile.cover_rate_per_second
1269 );
1270 assert_eq!(profile.health.accountability_reply_bytes, 2048);
1271 }
1272
1273 #[tokio::test]
1274 async fn selection_manager_rotates_paths_under_residency_pressure() {
1275 let registry = Arc::new(ServiceRegistry::new());
1276 let health = LocalHealthObserverService::new(LocalHealthObserverConfig::for_testing());
1277 let manager =
1278 SelectionManagerService::new(SelectionManagerConfig::for_testing(), registry, health);
1279 let candidates = [
1280 candidate(1, true, ProviderEvidence::DirectFriend),
1281 candidate(2, true, ProviderEvidence::Neighborhood),
1282 candidate(3, true, ProviderEvidence::IntroducedFof),
1283 candidate(4, true, ProviderEvidence::Neighborhood),
1284 ];
1285 let first = manager
1286 .select_profile(
1287 context(4),
1288 LinkEndpoint::direct(LinkProtocol::Tcp, "127.0.0.1:9300"),
1289 &candidates,
1290 &[],
1291 10,
1292 &TestRandom(1),
1293 )
1294 .await
1295 .expect("first profile");
1296 let second = manager
1297 .select_profile(
1298 context(4),
1299 LinkEndpoint::direct(LinkProtocol::Tcp, "127.0.0.1:9300"),
1300 &candidates,
1301 &[],
1302 20,
1303 &TestRandom(99),
1304 )
1305 .await
1306 .expect("second profile");
1307 let first_route = first.establish.expect("first establish").route;
1308 let second_route = second.establish.expect("second establish").route;
1309 assert_ne!(first_route.hops, second_route.hops);
1310 assert!(first_route
1311 .hops
1312 .iter()
1313 .any(|hop| hop.authority_id == authority(1)));
1314 assert!(second_route
1315 .hops
1316 .iter()
1317 .any(|hop| hop.authority_id != authority(1)));
1318 }
1319
1320 #[tokio::test]
1321 async fn selection_manager_consumes_bootstrap_hints_when_shared_candidates_are_empty() {
1322 let registry = Arc::new(ServiceRegistry::new());
1323 let health = LocalHealthObserverService::new(LocalHealthObserverConfig::for_testing());
1324 health.observe_provider_set(&[], 1, 50).await;
1325 let manager =
1326 SelectionManagerService::new(SelectionManagerConfig::for_testing(), registry, health);
1327 manager
1328 .remember_bootstrap_hints(
1329 context(5),
1330 &[
1331 bootstrap_hint(
1332 8,
1333 BootstrapHintSource::RememberedDirectContact,
1334 45,
1335 9_000,
1336 2,
1337 None,
1338 ),
1339 bootstrap_hint(
1340 9,
1341 BootstrapHintSource::BootstrapBridge,
1342 48,
1343 6_000,
1344 4,
1345 Some(1),
1346 ),
1347 ],
1348 )
1349 .await;
1350
1351 let profile = manager
1352 .select_profile(
1353 context(5),
1354 LinkEndpoint::direct(LinkProtocol::Tcp, "127.0.0.1:9400"),
1355 &[],
1356 &[],
1357 50,
1358 &TestRandom(1),
1359 )
1360 .await
1361 .expect("bootstrap hints should provide move candidates");
1362 let establish = profile.establish.expect("establish decision");
1363 assert!(establish
1364 .route
1365 .hops
1366 .iter()
1367 .any(|hop| hop.authority_id == authority(8)));
1368 }
1369
1370 #[tokio::test]
1371 async fn stale_reentry_decisions_progress_from_contacts_to_bridges() {
1372 let registry = Arc::new(ServiceRegistry::new());
1373 let health = LocalHealthObserverService::new(LocalHealthObserverConfig::for_testing());
1374 let manager =
1375 SelectionManagerService::new(SelectionManagerConfig::for_testing(), registry, health);
1376 manager
1377 .remember_bootstrap_hints(
1378 context(6),
1379 &[
1380 bootstrap_hint(
1381 1,
1382 BootstrapHintSource::RememberedDirectContact,
1383 100,
1384 9_000,
1385 1,
1386 None,
1387 ),
1388 bootstrap_hint(
1389 2,
1390 BootstrapHintSource::NeighborhoodDiscoveryBoard,
1391 100,
1392 7_000,
1393 3,
1394 None,
1395 ),
1396 bootstrap_hint(
1397 3,
1398 BootstrapHintSource::WebOfTrustIntroduction,
1399 100,
1400 8_000,
1401 2,
1402 None,
1403 ),
1404 bootstrap_hint(
1405 4,
1406 BootstrapHintSource::BootstrapBridge,
1407 100,
1408 5_000,
1409 4,
1410 Some(7),
1411 ),
1412 ],
1413 )
1414 .await;
1415
1416 let first = manager
1417 .stale_reentry_decision(context(6), 120)
1418 .await
1419 .expect("contacts-first decision");
1420 assert_eq!(first.stage, BootstrapReentryStage::RememberedContacts);
1421 manager
1422 .record_reentry_attempt(context(6), first.stage, 121, false)
1423 .await;
1424 manager
1425 .record_reentry_attempt(context(6), first.stage, 122, false)
1426 .await;
1427
1428 let second = manager
1429 .stale_reentry_decision(context(6), 123)
1430 .await
1431 .expect("board decision");
1432 assert_eq!(second.stage, BootstrapReentryStage::NeighborhoodBoards);
1433 manager
1434 .record_reentry_attempt(context(6), second.stage, 124, false)
1435 .await;
1436 manager
1437 .record_reentry_attempt(context(6), second.stage, 125, false)
1438 .await;
1439
1440 let third = manager
1441 .stale_reentry_decision(context(6), 126)
1442 .await
1443 .expect("introduction decision");
1444 assert_eq!(third.stage, BootstrapReentryStage::WebOfTrustIntroductions);
1445 manager
1446 .record_reentry_attempt(context(6), third.stage, 127, false)
1447 .await;
1448 manager
1449 .record_reentry_attempt(context(6), third.stage, 128, false)
1450 .await;
1451
1452 let fourth = manager
1453 .stale_reentry_decision(context(6), 129)
1454 .await
1455 .expect("bridge decision");
1456 assert_eq!(fourth.stage, BootstrapReentryStage::BootstrapBridges);
1457 }
1458
1459 #[tokio::test]
1460 async fn bootstrap_weighting_prefers_fresher_hints_without_collapsing_to_one_bridge_cluster() {
1461 let registry = Arc::new(ServiceRegistry::new());
1462 let health = LocalHealthObserverService::new(LocalHealthObserverConfig::for_testing());
1463 health.observe_provider_set(&[], 2, 200).await;
1464 let manager =
1465 SelectionManagerService::new(SelectionManagerConfig::for_testing(), registry, health);
1466 manager
1467 .remember_bootstrap_hints(
1468 context(7),
1469 &[
1470 bootstrap_hint(
1471 10,
1472 BootstrapHintSource::NeighborhoodDiscoveryBoard,
1473 195,
1474 8_500,
1475 3,
1476 Some(1),
1477 ),
1478 bootstrap_hint(
1479 11,
1480 BootstrapHintSource::NeighborhoodDiscoveryBoard,
1481 194,
1482 8_000,
1483 3,
1484 Some(2),
1485 ),
1486 bootstrap_hint(
1487 12,
1488 BootstrapHintSource::BootstrapBridge,
1489 130,
1490 5_500,
1491 4,
1492 Some(1),
1493 ),
1494 bootstrap_hint(
1495 13,
1496 BootstrapHintSource::BootstrapBridge,
1497 192,
1498 7_500,
1499 4,
1500 Some(3),
1501 ),
1502 ],
1503 )
1504 .await;
1505
1506 let profile = manager
1507 .select_profile(
1508 context(7),
1509 LinkEndpoint::direct(LinkProtocol::Tcp, "127.0.0.1:9500"),
1510 &[],
1511 &[],
1512 200,
1513 &TestRandom(42),
1514 )
1515 .await
1516 .expect("bootstrap weighted profile");
1517 let route = profile.establish.expect("establish").route;
1518 let unique_hops = route
1519 .hops
1520 .iter()
1521 .map(|hop| hop.authority_id)
1522 .collect::<std::collections::BTreeSet<_>>();
1523 assert!(unique_hops.len() > 1);
1524 assert!(route
1525 .hops
1526 .iter()
1527 .any(|hop| hop.authority_id == authority(10)));
1528 assert!(route
1529 .hops
1530 .iter()
1531 .any(|hop| hop.authority_id != authority(12)));
1532 }
1533
1534 #[tokio::test]
1535 async fn selection_manager_merges_validated_shared_bootstrap_records_locally() {
1536 let registry = Arc::new(ServiceRegistry::new());
1537 let health = LocalHealthObserverService::new(LocalHealthObserverConfig::for_testing());
1538 health.observe_provider_set(&[], 3, 300).await;
1539 let manager =
1540 SelectionManagerService::new(SelectionManagerConfig::for_testing(), registry, health);
1541
1542 manager
1543 .remember_bootstrap_contact_records(
1544 context(8),
1545 &[BootstrapContactHint {
1546 scope: context(8),
1547 authority_id: authority(20),
1548 device_id: Some(DeviceId::new_from_entropy([20; 32])),
1549 link_endpoints: vec![LinkEndpoint::direct(LinkProtocol::Tcp, "127.0.0.1:7620")],
1550 route_layer_public_key: Some([20; 32]),
1551 freshest_observed_at_ms: 290,
1552 valid_until: 390,
1553 replay_window_id: [20; 32],
1554 }],
1555 )
1556 .await
1557 .expect("contact hint should merge");
1558 manager
1559 .remember_neighborhood_reentry_records(
1560 context(8),
1561 &[NeighborhoodReentryHint {
1562 scope: context(8),
1563 publisher_authority: authority(21),
1564 advertised_authority: authority(21),
1565 advertised_device: Some(DeviceId::new_from_entropy([21; 32])),
1566 link_endpoints: vec![LinkEndpoint::direct(LinkProtocol::Tcp, "127.0.0.1:7621")],
1567 route_layer_public_key: Some([21; 32]),
1568 published_at_ms: 291,
1569 valid_until: 391,
1570 replay_window_id: [21; 32],
1571 }],
1572 )
1573 .await
1574 .expect("reentry hint should merge");
1575 manager
1576 .remember_bootstrap_introduction_records(
1577 context(8),
1578 &[BootstrapIntroductionHint {
1579 scope: context(8),
1580 introducer_authority: authority(22),
1581 introduced_authority: authority(22),
1582 introduced_device: Some(DeviceId::new_from_entropy([22; 32])),
1583 link_endpoints: vec![LinkEndpoint::direct(LinkProtocol::Tcp, "127.0.0.1:7622")],
1584 route_layer_public_key: Some([22; 32]),
1585 remaining_depth: 1,
1586 max_fanout: 2,
1587 valid_until: 392,
1588 replay_window_id: [22; 32],
1589 }],
1590 295,
1591 )
1592 .await
1593 .expect("introduction hint should merge");
1594
1595 let profile = manager
1596 .select_profile(
1597 context(8),
1598 LinkEndpoint::direct(LinkProtocol::Tcp, "127.0.0.1:9600"),
1599 &[],
1600 &[],
1601 300,
1602 &TestRandom(5),
1603 )
1604 .await
1605 .expect("shared bootstrap records should become local candidates");
1606
1607 let route = profile.establish.expect("establish").route;
1608 let authorities = route
1609 .hops
1610 .iter()
1611 .map(|hop| hop.authority_id)
1612 .collect::<std::collections::BTreeSet<_>>();
1613 assert!(authorities.contains(&authority(20)));
1614 assert!(authorities.contains(&authority(21)) || authorities.contains(&authority(22)));
1615 assert!(route
1616 .hops
1617 .iter()
1618 .all(|hop| hop.route_layer_public_key.is_some()));
1619 assert!(matches!(
1620 profile.move_decision.binding,
1621 MovePathBinding::Established(_) | MovePathBinding::Direct(_)
1622 ));
1623 let reentry = manager
1624 .stale_reentry_decision(context(8), 300)
1625 .await
1626 .expect("shared records should support stale-node re-entry");
1627 assert_eq!(reentry.stage, BootstrapReentryStage::RememberedContacts);
1628 }
1629
1630 #[tokio::test]
1631 async fn selection_manager_rejects_invalid_shared_bootstrap_records() {
1632 let registry = Arc::new(ServiceRegistry::new());
1633 let health = LocalHealthObserverService::new(LocalHealthObserverConfig::for_testing());
1634 let manager =
1635 SelectionManagerService::new(SelectionManagerConfig::for_testing(), registry, health);
1636
1637 let error = manager
1638 .remember_neighborhood_reentry_records(
1639 context(9),
1640 &[NeighborhoodReentryHint {
1641 scope: context(9),
1642 publisher_authority: authority(30),
1643 advertised_authority: authority(31),
1644 advertised_device: None,
1645 link_endpoints: Vec::new(),
1646 route_layer_public_key: None,
1647 published_at_ms: 10,
1648 valid_until: 20,
1649 replay_window_id: [30; 32],
1650 }],
1651 )
1652 .await
1653 .expect_err("invalid board hint should be rejected");
1654
1655 assert!(matches!(
1656 error,
1657 SelectionManagerError::InvalidBootstrapRecord {
1658 kind: "neighborhood_reentry_hint",
1659 ..
1660 }
1661 ));
1662
1663 assert!(manager
1664 .stale_reentry_decision(context(9), 30)
1665 .await
1666 .is_none());
1667 }
1668}