use std::net::SocketAddr;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::{Duration, Instant};
use dashmap::DashMap;
use serde::Serialize;
use tokio::sync::mpsc;
use tracing::debug;
use crate::peer_state::PeerSource;
const MAX_BACKOFF_SECS: u64 = 3600;
const EVICTION_BAN_DURATION: Duration = Duration::from_secs(1800);
const BASE_BACKOFF_SECS: u64 = 10;
const BACKOFF_FACTOR: u64 = 6;
const RETRY_WINDOW_SECS: u64 = 86400;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum PeerLifecycle {
Queued,
Connecting,
Live,
Dead,
}
#[derive(Debug)]
pub(crate) struct PeerEntry {
pub state: PeerLifecycle,
pub source: PeerSource,
pub backoff_attempt: u32,
pub first_failure_at: Option<Instant>,
pub retry_at: Option<Instant>,
pub connecting_since: Option<Instant>,
pub tcp_connected_at: Option<Instant>,
}
#[derive(Debug, Default)]
pub(crate) struct PeerPipelineStats {
pub known: AtomicU32,
pub queued: AtomicU32,
pub connecting: AtomicU32,
pub live: AtomicU32,
pub dead: AtomicU32,
}
impl PeerPipelineStats {
pub fn snapshot(&self) -> PeerPipelineSnapshot {
PeerPipelineSnapshot {
known: self.known.load(Ordering::Relaxed),
queued: self.queued.load(Ordering::Relaxed),
connecting: self.connecting.load(Ordering::Relaxed),
live: self.live.load(Ordering::Relaxed),
dead: self.dead.load(Ordering::Relaxed),
}
}
fn inc(&self, state: PeerLifecycle) {
self.counter(state).fetch_add(1, Ordering::Relaxed);
}
fn dec(&self, state: PeerLifecycle) {
self.counter(state).fetch_sub(1, Ordering::Relaxed);
}
fn counter(&self, state: PeerLifecycle) -> &AtomicU32 {
match state {
PeerLifecycle::Queued => &self.queued,
PeerLifecycle::Connecting => &self.connecting,
PeerLifecycle::Live => &self.live,
PeerLifecycle::Dead => &self.dead,
}
}
}
#[derive(Debug, Clone, Copy, Default, Serialize)]
pub struct PeerPipelineSnapshot {
pub known: u32,
pub queued: u32,
pub connecting: u32,
pub live: u32,
pub dead: u32,
}
pub(crate) struct PeerStates {
states: DashMap<SocketAddr, PeerEntry>,
pub stats: PeerPipelineStats,
queue_tx: mpsc::UnboundedSender<SocketAddr>,
eviction_bans: DashMap<SocketAddr, Instant>,
}
impl PeerStates {
pub fn new(queue_tx: mpsc::UnboundedSender<SocketAddr>) -> Self {
Self {
states: DashMap::new(),
stats: PeerPipelineStats::default(),
queue_tx,
eviction_bans: DashMap::new(),
}
}
pub fn add_if_not_seen(&self, addr: SocketAddr, source: PeerSource) -> bool {
use dashmap::mapref::entry::Entry;
match self.states.entry(addr) {
Entry::Occupied(_) => false,
Entry::Vacant(vacant) => {
vacant.insert(PeerEntry {
state: PeerLifecycle::Queued,
source,
backoff_attempt: 0,
first_failure_at: None,
retry_at: None,
connecting_since: None,
tcp_connected_at: None,
});
self.stats.known.fetch_add(1, Ordering::Relaxed);
self.stats.inc(PeerLifecycle::Queued);
let _ = self.queue_tx.send(addr);
true
}
}
}
pub fn mark_connecting(&self, addr: SocketAddr) -> bool {
if let Some(mut entry) = self.states.get_mut(&addr)
&& entry.state == PeerLifecycle::Queued
{
self.stats.dec(PeerLifecycle::Queued);
entry.state = PeerLifecycle::Connecting;
entry.connecting_since = Some(Instant::now());
entry.tcp_connected_at = None;
self.stats.inc(PeerLifecycle::Connecting);
return true;
}
false
}
pub fn set_tcp_connected(&self, addr: SocketAddr) {
if let Some(mut entry) = self.states.get_mut(&addr)
&& entry.state == PeerLifecycle::Connecting
{
entry.tcp_connected_at = Some(Instant::now());
}
}
pub fn mark_live(&self, addr: SocketAddr) {
if let Some(mut entry) = self.states.get_mut(&addr)
&& entry.state == PeerLifecycle::Connecting
{
self.stats.dec(PeerLifecycle::Connecting);
entry.state = PeerLifecycle::Live;
entry.backoff_attempt = 0;
entry.first_failure_at = None;
entry.retry_at = None;
self.stats.inc(PeerLifecycle::Live);
}
}
pub fn mark_dead(&self, addr: SocketAddr) -> Option<Duration> {
let mut entry = self.states.get_mut(&addr)?;
let old_state = entry.state;
if old_state != PeerLifecycle::Live && old_state != PeerLifecycle::Connecting {
return None;
}
let now = Instant::now();
let first_failure = entry.first_failure_at.unwrap_or(now);
if first_failure.elapsed() > Duration::from_secs(RETRY_WINDOW_SECS) {
self.stats.dec(old_state);
self.stats.known.fetch_sub(1, Ordering::Relaxed);
let addr_to_remove = *entry.key();
drop(entry);
self.states.remove(&addr_to_remove);
debug!(%addr_to_remove, "peer removed: 24hr retry window exhausted");
return None;
}
let attempt = entry.backoff_attempt;
let backoff_secs = BASE_BACKOFF_SECS
.saturating_mul(BACKOFF_FACTOR.saturating_pow(attempt))
.min(MAX_BACKOFF_SECS);
self.stats.dec(old_state);
entry.state = PeerLifecycle::Dead;
entry.backoff_attempt = attempt.saturating_add(1);
entry.first_failure_at = Some(first_failure);
entry.retry_at = Some(now + Duration::from_secs(backoff_secs));
self.stats.inc(PeerLifecycle::Dead);
Some(Duration::from_secs(backoff_secs))
}
pub fn mark_queued_for_retry(&self, addr: SocketAddr) -> bool {
if let Some(mut entry) = self.states.get_mut(&addr)
&& entry.state == PeerLifecycle::Dead
{
self.stats.dec(PeerLifecycle::Dead);
entry.state = PeerLifecycle::Queued;
entry.retry_at = None;
self.stats.inc(PeerLifecycle::Queued);
let _ = self.queue_tx.send(addr);
return true;
}
false
}
pub fn is_live(&self, addr: &SocketAddr) -> bool {
self.states
.get(addr)
.is_some_and(|e| e.state == PeerLifecycle::Live)
}
#[allow(dead_code)] pub fn is_active(&self, addr: &SocketAddr) -> bool {
self.states
.get(addr)
.is_some_and(|e| e.state != PeerLifecycle::Dead)
}
pub fn source(&self, addr: &SocketAddr) -> Option<PeerSource> {
self.states.get(addr).map(|e| e.source)
}
pub fn soft_reap_candidates(&self, soft_timeout: Duration) -> Vec<SocketAddr> {
let mut candidates = Vec::new();
for entry in self.states.iter() {
if entry.state == PeerLifecycle::Connecting
&& entry.tcp_connected_at.is_none()
&& entry
.connecting_since
.is_some_and(|t| t.elapsed() > soft_timeout)
{
candidates.push(*entry.key());
}
}
candidates
}
#[allow(dead_code)] pub fn len(&self) -> usize {
self.states.len()
}
pub fn add_eviction_ban(&self, addr: SocketAddr) {
self.eviction_bans.insert(addr, Instant::now());
}
pub fn is_eviction_banned(&self, addr: &SocketAddr) -> bool {
let Some(entry) = self.eviction_bans.get(addr) else {
return false;
};
let banned_at = *entry;
drop(entry); if banned_at.elapsed() >= EVICTION_BAN_DURATION {
self.eviction_bans.remove(addr);
false
} else {
true
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::{IpAddr, Ipv4Addr};
fn test_addr(port: u16) -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 1)), port)
}
fn test_addr_ip(last_octet: u8, port: u16) -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, last_octet)), port)
}
fn make_peer_states() -> (PeerStates, mpsc::UnboundedReceiver<SocketAddr>) {
let (tx, rx) = mpsc::unbounded_channel();
(PeerStates::new(tx), rx)
}
#[test]
fn add_if_not_seen_returns_true_for_new_peer() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
assert!(ps.add_if_not_seen(addr, PeerSource::Tracker));
}
#[test]
fn add_if_not_seen_returns_false_for_duplicate() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
assert!(ps.add_if_not_seen(addr, PeerSource::Tracker));
assert!(!ps.add_if_not_seen(addr, PeerSource::Dht));
}
#[test]
fn add_if_not_seen_sets_queued_state() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
let entry = ps.states.get(&addr).expect("entry should exist");
assert_eq!(entry.state, PeerLifecycle::Queued);
assert_eq!(entry.source, PeerSource::Tracker);
assert_eq!(entry.backoff_attempt, 0);
assert!(entry.first_failure_at.is_none());
assert!(entry.retry_at.is_none());
}
#[test]
fn counters_increment_on_add() {
let (ps, _rx) = make_peer_states();
let snap_before = ps.stats.snapshot();
assert_eq!(snap_before.known, 0);
assert_eq!(snap_before.queued, 0);
ps.add_if_not_seen(test_addr(6881), PeerSource::Tracker);
ps.add_if_not_seen(test_addr_ip(2, 6882), PeerSource::Dht);
let snap = ps.stats.snapshot();
assert_eq!(snap.known, 2);
assert_eq!(snap.queued, 2);
assert_eq!(snap.connecting, 0);
assert_eq!(snap.live, 0);
assert_eq!(snap.dead, 0);
}
#[test]
fn mark_connecting_transitions_from_queued() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
assert!(ps.mark_connecting(addr));
let entry = ps.states.get(&addr).expect("entry should exist");
assert_eq!(entry.state, PeerLifecycle::Connecting);
let snap = ps.stats.snapshot();
assert_eq!(snap.queued, 0);
assert_eq!(snap.connecting, 1);
}
#[test]
fn mark_connecting_rejects_non_queued() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
assert!(!ps.mark_connecting(addr));
}
#[test]
fn mark_connecting_returns_false_for_unknown_addr() {
let (ps, _rx) = make_peer_states();
assert!(!ps.mark_connecting(test_addr(9999)));
}
#[test]
fn mark_live_transitions_from_connecting() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
ps.mark_live(addr);
let entry = ps.states.get(&addr).expect("entry should exist");
assert_eq!(entry.state, PeerLifecycle::Live);
assert_eq!(entry.backoff_attempt, 0);
assert!(entry.first_failure_at.is_none());
let snap = ps.stats.snapshot();
assert_eq!(snap.connecting, 0);
assert_eq!(snap.live, 1);
}
#[test]
fn mark_live_resets_backoff_state() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
if let Some(mut entry) = ps.states.get_mut(&addr) {
entry.backoff_attempt = 3;
entry.first_failure_at = Some(Instant::now());
}
ps.mark_live(addr);
let entry = ps.states.get(&addr).expect("entry should exist");
assert_eq!(entry.backoff_attempt, 0);
assert!(entry.first_failure_at.is_none());
assert!(entry.retry_at.is_none());
}
#[test]
fn mark_live_no_op_if_not_connecting() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_live(addr);
let entry = ps.states.get(&addr).expect("entry should exist");
assert_eq!(entry.state, PeerLifecycle::Queued);
}
#[test]
fn mark_dead_from_live_returns_backoff_duration() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
ps.mark_live(addr);
let backoff = ps.mark_dead(addr);
assert_eq!(backoff, Some(Duration::from_secs(10)));
let entry = ps.states.get(&addr).expect("entry should exist");
assert_eq!(entry.state, PeerLifecycle::Dead);
assert_eq!(entry.backoff_attempt, 1);
assert!(entry.first_failure_at.is_some());
assert!(entry.retry_at.is_some());
let snap = ps.stats.snapshot();
assert_eq!(snap.live, 0);
assert_eq!(snap.dead, 1);
}
#[test]
fn mark_dead_from_connecting_returns_backoff_duration() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
let backoff = ps.mark_dead(addr);
assert_eq!(backoff, Some(Duration::from_secs(10)));
let entry = ps.states.get(&addr).expect("entry should exist");
assert_eq!(entry.state, PeerLifecycle::Dead);
let snap = ps.stats.snapshot();
assert_eq!(snap.connecting, 0);
assert_eq!(snap.dead, 1);
}
#[test]
fn mark_dead_returns_none_for_queued() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
assert!(ps.mark_dead(addr).is_none());
}
#[test]
fn mark_dead_returns_none_for_unknown() {
let (ps, _rx) = make_peer_states();
assert!(ps.mark_dead(test_addr(9999)).is_none());
}
#[test]
fn backoff_increases_exponentially() {
let expected_secs = [10u64, 60, 360, 2160, 3600];
for (attempt, &expected) in expected_secs.iter().enumerate() {
#[allow(clippy::cast_possible_truncation)]
let attempt = attempt as u32;
let got = BASE_BACKOFF_SECS
.saturating_mul(BACKOFF_FACTOR.saturating_pow(attempt))
.min(MAX_BACKOFF_SECS);
assert_eq!(
got, expected,
"attempt {attempt}: expected {expected}s, got {got}s"
);
}
for attempt in 5u32..=10 {
let got = BASE_BACKOFF_SECS
.saturating_mul(BACKOFF_FACTOR.saturating_pow(attempt))
.min(MAX_BACKOFF_SECS);
assert_eq!(
got, MAX_BACKOFF_SECS,
"attempt {attempt} should cap at {MAX_BACKOFF_SECS}s"
);
}
}
#[test]
fn mark_dead_preserves_first_failure_at_across_retries() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
ps.mark_live(addr);
let _ = ps.mark_dead(addr);
let first_failure = ps
.states
.get(&addr)
.expect("entry should exist")
.first_failure_at
.expect("first_failure_at should be set");
ps.mark_queued_for_retry(addr);
ps.mark_connecting(addr);
let _ = ps.mark_dead(addr);
let second_failure = ps
.states
.get(&addr)
.expect("entry should exist")
.first_failure_at
.expect("first_failure_at should still be set");
assert_eq!(
first_failure, second_failure,
"first_failure_at must not change across retries without a successful connection"
);
}
#[test]
fn mark_dead_removes_peer_after_24hr_window() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
ps.mark_live(addr);
if let Some(mut entry) = ps.states.get_mut(&addr) {
let too_old = Instant::now().checked_sub(Duration::from_secs(RETRY_WINDOW_SECS + 1));
if let Some(old_instant) = too_old {
entry.first_failure_at = Some(old_instant);
} else {
return;
}
}
let result = ps.mark_dead(addr);
assert!(result.is_none(), "peer past 24hr window should return None");
assert!(
ps.states.get(&addr).is_none(),
"peer should be removed from the map"
);
let snap = ps.stats.snapshot();
assert_eq!(snap.known, 0, "known counter should be decremented");
assert_eq!(snap.live, 0);
assert_eq!(snap.dead, 0);
}
#[test]
fn mark_queued_for_retry_transitions_dead_to_queued() {
let (ps, mut rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
ps.mark_live(addr);
let _ = ps.mark_dead(addr);
assert!(ps.mark_queued_for_retry(addr));
let entry = ps.states.get(&addr).expect("entry should exist");
assert_eq!(entry.state, PeerLifecycle::Queued);
assert!(entry.retry_at.is_none(), "retry_at should be cleared");
let snap = ps.stats.snapshot();
assert_eq!(snap.dead, 0);
assert_eq!(snap.queued, 1);
let queued_addr = rx.try_recv().expect("should have received re-queued addr");
assert_eq!(queued_addr, addr);
}
#[test]
fn mark_queued_for_retry_rejects_non_dead() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
assert!(!ps.mark_queued_for_retry(addr));
}
#[test]
fn mark_queued_for_retry_returns_false_for_unknown() {
let (ps, _rx) = make_peer_states();
assert!(!ps.mark_queued_for_retry(test_addr(9999)));
}
#[test]
fn is_live_reflects_state() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
assert!(!ps.is_live(&addr));
ps.add_if_not_seen(addr, PeerSource::Tracker);
assert!(!ps.is_live(&addr));
ps.mark_connecting(addr);
assert!(!ps.is_live(&addr));
ps.mark_live(addr);
assert!(ps.is_live(&addr));
}
#[test]
fn is_active_reflects_state() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
assert!(!ps.is_active(&addr));
ps.add_if_not_seen(addr, PeerSource::Tracker);
assert!(ps.is_active(&addr), "Queued peer should be active");
ps.mark_connecting(addr);
assert!(ps.is_active(&addr), "Connecting peer should be active");
ps.mark_live(addr);
assert!(ps.is_active(&addr), "Live peer should be active");
let _ = ps.mark_dead(addr);
assert!(!ps.is_active(&addr), "Dead peer should not be active");
}
#[test]
fn len_tracks_entry_count() {
let (ps, _rx) = make_peer_states();
assert_eq!(ps.len(), 0);
ps.add_if_not_seen(test_addr(6881), PeerSource::Tracker);
assert_eq!(ps.len(), 1);
ps.add_if_not_seen(test_addr_ip(2, 6882), PeerSource::Dht);
assert_eq!(ps.len(), 2);
ps.add_if_not_seen(test_addr(6881), PeerSource::Pex);
assert_eq!(ps.len(), 2);
}
#[test]
fn full_lifecycle_counters_are_consistent() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
let s = ps.stats.snapshot();
assert_eq!(
(s.known, s.queued, s.connecting, s.live, s.dead),
(1, 1, 0, 0, 0)
);
ps.mark_connecting(addr);
let s = ps.stats.snapshot();
assert_eq!(
(s.known, s.queued, s.connecting, s.live, s.dead),
(1, 0, 1, 0, 0)
);
ps.mark_live(addr);
let s = ps.stats.snapshot();
assert_eq!(
(s.known, s.queued, s.connecting, s.live, s.dead),
(1, 0, 0, 1, 0)
);
let _ = ps.mark_dead(addr);
let s = ps.stats.snapshot();
assert_eq!(
(s.known, s.queued, s.connecting, s.live, s.dead),
(1, 0, 0, 0, 1)
);
ps.mark_queued_for_retry(addr);
let s = ps.stats.snapshot();
assert_eq!(
(s.known, s.queued, s.connecting, s.live, s.dead),
(1, 1, 0, 0, 0)
);
}
#[test]
fn multiple_peers_tracked_independently() {
let (ps, _rx) = make_peer_states();
let addr1 = test_addr(6881);
let addr2 = test_addr_ip(2, 6882);
let addr3 = test_addr_ip(3, 6883);
ps.add_if_not_seen(addr1, PeerSource::Tracker);
ps.add_if_not_seen(addr2, PeerSource::Dht);
ps.add_if_not_seen(addr3, PeerSource::Pex);
ps.mark_connecting(addr1);
ps.mark_connecting(addr2);
ps.mark_live(addr1);
let s = ps.stats.snapshot();
assert_eq!(s.known, 3);
assert_eq!(s.queued, 1); assert_eq!(s.connecting, 1); assert_eq!(s.live, 1); assert_eq!(s.dead, 0);
}
#[test]
fn mark_dead_increments_backoff_attempt() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
let backoff1 = ps.mark_dead(addr);
assert_eq!(backoff1, Some(Duration::from_secs(10)));
let attempt_after_first = ps.states.get(&addr).expect("entry exists").backoff_attempt;
assert_eq!(attempt_after_first, 1);
ps.mark_queued_for_retry(addr);
ps.mark_connecting(addr);
let backoff2 = ps.mark_dead(addr);
assert_eq!(backoff2, Some(Duration::from_secs(60)));
let attempt_after_second = ps.states.get(&addr).expect("entry exists").backoff_attempt;
assert_eq!(attempt_after_second, 2);
}
#[test]
fn snapshot_is_serializable() {
let snap = PeerPipelineSnapshot {
known: 42,
queued: 10,
connecting: 5,
live: 20,
dead: 7,
};
let json = serde_json::to_string(&snap).expect("should serialize");
assert!(json.contains("\"known\":42"));
assert!(json.contains("\"live\":20"));
}
#[test]
fn mark_connecting_sets_connecting_since() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
let before = Instant::now();
ps.mark_connecting(addr);
let after = Instant::now();
let entry = ps.states.get(&addr).expect("entry should exist");
let since = entry
.connecting_since
.expect("connecting_since should be set");
assert!(since >= before && since <= after);
assert!(entry.tcp_connected_at.is_none());
}
#[test]
fn set_tcp_connected_marks_connecting_peer() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
let before = Instant::now();
ps.set_tcp_connected(addr);
let after = Instant::now();
let entry = ps.states.get(&addr).expect("entry should exist");
let connected_at = entry
.tcp_connected_at
.expect("tcp_connected_at should be set");
assert!(connected_at >= before && connected_at <= after);
}
#[test]
fn set_tcp_connected_no_op_for_non_connecting() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.set_tcp_connected(addr);
let entry = ps.states.get(&addr).expect("entry should exist");
assert!(
entry.tcp_connected_at.is_none(),
"should not set tcp_connected_at for non-Connecting peer"
);
}
#[test]
fn soft_reap_candidates_finds_unreachable_peers() {
let (ps, _rx) = make_peer_states();
let addr1 = test_addr(6881);
let addr2 = test_addr_ip(2, 6882);
let addr3 = test_addr_ip(3, 6883);
ps.add_if_not_seen(addr1, PeerSource::Tracker);
ps.add_if_not_seen(addr2, PeerSource::Tracker);
ps.add_if_not_seen(addr3, PeerSource::Tracker);
ps.mark_connecting(addr1);
ps.mark_connecting(addr2);
ps.mark_connecting(addr3);
ps.set_tcp_connected(addr2);
for mut entry in ps.states.iter_mut() {
if *entry.key() == addr1 || *entry.key() == addr3 {
entry.connecting_since = Some(
Instant::now()
.checked_sub(Duration::from_secs(5))
.unwrap_or(Instant::now()),
);
}
}
let candidates = ps.soft_reap_candidates(Duration::from_secs(3));
assert_eq!(
candidates.len(),
2,
"should find 2 peers without TCP SYN-ACK past timeout"
);
assert!(candidates.contains(&addr1));
assert!(candidates.contains(&addr3));
assert!(
!candidates.contains(&addr2),
"peer with tcp_connected_at should be spared"
);
}
#[test]
fn soft_reap_spares_recently_connecting_peers() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
ps.mark_connecting(addr);
let candidates = ps.soft_reap_candidates(Duration::from_secs(3));
assert!(
candidates.is_empty(),
"recently connecting peer should not be reaped"
);
}
#[test]
fn promotion_flow_increments_live_count() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(6881);
ps.add_if_not_seen(addr, PeerSource::Tracker);
assert_eq!(ps.stats.snapshot().live, 0);
ps.mark_connecting(addr);
assert_eq!(ps.stats.snapshot().live, 0);
assert_eq!(ps.stats.snapshot().connecting, 1);
ps.set_tcp_connected(addr);
assert_eq!(ps.stats.snapshot().live, 0);
ps.mark_live(addr);
assert_eq!(ps.stats.snapshot().live, 1);
assert_eq!(ps.stats.snapshot().connecting, 0);
}
#[test]
fn test_eviction_ban_blocks_peer() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(7001);
assert!(
!ps.is_eviction_banned(&addr),
"should not be banned before add"
);
ps.add_eviction_ban(addr);
assert!(
ps.is_eviction_banned(&addr),
"should be banned immediately after add"
);
}
#[test]
fn test_eviction_ban_expires() {
let (ps, _rx) = make_peer_states();
let addr = test_addr(7002);
ps.add_eviction_ban(addr);
assert!(ps.is_eviction_banned(&addr), "fresh ban should block");
let entry = ps.eviction_bans.get(&addr).expect("ban entry should exist");
assert!(
entry.elapsed() < Duration::from_secs(5),
"ban timestamp should be recent"
);
}
#[test]
fn test_eviction_ban_not_present_returns_false() {
let (ps, _rx) = make_peer_states();
let unknown = test_addr(7003);
assert!(
!ps.is_eviction_banned(&unknown),
"unknown address should not be banned"
);
}
#[test]
fn test_eviction_ban_blocks_specific_peer_only() {
let (ps, _rx) = make_peer_states();
let banned_addr = test_addr_ip(10, 7010);
let innocent_addr = test_addr_ip(20, 7020);
let another_addr = test_addr_ip(30, 7030);
ps.add_eviction_ban(banned_addr);
assert!(
ps.is_eviction_banned(&banned_addr),
"banned peer should be blocked"
);
assert!(
!ps.is_eviction_banned(&innocent_addr),
"innocent peer should not be blocked by another peer's ban"
);
assert!(
!ps.is_eviction_banned(&another_addr),
"unrelated peer should not be blocked"
);
ps.add_eviction_ban(innocent_addr);
assert!(
ps.is_eviction_banned(&banned_addr),
"first ban should still be active"
);
assert!(
ps.is_eviction_banned(&innocent_addr),
"second peer should now be banned"
);
assert!(
!ps.is_eviction_banned(&another_addr),
"third peer should remain unaffected"
);
}
}