Skip to main content

ant_core/data/
peer_cache.rs

1//! Persistent client bootstrap peer cache.
2//!
3//! Client peer IDs are ephemeral, so this cache is not keyed by distance from
4//! the local client. It remembers authenticated node peers that we have already
5//! connected to directly during client runs, stores their dialable channel
6//! addresses, and prefers retaining peers that are spread across the peer-id
7//! keyspace.
8
9use crate::config;
10use ant_protocol::transport::{IPDiversityConfig, MultiAddr, P2PNode, PeerId};
11use serde::{Deserialize, Serialize};
12use std::collections::{HashMap, HashSet};
13use std::net::IpAddr;
14use std::path::{Path, PathBuf};
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::time::{SystemTime, UNIX_EPOCH};
17use tracing::{debug, info, warn};
18
19pub const CLIENT_PEER_CACHE_MAX_PEERS: usize = 50;
20
21/// Address families allowed when materializing cached startup candidates.
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub enum BootstrapAddressFilter {
24    /// Allow every dialable cached address.
25    All,
26    /// Allow only IPv4 cached addresses.
27    Ipv4Only,
28}
29
30const CLIENT_PEER_CACHE_SCHEMA_VERSION: u32 = 1;
31const CLIENT_PEER_CACHE_FILE_NAME: &str = "client_peer_cache.json";
32const CLIENT_PEER_CACHE_TEMP_SUFFIX: &str = "tmp";
33const DEFAULT_MAX_PER_EXACT_IP: usize = 2;
34const SUBNET_LIMIT_K_DIVISOR: usize = 4;
35const IPV4_SUBNET_PREFIX_OCTETS: usize = 3;
36const IPV6_SUBNET_PREFIX_SEGMENTS: usize = 3;
37const BITS_PER_BYTE: u8 = 8;
38const PEER_ID_SECTOR_BITS: u8 = 4;
39const PEER_ID_SECTOR_COUNT: usize = 1 << PEER_ID_SECTOR_BITS;
40const PEER_ID_XOR_DISTANCE_BYTES: usize = 32;
41
42static TEMP_FILE_COUNTER: AtomicU64 = AtomicU64::new(0);
43
44#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
45struct ClientPeerCacheFile {
46    schema_version: u32,
47    peers: Vec<CachedPeer>,
48}
49
50#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
51struct CachedPeer {
52    peer_id: PeerId,
53    direct_addresses: Vec<MultiAddr>,
54    first_connected_epoch_secs: u64,
55    last_connected_epoch_secs: u64,
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
59enum SubnetKey {
60    V4([u8; IPV4_SUBNET_PREFIX_OCTETS]),
61    V6([u16; IPV6_SUBNET_PREFIX_SEGMENTS]),
62}
63
64struct DiversityTracker {
65    exact_ip_counts: HashMap<IpAddr, usize>,
66    subnet_counts: HashMap<SubnetKey, usize>,
67    max_per_ip: usize,
68    max_per_subnet: usize,
69}
70
71/// Build the on-disk cache path for the client peer cache.
72#[must_use]
73pub fn cache_path() -> Option<PathBuf> {
74    match config::data_dir() {
75        Ok(data_dir) => Some(data_dir.join(CLIENT_PEER_CACHE_FILE_NAME)),
76        Err(err) => {
77            warn!("client peer cache disabled: failed to resolve data dir: {err}");
78            None
79        }
80    }
81}
82
83/// Load cache addresses to try before configured bootstrap peers.
84///
85/// Returns at most one direct address per cached peer. saorsa-core stops client
86/// bootstrap after the client bootstrap target is reached, so every usable
87/// cached peer is ordered before the configured fallback peers without forcing
88/// all cached peers to be dialed on a healthy warm start.
89#[must_use]
90pub fn cached_bootstrap_peers(cache_path: &Path, k_value: usize) -> Vec<MultiAddr> {
91    cached_bootstrap_peers_with_filter(cache_path, k_value, BootstrapAddressFilter::All)
92}
93
94/// Load cache addresses to try before configured bootstrap peers, applying an
95/// address-family filter before choosing the first address for each peer.
96#[must_use]
97pub fn cached_bootstrap_peers_with_filter(
98    cache_path: &Path,
99    k_value: usize,
100    address_filter: BootstrapAddressFilter,
101) -> Vec<MultiAddr> {
102    let Some(mut cache) = ClientPeerCacheFile::load_existing(cache_path) else {
103        return Vec::new();
104    };
105    let loaded_peer_count = cache.peers.len();
106    let loaded_direct_address_count = cache.direct_address_count();
107    let diversity_config = cache_diversity_config();
108    let normalized = cache.normalize(&diversity_config, k_value);
109    if normalized {
110        cache.save(cache_path);
111    }
112    let bootstrap_addresses =
113        cache.bootstrap_addresses(CLIENT_PEER_CACHE_MAX_PEERS, address_filter);
114    info!(
115        path = %cache_path.display(),
116        cached_peers = loaded_peer_count,
117        direct_addresses = loaded_direct_address_count,
118        usable_cached_peers = cache.peers.len(),
119        bootstrap_candidates = bootstrap_addresses.len(),
120        "client peer bootstrap cache file found and loaded; cached peers available",
121    );
122    bootstrap_addresses
123}
124
125/// Select startup bootstrap peers.
126///
127/// Cached peers are ordered first and configured bootstrap peers are appended
128/// behind them. saorsa-core stops client bootstrap after the client bootstrap
129/// target is reached, so configured peers are only reached when the cached
130/// candidates do not produce enough successful connections.
131#[must_use]
132pub fn select_bootstrap_peers(
133    cached: impl IntoIterator<Item = MultiAddr>,
134    configured: impl IntoIterator<Item = MultiAddr>,
135) -> Vec<MultiAddr> {
136    dedupe_bootstrap_peers(cached.into_iter().chain(configured))
137}
138
139fn dedupe_bootstrap_peers(addrs: impl IntoIterator<Item = MultiAddr>) -> Vec<MultiAddr> {
140    let mut seen = HashSet::new();
141    let mut deduped = Vec::new();
142
143    for addr in addrs {
144        if seen.insert(bootstrap_address_key(&addr)) {
145            deduped.push(addr);
146        }
147    }
148
149    deduped
150}
151
152/// Persist authenticated peers reached directly during this client run.
153///
154/// A DHT Direct tag is not required here. The cache records dialable addresses
155/// from currently live peer connections so the next client run can try peers it
156/// actually reached.
157pub async fn promote_connected_direct_peers(node: &P2PNode, cache_path: &Path, k_value: usize) {
158    let connected_peers = node.connected_peers().await;
159    if connected_peers.is_empty() {
160        return;
161    }
162
163    let connected_peer_count = connected_peers.len();
164    let mut cache = ClientPeerCacheFile::load(cache_path);
165    let diversity_config = cache_diversity_config();
166    let now = now_epoch_secs();
167    let mut changed = false;
168    let mut cacheable_peer_count = 0usize;
169    let mut cacheable_address_count = 0usize;
170
171    for peer_id in connected_peers {
172        let Some(peer_info) = node.peer_info(&peer_id).await else {
173            continue;
174        };
175
176        let channel_addresses = peer_info
177            .addresses
178            .into_iter()
179            .filter(|addr| addr.dialable_socket_addr().is_some())
180            .collect::<Vec<_>>();
181        if channel_addresses.is_empty() {
182            continue;
183        }
184
185        cacheable_peer_count += 1;
186        cacheable_address_count += channel_addresses.len();
187
188        changed |= cache.upsert_connected_peer(
189            peer_id,
190            channel_addresses,
191            now,
192            &diversity_config,
193            k_value,
194        );
195    }
196
197    if changed {
198        info!(
199            path = %cache_path.display(),
200            connected_peers = connected_peer_count,
201            cacheable_peers = cacheable_peer_count,
202            cacheable_addresses = cacheable_address_count,
203            cached_peers = cache.peers.len(),
204            direct_addresses = cache.direct_address_count(),
205            "client peer bootstrap cache updated from live connected peers",
206        );
207        cache.save(cache_path);
208    }
209}
210
211/// The cache applies the default k-bucket IP diversity policy rather than the
212/// client's permissive routing-table setting. This keeps the persisted
213/// bootstrap surface from collapsing onto one IP or subnet.
214#[must_use]
215fn cache_diversity_config() -> IPDiversityConfig {
216    IPDiversityConfig::default()
217}
218
219impl BootstrapAddressFilter {
220    fn allows(self, addr: &MultiAddr) -> bool {
221        match self {
222            Self::All => addr.dialable_socket_addr().is_some(),
223            Self::Ipv4Only => addr
224                .dialable_socket_addr()
225                .is_some_and(|socket| socket.is_ipv4()),
226        }
227    }
228}
229
230impl ClientPeerCacheFile {
231    fn empty() -> Self {
232        Self {
233            schema_version: CLIENT_PEER_CACHE_SCHEMA_VERSION,
234            peers: Vec::new(),
235        }
236    }
237
238    fn load(path: &Path) -> Self {
239        Self::load_existing(path).unwrap_or_else(Self::empty)
240    }
241
242    fn load_existing(path: &Path) -> Option<Self> {
243        let Ok(data) = std::fs::read_to_string(path) else {
244            return None;
245        };
246
247        match serde_json::from_str::<Self>(&data) {
248            Ok(cache) if cache.schema_version == CLIENT_PEER_CACHE_SCHEMA_VERSION => Some(cache),
249            Ok(cache) => {
250                debug!(
251                    path = %path.display(),
252                    schema_version = cache.schema_version,
253                    "ignoring client peer cache with unsupported schema version",
254                );
255                None
256            }
257            Err(err) => {
258                warn!(
259                    path = %path.display(),
260                    "ignoring unreadable client peer cache: {err}",
261                );
262                None
263            }
264        }
265    }
266
267    fn direct_address_count(&self) -> usize {
268        self.peers
269            .iter()
270            .map(|peer| peer.direct_addresses.len())
271            .sum()
272    }
273
274    fn save(&self, path: &Path) {
275        if let Some(parent) = path.parent() {
276            if let Err(err) = std::fs::create_dir_all(parent) {
277                warn!(
278                    path = %path.display(),
279                    "failed to create client peer cache directory: {err}",
280                );
281                return;
282            }
283        }
284
285        let data = match serde_json::to_vec_pretty(self) {
286            Ok(data) => data,
287            Err(err) => {
288                warn!("failed to serialize client peer cache: {err}");
289                return;
290            }
291        };
292
293        let temp_path = temp_path_for(path);
294        if let Err(err) = std::fs::write(&temp_path, data) {
295            warn!(
296                path = %temp_path.display(),
297                "failed to write client peer cache temp file: {err}",
298            );
299            return;
300        }
301
302        #[cfg(windows)]
303        if path.exists() {
304            if let Err(err) = std::fs::remove_file(path) {
305                warn!(
306                    path = %path.display(),
307                    "failed to replace existing client peer cache: {err}",
308                );
309                let _ = std::fs::remove_file(&temp_path);
310                return;
311            }
312        }
313
314        if let Err(err) = std::fs::rename(&temp_path, path) {
315            warn!(
316                from = %temp_path.display(),
317                to = %path.display(),
318                "failed to commit client peer cache: {err}",
319            );
320            let _ = std::fs::remove_file(temp_path);
321        }
322    }
323
324    fn upsert_connected_peer(
325        &mut self,
326        peer_id: PeerId,
327        direct_addresses: Vec<MultiAddr>,
328        now: u64,
329        diversity_config: &IPDiversityConfig,
330        k_value: usize,
331    ) -> bool {
332        let direct_addresses = sanitize_direct_addresses(peer_id, direct_addresses);
333        if direct_addresses.is_empty() {
334            return false;
335        }
336
337        let before = self.peers.clone();
338        if let Some(existing) = self.peers.iter_mut().find(|peer| peer.peer_id == peer_id) {
339            existing.direct_addresses = direct_addresses;
340            existing.last_connected_epoch_secs = now;
341        } else {
342            self.peers.push(CachedPeer {
343                peer_id,
344                direct_addresses,
345                first_connected_epoch_secs: now,
346                last_connected_epoch_secs: now,
347            });
348        }
349
350        self.normalize(diversity_config, k_value);
351        self.peers != before
352    }
353
354    fn normalize(&mut self, diversity_config: &IPDiversityConfig, k_value: usize) -> bool {
355        let before = self.peers.clone();
356        self.peers.retain(|peer| !peer.direct_addresses.is_empty());
357        self.peers.sort_by(|left, right| {
358            right
359                .last_connected_epoch_secs
360                .cmp(&left.last_connected_epoch_secs)
361                .then_with(|| left.peer_id.to_hex().cmp(&right.peer_id.to_hex()))
362        });
363
364        let mut candidates = Vec::with_capacity(self.peers.len());
365        let mut seen_peers = HashSet::new();
366        for peer in self.peers.drain(..) {
367            if seen_peers.insert(peer.peer_id) {
368                candidates.push(peer);
369            }
370        }
371
372        let mut tracker = DiversityTracker::new(diversity_config, k_value);
373        let mut normalized = Vec::with_capacity(CLIENT_PEER_CACHE_MAX_PEERS);
374
375        while normalized.len() < CLIENT_PEER_CACHE_MAX_PEERS {
376            let Some(best_index) =
377                select_peer_id_diverse_candidate(&candidates, &normalized, &tracker)
378            else {
379                break;
380            };
381            let peer = candidates.swap_remove(best_index);
382            tracker.record_peer(&peer);
383            normalized.push(peer);
384        }
385
386        self.peers = normalized;
387        self.peers != before
388    }
389
390    fn bootstrap_addresses(
391        &self,
392        limit: usize,
393        address_filter: BootstrapAddressFilter,
394    ) -> Vec<MultiAddr> {
395        let mut sectors = (0..PEER_ID_SECTOR_COUNT)
396            .map(|_| Vec::new())
397            .collect::<Vec<Vec<&CachedPeer>>>();
398
399        for peer in &self.peers {
400            sectors[peer_id_sector(peer.peer_id)].push(peer);
401        }
402
403        let mut positions = [0usize; PEER_ID_SECTOR_COUNT];
404        let mut addresses = Vec::with_capacity(self.peers.len().min(limit));
405
406        loop {
407            let mut advanced_this_round = false;
408            for sector in 0..PEER_ID_SECTOR_COUNT {
409                let position = positions[sector];
410                let Some(peer) = sectors[sector].get(position) else {
411                    continue;
412                };
413                positions[sector] += 1;
414                advanced_this_round = true;
415                if let Some(addr) = peer
416                    .direct_addresses
417                    .iter()
418                    .find(|addr| address_filter.allows(addr))
419                {
420                    addresses.push(addr.clone());
421                }
422                if addresses.len() >= limit {
423                    return addresses;
424                }
425            }
426            if !advanced_this_round {
427                return addresses;
428            }
429        }
430    }
431}
432
433fn select_peer_id_diverse_candidate(
434    candidates: &[CachedPeer],
435    selected: &[CachedPeer],
436    tracker: &DiversityTracker,
437) -> Option<usize> {
438    let mut best_index = None;
439
440    for (candidate_index, candidate) in candidates.iter().enumerate() {
441        if !tracker.can_admit_peer(candidate) {
442            continue;
443        }
444        let Some(current_best_index) = best_index else {
445            best_index = Some(candidate_index);
446            continue;
447        };
448        let current_best = &candidates[current_best_index];
449        if prefer_peer_id_candidate(candidate, current_best, selected) {
450            best_index = Some(candidate_index);
451        }
452    }
453
454    best_index
455}
456
457fn prefer_peer_id_candidate(
458    candidate: &CachedPeer,
459    current_best: &CachedPeer,
460    selected: &[CachedPeer],
461) -> bool {
462    peer_id_spread_score(candidate, selected)
463        .cmp(&peer_id_spread_score(current_best, selected))
464        .then_with(|| {
465            candidate
466                .last_connected_epoch_secs
467                .cmp(&current_best.last_connected_epoch_secs)
468        })
469        .then_with(|| {
470            current_best
471                .peer_id
472                .to_hex()
473                .cmp(&candidate.peer_id.to_hex())
474        })
475        .is_gt()
476}
477
478fn peer_id_spread_score(
479    candidate: &CachedPeer,
480    selected: &[CachedPeer],
481) -> Option<[u8; PEER_ID_XOR_DISTANCE_BYTES]> {
482    selected
483        .iter()
484        .map(|peer| peer_id_xor_distance(candidate.peer_id, peer.peer_id))
485        .min()
486}
487
488fn peer_id_xor_distance(left: PeerId, right: PeerId) -> [u8; PEER_ID_XOR_DISTANCE_BYTES] {
489    let left_bytes = left.as_bytes();
490    let right_bytes = right.as_bytes();
491    let mut distance = [0u8; PEER_ID_XOR_DISTANCE_BYTES];
492    for (index, byte) in distance.iter_mut().enumerate() {
493        *byte = left_bytes[index] ^ right_bytes[index];
494    }
495    distance
496}
497
498impl DiversityTracker {
499    fn new(config: &IPDiversityConfig, k_value: usize) -> Self {
500        Self {
501            exact_ip_counts: HashMap::new(),
502            subnet_counts: HashMap::new(),
503            max_per_ip: config.max_per_ip.unwrap_or(DEFAULT_MAX_PER_EXACT_IP),
504            max_per_subnet: config
505                .max_per_subnet
506                .unwrap_or_else(|| default_subnet_limit(k_value)),
507        }
508    }
509
510    fn can_admit_peer(&self, peer: &CachedPeer) -> bool {
511        let Some((ip_set, subnet_set)) = peer_diversity_sets(peer) else {
512            return false;
513        };
514
515        for ip in &ip_set {
516            if self.exact_ip_counts.get(ip).copied().unwrap_or_default() >= self.max_per_ip {
517                return false;
518            }
519        }
520
521        for subnet in &subnet_set {
522            if self.subnet_counts.get(subnet).copied().unwrap_or_default() >= self.max_per_subnet {
523                return false;
524            }
525        }
526
527        true
528    }
529
530    fn record_peer(&mut self, peer: &CachedPeer) {
531        let Some((ip_set, subnet_set)) = peer_diversity_sets(peer) else {
532            return;
533        };
534
535        for ip in ip_set {
536            *self.exact_ip_counts.entry(ip).or_default() += 1;
537        }
538        for subnet in subnet_set {
539            *self.subnet_counts.entry(subnet).or_default() += 1;
540        }
541    }
542}
543
544fn peer_diversity_sets(peer: &CachedPeer) -> Option<(HashSet<IpAddr>, HashSet<SubnetKey>)> {
545    let ip_set = peer
546        .direct_addresses
547        .iter()
548        .filter_map(|addr| {
549            addr.dialable_socket_addr()
550                .map(|socket| canonical_ip(socket.ip()))
551        })
552        .collect::<HashSet<_>>();
553
554    if ip_set.is_empty() {
555        return None;
556    }
557
558    let subnet_set = ip_set
559        .iter()
560        .map(|ip| subnet_key(*ip))
561        .collect::<HashSet<_>>();
562
563    Some((ip_set, subnet_set))
564}
565
566fn sanitize_direct_addresses(peer_id: PeerId, direct_addresses: Vec<MultiAddr>) -> Vec<MultiAddr> {
567    let mut seen = HashSet::new();
568    let mut sanitized = Vec::new();
569
570    for addr in direct_addresses {
571        if addr.dialable_socket_addr().is_none() {
572            continue;
573        }
574        let addr = addr.with_peer_id(peer_id);
575        if seen.insert(addr.to_string()) {
576            sanitized.push(addr);
577        }
578    }
579
580    sanitized
581}
582
583fn bootstrap_address_key(addr: &MultiAddr) -> String {
584    addr.dialable_socket_addr()
585        .map(|socket| socket.to_string())
586        .unwrap_or_else(|| addr.to_string())
587}
588
589fn default_subnet_limit(k_value: usize) -> usize {
590    std::cmp::max(k_value / SUBNET_LIMIT_K_DIVISOR, 1)
591}
592
593fn subnet_key(ip: IpAddr) -> SubnetKey {
594    match ip {
595        IpAddr::V4(ip) => {
596            let octets = ip.octets();
597            SubnetKey::V4([octets[0], octets[1], octets[IPV4_SUBNET_PREFIX_OCTETS - 1]])
598        }
599        IpAddr::V6(ip) => {
600            let segments = ip.segments();
601            SubnetKey::V6([
602                segments[0],
603                segments[1],
604                segments[IPV6_SUBNET_PREFIX_SEGMENTS - 1],
605            ])
606        }
607    }
608}
609
610fn canonical_ip(ip: IpAddr) -> IpAddr {
611    match ip {
612        IpAddr::V4(ip) => IpAddr::V4(ip),
613        IpAddr::V6(ip) => ip
614            .to_ipv4_mapped()
615            .map(IpAddr::V4)
616            .unwrap_or(IpAddr::V6(ip)),
617    }
618}
619
620fn peer_id_sector(peer_id: PeerId) -> usize {
621    let sector_shift = BITS_PER_BYTE - PEER_ID_SECTOR_BITS;
622    usize::from(peer_id.as_bytes()[0] >> sector_shift)
623}
624
625fn temp_path_for(path: &Path) -> PathBuf {
626    let counter = TEMP_FILE_COUNTER.fetch_add(1, Ordering::Relaxed);
627    let process_id = std::process::id();
628    let file_name = path
629        .file_name()
630        .and_then(|name| name.to_str())
631        .unwrap_or(CLIENT_PEER_CACHE_FILE_NAME);
632    path.with_file_name(format!(
633        ".{file_name}.{process_id}.{counter}.{CLIENT_PEER_CACHE_TEMP_SUFFIX}"
634    ))
635}
636
637fn now_epoch_secs() -> u64 {
638    SystemTime::now()
639        .duration_since(UNIX_EPOCH)
640        .map(|duration| duration.as_secs())
641        .unwrap_or_default()
642}
643
644#[cfg(test)]
645#[allow(clippy::unwrap_used)]
646mod tests {
647    use super::*;
648    use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
649
650    const TEST_PEER_ID_LEN: usize = 32;
651    const TEST_K_VALUE: usize = 20;
652    const FIRST_PORT: u16 = 10_000;
653    const TEST_NOW: u64 = 1_000_000;
654    const EXACT_IP_ATTEMPTS: u8 = 3;
655    const SUBNET_ATTEMPTS: u8 = 6;
656    const BOOTSTRAP_ROUND_ROBIN_TEST_LIMIT: usize = 6;
657
658    fn peer_id(byte: u8) -> PeerId {
659        peer_id_with_prefix(byte, 0)
660    }
661
662    fn peer_id_with_prefix(first_byte: u8, second_byte: u8) -> PeerId {
663        let mut bytes = [0u8; TEST_PEER_ID_LEN];
664        bytes[0] = first_byte;
665        bytes[1] = second_byte;
666        PeerId::from_bytes(bytes)
667    }
668
669    fn direct_addr(ip: IpAddr, port: u16) -> MultiAddr {
670        MultiAddr::quic(SocketAddr::new(ip, port))
671    }
672
673    fn v4(a: u8, b: u8, c: u8, d: u8) -> IpAddr {
674        IpAddr::V4(Ipv4Addr::new(a, b, c, d))
675    }
676
677    fn v6(first_segment: u16, host: u16) -> IpAddr {
678        IpAddr::V6(Ipv6Addr::new(first_segment, 0, 0, 0, 0, 0, 0, host))
679    }
680
681    #[test]
682    fn cache_prefers_peer_id_spread_over_recency_when_full() {
683        let mut cache = ClientPeerCacheFile::empty();
684        let diversity = IPDiversityConfig::permissive();
685
686        let old_distant_peer = peer_id_with_prefix(u8::MAX, 0);
687        cache.peers.push(CachedPeer {
688            peer_id: old_distant_peer,
689            direct_addresses: vec![direct_addr(v4(203, 0, 113, 1), FIRST_PORT)],
690            first_connected_epoch_secs: TEST_NOW,
691            last_connected_epoch_secs: TEST_NOW,
692        });
693
694        for idx in 0..CLIENT_PEER_CACHE_MAX_PEERS {
695            let peer = peer_id_with_prefix(0, idx as u8);
696            let addr = direct_addr(
697                v4(1, 0, idx as u8, 1),
698                FIRST_PORT + u16::try_from(idx).unwrap(),
699            );
700            let connected_epoch_secs = TEST_NOW + u64::try_from(idx).unwrap() + 1;
701            cache.peers.push(CachedPeer {
702                peer_id: peer,
703                direct_addresses: vec![addr.with_peer_id(peer)],
704                first_connected_epoch_secs: connected_epoch_secs,
705                last_connected_epoch_secs: connected_epoch_secs,
706            });
707        }
708
709        cache.normalize(&diversity, TEST_K_VALUE);
710
711        assert_eq!(cache.peers.len(), CLIENT_PEER_CACHE_MAX_PEERS);
712        assert!(
713            cache
714                .peers
715                .iter()
716                .any(|peer| peer.peer_id == old_distant_peer),
717            "old distant peer must be retained ahead of one newer clustered peer"
718        );
719        assert_eq!(
720            cache
721                .peers
722                .iter()
723                .filter(|peer| peer.peer_id.as_bytes()[0] == 0)
724                .count(),
725            CLIENT_PEER_CACHE_MAX_PEERS - 1
726        );
727    }
728
729    #[test]
730    fn cache_applies_exact_ip_limit() {
731        let mut cache = ClientPeerCacheFile::empty();
732        let diversity = IPDiversityConfig::default();
733
734        for idx in 0..EXACT_IP_ATTEMPTS {
735            cache.upsert_connected_peer(
736                peer_id(idx),
737                vec![direct_addr(v4(203, 0, 113, 1), FIRST_PORT + u16::from(idx))],
738                TEST_NOW + u64::from(idx),
739                &diversity,
740                TEST_K_VALUE,
741            );
742        }
743
744        assert_eq!(cache.peers.len(), DEFAULT_MAX_PER_EXACT_IP);
745        assert!(cache.peers.iter().any(|peer| peer.peer_id == peer_id(2)));
746        assert!(cache.peers.iter().any(|peer| peer.peer_id == peer_id(1)));
747        assert!(!cache.peers.iter().any(|peer| peer.peer_id == peer_id(0)));
748    }
749
750    #[test]
751    fn cache_applies_subnet_limit() {
752        let mut cache = ClientPeerCacheFile::empty();
753        let diversity = IPDiversityConfig::default();
754
755        for idx in 0..SUBNET_ATTEMPTS {
756            cache.upsert_connected_peer(
757                peer_id(idx),
758                vec![direct_addr(
759                    v4(198, 51, 100, idx),
760                    FIRST_PORT + u16::from(idx),
761                )],
762                TEST_NOW + u64::from(idx),
763                &diversity,
764                TEST_K_VALUE,
765            );
766        }
767
768        assert_eq!(cache.peers.len(), default_subnet_limit(TEST_K_VALUE));
769        assert!(cache.peers.iter().any(|peer| peer.peer_id == peer_id(5)));
770        assert!(!cache.peers.iter().any(|peer| peer.peer_id == peer_id(0)));
771    }
772
773    #[test]
774    fn cache_rejects_peers_without_dialable_direct_addresses() {
775        let mut cache = ClientPeerCacheFile::empty();
776        let diversity = IPDiversityConfig::permissive();
777
778        let changed =
779            cache.upsert_connected_peer(peer_id(1), Vec::new(), TEST_NOW, &diversity, TEST_K_VALUE);
780
781        assert!(!changed);
782        assert!(cache.peers.is_empty());
783    }
784
785    #[test]
786    fn cached_bootstrap_addresses_round_robin_peer_id_sectors() {
787        let mut cache = ClientPeerCacheFile::empty();
788        let diversity = IPDiversityConfig::permissive();
789
790        cache.upsert_connected_peer(
791            peer_id(0x01),
792            vec![direct_addr(v4(1, 0, 0, 1), FIRST_PORT)],
793            TEST_NOW,
794            &diversity,
795            TEST_K_VALUE,
796        );
797        cache.upsert_connected_peer(
798            peer_id(0x02),
799            vec![direct_addr(v4(1, 0, 0, 2), FIRST_PORT + 1)],
800            TEST_NOW + 1,
801            &diversity,
802            TEST_K_VALUE,
803        );
804        cache.upsert_connected_peer(
805            peer_id(0xf0),
806            vec![direct_addr(v6(0x2001, 1), FIRST_PORT + 2)],
807            TEST_NOW + 2,
808            &diversity,
809            TEST_K_VALUE,
810        );
811
812        let addresses = cache.bootstrap_addresses(
813            BOOTSTRAP_ROUND_ROBIN_TEST_LIMIT,
814            BootstrapAddressFilter::All,
815        );
816
817        assert_eq!(addresses.len(), 3);
818        assert_eq!(
819            addresses[0].dialable_socket_addr().unwrap().ip(),
820            v4(1, 0, 0, 2)
821        );
822        assert_eq!(
823            addresses[1].dialable_socket_addr().unwrap().ip(),
824            v6(0x2001, 1)
825        );
826        assert_eq!(
827            addresses[2].dialable_socket_addr().unwrap().ip(),
828            v4(1, 0, 0, 1)
829        );
830    }
831
832    #[test]
833    fn cached_addresses_are_stored_with_peer_id_suffix() {
834        let mut cache = ClientPeerCacheFile::empty();
835        let diversity = IPDiversityConfig::permissive();
836
837        cache.upsert_connected_peer(
838            peer_id(1),
839            vec![direct_addr(v4(203, 0, 113, 10), FIRST_PORT)],
840            TEST_NOW,
841            &diversity,
842            TEST_K_VALUE,
843        );
844
845        let addr = cache.peers[0].direct_addresses[0].clone();
846        assert_eq!(addr.peer_id(), Some(&peer_id(1)));
847    }
848
849    #[test]
850    fn cached_bootstrap_addresses_respect_ipv4_only_filter() {
851        let mut cache = ClientPeerCacheFile::empty();
852        let diversity = IPDiversityConfig::permissive();
853
854        let peer = peer_id(1);
855        let ipv6_addr = direct_addr(v6(0x2001, 1), FIRST_PORT);
856        let ipv4_addr = direct_addr(v4(203, 0, 113, 10), FIRST_PORT + 1);
857        cache.upsert_connected_peer(
858            peer,
859            vec![ipv6_addr.clone(), ipv4_addr.clone()],
860            TEST_NOW,
861            &diversity,
862            TEST_K_VALUE,
863        );
864
865        let all_addresses =
866            cache.bootstrap_addresses(CLIENT_PEER_CACHE_MAX_PEERS, BootstrapAddressFilter::All);
867        assert_eq!(all_addresses, vec![ipv6_addr.with_peer_id(peer)]);
868
869        let ipv4_addresses = cache.bootstrap_addresses(
870            CLIENT_PEER_CACHE_MAX_PEERS,
871            BootstrapAddressFilter::Ipv4Only,
872        );
873        assert_eq!(ipv4_addresses, vec![ipv4_addr.with_peer_id(peer)]);
874    }
875
876    #[test]
877    fn select_bootstrap_peers_orders_configured_after_cached_fallback() {
878        let first_cached = MultiAddr::quic(SocketAddr::new(v4(203, 0, 113, 20), FIRST_PORT))
879            .with_peer_id(peer_id(1));
880        let second_cached = MultiAddr::quic(SocketAddr::new(v4(203, 0, 113, 21), FIRST_PORT))
881            .with_peer_id(peer_id(2));
882        let configured = MultiAddr::quic(SocketAddr::new(v4(203, 0, 113, 22), FIRST_PORT));
883
884        let selected = select_bootstrap_peers(
885            vec![first_cached.clone(), second_cached.clone()],
886            vec![configured.clone()],
887        );
888
889        assert_eq!(selected, vec![first_cached, second_cached, configured]);
890    }
891
892    #[test]
893    fn select_bootstrap_peers_uses_configured_when_cache_empty() {
894        let configured = MultiAddr::quic(SocketAddr::new(v4(203, 0, 113, 21), FIRST_PORT));
895
896        let selected = select_bootstrap_peers(Vec::new(), vec![configured.clone()]);
897
898        assert_eq!(selected, vec![configured]);
899    }
900
901    #[test]
902    fn cached_bootstrap_peers_include_all_usable_cached_peers() {
903        let mut cache = ClientPeerCacheFile::empty();
904        let diversity = IPDiversityConfig::permissive();
905
906        for idx in 0..BOOTSTRAP_ROUND_ROBIN_TEST_LIMIT + 1 {
907            cache.upsert_connected_peer(
908                peer_id(idx as u8),
909                vec![direct_addr(
910                    v4(1, 0, idx as u8, 1),
911                    FIRST_PORT + u16::try_from(idx).unwrap(),
912                )],
913                TEST_NOW + u64::try_from(idx).unwrap(),
914                &diversity,
915                TEST_K_VALUE,
916            );
917        }
918
919        let addresses =
920            cache.bootstrap_addresses(CLIENT_PEER_CACHE_MAX_PEERS, BootstrapAddressFilter::All);
921
922        assert_eq!(addresses.len(), BOOTSTRAP_ROUND_ROBIN_TEST_LIMIT + 1);
923    }
924}