#![allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::missing_panics_doc,
clippy::significant_drop_tightening
)]
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::RwLock;
use ant_node::replication::bootstrap::{
check_bootstrap_drained, clear_capacity_rejected, note_capacity_rejected,
};
use ant_node::replication::scheduling::{
AdmissionResult, ReplicationQueues, MAX_PENDING_VERIFY_PER_PEER,
};
use ant_node::replication::types::{
BootstrapState, HintPipeline, VerificationEntry, VerificationState,
};
use saorsa_core::identity::PeerId;
fn peer(b: u8) -> PeerId {
let mut bytes = [0u8; 32];
bytes[0] = b;
PeerId::from_bytes(bytes)
}
fn entry(sender: PeerId) -> VerificationEntry {
VerificationEntry {
state: VerificationState::PendingVerify,
pipeline: HintPipeline::Replica,
verified_sources: Vec::new(),
tried_sources: HashSet::new(),
created_at: Instant::now(),
hint_sender: sender,
}
}
fn unique_key(i: u32) -> [u8; 32] {
let mut k = [0u8; 32];
k[..4].copy_from_slice(&i.to_le_bytes());
k
}
async fn simulate_inbound_sync(
queues: &Arc<RwLock<ReplicationQueues>>,
bootstrap_state: &Arc<RwLock<BootstrapState>>,
source: PeerId,
key_offset: u32,
hint_count: u32,
) -> usize {
let mut capacity_rejected_count: usize = 0;
{
let mut q = queues.write().await;
for i in 0..hint_count {
let result = q.add_pending_verify(unique_key(key_offset + i), entry(source));
match result {
AdmissionResult::Admitted | AdmissionResult::AlreadyPresent => {}
AdmissionResult::CapacityRejected => {
capacity_rejected_count += 1;
}
}
}
}
if capacity_rejected_count > 0 {
note_capacity_rejected(bootstrap_state, source).await;
} else {
clear_capacity_rejected(bootstrap_state, &source).await;
}
capacity_rejected_count
}
#[tokio::test]
async fn poc_bootstrap_stall_via_persistent_per_peer_overflow() {
let queues = Arc::new(RwLock::new(ReplicationQueues::new()));
let bootstrap_state = Arc::new(RwLock::new(BootstrapState::new()));
let attacker = peer(0xAA);
let mut next_key: u32 = 0;
#[allow(clippy::cast_possible_truncation)]
let flood = MAX_PENDING_VERIFY_PER_PEER as u32 + 1;
let rejected =
simulate_inbound_sync(&queues, &bootstrap_state, attacker, next_key, flood).await;
next_key += flood;
assert!(
rejected >= 1,
"round 1 must over-cap (got {rejected} rejections); test is mis-sized"
);
let drained_before_attack_continues = {
let q = queues.read().await;
check_bootstrap_drained(&bootstrap_state, &q).await
};
assert!(
!drained_before_attack_continues,
"bootstrap must NOT drain while attacker has outstanding capacity-rejected hints"
);
for round in 0..32 {
let r = simulate_inbound_sync(&queues, &bootstrap_state, attacker, next_key, 1).await;
next_key += 1;
assert!(
r >= 1,
"round {round}: attacker hint must continue to capacity-reject \
(per-peer cap still full); got {r}"
);
let drained = {
let q = queues.read().await;
check_bootstrap_drained(&bootstrap_state, &q).await
};
assert!(
!drained,
"round {round}: bootstrap drained despite attacker still capacity-rejecting"
);
}
let state = bootstrap_state.read().await;
assert!(
state.capacity_rejected_sources.contains(&attacker),
"attacker peer is still in capacity_rejected_sources after the flood — \
this is the documented stall: the victim has no mechanism to retire \
the attacker without the attacker's cooperation (a 'clean' admission \
cycle), so a hostile peer can stall bootstrap indefinitely"
);
assert_eq!(
state.capacity_rejected_sources.len(),
1,
"only the attacker is outstanding; honest peers are unaffected — \
which is exactly what makes this a single-peer DoS"
);
}
#[tokio::test]
async fn honest_peer_drains_normally_alongside_attacker() {
let queues = Arc::new(RwLock::new(ReplicationQueues::new()));
let bootstrap_state = Arc::new(RwLock::new(BootstrapState::new()));
let attacker = peer(0xAA);
let honest = peer(0x01);
#[allow(clippy::cast_possible_truncation)]
let flood = MAX_PENDING_VERIFY_PER_PEER as u32 + 1;
let r_atk = simulate_inbound_sync(&queues, &bootstrap_state, attacker, 0, flood).await;
assert!(r_atk >= 1);
let r_honest = simulate_inbound_sync(&queues, &bootstrap_state, honest, flood + 100, 16).await;
assert_eq!(
r_honest, 0,
"honest peer's small batch must NOT capacity-reject — per-source quota isolates them"
);
let state = bootstrap_state.read().await;
assert!(
state.capacity_rejected_sources.contains(&attacker),
"attacker is outstanding"
);
assert!(
!state.capacity_rejected_sources.contains(&honest),
"honest peer is NOT outstanding; its clean cycle cleared (or never created) its entry"
);
}