Skip to main content

aura_agent/runtime/services/
selection_manager.rs

1//! Runtime-owned adaptive selection manager.
2#![allow(dead_code)]
3
4use super::config_profiles::impl_service_config_profiles;
5use super::local_health_observer::LocalHealthObserverService;
6use super::service_registry::{ProviderHealthSnapshot, ServiceRegistry};
7use super::traits::{RuntimeService, RuntimeServiceContext, ServiceError, ServiceHealth};
8use async_trait::async_trait;
9use aura_core::effects::RandomEffects;
10use aura_core::service::{
11    BootstrapContactHint, BootstrapIntroductionHint, LinkEndpoint, LocalEstablishDecision,
12    LocalHealthSnapshot, LocalHoldDecision, LocalMoveDecision, LocalRoutingProfile,
13    LocalSelectionProfile, MessageClassRoutingConstraint, MovePath, MovePathBinding,
14    NeighborhoodReentryHint, PrivacyMessageClass, ProviderCandidate, ProviderEvidence, Route,
15    SchedulerClass, SecurityControlClass, SelectionState, ServiceFamily, ServiceProfile,
16};
17use aura_core::types::identifiers::{AuthorityId, ContextId, DeviceId};
18use aura_rendezvous::{
19    validate_bootstrap_contact_hint, validate_bootstrap_introduction_hint,
20    validate_neighborhood_reentry_hint,
21};
22use std::collections::{BTreeMap, BTreeSet, HashMap};
23use std::sync::Arc;
24use tokio::sync::RwLock;
25
26#[allow(dead_code)]
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
28enum SelectionManagerCommand {
29    SelectProfile,
30    RecordProfile,
31}
32
33#[derive(Debug, Clone)]
34pub struct SelectionManagerConfig {
35    pub default_path_ttl_ms: u64,
36    pub profile_change_min_interval_ms: u64,
37    pub residency_turns: u32,
38    pub security_control_floor: u32,
39    pub privacy_mode_enabled: bool,
40    pub min_mixing_depth: u8,
41    pub max_mixing_depth: u8,
42    pub path_diversity_floor: u8,
43    pub cover_floor_per_second: u32,
44    pub cover_gain_per_rtt_bucket: u32,
45    pub delay_gain_numerator: u32,
46    pub delay_gain_denominator: u32,
47    pub delay_hysteresis_ms: u64,
48    pub cover_hysteresis_per_second: u32,
49    pub diversity_hysteresis: u8,
50    pub tuning_enabled: bool,
51}
52
53impl Default for SelectionManagerConfig {
54    fn default() -> Self {
55        Self {
56            default_path_ttl_ms: 30_000,
57            profile_change_min_interval_ms: 1_000,
58            residency_turns: 2,
59            security_control_floor: 2,
60            privacy_mode_enabled: false,
61            min_mixing_depth: 1,
62            max_mixing_depth: 3,
63            // Phase-6 tuning keeps small/medium profiles off the degenerate
64            // one-hop floor while still allowing larger reachable sets to
65            // climb to the configured mixing-depth ceiling.
66            path_diversity_floor: 2,
67            // The fixed privacy policy keeps a non-zero baseline even when
68            // organic traffic and sync windows are sparse.
69            cover_floor_per_second: 2,
70            cover_gain_per_rtt_bucket: 1,
71            delay_gain_numerator: 1,
72            // Phase-6 tuning reduced default delay growth after evidence from
73            // partition-heal and ceremony-latency validation profiles.
74            delay_gain_denominator: 3,
75            delay_hysteresis_ms: 25,
76            cover_hysteresis_per_second: 1,
77            diversity_hysteresis: 1,
78            tuning_enabled: false,
79        }
80    }
81}
82
83impl_service_config_profiles!(SelectionManagerConfig {
84    pub fn for_testing() -> Self {
85        Self {
86            default_path_ttl_ms: 1_000,
87            profile_change_min_interval_ms: 0,
88            residency_turns: 1,
89            security_control_floor: 1,
90            privacy_mode_enabled: true,
91            min_mixing_depth: 1,
92            max_mixing_depth: 3,
93            path_diversity_floor: 2,
94            cover_floor_per_second: 1,
95            cover_gain_per_rtt_bucket: 1,
96            delay_gain_numerator: 1,
97            delay_gain_denominator: 2,
98            delay_hysteresis_ms: 5,
99            cover_hysteresis_per_second: 1,
100            diversity_hysteresis: 1,
101            tuning_enabled: true,
102        }
103    }
104});
105
106#[derive(Debug)]
107struct SelectionManagerState {
108    last_profiles: HashMap<ContextId, LocalSelectionProfile>,
109    residency: HashMap<(ContextId, AuthorityId), u32>,
110    bootstrap: HashMap<ContextId, BootstrapScopeState>,
111    lifecycle: ServiceHealth,
112}
113
114impl Default for SelectionManagerState {
115    fn default() -> Self {
116        Self {
117            last_profiles: HashMap::new(),
118            residency: HashMap::new(),
119            bootstrap: HashMap::new(),
120            lifecycle: ServiceHealth::NotStarted,
121        }
122    }
123}
124
125#[derive(Debug, thiserror::Error)]
126pub enum SelectionManagerError {
127    #[error("no reachable candidates available for family {family:?}")]
128    NoReachableCandidates { family: ServiceFamily },
129    #[error("privacy mode requires at least one anonymous relay candidate")]
130    NoAnonymousRelayCandidates,
131    #[error("invalid bootstrap record ({kind}): {reason}")]
132    InvalidBootstrapRecord { kind: &'static str, reason: String },
133}
134
135#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
136pub enum BootstrapHintSource {
137    RememberedDirectContact,
138    PriorProvider,
139    NeighborhoodDiscoveryBoard,
140    WebOfTrustIntroduction,
141    BootstrapBridge,
142}
143
144#[derive(Debug, Clone, PartialEq, Eq)]
145pub struct BootstrapHint {
146    pub authority_id: AuthorityId,
147    pub device_id: Option<DeviceId>,
148    pub link_endpoints: Vec<LinkEndpoint>,
149    pub route_layer_public_key: Option<[u8; 32]>,
150    pub source: BootstrapHintSource,
151    pub observed_at_ms: u64,
152    pub reliability_bps: u16,
153    pub breadth_hint: u8,
154    pub bridge_cluster: Option<u64>,
155}
156
157#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
158pub enum BootstrapReentryStage {
159    RememberedContacts,
160    NeighborhoodBoards,
161    WebOfTrustIntroductions,
162    BootstrapBridges,
163}
164
165#[derive(Debug, Clone, PartialEq, Eq)]
166pub struct BootstrapReentryDecision {
167    pub stage: BootstrapReentryStage,
168    pub candidate_authorities: Vec<AuthorityId>,
169}
170
171#[derive(Debug, Clone, Default)]
172struct BootstrapAttemptState {
173    attempts: u32,
174    last_attempt_at_ms: Option<u64>,
175    last_success_at_ms: Option<u64>,
176}
177
178#[derive(Debug, Clone)]
179struct BootstrapCandidateState {
180    authority_id: AuthorityId,
181    device_id: Option<DeviceId>,
182    link_endpoints: Vec<LinkEndpoint>,
183    route_layer_public_key: Option<[u8; 32]>,
184    sources: BTreeSet<BootstrapHintSource>,
185    freshest_observed_at_ms: u64,
186    reliability_bps: u16,
187    breadth_hint: u8,
188    bridge_cluster: Option<u64>,
189}
190
191#[derive(Debug, Clone, Default)]
192struct BootstrapScopeState {
193    candidates: HashMap<AuthorityId, BootstrapCandidateState>,
194    attempts: BTreeMap<BootstrapReentryStage, BootstrapAttemptState>,
195}
196
197#[aura_macros::service_surface(
198    families = "Establish,Move,Hold",
199    object_categories = "runtime_derived_local,transport_protocol",
200    discover = "social_manager_and_service_registry",
201    permit = "runtime_local_policy_and_health",
202    transfer = "selection_manager",
203    select = "selection_manager",
204    authoritative = "",
205    runtime_local = "selection_profiles,residency_windows,weighting_state",
206    category = "service_surface"
207)]
208#[aura_macros::actor_owned(
209    owner = "selection_manager",
210    domain = "adaptive_privacy_selection",
211    gate = "selection_manager_command_ingress",
212    command = SelectionManagerCommand,
213    capacity = 64,
214    category = "actor_owned"
215)]
216pub struct SelectionManagerService {
217    config: SelectionManagerConfig,
218    registry: Arc<ServiceRegistry>,
219    health: LocalHealthObserverService,
220    state: RwLock<SelectionManagerState>,
221}
222
223impl SelectionManagerService {
224    pub fn new(
225        config: SelectionManagerConfig,
226        registry: Arc<ServiceRegistry>,
227        health: LocalHealthObserverService,
228    ) -> Self {
229        Self {
230            config: sanitize_config_for_build(config),
231            registry,
232            health,
233            state: RwLock::new(SelectionManagerState::default()),
234        }
235    }
236
237    pub async fn select_profile<E: RandomEffects + ?Sized>(
238        &self,
239        scope: ContextId,
240        destination: LinkEndpoint,
241        move_candidates: &[ProviderCandidate],
242        hold_candidates: &[ProviderCandidate],
243        now_ms: u64,
244        random: &E,
245    ) -> Result<LocalSelectionProfile, SelectionManagerError> {
246        if let Some(existing) = self.state.read().await.last_profiles.get(&scope).cloned() {
247            if now_ms
248                < existing
249                    .health
250                    .generated_at_ms
251                    .saturating_add(self.config.profile_change_min_interval_ms)
252            {
253                return Ok(existing);
254            }
255        }
256        let health = self.health.snapshot().await;
257        let move_candidates = self
258            .merge_bootstrap_candidates(scope, ServiceFamily::Move, move_candidates, now_ms)
259            .await;
260        let hold_candidates = self
261            .merge_bootstrap_candidates(scope, ServiceFamily::Hold, hold_candidates, now_ms)
262            .await;
263        let selected_move_authorities: Vec<AuthorityId> = self
264            .pick_authorities(
265                scope,
266                &move_candidates,
267                ServiceFamily::Move,
268                &health,
269                now_ms,
270                random,
271            )
272            .await?;
273        let selected_hold_authorities: Vec<AuthorityId> = self
274            .pick_authorities(
275                scope,
276                &hold_candidates,
277                ServiceFamily::Hold,
278                &health,
279                now_ms,
280                random,
281            )
282            .await
283            .unwrap_or_default();
284
285        let routing_profile = if self.config.privacy_mode_enabled {
286            self.continuous_routing_profile(&health, selected_move_authorities.len() as u8)
287        } else {
288            LocalRoutingProfile::passthrough()
289        };
290
291        let establish = if self.config.privacy_mode_enabled {
292            if selected_move_authorities.is_empty() {
293                return Err(SelectionManagerError::NoAnonymousRelayCandidates);
294            }
295            let route = build_route(
296                &selected_move_authorities,
297                &move_candidates,
298                &move_candidates_by_authority(&move_candidates),
299                destination.clone(),
300            );
301            Some(LocalEstablishDecision {
302                profile: ServiceProfile::AnonymousPathEstablish,
303                route,
304                retain_path_until_ms: Some(now_ms.saturating_add(self.config.default_path_ttl_ms)),
305                scheduler_class: Some(SchedulerClass::BoundedDeadlineReply),
306            })
307        } else {
308            None
309        };
310
311        let hold = if !selected_hold_authorities.is_empty() {
312            Some(LocalHoldDecision {
313                profile: ServiceProfile::DeferredDeliveryHold,
314                selected_authorities: selected_hold_authorities.clone(),
315                bounded_residency_remaining: Some(self.config.residency_turns),
316            })
317        } else {
318            None
319        };
320
321        let synthetic_cover_gap_per_second = routing_profile.cover_rate_per_second.saturating_sub(
322            ((health.traffic_volume_bytes + health.sync_blended_retrieval_bytes) / 1024) as u32,
323        );
324
325        let profile = LocalSelectionProfile {
326            scope,
327            health: health.clone(),
328            establish,
329            move_decision: LocalMoveDecision {
330                routing_profile: apply_profile_hysteresis(
331                    self.state
332                        .read()
333                        .await
334                        .last_profiles
335                        .get(&scope)
336                        .map(|profile| &profile.move_decision.routing_profile),
337                    routing_profile,
338                    &self.config,
339                ),
340                binding: MovePathBinding::Direct(MovePath::direct(destination)),
341                scheduler_class: SchedulerClass::SyncBlended,
342                metadata_minimized: true,
343            },
344            hold,
345            security_control_floor: self.config.security_control_floor,
346            security_controls: vec![
347                SecurityControlClass::AnonymousPathEstablish,
348                SecurityControlClass::CapabilityTrustUpdate,
349                SecurityControlClass::AccountabilityReply,
350                SecurityControlClass::RetrievalCapabilityRotation,
351            ],
352            message_class_constraints: vec![
353                MessageClassRoutingConstraint {
354                    message_class: PrivacyMessageClass::Ceremony,
355                    force_scheduler_class: Some(SchedulerClass::BoundedDeadlineReply),
356                    max_mixing_depth: Some(1),
357                    max_delay_ms: Some(0),
358                },
359                MessageClassRoutingConstraint {
360                    message_class: PrivacyMessageClass::Consensus,
361                    force_scheduler_class: Some(SchedulerClass::BoundedDeadlineReply),
362                    max_mixing_depth: Some(1),
363                    max_delay_ms: Some(0),
364                },
365                MessageClassRoutingConstraint {
366                    message_class: PrivacyMessageClass::AccountabilityReply,
367                    force_scheduler_class: Some(SchedulerClass::BoundedDeadlineReply),
368                    max_mixing_depth: Some(1),
369                    max_delay_ms: Some(self.config.delay_hysteresis_ms),
370                },
371            ],
372            synthetic_cover_gap_per_second,
373        };
374
375        let profile = self.apply_profile_rate_limit(scope, profile, now_ms).await;
376
377        self.registry
378            .record_selection_state(
379                scope,
380                SelectionState {
381                    family: ServiceFamily::Move,
382                    selected_authorities: selected_move_authorities,
383                    epoch: None,
384                    bounded_residency_remaining: Some(self.config.residency_turns),
385                },
386            )
387            .await;
388        if !selected_hold_authorities.is_empty() {
389            self.registry
390                .record_selection_state(
391                    scope,
392                    SelectionState {
393                        family: ServiceFamily::Hold,
394                        selected_authorities: selected_hold_authorities,
395                        epoch: None,
396                        bounded_residency_remaining: Some(self.config.residency_turns),
397                    },
398                )
399                .await;
400        }
401        self.state
402            .write()
403            .await
404            .last_profiles
405            .insert(scope, profile.clone());
406        Ok(profile)
407    }
408
409    pub async fn remember_bootstrap_hints(&self, scope: ContextId, hints: &[BootstrapHint]) {
410        let mut state = self.state.write().await;
411        let scope_state = state.bootstrap.entry(scope).or_default();
412        for hint in hints {
413            let entry = scope_state
414                .candidates
415                .entry(hint.authority_id)
416                .or_insert_with(|| BootstrapCandidateState {
417                    authority_id: hint.authority_id,
418                    device_id: hint.device_id,
419                    link_endpoints: hint.link_endpoints.clone(),
420                    route_layer_public_key: hint.route_layer_public_key,
421                    sources: BTreeSet::new(),
422                    freshest_observed_at_ms: hint.observed_at_ms,
423                    reliability_bps: hint.reliability_bps,
424                    breadth_hint: hint.breadth_hint,
425                    bridge_cluster: hint.bridge_cluster,
426                });
427            entry.device_id = entry.device_id.or(hint.device_id);
428            for endpoint in &hint.link_endpoints {
429                if !entry.link_endpoints.contains(endpoint) {
430                    entry.link_endpoints.push(endpoint.clone());
431                }
432            }
433            if entry.route_layer_public_key.is_none() {
434                entry.route_layer_public_key = hint.route_layer_public_key;
435            }
436            entry.sources.insert(hint.source);
437            entry.freshest_observed_at_ms = entry.freshest_observed_at_ms.max(hint.observed_at_ms);
438            entry.reliability_bps = entry.reliability_bps.max(hint.reliability_bps);
439            entry.breadth_hint = entry.breadth_hint.max(hint.breadth_hint);
440            entry.bridge_cluster = entry.bridge_cluster.or(hint.bridge_cluster);
441        }
442    }
443
444    pub async fn remember_bootstrap_contact_records(
445        &self,
446        scope: ContextId,
447        records: &[BootstrapContactHint],
448    ) -> Result<usize, SelectionManagerError> {
449        let hints = records
450            .iter()
451            .map(|record| {
452                validate_bootstrap_contact_hint(record).map_err(|error| {
453                    SelectionManagerError::InvalidBootstrapRecord {
454                        kind: "bootstrap_contact_hint",
455                        reason: format!("{error:?}"),
456                    }
457                })?;
458                Ok(BootstrapHint {
459                    authority_id: record.authority_id,
460                    device_id: record.device_id,
461                    link_endpoints: record.link_endpoints.clone(),
462                    route_layer_public_key: record.route_layer_public_key,
463                    source: BootstrapHintSource::RememberedDirectContact,
464                    observed_at_ms: record.freshest_observed_at_ms,
465                    reliability_bps: 9_000,
466                    breadth_hint: 1,
467                    bridge_cluster: None,
468                })
469            })
470            .collect::<Result<Vec<_>, _>>()?;
471        self.remember_bootstrap_hints(scope, &hints).await;
472        Ok(hints.len())
473    }
474
475    pub async fn remember_neighborhood_reentry_records(
476        &self,
477        scope: ContextId,
478        records: &[NeighborhoodReentryHint],
479    ) -> Result<usize, SelectionManagerError> {
480        let hints = records
481            .iter()
482            .map(|record| {
483                validate_neighborhood_reentry_hint(record).map_err(|error| {
484                    SelectionManagerError::InvalidBootstrapRecord {
485                        kind: "neighborhood_reentry_hint",
486                        reason: format!("{error:?}"),
487                    }
488                })?;
489                Ok(BootstrapHint {
490                    authority_id: record.advertised_authority,
491                    device_id: record.advertised_device,
492                    link_endpoints: record.link_endpoints.clone(),
493                    route_layer_public_key: record.route_layer_public_key,
494                    source: BootstrapHintSource::NeighborhoodDiscoveryBoard,
495                    observed_at_ms: record.published_at_ms,
496                    reliability_bps: 7_000,
497                    breadth_hint: 3,
498                    bridge_cluster: None,
499                })
500            })
501            .collect::<Result<Vec<_>, _>>()?;
502        self.remember_bootstrap_hints(scope, &hints).await;
503        Ok(hints.len())
504    }
505
506    pub async fn remember_bootstrap_introduction_records(
507        &self,
508        scope: ContextId,
509        records: &[BootstrapIntroductionHint],
510        observed_at_ms: u64,
511    ) -> Result<usize, SelectionManagerError> {
512        let hints = records
513            .iter()
514            .map(|record| {
515                validate_bootstrap_introduction_hint(record).map_err(|error| {
516                    SelectionManagerError::InvalidBootstrapRecord {
517                        kind: "bootstrap_introduction_hint",
518                        reason: format!("{error:?}"),
519                    }
520                })?;
521                Ok(BootstrapHint {
522                    authority_id: record.introduced_authority,
523                    device_id: record.introduced_device,
524                    link_endpoints: record.link_endpoints.clone(),
525                    route_layer_public_key: record.route_layer_public_key,
526                    source: BootstrapHintSource::WebOfTrustIntroduction,
527                    observed_at_ms,
528                    reliability_bps: 8_000,
529                    breadth_hint: record.max_fanout,
530                    bridge_cluster: None,
531                })
532            })
533            .collect::<Result<Vec<_>, _>>()?;
534        self.remember_bootstrap_hints(scope, &hints).await;
535        Ok(hints.len())
536    }
537
538    pub async fn record_reentry_attempt(
539        &self,
540        scope: ContextId,
541        stage: BootstrapReentryStage,
542        now_ms: u64,
543        succeeded: bool,
544    ) {
545        let mut state = self.state.write().await;
546        let attempt = state
547            .bootstrap
548            .entry(scope)
549            .or_default()
550            .attempts
551            .entry(stage)
552            .or_default();
553        attempt.attempts = attempt.attempts.saturating_add(1);
554        attempt.last_attempt_at_ms = Some(now_ms);
555        if succeeded {
556            attempt.last_success_at_ms = Some(now_ms);
557        }
558    }
559
560    pub async fn stale_reentry_decision(
561        &self,
562        scope: ContextId,
563        now_ms: u64,
564    ) -> Option<BootstrapReentryDecision> {
565        let state = self.state.read().await;
566        let scope_state = state.bootstrap.get(&scope)?;
567        let stages = [
568            BootstrapReentryStage::RememberedContacts,
569            BootstrapReentryStage::NeighborhoodBoards,
570            BootstrapReentryStage::WebOfTrustIntroductions,
571            BootstrapReentryStage::BootstrapBridges,
572        ];
573        for stage in stages {
574            let attempts = scope_state
575                .attempts
576                .get(&stage)
577                .map(|attempt| attempt.attempts)
578                .unwrap_or(0);
579            if attempts >= 2 {
580                continue;
581            }
582            let mut authorities = scope_state
583                .candidates
584                .values()
585                .filter(|candidate| candidate_matches_reentry_stage(candidate, stage))
586                .filter(|candidate| freshness_score(candidate, now_ms) > 0)
587                .map(|candidate| candidate.authority_id)
588                .collect::<Vec<_>>();
589            authorities.sort();
590            authorities.dedup();
591            if !authorities.is_empty() {
592                return Some(BootstrapReentryDecision {
593                    stage,
594                    candidate_authorities: authorities,
595                });
596            }
597        }
598        None
599    }
600
601    fn continuous_routing_profile(
602        &self,
603        health: &LocalHealthSnapshot,
604        selected_move_count: u8,
605    ) -> LocalRoutingProfile {
606        let diversity_target = self
607            .config
608            .path_diversity_floor
609            .max(health.observed_route_diversity)
610            .min(self.config.max_mixing_depth.max(1));
611        let mixing_depth = selected_move_count
612            .max(self.config.min_mixing_depth)
613            .min(self.config.max_mixing_depth)
614            .min(diversity_target.max(1));
615        let delay_ms = div_ceil_u64(
616            (health.ema_rtt_ms as u64).saturating_mul(self.config.delay_gain_numerator as u64),
617            self.config.delay_gain_denominator.max(1) as u64,
618        );
619        let cover_rate_per_second = self
620            .config
621            .cover_floor_per_second
622            .saturating_add(
623                div_ceil_u32(health.ema_rtt_ms, 100)
624                    .saturating_mul(self.config.cover_gain_per_rtt_bucket),
625            )
626            .saturating_add(div_ceil_u32(health.queue_pressure, 32));
627        LocalRoutingProfile {
628            mixing_depth,
629            delay_ms,
630            cover_rate_per_second,
631            path_diversity: diversity_target.max(1),
632        }
633    }
634
635    async fn apply_profile_rate_limit(
636        &self,
637        scope: ContextId,
638        candidate: LocalSelectionProfile,
639        now_ms: u64,
640    ) -> LocalSelectionProfile {
641        let state = self.state.read().await;
642        let Some(existing) = state.last_profiles.get(&scope) else {
643            return candidate;
644        };
645        if now_ms
646            < existing
647                .health
648                .generated_at_ms
649                .saturating_add(self.config.profile_change_min_interval_ms)
650        {
651            return existing.clone();
652        }
653        candidate
654    }
655
656    async fn pick_authorities<E: RandomEffects + ?Sized>(
657        &self,
658        scope: ContextId,
659        candidates: &[ProviderCandidate],
660        family: ServiceFamily,
661        local_health: &LocalHealthSnapshot,
662        now_ms: u64,
663        random: &E,
664    ) -> Result<Vec<AuthorityId>, SelectionManagerError> {
665        let reachable = candidates
666            .iter()
667            .filter(|candidate| candidate.reachable)
668            .collect::<Vec<_>>();
669        if reachable.is_empty() {
670            return Err(SelectionManagerError::NoReachableCandidates { family });
671        }
672
673        let projection = self.registry.projection(Some(scope), u64::MAX).await;
674        let state = self.state.read().await;
675        let residency = state.residency.clone();
676        let bootstrap_scope = state.bootstrap.get(&scope).cloned().unwrap_or_default();
677        drop(state);
678        let mut weighted = reachable
679            .iter()
680            .map(|candidate| {
681                let health = projection.provider_health.iter().find(|entry| {
682                    entry.family == family && entry.authority_id == candidate.authority_id
683                });
684                (
685                    candidate.authority_id,
686                    score_candidate(
687                        candidate,
688                        health,
689                        local_health,
690                        family,
691                        residency
692                            .get(&(scope, candidate.authority_id))
693                            .copied()
694                            .unwrap_or(0),
695                        bootstrap_scope.candidates.get(&candidate.authority_id),
696                        now_ms,
697                    ),
698                )
699            })
700            .collect::<Vec<_>>();
701        let mut selected = Vec::new();
702        let max_select = if family == ServiceFamily::Hold {
703            3
704        } else {
705            usize::from(
706                self.config
707                    .max_mixing_depth
708                    .max(self.config.min_mixing_depth),
709            )
710        };
711        let mut selected_evidence = Vec::new();
712        for _ in 0..max_select {
713            if weighted.is_empty() {
714                break;
715            }
716            let total_weight: u32 = weighted.iter().map(|(_, weight)| *weight).sum();
717            if total_weight == 0 {
718                break;
719            }
720            let draw = random.random_range(0, total_weight as u64).await as u32;
721            let mut cursor = 0u32;
722            let mut chosen_index = 0usize;
723            for (index, (_, weight)) in weighted.iter().enumerate() {
724                cursor = cursor.saturating_add(*weight);
725                if draw < cursor {
726                    chosen_index = index;
727                    break;
728                }
729            }
730            let (authority, _) = weighted.remove(chosen_index);
731            selected.push(authority);
732            if let Some(candidate) = candidates
733                .iter()
734                .find(|candidate| candidate.authority_id == authority)
735            {
736                for evidence in &candidate.evidence {
737                    if !selected_evidence.contains(evidence) {
738                        selected_evidence.push(evidence.clone());
739                    }
740                }
741            }
742            for (candidate_authority, weight) in &mut weighted {
743                if let Some(candidate) = candidates
744                    .iter()
745                    .find(|candidate| candidate.authority_id == *candidate_authority)
746                {
747                    let diversity_penalty = candidate
748                        .evidence
749                        .iter()
750                        .filter(|evidence| selected_evidence.iter().any(|seen| seen == *evidence))
751                        .count() as u32
752                        * 10;
753                    let bridge_diversity_penalty =
754                        bootstrap_scope
755                            .candidates
756                            .get(candidate_authority)
757                            .and_then(|candidate_state| candidate_state.bridge_cluster)
758                            .map(|cluster| {
759                                selected
760                                    .iter()
761                                    .filter(|selected_authority| {
762                                        bootstrap_scope.candidates.get(selected_authority).and_then(
763                                            |selected_state| selected_state.bridge_cluster,
764                                        ) == Some(cluster)
765                                    })
766                                    .count() as u32
767                                    * 12
768                            })
769                            .unwrap_or(0);
770                    *weight = weight
771                        .saturating_sub(diversity_penalty)
772                        .saturating_sub(bridge_diversity_penalty);
773                }
774            }
775        }
776        selected.sort();
777        selected.dedup();
778        let mut state = self.state.write().await;
779        for value in state.residency.values_mut() {
780            *value = value.saturating_sub(1);
781        }
782        for authority in &selected {
783            state
784                .residency
785                .insert((scope, *authority), self.config.residency_turns);
786        }
787        Ok(selected)
788    }
789
790    async fn merge_bootstrap_candidates(
791        &self,
792        scope: ContextId,
793        family: ServiceFamily,
794        candidates: &[ProviderCandidate],
795        now_ms: u64,
796    ) -> Vec<ProviderCandidate> {
797        if family == ServiceFamily::Hold {
798            return candidates.to_vec();
799        }
800        let state = self.state.read().await;
801        let Some(scope_state) = state.bootstrap.get(&scope) else {
802            return candidates.to_vec();
803        };
804        let mut merged = candidates
805            .iter()
806            .cloned()
807            .map(|candidate| (candidate.authority_id, candidate))
808            .collect::<BTreeMap<_, _>>();
809        for candidate_state in scope_state.candidates.values() {
810            if freshness_score(candidate_state, now_ms) == 0 {
811                continue;
812            }
813            let entry = merged
814                .entry(candidate_state.authority_id)
815                .or_insert_with(|| ProviderCandidate {
816                    authority_id: candidate_state.authority_id,
817                    device_id: candidate_state.device_id,
818                    family,
819                    evidence: vec![ProviderEvidence::DescriptorFallback],
820                    link_endpoints: candidate_state.link_endpoints.clone(),
821                    route_layer_public_key: None,
822                    reachable: !candidate_state.link_endpoints.is_empty(),
823                });
824            if entry.device_id.is_none() {
825                entry.device_id = candidate_state.device_id;
826            }
827            for endpoint in &candidate_state.link_endpoints {
828                if !entry.link_endpoints.contains(endpoint) {
829                    entry.link_endpoints.push(endpoint.clone());
830                }
831            }
832            if !entry.reachable && !candidate_state.link_endpoints.is_empty() {
833                entry.reachable = true;
834            }
835            if entry.route_layer_public_key.is_none() {
836                entry.route_layer_public_key = candidate_state.route_layer_public_key;
837            }
838        }
839        merged.into_values().collect()
840    }
841}
842
843fn div_ceil_u32(value: u32, divisor: u32) -> u32 {
844    if value == 0 {
845        return 0;
846    }
847    (value.saturating_add(divisor.saturating_sub(1))) / divisor.max(1)
848}
849
850fn div_ceil_u64(value: u64, divisor: u64) -> u64 {
851    if value == 0 {
852        return 0;
853    }
854    (value.saturating_add(divisor.saturating_sub(1))) / divisor.max(1)
855}
856
857fn sanitize_config_for_build(config: SelectionManagerConfig) -> SelectionManagerConfig {
858    #[cfg(not(any(test, debug_assertions)))]
859    {
860        let mut config = config;
861        let fixed = SelectionManagerConfig::default();
862        if config.tuning_enabled {
863            config.min_mixing_depth = fixed.min_mixing_depth;
864            config.max_mixing_depth = fixed.max_mixing_depth;
865            config.path_diversity_floor = fixed.path_diversity_floor;
866            config.cover_floor_per_second = fixed.cover_floor_per_second;
867            config.cover_gain_per_rtt_bucket = fixed.cover_gain_per_rtt_bucket;
868            config.delay_gain_numerator = fixed.delay_gain_numerator;
869            config.delay_gain_denominator = fixed.delay_gain_denominator;
870            config.delay_hysteresis_ms = fixed.delay_hysteresis_ms;
871            config.cover_hysteresis_per_second = fixed.cover_hysteresis_per_second;
872            config.diversity_hysteresis = fixed.diversity_hysteresis;
873            config.tuning_enabled = false;
874        }
875        config
876    }
877
878    #[cfg(any(test, debug_assertions))]
879    {
880        config
881    }
882}
883
884fn move_candidates_by_authority(
885    candidates: &[ProviderCandidate],
886) -> BTreeMap<AuthorityId, Vec<LinkEndpoint>> {
887    let mut map = BTreeMap::new();
888    for candidate in candidates {
889        map.entry(candidate.authority_id)
890            .or_insert_with(Vec::new)
891            .extend(candidate.link_endpoints.clone());
892    }
893    map
894}
895
896fn build_route(
897    authorities: &[AuthorityId],
898    candidates: &[ProviderCandidate],
899    endpoints: &BTreeMap<AuthorityId, Vec<LinkEndpoint>>,
900    destination: LinkEndpoint,
901) -> Route {
902    let route_keys = candidates
903        .iter()
904        .filter_map(|candidate| {
905            candidate
906                .route_layer_public_key
907                .map(|public_key| (candidate.authority_id, public_key))
908        })
909        .collect::<BTreeMap<_, _>>();
910    Route {
911        hops: authorities
912            .iter()
913            .map(|authority_id| aura_core::service::RelayHop {
914                authority_id: *authority_id,
915                link_endpoint: endpoints
916                    .get(authority_id)
917                    .and_then(|values| values.first().cloned())
918                    .unwrap_or_else(|| LinkEndpoint::relay(*authority_id)),
919                route_layer_public_key: route_keys.get(authority_id).copied(),
920            })
921            .collect(),
922        destination,
923    }
924}
925
926fn score_candidate(
927    candidate: &ProviderCandidate,
928    health: Option<&ProviderHealthSnapshot>,
929    local_health: &LocalHealthSnapshot,
930    family: ServiceFamily,
931    residency_penalty_turns: u32,
932    bootstrap: Option<&BootstrapCandidateState>,
933    now_ms: u64,
934) -> u32 {
935    let evidence_score = candidate
936        .evidence
937        .iter()
938        .map(|evidence| match evidence {
939            ProviderEvidence::Neighborhood => 15,
940            ProviderEvidence::DirectFriend => 30,
941            ProviderEvidence::IntroducedFof => 20,
942            ProviderEvidence::Guardian => 25,
943            ProviderEvidence::DescriptorFallback => 5,
944        })
945        .sum::<u32>();
946    let health_score = health
947        .map(|entry| {
948            let total = entry
949                .success_count
950                .saturating_add(entry.failure_count)
951                .max(1);
952            (entry.success_count.saturating_mul(100)) / total
953        })
954        .unwrap_or(50);
955    let hold_quality = if family == ServiceFamily::Hold {
956        local_health.hold_success_bps / 100
957    } else {
958        0
959    };
960    let bootstrap_freshness = bootstrap
961        .map(|candidate| freshness_score(candidate, now_ms))
962        .unwrap_or(0);
963    let bootstrap_source = bootstrap.map(source_score).unwrap_or(0);
964    let bootstrap_breadth = bootstrap
965        .map(|candidate| u32::from(candidate.breadth_hint).saturating_mul(4))
966        .unwrap_or(0);
967    let availability_score = local_health
968        .sync_opportunity_count
969        .min(10)
970        .saturating_mul(3);
971    let diversity_score = local_health.observed_route_diversity as u32 * 5;
972    let reachability_score: u32 = if candidate.reachable { 40 } else { 0 };
973    reachability_score
974        .saturating_add(evidence_score)
975        .saturating_add(health_score)
976        .saturating_add(hold_quality)
977        .saturating_add(bootstrap_freshness)
978        .saturating_add(bootstrap_source)
979        .saturating_add(bootstrap_breadth)
980        .saturating_add(availability_score)
981        .saturating_add(diversity_score)
982        .saturating_sub(residency_penalty_turns.saturating_mul(20))
983}
984
985fn freshness_score(candidate: &BootstrapCandidateState, now_ms: u64) -> u32 {
986    let age_ms = now_ms.saturating_sub(candidate.freshest_observed_at_ms);
987    let age_bucket: u32 = match age_ms {
988        0..=5_000 => 30,
989        5_001..=30_000 => 22,
990        30_001..=120_000 => 14,
991        120_001..=600_000 => 6,
992        _ => 0,
993    };
994    let reliability = u32::from(candidate.reliability_bps) / 250;
995    age_bucket.saturating_add(reliability)
996}
997
998fn source_score(candidate: &BootstrapCandidateState) -> u32 {
999    candidate
1000        .sources
1001        .iter()
1002        .map(|source| match source {
1003            BootstrapHintSource::RememberedDirectContact => 24,
1004            BootstrapHintSource::PriorProvider => 18,
1005            BootstrapHintSource::NeighborhoodDiscoveryBoard => 12,
1006            BootstrapHintSource::WebOfTrustIntroduction => 16,
1007            BootstrapHintSource::BootstrapBridge => 8,
1008        })
1009        .max()
1010        .unwrap_or(0)
1011}
1012
1013fn candidate_matches_reentry_stage(
1014    candidate: &BootstrapCandidateState,
1015    stage: BootstrapReentryStage,
1016) -> bool {
1017    match stage {
1018        BootstrapReentryStage::RememberedContacts => {
1019            candidate
1020                .sources
1021                .contains(&BootstrapHintSource::RememberedDirectContact)
1022                || candidate
1023                    .sources
1024                    .contains(&BootstrapHintSource::PriorProvider)
1025        }
1026        BootstrapReentryStage::NeighborhoodBoards => candidate
1027            .sources
1028            .contains(&BootstrapHintSource::NeighborhoodDiscoveryBoard),
1029        BootstrapReentryStage::WebOfTrustIntroductions => candidate
1030            .sources
1031            .contains(&BootstrapHintSource::WebOfTrustIntroduction),
1032        BootstrapReentryStage::BootstrapBridges => candidate
1033            .sources
1034            .contains(&BootstrapHintSource::BootstrapBridge),
1035    }
1036}
1037
1038fn apply_profile_hysteresis(
1039    previous: Option<&LocalRoutingProfile>,
1040    mut candidate: LocalRoutingProfile,
1041    config: &SelectionManagerConfig,
1042) -> LocalRoutingProfile {
1043    let Some(previous) = previous else {
1044        return candidate;
1045    };
1046    if previous.mixing_depth.abs_diff(candidate.mixing_depth) <= 1 {
1047        candidate.mixing_depth = previous.mixing_depth;
1048    }
1049    if previous.delay_ms.abs_diff(candidate.delay_ms) <= config.delay_hysteresis_ms {
1050        candidate.delay_ms = previous.delay_ms;
1051    }
1052    if previous
1053        .cover_rate_per_second
1054        .abs_diff(candidate.cover_rate_per_second)
1055        <= config.cover_hysteresis_per_second
1056    {
1057        candidate.cover_rate_per_second = previous.cover_rate_per_second;
1058    }
1059    if previous.path_diversity.abs_diff(candidate.path_diversity) <= config.diversity_hysteresis {
1060        candidate.path_diversity = previous.path_diversity;
1061    }
1062    candidate
1063}
1064
1065#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
1066#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
1067impl RuntimeService for SelectionManagerService {
1068    fn name(&self) -> &'static str {
1069        "selection_manager"
1070    }
1071
1072    fn dependencies(&self) -> &[&'static str] {
1073        &[
1074            "social_manager",
1075            "rendezvous_manager",
1076            "local_health_observer",
1077        ]
1078    }
1079
1080    async fn start(&self, _ctx: &RuntimeServiceContext) -> Result<(), ServiceError> {
1081        self.state.write().await.lifecycle = ServiceHealth::Healthy;
1082        Ok(())
1083    }
1084
1085    async fn stop(&self) -> Result<(), ServiceError> {
1086        self.state.write().await.lifecycle = ServiceHealth::Stopped;
1087        Ok(())
1088    }
1089
1090    async fn health(&self) -> ServiceHealth {
1091        self.state.read().await.lifecycle.clone()
1092    }
1093}
1094
1095#[cfg(test)]
1096mod tests {
1097    use super::*;
1098    use crate::runtime::services::LocalHealthObserverConfig;
1099    use aura_core::effects::RandomCoreEffects;
1100    use aura_core::service::{LinkProtocol, ProviderEvidence};
1101    use aura_core::types::identifiers::DeviceId;
1102
1103    struct TestRandom(u64);
1104
1105    #[async_trait]
1106    impl RandomCoreEffects for TestRandom {
1107        async fn random_bytes(&self, len: usize) -> Vec<u8> {
1108            vec![self.0 as u8; len]
1109        }
1110
1111        async fn random_bytes_32(&self) -> [u8; 32] {
1112            [self.0 as u8; 32]
1113        }
1114
1115        async fn random_u64(&self) -> u64 {
1116            self.0
1117        }
1118    }
1119
1120    fn authority(seed: u8) -> AuthorityId {
1121        AuthorityId::new_from_entropy([seed; 32])
1122    }
1123
1124    fn context(seed: u8) -> ContextId {
1125        ContextId::new_from_entropy([seed; 32])
1126    }
1127
1128    fn candidate(seed: u8, reachable: bool, evidence: ProviderEvidence) -> ProviderCandidate {
1129        ProviderCandidate {
1130            authority_id: authority(seed),
1131            device_id: Some(DeviceId::new_from_entropy([seed; 32])),
1132            family: ServiceFamily::Move,
1133            evidence: vec![evidence],
1134            link_endpoints: vec![LinkEndpoint::direct(
1135                LinkProtocol::Tcp,
1136                format!("127.0.0.1:{}", 7000 + seed as u16),
1137            )],
1138            route_layer_public_key: Some([seed; 32]),
1139            reachable,
1140        }
1141    }
1142
1143    fn bootstrap_hint(
1144        seed: u8,
1145        source: BootstrapHintSource,
1146        observed_at_ms: u64,
1147        reliability_bps: u16,
1148        breadth_hint: u8,
1149        bridge_cluster: Option<u64>,
1150    ) -> BootstrapHint {
1151        BootstrapHint {
1152            authority_id: authority(seed),
1153            device_id: Some(DeviceId::new_from_entropy([seed; 32])),
1154            link_endpoints: vec![LinkEndpoint::direct(
1155                LinkProtocol::Tcp,
1156                format!("127.0.0.1:{}", 7600 + seed as u16),
1157            )],
1158            route_layer_public_key: Some([seed; 32]),
1159            source,
1160            observed_at_ms,
1161            reliability_bps,
1162            breadth_hint,
1163            bridge_cluster,
1164        }
1165    }
1166
1167    #[tokio::test]
1168    async fn selection_manager_prefers_stronger_candidates_without_total_concentration() {
1169        let registry = Arc::new(ServiceRegistry::new());
1170        let health = LocalHealthObserverService::new(LocalHealthObserverConfig::for_testing());
1171        health
1172            .observe_provider_set(
1173                &[
1174                    candidate(1, true, ProviderEvidence::DirectFriend),
1175                    candidate(2, true, ProviderEvidence::Neighborhood),
1176                    candidate(3, true, ProviderEvidence::Neighborhood),
1177                ],
1178                2,
1179                10,
1180            )
1181            .await;
1182        let manager =
1183            SelectionManagerService::new(SelectionManagerConfig::for_testing(), registry, health);
1184        let profile = manager
1185            .select_profile(
1186                context(1),
1187                LinkEndpoint::direct(LinkProtocol::Tcp, "127.0.0.1:9000"),
1188                &[
1189                    candidate(1, true, ProviderEvidence::DirectFriend),
1190                    candidate(2, true, ProviderEvidence::Neighborhood),
1191                    candidate(3, true, ProviderEvidence::Neighborhood),
1192                ],
1193                &[],
1194                20,
1195                &TestRandom(3),
1196            )
1197            .await
1198            .expect("select profile");
1199        let establish = profile.establish.expect("privacy mode establish");
1200        assert_eq!(establish.profile, ServiceProfile::AnonymousPathEstablish);
1201        assert!(!establish.route.hops.is_empty());
1202        assert!(establish
1203            .route
1204            .hops
1205            .iter()
1206            .any(|hop| hop.authority_id == authority(1)));
1207        assert!(establish
1208            .route
1209            .hops
1210            .iter()
1211            .any(|hop| hop.authority_id != authority(1)));
1212    }
1213
1214    #[tokio::test]
1215    async fn selection_manager_keeps_passthrough_available() {
1216        let registry = Arc::new(ServiceRegistry::new());
1217        let health = LocalHealthObserverService::new(LocalHealthObserverConfig::for_testing());
1218        let mut config = SelectionManagerConfig::for_testing();
1219        config.privacy_mode_enabled = false;
1220        let manager = SelectionManagerService::new(config, registry, health);
1221        let profile = manager
1222            .select_profile(
1223                context(2),
1224                LinkEndpoint::direct(LinkProtocol::Tcp, "127.0.0.1:9100"),
1225                &[candidate(1, true, ProviderEvidence::Neighborhood)],
1226                &[],
1227                30,
1228                &TestRandom(1),
1229            )
1230            .await
1231            .expect("select passthrough");
1232        assert!(profile.establish.is_none());
1233        assert_eq!(
1234            profile.move_decision.routing_profile,
1235            LocalRoutingProfile::passthrough()
1236        );
1237    }
1238
1239    #[tokio::test]
1240    async fn selection_manager_records_message_class_constraints_and_real_traffic_gap() {
1241        let registry = Arc::new(ServiceRegistry::new());
1242        let health = LocalHealthObserverService::new(LocalHealthObserverConfig::for_testing());
1243        health.observe_traffic_volume(1024, 10).await;
1244        health.observe_sync_blended_retrieval_volume(1024, 11).await;
1245        health.observe_accountability_reply_volume(2048, 12).await;
1246        health
1247            .observe_provider_set(&[candidate(1, true, ProviderEvidence::Neighborhood)], 2, 13)
1248            .await;
1249        let manager =
1250            SelectionManagerService::new(SelectionManagerConfig::for_testing(), registry, health);
1251        let profile = manager
1252            .select_profile(
1253                context(3),
1254                LinkEndpoint::direct(LinkProtocol::Tcp, "127.0.0.1:9200"),
1255                &[candidate(1, true, ProviderEvidence::Neighborhood)],
1256                &[],
1257                20,
1258                &TestRandom(2),
1259            )
1260            .await
1261            .expect("select profile");
1262        assert!(profile
1263            .message_class_constraints
1264            .iter()
1265            .any(|constraint| constraint.message_class == PrivacyMessageClass::Consensus));
1266        assert!(
1267            profile.synthetic_cover_gap_per_second
1268                <= profile.move_decision.routing_profile.cover_rate_per_second
1269        );
1270        assert_eq!(profile.health.accountability_reply_bytes, 2048);
1271    }
1272
1273    #[tokio::test]
1274    async fn selection_manager_rotates_paths_under_residency_pressure() {
1275        let registry = Arc::new(ServiceRegistry::new());
1276        let health = LocalHealthObserverService::new(LocalHealthObserverConfig::for_testing());
1277        let manager =
1278            SelectionManagerService::new(SelectionManagerConfig::for_testing(), registry, health);
1279        let candidates = [
1280            candidate(1, true, ProviderEvidence::DirectFriend),
1281            candidate(2, true, ProviderEvidence::Neighborhood),
1282            candidate(3, true, ProviderEvidence::IntroducedFof),
1283            candidate(4, true, ProviderEvidence::Neighborhood),
1284        ];
1285        let first = manager
1286            .select_profile(
1287                context(4),
1288                LinkEndpoint::direct(LinkProtocol::Tcp, "127.0.0.1:9300"),
1289                &candidates,
1290                &[],
1291                10,
1292                &TestRandom(1),
1293            )
1294            .await
1295            .expect("first profile");
1296        let second = manager
1297            .select_profile(
1298                context(4),
1299                LinkEndpoint::direct(LinkProtocol::Tcp, "127.0.0.1:9300"),
1300                &candidates,
1301                &[],
1302                20,
1303                &TestRandom(99),
1304            )
1305            .await
1306            .expect("second profile");
1307        let first_route = first.establish.expect("first establish").route;
1308        let second_route = second.establish.expect("second establish").route;
1309        assert_ne!(first_route.hops, second_route.hops);
1310        assert!(first_route
1311            .hops
1312            .iter()
1313            .any(|hop| hop.authority_id == authority(1)));
1314        assert!(second_route
1315            .hops
1316            .iter()
1317            .any(|hop| hop.authority_id != authority(1)));
1318    }
1319
1320    #[tokio::test]
1321    async fn selection_manager_consumes_bootstrap_hints_when_shared_candidates_are_empty() {
1322        let registry = Arc::new(ServiceRegistry::new());
1323        let health = LocalHealthObserverService::new(LocalHealthObserverConfig::for_testing());
1324        health.observe_provider_set(&[], 1, 50).await;
1325        let manager =
1326            SelectionManagerService::new(SelectionManagerConfig::for_testing(), registry, health);
1327        manager
1328            .remember_bootstrap_hints(
1329                context(5),
1330                &[
1331                    bootstrap_hint(
1332                        8,
1333                        BootstrapHintSource::RememberedDirectContact,
1334                        45,
1335                        9_000,
1336                        2,
1337                        None,
1338                    ),
1339                    bootstrap_hint(
1340                        9,
1341                        BootstrapHintSource::BootstrapBridge,
1342                        48,
1343                        6_000,
1344                        4,
1345                        Some(1),
1346                    ),
1347                ],
1348            )
1349            .await;
1350
1351        let profile = manager
1352            .select_profile(
1353                context(5),
1354                LinkEndpoint::direct(LinkProtocol::Tcp, "127.0.0.1:9400"),
1355                &[],
1356                &[],
1357                50,
1358                &TestRandom(1),
1359            )
1360            .await
1361            .expect("bootstrap hints should provide move candidates");
1362        let establish = profile.establish.expect("establish decision");
1363        assert!(establish
1364            .route
1365            .hops
1366            .iter()
1367            .any(|hop| hop.authority_id == authority(8)));
1368    }
1369
1370    #[tokio::test]
1371    async fn stale_reentry_decisions_progress_from_contacts_to_bridges() {
1372        let registry = Arc::new(ServiceRegistry::new());
1373        let health = LocalHealthObserverService::new(LocalHealthObserverConfig::for_testing());
1374        let manager =
1375            SelectionManagerService::new(SelectionManagerConfig::for_testing(), registry, health);
1376        manager
1377            .remember_bootstrap_hints(
1378                context(6),
1379                &[
1380                    bootstrap_hint(
1381                        1,
1382                        BootstrapHintSource::RememberedDirectContact,
1383                        100,
1384                        9_000,
1385                        1,
1386                        None,
1387                    ),
1388                    bootstrap_hint(
1389                        2,
1390                        BootstrapHintSource::NeighborhoodDiscoveryBoard,
1391                        100,
1392                        7_000,
1393                        3,
1394                        None,
1395                    ),
1396                    bootstrap_hint(
1397                        3,
1398                        BootstrapHintSource::WebOfTrustIntroduction,
1399                        100,
1400                        8_000,
1401                        2,
1402                        None,
1403                    ),
1404                    bootstrap_hint(
1405                        4,
1406                        BootstrapHintSource::BootstrapBridge,
1407                        100,
1408                        5_000,
1409                        4,
1410                        Some(7),
1411                    ),
1412                ],
1413            )
1414            .await;
1415
1416        let first = manager
1417            .stale_reentry_decision(context(6), 120)
1418            .await
1419            .expect("contacts-first decision");
1420        assert_eq!(first.stage, BootstrapReentryStage::RememberedContacts);
1421        manager
1422            .record_reentry_attempt(context(6), first.stage, 121, false)
1423            .await;
1424        manager
1425            .record_reentry_attempt(context(6), first.stage, 122, false)
1426            .await;
1427
1428        let second = manager
1429            .stale_reentry_decision(context(6), 123)
1430            .await
1431            .expect("board decision");
1432        assert_eq!(second.stage, BootstrapReentryStage::NeighborhoodBoards);
1433        manager
1434            .record_reentry_attempt(context(6), second.stage, 124, false)
1435            .await;
1436        manager
1437            .record_reentry_attempt(context(6), second.stage, 125, false)
1438            .await;
1439
1440        let third = manager
1441            .stale_reentry_decision(context(6), 126)
1442            .await
1443            .expect("introduction decision");
1444        assert_eq!(third.stage, BootstrapReentryStage::WebOfTrustIntroductions);
1445        manager
1446            .record_reentry_attempt(context(6), third.stage, 127, false)
1447            .await;
1448        manager
1449            .record_reentry_attempt(context(6), third.stage, 128, false)
1450            .await;
1451
1452        let fourth = manager
1453            .stale_reentry_decision(context(6), 129)
1454            .await
1455            .expect("bridge decision");
1456        assert_eq!(fourth.stage, BootstrapReentryStage::BootstrapBridges);
1457    }
1458
1459    #[tokio::test]
1460    async fn bootstrap_weighting_prefers_fresher_hints_without_collapsing_to_one_bridge_cluster() {
1461        let registry = Arc::new(ServiceRegistry::new());
1462        let health = LocalHealthObserverService::new(LocalHealthObserverConfig::for_testing());
1463        health.observe_provider_set(&[], 2, 200).await;
1464        let manager =
1465            SelectionManagerService::new(SelectionManagerConfig::for_testing(), registry, health);
1466        manager
1467            .remember_bootstrap_hints(
1468                context(7),
1469                &[
1470                    bootstrap_hint(
1471                        10,
1472                        BootstrapHintSource::NeighborhoodDiscoveryBoard,
1473                        195,
1474                        8_500,
1475                        3,
1476                        Some(1),
1477                    ),
1478                    bootstrap_hint(
1479                        11,
1480                        BootstrapHintSource::NeighborhoodDiscoveryBoard,
1481                        194,
1482                        8_000,
1483                        3,
1484                        Some(2),
1485                    ),
1486                    bootstrap_hint(
1487                        12,
1488                        BootstrapHintSource::BootstrapBridge,
1489                        130,
1490                        5_500,
1491                        4,
1492                        Some(1),
1493                    ),
1494                    bootstrap_hint(
1495                        13,
1496                        BootstrapHintSource::BootstrapBridge,
1497                        192,
1498                        7_500,
1499                        4,
1500                        Some(3),
1501                    ),
1502                ],
1503            )
1504            .await;
1505
1506        let profile = manager
1507            .select_profile(
1508                context(7),
1509                LinkEndpoint::direct(LinkProtocol::Tcp, "127.0.0.1:9500"),
1510                &[],
1511                &[],
1512                200,
1513                &TestRandom(42),
1514            )
1515            .await
1516            .expect("bootstrap weighted profile");
1517        let route = profile.establish.expect("establish").route;
1518        let unique_hops = route
1519            .hops
1520            .iter()
1521            .map(|hop| hop.authority_id)
1522            .collect::<std::collections::BTreeSet<_>>();
1523        assert!(unique_hops.len() > 1);
1524        assert!(route
1525            .hops
1526            .iter()
1527            .any(|hop| hop.authority_id == authority(10)));
1528        assert!(route
1529            .hops
1530            .iter()
1531            .any(|hop| hop.authority_id != authority(12)));
1532    }
1533
1534    #[tokio::test]
1535    async fn selection_manager_merges_validated_shared_bootstrap_records_locally() {
1536        let registry = Arc::new(ServiceRegistry::new());
1537        let health = LocalHealthObserverService::new(LocalHealthObserverConfig::for_testing());
1538        health.observe_provider_set(&[], 3, 300).await;
1539        let manager =
1540            SelectionManagerService::new(SelectionManagerConfig::for_testing(), registry, health);
1541
1542        manager
1543            .remember_bootstrap_contact_records(
1544                context(8),
1545                &[BootstrapContactHint {
1546                    scope: context(8),
1547                    authority_id: authority(20),
1548                    device_id: Some(DeviceId::new_from_entropy([20; 32])),
1549                    link_endpoints: vec![LinkEndpoint::direct(LinkProtocol::Tcp, "127.0.0.1:7620")],
1550                    route_layer_public_key: Some([20; 32]),
1551                    freshest_observed_at_ms: 290,
1552                    valid_until: 390,
1553                    replay_window_id: [20; 32],
1554                }],
1555            )
1556            .await
1557            .expect("contact hint should merge");
1558        manager
1559            .remember_neighborhood_reentry_records(
1560                context(8),
1561                &[NeighborhoodReentryHint {
1562                    scope: context(8),
1563                    publisher_authority: authority(21),
1564                    advertised_authority: authority(21),
1565                    advertised_device: Some(DeviceId::new_from_entropy([21; 32])),
1566                    link_endpoints: vec![LinkEndpoint::direct(LinkProtocol::Tcp, "127.0.0.1:7621")],
1567                    route_layer_public_key: Some([21; 32]),
1568                    published_at_ms: 291,
1569                    valid_until: 391,
1570                    replay_window_id: [21; 32],
1571                }],
1572            )
1573            .await
1574            .expect("reentry hint should merge");
1575        manager
1576            .remember_bootstrap_introduction_records(
1577                context(8),
1578                &[BootstrapIntroductionHint {
1579                    scope: context(8),
1580                    introducer_authority: authority(22),
1581                    introduced_authority: authority(22),
1582                    introduced_device: Some(DeviceId::new_from_entropy([22; 32])),
1583                    link_endpoints: vec![LinkEndpoint::direct(LinkProtocol::Tcp, "127.0.0.1:7622")],
1584                    route_layer_public_key: Some([22; 32]),
1585                    remaining_depth: 1,
1586                    max_fanout: 2,
1587                    valid_until: 392,
1588                    replay_window_id: [22; 32],
1589                }],
1590                295,
1591            )
1592            .await
1593            .expect("introduction hint should merge");
1594
1595        let profile = manager
1596            .select_profile(
1597                context(8),
1598                LinkEndpoint::direct(LinkProtocol::Tcp, "127.0.0.1:9600"),
1599                &[],
1600                &[],
1601                300,
1602                &TestRandom(5),
1603            )
1604            .await
1605            .expect("shared bootstrap records should become local candidates");
1606
1607        let route = profile.establish.expect("establish").route;
1608        let authorities = route
1609            .hops
1610            .iter()
1611            .map(|hop| hop.authority_id)
1612            .collect::<std::collections::BTreeSet<_>>();
1613        assert!(authorities.contains(&authority(20)));
1614        assert!(authorities.contains(&authority(21)) || authorities.contains(&authority(22)));
1615        assert!(route
1616            .hops
1617            .iter()
1618            .all(|hop| hop.route_layer_public_key.is_some()));
1619        assert!(matches!(
1620            profile.move_decision.binding,
1621            MovePathBinding::Established(_) | MovePathBinding::Direct(_)
1622        ));
1623        let reentry = manager
1624            .stale_reentry_decision(context(8), 300)
1625            .await
1626            .expect("shared records should support stale-node re-entry");
1627        assert_eq!(reentry.stage, BootstrapReentryStage::RememberedContacts);
1628    }
1629
1630    #[tokio::test]
1631    async fn selection_manager_rejects_invalid_shared_bootstrap_records() {
1632        let registry = Arc::new(ServiceRegistry::new());
1633        let health = LocalHealthObserverService::new(LocalHealthObserverConfig::for_testing());
1634        let manager =
1635            SelectionManagerService::new(SelectionManagerConfig::for_testing(), registry, health);
1636
1637        let error = manager
1638            .remember_neighborhood_reentry_records(
1639                context(9),
1640                &[NeighborhoodReentryHint {
1641                    scope: context(9),
1642                    publisher_authority: authority(30),
1643                    advertised_authority: authority(31),
1644                    advertised_device: None,
1645                    link_endpoints: Vec::new(),
1646                    route_layer_public_key: None,
1647                    published_at_ms: 10,
1648                    valid_until: 20,
1649                    replay_window_id: [30; 32],
1650                }],
1651            )
1652            .await
1653            .expect_err("invalid board hint should be rejected");
1654
1655        assert!(matches!(
1656            error,
1657            SelectionManagerError::InvalidBootstrapRecord {
1658                kind: "neighborhood_reentry_hint",
1659                ..
1660            }
1661        ));
1662
1663        assert!(manager
1664            .stale_reentry_decision(context(9), 30)
1665            .await
1666            .is_none());
1667    }
1668}