#![allow(
clippy::unwrap_used,
clippy::expect_used,
clippy::missing_panics_doc,
clippy::cast_possible_truncation,
clippy::doc_markdown
)]
use ant_node::replication::scheduling::{
ReplicationQueues, MAX_FETCH_QUEUE, MAX_PENDING_VERIFY, MAX_PENDING_VERIFY_PER_PEER,
};
use ant_node::replication::types::{HintPipeline, VerificationEntry, VerificationState};
use saorsa_core::identity::PeerId;
use std::collections::HashSet;
use std::time::Instant;
fn peer_id_from_byte(b: u8) -> PeerId {
let mut bytes = [0u8; 32];
bytes[0] = b;
PeerId::from_bytes(bytes)
}
fn unique_xorname(i: u32) -> [u8; 32] {
let mut x = [0u8; 32];
x[..4].copy_from_slice(&i.to_le_bytes());
x
}
fn entry_from(sender: PeerId) -> VerificationEntry {
VerificationEntry {
state: VerificationState::PendingVerify,
pipeline: HintPipeline::Replica,
verified_sources: Vec::new(),
tried_sources: HashSet::new(),
created_at: Instant::now(),
hint_sender: sender,
}
}
#[test]
fn poc_d1_pending_verify_is_globally_bounded() {
let mut queues = ReplicationQueues::new();
let per_peer = MAX_PENDING_VERIFY_PER_PEER;
let mut i: u32 = 0;
let mut sender: u32 = 0;
let target = (MAX_PENDING_VERIFY as u32).saturating_add(20_000);
while i < target {
let mut pid = [0u8; 32];
pid[..4].copy_from_slice(&sender.to_le_bytes());
let s = PeerId::from_bytes(pid);
for _ in 0..per_peer {
if i >= target {
break;
}
queues.add_pending_verify(unique_xorname(i), entry_from(s));
i += 1;
}
sender += 1;
}
assert!(
queues.pending_count() <= MAX_PENDING_VERIFY,
"pending_verify must never exceed MAX_PENDING_VERIFY ({MAX_PENDING_VERIFY}); got {}",
queues.pending_count()
);
assert_eq!(
queues.pending_count(),
MAX_PENDING_VERIFY,
"global memory backstop clamps exactly at the cap"
);
}
#[test]
fn poc_d1_fetch_queue_is_capacity_bounded() {
let mut queues = ReplicationQueues::new();
let sources = vec![peer_id_from_byte(0x02)];
let flood: u32 = (MAX_FETCH_QUEUE as u32).saturating_add(50_000);
for i in 0..flood {
let key = unique_xorname(i);
queues.enqueue_fetch(key, key, sources.clone());
}
assert!(
queues.fetch_queue_count() <= MAX_FETCH_QUEUE,
"fetch_queue must never exceed MAX_FETCH_QUEUE ({MAX_FETCH_QUEUE}); got {}",
queues.fetch_queue_count()
);
assert_eq!(queues.fetch_queue_count(), MAX_FETCH_QUEUE);
}
#[test]
fn poc_d1_flooding_peer_cannot_starve_honest_peer() {
let mut queues = ReplicationQueues::new();
let attacker = peer_id_from_byte(0xAA);
let honest = peer_id_from_byte(0xBB);
let attacker_flood: u32 = (MAX_PENDING_VERIFY_PER_PEER as u32).saturating_add(10_000);
let mut attacker_admitted = 0usize;
for i in 0..attacker_flood {
if queues
.add_pending_verify(unique_xorname(i), entry_from(attacker))
.admitted()
{
attacker_admitted += 1;
}
}
assert_eq!(
attacker_admitted, MAX_PENDING_VERIFY_PER_PEER,
"a single peer can occupy at most MAX_PENDING_VERIFY_PER_PEER slots"
);
assert_eq!(
queues.pending_count_for_sender(&attacker),
MAX_PENDING_VERIFY_PER_PEER,
"per-source accounting matches"
);
assert!(
queues.pending_count() < MAX_PENDING_VERIFY,
"one flooding peer must not be able to fill the global queue; \
pending_count={} cap={MAX_PENDING_VERIFY}",
queues.pending_count()
);
let mut honest_admitted = 0usize;
for j in 0..2_000u32 {
let key = unique_xorname(10_000_000 + j);
if queues
.add_pending_verify(key, entry_from(honest))
.admitted()
{
honest_admitted += 1;
}
}
assert_eq!(
honest_admitted, 2_000,
"every honest hint is admitted — the flooding peer cannot starve it. \
(This assertion FAILS against a global-cap-only fix.)"
);
}
#[test]
fn poc_d1_per_sender_counter_is_consistent() {
let mut queues = ReplicationQueues::new();
let peer = peer_id_from_byte(0xCC);
for i in 0..100u32 {
assert!(queues
.add_pending_verify(unique_xorname(i), entry_from(peer))
.admitted());
}
assert_eq!(queues.pending_count_for_sender(&peer), 100);
for i in 0..40u32 {
assert!(queues.remove_pending(&unique_xorname(i)).is_some());
}
assert_eq!(
queues.pending_count_for_sender(&peer),
60,
"remove_pending decrements the per-source counter in lockstep"
);
queues.evict_stale(std::time::Duration::from_secs(0));
assert_eq!(queues.pending_count(), 0, "all entries evicted as stale");
assert_eq!(
queues.pending_count_for_sender(&peer),
0,
"evict_stale releases per-source slots; the freed quota is reusable \
and the per-sender map is pruned (no leak/desync)"
);
assert!(queues
.add_pending_verify(unique_xorname(999), entry_from(peer))
.admitted());
assert_eq!(queues.pending_count_for_sender(&peer), 1);
}
#[test]
fn poc_d1_bound_preserves_legitimate_entries() {
let mut queues = ReplicationQueues::new();
let peer = peer_id_from_byte(0xDD);
for i in 0..1_000u32 {
assert!(
queues
.add_pending_verify(unique_xorname(i), entry_from(peer))
.admitted(),
"legitimate entries well under both caps are always admitted"
);
}
assert_eq!(queues.pending_count(), 1_000);
assert!(!queues
.add_pending_verify(unique_xorname(0), entry_from(peer))
.admitted());
assert_eq!(
queues.pending_count(),
1_000,
"no spurious growth from dedup"
);
assert_eq!(
queues.pending_count_for_sender(&peer),
1_000,
"dedup must not double-count the per-source quota"
);
}
#[test]
fn poc_d1_set_pending_state_keeps_counter_consistent() {
let mut queues = ReplicationQueues::new();
let peer = peer_id_from_byte(0xEE);
let key = unique_xorname(1);
assert!(queues.add_pending_verify(key, entry_from(peer)).admitted());
assert_eq!(queues.pending_count_for_sender(&peer), 1);
let pipeline = queues
.set_pending_state(&key, VerificationState::QuorumVerified)
.expect("entry must be present");
assert_eq!(pipeline, HintPipeline::Replica, "pipeline preserved");
assert_eq!(
queues.pending_count_for_sender(&peer),
1,
"state change must not touch the per-source counter"
);
assert!(queues.remove_pending(&key).is_some());
assert_eq!(
queues.pending_count_for_sender(&peer),
0,
"removal after a state mutation releases the slot exactly once"
);
}