use std::collections::{BinaryHeap, HashMap, HashSet};
use std::time::{Duration, Instant};
use crate::logging::debug;
use crate::ant_protocol::XorName;
use crate::replication::types::{FetchCandidate, VerificationEntry};
use saorsa_core::identity::PeerId;
#[derive(Debug, Clone)]
pub struct InFlightEntry {
pub key: XorName,
pub source: PeerId,
pub started_at: Instant,
pub all_sources: Vec<PeerId>,
pub tried: HashSet<PeerId>,
}
pub struct ReplicationQueues {
pending_verify: HashMap<XorName, VerificationEntry>,
fetch_queue: BinaryHeap<FetchCandidate>,
fetch_queue_keys: HashSet<XorName>,
in_flight_fetch: HashMap<XorName, InFlightEntry>,
}
impl Default for ReplicationQueues {
fn default() -> Self {
Self::new()
}
}
impl ReplicationQueues {
#[must_use]
pub fn new() -> Self {
Self {
pending_verify: HashMap::new(),
fetch_queue: BinaryHeap::new(),
fetch_queue_keys: HashSet::new(),
in_flight_fetch: HashMap::new(),
}
}
pub fn add_pending_verify(&mut self, key: XorName, entry: VerificationEntry) -> bool {
if self.contains_key(&key) {
return false;
}
self.pending_verify.insert(key, entry);
true
}
#[must_use]
pub fn get_pending(&self, key: &XorName) -> Option<&VerificationEntry> {
self.pending_verify.get(key)
}
pub fn get_pending_mut(&mut self, key: &XorName) -> Option<&mut VerificationEntry> {
self.pending_verify.get_mut(key)
}
pub fn remove_pending(&mut self, key: &XorName) -> Option<VerificationEntry> {
self.pending_verify.remove(key)
}
#[must_use]
pub fn pending_keys(&self) -> Vec<XorName> {
self.pending_verify.keys().copied().collect()
}
#[must_use]
pub fn pending_count(&self) -> usize {
self.pending_verify.len()
}
pub fn enqueue_fetch(&mut self, key: XorName, distance: XorName, sources: Vec<PeerId>) {
if self.pending_verify.contains_key(&key)
|| self.fetch_queue_keys.contains(&key)
|| self.in_flight_fetch.contains_key(&key)
{
return;
}
self.fetch_queue_keys.insert(key);
self.fetch_queue.push(FetchCandidate {
key,
distance,
sources,
});
}
pub fn dequeue_fetch(&mut self) -> Option<FetchCandidate> {
while let Some(candidate) = self.fetch_queue.pop() {
self.fetch_queue_keys.remove(&candidate.key);
if !self.in_flight_fetch.contains_key(&candidate.key) {
return Some(candidate);
}
}
None
}
#[must_use]
pub fn fetch_queue_count(&self) -> usize {
self.fetch_queue.len()
}
pub fn start_fetch(&mut self, key: XorName, source: PeerId, all_sources: Vec<PeerId>) {
let mut tried = HashSet::new();
tried.insert(source);
self.in_flight_fetch.insert(
key,
InFlightEntry {
key,
source,
started_at: Instant::now(),
all_sources,
tried,
},
);
}
pub fn complete_fetch(&mut self, key: &XorName) -> Option<InFlightEntry> {
self.in_flight_fetch.remove(key)
}
pub fn retry_fetch(&mut self, key: &XorName) -> Option<PeerId> {
let entry = self.in_flight_fetch.get_mut(key)?;
entry.tried.insert(entry.source);
let next = entry
.all_sources
.iter()
.find(|p| !entry.tried.contains(p))
.copied();
if let Some(next_peer) = next {
entry.source = next_peer;
entry.tried.insert(next_peer);
Some(next_peer)
} else {
None
}
}
#[must_use]
pub fn in_flight_count(&self) -> usize {
self.in_flight_fetch.len()
}
#[must_use]
pub fn contains_key(&self, key: &XorName) -> bool {
self.pending_verify.contains_key(key)
|| self.fetch_queue_keys.contains(key)
|| self.in_flight_fetch.contains_key(key)
}
#[must_use]
pub fn is_bootstrap_work_empty(&self, bootstrap_keys: &HashSet<XorName>) -> bool {
!bootstrap_keys.iter().any(|k| self.contains_key(k))
}
pub fn evict_stale(&mut self, max_age: Duration) {
let now = Instant::now();
let before = self.pending_verify.len();
self.pending_verify
.retain(|_, entry| now.duration_since(entry.created_at) < max_age);
let evicted = before.saturating_sub(self.pending_verify.len());
if evicted > 0 {
debug!("Evicted {evicted} stale pending-verification entries");
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use std::collections::HashSet;
use std::time::{Duration, Instant};
use super::*;
use crate::replication::types::{HintPipeline, VerificationState};
fn peer_id_from_byte(b: u8) -> PeerId {
let mut bytes = [0u8; 32];
bytes[0] = b;
PeerId::from_bytes(bytes)
}
fn xor_name_from_byte(b: u8) -> XorName {
[b; 32]
}
fn test_entry(sender_byte: u8) -> VerificationEntry {
VerificationEntry {
state: VerificationState::PendingVerify,
pipeline: HintPipeline::Replica,
verified_sources: Vec::new(),
tried_sources: HashSet::new(),
created_at: Instant::now(),
hint_sender: peer_id_from_byte(sender_byte),
}
}
#[test]
fn add_pending_verify_new_key_succeeds() {
let mut queues = ReplicationQueues::new();
let key = xor_name_from_byte(0x01);
assert!(queues.add_pending_verify(key, test_entry(1)));
assert_eq!(queues.pending_count(), 1);
}
#[test]
fn add_pending_verify_duplicate_rejected() {
let mut queues = ReplicationQueues::new();
let key = xor_name_from_byte(0x01);
assert!(queues.add_pending_verify(key, test_entry(1)));
assert!(!queues.add_pending_verify(key, test_entry(2)));
assert_eq!(queues.pending_count(), 1);
}
#[test]
fn add_pending_verify_rejected_if_in_fetch_queue() {
let mut queues = ReplicationQueues::new();
let key = xor_name_from_byte(0x02);
let distance = xor_name_from_byte(0x10);
queues.enqueue_fetch(key, distance, vec![peer_id_from_byte(1)]);
assert!(
!queues.add_pending_verify(key, test_entry(1)),
"should reject key already in fetch queue"
);
}
#[test]
fn add_pending_verify_rejected_if_in_flight() {
let mut queues = ReplicationQueues::new();
let key = xor_name_from_byte(0x03);
let source = peer_id_from_byte(1);
queues.start_fetch(key, source, vec![source]);
assert!(
!queues.add_pending_verify(key, test_entry(1)),
"should reject key already in-flight"
);
}
#[test]
fn dequeue_returns_nearest_first() {
let mut queues = ReplicationQueues::new();
let near_key = xor_name_from_byte(0x01);
let far_key = xor_name_from_byte(0x02);
let near_dist = [0x00; 32]; let far_dist = [0xFF; 32];
queues.enqueue_fetch(far_key, far_dist, vec![peer_id_from_byte(1)]);
queues.enqueue_fetch(near_key, near_dist, vec![peer_id_from_byte(2)]);
let first = queues.dequeue_fetch().expect("should dequeue");
assert_eq!(first.key, near_key, "nearest key should dequeue first");
let second = queues.dequeue_fetch().expect("should dequeue");
assert_eq!(second.key, far_key, "farthest key should dequeue second");
}
#[test]
fn enqueue_dedup_prevents_duplicates() {
let mut queues = ReplicationQueues::new();
let key = xor_name_from_byte(0x01);
queues.enqueue_fetch(key, [0x10; 32], vec![peer_id_from_byte(1)]);
queues.enqueue_fetch(key, [0x10; 32], vec![peer_id_from_byte(2)]);
assert_eq!(
queues.fetch_queue_count(),
1,
"duplicate enqueue should be ignored"
);
}
#[test]
fn start_and_complete_fetch() {
let mut queues = ReplicationQueues::new();
let key = xor_name_from_byte(0x01);
let source = peer_id_from_byte(1);
queues.start_fetch(key, source, vec![source]);
assert_eq!(queues.in_flight_count(), 1);
let completed = queues.complete_fetch(&key);
assert!(completed.is_some());
assert_eq!(queues.in_flight_count(), 0);
}
#[test]
fn complete_nonexistent_returns_none() {
let mut queues = ReplicationQueues::new();
let key = xor_name_from_byte(0x99);
assert!(queues.complete_fetch(&key).is_none());
}
#[test]
fn retry_fetch_returns_next_untried_source() {
let mut queues = ReplicationQueues::new();
let key = xor_name_from_byte(0x01);
let source_a = peer_id_from_byte(1);
let source_b = peer_id_from_byte(2);
let source_c = peer_id_from_byte(3);
queues.start_fetch(key, source_a, vec![source_a, source_b, source_c]);
let next = queues.retry_fetch(&key);
assert_eq!(next, Some(source_b));
let next = queues.retry_fetch(&key);
assert_eq!(next, Some(source_c));
let next = queues.retry_fetch(&key);
assert!(next.is_none(), "all sources exhausted");
}
#[test]
fn retry_fetch_nonexistent_returns_none() {
let mut queues = ReplicationQueues::new();
assert!(queues.retry_fetch(&xor_name_from_byte(0xFF)).is_none());
}
#[test]
fn contains_key_in_pending() {
let mut queues = ReplicationQueues::new();
let key = xor_name_from_byte(0x01);
queues.add_pending_verify(key, test_entry(1));
assert!(queues.contains_key(&key));
}
#[test]
fn contains_key_in_fetch_queue() {
let mut queues = ReplicationQueues::new();
let key = xor_name_from_byte(0x02);
queues.enqueue_fetch(key, [0x10; 32], vec![peer_id_from_byte(1)]);
assert!(queues.contains_key(&key));
}
#[test]
fn contains_key_in_flight() {
let mut queues = ReplicationQueues::new();
let key = xor_name_from_byte(0x03);
queues.start_fetch(key, peer_id_from_byte(1), vec![]);
assert!(queues.contains_key(&key));
}
#[test]
fn contains_key_absent() {
let queues = ReplicationQueues::new();
assert!(!queues.contains_key(&xor_name_from_byte(0xFF)));
}
#[test]
fn bootstrap_work_empty_when_no_keys_present() {
let queues = ReplicationQueues::new();
let bootstrap_keys: HashSet<XorName> = [xor_name_from_byte(0x01), xor_name_from_byte(0x02)]
.into_iter()
.collect();
assert!(queues.is_bootstrap_work_empty(&bootstrap_keys));
}
#[test]
fn bootstrap_work_not_empty_when_key_in_pending() {
let mut queues = ReplicationQueues::new();
let key = xor_name_from_byte(0x01);
queues.add_pending_verify(key, test_entry(1));
let bootstrap_keys: HashSet<XorName> = std::iter::once(key).collect();
assert!(!queues.is_bootstrap_work_empty(&bootstrap_keys));
}
#[test]
fn evict_stale_removes_old_entries() {
let mut queues = ReplicationQueues::new();
let key = xor_name_from_byte(0x01);
let mut entry = test_entry(1);
entry.created_at = Instant::now()
.checked_sub(Duration::from_secs(2))
.unwrap_or_else(Instant::now);
queues.pending_verify.insert(key, entry);
assert_eq!(queues.pending_count(), 1);
queues.evict_stale(Duration::from_secs(1));
assert_eq!(
queues.pending_count(),
0,
"entry older than max_age should be evicted"
);
}
#[test]
fn evict_stale_keeps_fresh_entries() {
let mut queues = ReplicationQueues::new();
let key = xor_name_from_byte(0x01);
queues.add_pending_verify(key, test_entry(1));
queues.evict_stale(Duration::from_secs(3600));
assert_eq!(
queues.pending_count(),
1,
"fresh entry should not be evicted"
);
}
#[test]
fn remove_pending_returns_entry() {
let mut queues = ReplicationQueues::new();
let key = xor_name_from_byte(0x01);
queues.add_pending_verify(key, test_entry(1));
let removed = queues.remove_pending(&key);
assert!(removed.is_some());
assert_eq!(queues.pending_count(), 0);
}
#[test]
fn remove_pending_nonexistent_returns_none() {
let mut queues = ReplicationQueues::new();
assert!(queues.remove_pending(&xor_name_from_byte(0xFF)).is_none());
}
#[test]
fn scenario_8_duplicate_key_not_double_queued() {
let mut queues = ReplicationQueues::new();
let key = xor_name_from_byte(0xE0);
let distance = xor_name_from_byte(0x10);
assert!(
queues.add_pending_verify(key, test_entry(1)),
"first add to PendingVerify should succeed"
);
assert!(
queues.contains_key(&key),
"key should be present in pipeline"
);
queues.enqueue_fetch(key, distance, vec![peer_id_from_byte(2)]);
assert!(queues.contains_key(&key), "key should still be in pipeline");
queues.remove_pending(&key);
queues.enqueue_fetch(key, distance, vec![peer_id_from_byte(3)]);
assert_eq!(queues.fetch_queue_count(), 1);
assert!(
!queues.add_pending_verify(key, test_entry(4)),
"key in FetchQueue should be rejected from PendingVerify"
);
let candidate = queues.dequeue_fetch().expect("should dequeue");
queues.start_fetch(
candidate.key,
candidate.sources[0],
candidate.sources.clone(),
);
assert!(
!queues.add_pending_verify(key, test_entry(5)),
"key in-flight should be rejected from PendingVerify"
);
queues.enqueue_fetch(key, distance, vec![peer_id_from_byte(6)]);
assert_eq!(
queues.fetch_queue_count(),
0,
"enqueue_fetch should be no-op for in-flight key"
);
}
#[test]
fn scenario_8_replica_and_paid_hint_collapses_to_replica() {
let mut queues = ReplicationQueues::new();
let key = xor_name_from_byte(0xE1);
let entry = VerificationEntry {
state: VerificationState::PendingVerify,
pipeline: HintPipeline::Replica, verified_sources: Vec::new(),
tried_sources: HashSet::new(),
created_at: Instant::now(),
hint_sender: peer_id_from_byte(1),
};
assert!(queues.add_pending_verify(key, entry));
let pending = queues.get_pending(&key).expect("should be pending");
assert_eq!(
pending.pipeline,
HintPipeline::Replica,
"key in both hint sets should be Replica pipeline"
);
let paid_entry = VerificationEntry {
state: VerificationState::PendingVerify,
pipeline: HintPipeline::PaidOnly,
verified_sources: Vec::new(),
tried_sources: HashSet::new(),
created_at: Instant::now(),
hint_sender: peer_id_from_byte(2),
};
assert!(
!queues.add_pending_verify(key, paid_entry),
"duplicate key should be rejected regardless of pipeline"
);
let pending = queues.get_pending(&key).expect("should still be pending");
assert_eq!(
pending.pipeline,
HintPipeline::Replica,
"pipeline should remain Replica after duplicate rejection"
);
}
#[test]
fn scenario_3_neighbor_sync_quorum_pass_full_pipeline() {
let mut queues = ReplicationQueues::new();
let key = xor_name_from_byte(0x03);
let distance = xor_name_from_byte(0x01);
let source_a = peer_id_from_byte(1);
let source_b = peer_id_from_byte(2);
let hint_sender = peer_id_from_byte(3);
let entry = VerificationEntry {
state: VerificationState::PendingVerify,
pipeline: HintPipeline::Replica,
verified_sources: Vec::new(),
tried_sources: HashSet::new(),
created_at: Instant::now(),
hint_sender,
};
assert!(
queues.add_pending_verify(key, entry),
"new key should be admitted to PendingVerify"
);
assert!(queues.contains_key(&key));
assert_eq!(queues.pending_count(), 1);
let removed = queues.remove_pending(&key);
assert!(removed.is_some(), "key should exist in pending");
assert_eq!(queues.pending_count(), 0);
queues.enqueue_fetch(key, distance, vec![source_a, source_b]);
assert_eq!(queues.fetch_queue_count(), 1);
assert!(
queues.contains_key(&key),
"key should be in pipeline (fetch queue)"
);
let candidate = queues.dequeue_fetch().expect("should dequeue");
assert_eq!(candidate.key, key);
assert_eq!(candidate.sources.len(), 2);
queues.start_fetch(key, source_a, candidate.sources);
assert_eq!(queues.in_flight_count(), 1);
assert_eq!(queues.fetch_queue_count(), 0);
assert!(
queues.contains_key(&key),
"key should be in pipeline (in-flight)"
);
let completed = queues.complete_fetch(&key);
assert!(
completed.is_some(),
"should have in-flight entry to complete"
);
assert_eq!(queues.in_flight_count(), 0);
assert!(
!queues.contains_key(&key),
"key should be fully processed out of pipeline"
);
}
}