#![allow(
clippy::cast_possible_truncation,
clippy::cast_possible_wrap,
clippy::cast_sign_loss,
reason = "M175: piece reservation — collection-length casts bounded by num_pieces (u32 by construction in Lengths::new)"
)]
use parking_lot::Mutex;
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicU8, AtomicU32, AtomicU64, Ordering};
use irontide_core::{FilePriority, Lengths};
use irontide_storage::Bitfield;
use rustc_hash::FxHashMap;
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct BlockRequest {
pub piece: u32,
pub begin: u32,
pub length: u32,
}
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum PieceState {
Available = 0,
Reserved = 1,
Complete = 2,
Unwanted = 3,
Endgame = 4,
}
impl PieceState {
fn from_u8(v: u8) -> Self {
match v {
0 | 5.. => Self::Available, 1 => Self::Reserved,
2 => Self::Complete,
3 => Self::Unwanted,
4 => Self::Endgame,
}
}
}
#[derive(Debug)]
pub(crate) struct AtomicPieceStates {
states: Vec<AtomicU8>,
}
#[allow(dead_code)]
impl AtomicPieceStates {
pub fn new(num_pieces: u32, we_have: &Bitfield, wanted: &Bitfield) -> Self {
let states: Vec<AtomicU8> = (0..num_pieces)
.map(|i| {
let val = if we_have.get(i) {
PieceState::Complete as u8
} else if !wanted.get(i) {
PieceState::Unwanted as u8
} else {
PieceState::Available as u8
};
AtomicU8::new(val)
})
.collect();
Self { states }
}
pub fn try_reserve(&self, index: u32) -> bool {
let atom = &self.states[index as usize];
if atom
.compare_exchange(
PieceState::Available as u8,
PieceState::Reserved as u8,
Ordering::Relaxed,
Ordering::Relaxed,
)
.is_ok()
{
return true;
}
let current = atom.load(Ordering::Relaxed);
current == PieceState::Endgame as u8
}
pub fn mark_complete(&self, index: u32) {
self.states[index as usize].store(PieceState::Complete as u8, Ordering::Relaxed);
}
pub fn release(&self, index: u32) {
self.states[index as usize].store(PieceState::Available as u8, Ordering::Relaxed);
}
pub fn transition_to_endgame(&self, index: u32) {
self.states[index as usize].store(PieceState::Endgame as u8, Ordering::Relaxed);
}
pub fn mark_unwanted(&self, index: u32) {
self.states[index as usize].store(PieceState::Unwanted as u8, Ordering::Relaxed);
}
pub fn mark_available(&self, index: u32) {
let _ = self.states[index as usize].compare_exchange(
PieceState::Unwanted as u8,
PieceState::Available as u8,
Ordering::Relaxed,
Ordering::Relaxed,
);
}
pub fn get(&self, index: u32) -> PieceState {
PieceState::from_u8(self.states[index as usize].load(Ordering::Relaxed))
}
pub fn force_reserved(&self, index: u32) {
self.states[index as usize].store(PieceState::Reserved as u8, Ordering::Relaxed);
}
pub fn len(&self) -> u32 {
self.states.len() as u32
}
pub fn in_flight_count(&self) -> u32 {
self.states
.iter()
.filter(|a| {
let v = a.load(Ordering::Relaxed);
v == PieceState::Reserved as u8 || v == PieceState::Endgame as u8
})
.count() as u32
}
pub fn snapshot(&self) -> Vec<u8> {
self.states
.iter()
.map(
|atom| match PieceState::from_u8(atom.load(Ordering::Relaxed)) {
PieceState::Complete => 2,
PieceState::Reserved | PieceState::Endgame => 1,
PieceState::Available | PieceState::Unwanted => 0,
},
)
.collect()
}
}
struct CurrentPiece {
piece: u32,
next_block: u32,
total_blocks: u32,
}
use slab::Slab;
pub(crate) struct PeerSlab {
inner: Slab<SocketAddr>,
addr_to_slot: FxHashMap<SocketAddr, u16>,
}
#[allow(dead_code)]
impl PeerSlab {
pub fn new() -> Self {
Self {
inner: Slab::with_capacity(64),
addr_to_slot: FxHashMap::default(),
}
}
pub fn insert(&mut self, addr: SocketAddr) -> u16 {
let key = self.inner.insert(addr);
assert!(
u16::try_from(key).is_ok(),
"peer slab overflow: {key} > u16::MAX"
);
let slot = key as u16;
self.addr_to_slot.insert(addr, slot);
slot
}
pub fn remove(&mut self, slot: u16) -> SocketAddr {
let addr = self.inner.remove(slot as usize);
self.addr_to_slot.remove(&addr);
addr
}
pub fn remove_by_addr(&mut self, addr: &SocketAddr) -> Option<u16> {
if let Some(slot) = self.addr_to_slot.remove(addr) {
self.inner.remove(slot as usize);
Some(slot)
} else {
None
}
}
pub fn get(&self, slot: u16) -> Option<&SocketAddr> {
self.inner.get(slot as usize)
}
pub fn slot_of(&self, addr: &SocketAddr) -> Option<u16> {
self.addr_to_slot.get(addr).copied()
}
pub fn contains(&self, slot: u16) -> bool {
self.inner.contains(slot as usize)
}
pub fn len(&self) -> usize {
self.inner.len()
}
}
#[derive(Debug)]
pub(crate) struct BlockMaps {
requested: Vec<AtomicU64>,
received: Vec<AtomicU64>,
words_per_piece: u32,
}
#[allow(dead_code)]
impl BlockMaps {
pub fn new(num_pieces: u32, lengths: &Lengths) -> Self {
let max_blocks = if num_pieces == 0 {
0
} else {
lengths.chunks_in_piece(0)
};
let words_per_piece = max_blocks.saturating_add(63) / 64;
let total_words = (num_pieces as usize).saturating_mul(words_per_piece as usize);
let mut requested = Vec::with_capacity(total_words);
let mut received = Vec::with_capacity(total_words);
for _ in 0..total_words {
requested.push(AtomicU64::new(0));
received.push(AtomicU64::new(0));
}
Self {
requested,
received,
words_per_piece,
}
}
pub fn mark_requested(&self, piece: u32, block: u32) -> bool {
let (word_idx, bit_mask) = self.word_and_mask(piece, block);
let old = self.requested[word_idx].fetch_or(bit_mask, Ordering::Relaxed);
old & bit_mask != 0
}
pub fn mark_received(&self, piece: u32, block: u32) {
let (word_idx, bit_mask) = self.word_and_mask(piece, block);
self.received[word_idx].fetch_or(bit_mask, Ordering::Relaxed);
}
pub fn next_unrequested(&self, piece: u32, total_blocks: u32) -> Option<u32> {
let base = (piece as usize).checked_mul(self.words_per_piece as usize)?;
for block in 0..total_blocks {
let word_offset = block / 64;
let bit = block % 64;
let word_idx = base.checked_add(word_offset as usize)?;
let word = self.requested.get(word_idx)?.load(Ordering::Relaxed);
if word & (1u64 << bit) == 0 {
return Some(block);
}
}
None
}
pub fn all_received(&self, piece: u32, total_blocks: u32) -> bool {
let Some(base) = (piece as usize).checked_mul(self.words_per_piece as usize) else {
return total_blocks == 0;
};
for block in 0..total_blocks {
let word_offset = block / 64;
let bit = block % 64;
let Some(atom) = self.received.get(base + word_offset as usize) else {
return false;
};
let word = atom.load(Ordering::Relaxed);
if word & (1u64 << bit) == 0 {
return false;
}
}
true
}
pub fn clear(&self, piece: u32, total_blocks: u32) {
let base = (piece as usize).saturating_mul(self.words_per_piece as usize);
let num_words = total_blocks.saturating_add(63) / 64;
for w in 0..num_words as usize {
let idx = base + w;
if let Some(atom) = self.requested.get(idx) {
atom.store(0, Ordering::Relaxed);
}
if let Some(atom) = self.received.get(idx) {
atom.store(0, Ordering::Relaxed);
}
}
}
fn word_and_mask(&self, piece: u32, block: u32) -> (usize, u64) {
let base = (piece as usize).saturating_mul(self.words_per_piece as usize);
let word_offset = (block / 64) as usize;
let idx = base + word_offset;
debug_assert!(
idx < self.requested.len(),
"BlockMaps: piece={piece} block={block} out of range (idx={idx}, len={})",
self.requested.len()
);
let bit = block % 64;
(idx, 1u64 << bit)
}
}
#[derive(Debug)]
pub(crate) struct StealCandidates {
inner: Mutex<VecDeque<u32>>,
timing: crate::timed_lock::LockTimingSettings,
}
#[allow(dead_code)]
impl StealCandidates {
pub fn new() -> Self {
Self {
inner: Mutex::new(VecDeque::new()),
timing: crate::timed_lock::LockTimingSettings::default(),
}
}
pub fn push(&self, piece: u32) {
let mut guard =
crate::timed_lock::TimedGuard::new(self.inner.lock(), &self.timing, "steal_candidates");
if !guard.contains(&piece) {
guard.push_back(piece);
}
}
pub fn push_batch(&self, pieces: &[u32]) {
let mut guard =
crate::timed_lock::TimedGuard::new(self.inner.lock(), &self.timing, "steal_candidates");
for &piece in pieces {
if !guard.contains(&piece) {
guard.push_back(piece);
}
}
}
pub fn pop(&self) -> Option<u32> {
let mut guard =
crate::timed_lock::TimedGuard::new(self.inner.lock(), &self.timing, "steal_candidates");
guard.pop_front()
}
pub fn remove(&self, piece: u32) {
let mut guard =
crate::timed_lock::TimedGuard::new(self.inner.lock(), &self.timing, "steal_candidates");
if let Some(pos) = guard.iter().position(|&p| p == piece) {
guard.remove(pos);
}
}
}
#[derive(Debug)]
pub(crate) struct PieceWriteGuards {
writers: Vec<AtomicU32>,
}
impl PieceWriteGuards {
pub fn new(num_pieces: u32) -> Self {
Self {
writers: (0..num_pieces).map(|_| AtomicU32::new(0)).collect(),
}
}
#[inline]
pub fn begin_write(&self, piece: u32) -> Option<WriteGuard<'_>> {
let counter = self.writers.get(piece as usize)?;
counter.fetch_add(1, Ordering::Relaxed);
Some(WriteGuard { counter })
}
#[allow(dead_code)]
#[inline]
pub fn is_idle(&self, piece: u32) -> bool {
self.writers
.get(piece as usize)
.is_some_and(|c| c.load(Ordering::Acquire) == 0)
}
}
#[derive(Debug)]
pub(crate) struct WriteGuard<'a> {
counter: &'a AtomicU32,
}
impl Drop for WriteGuard<'_> {
fn drop(&mut self) {
self.counter.fetch_sub(1, Ordering::Release);
}
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub(crate) struct InflightPiece {
pub peer: SocketAddr,
pub started: Instant,
}
#[derive(Debug)]
#[allow(dead_code)]
pub(crate) enum AcquireResult {
Reserved(u32),
Stolen { piece: u32, from_peer: SocketAddr },
NoneAvailable,
}
#[derive(Debug)]
pub(crate) enum AcquireResponse {
Acquired { piece: u32, total_blocks: u32 },
NoneAvailable,
}
#[derive(Debug, Clone)]
pub(crate) struct PieceOrderMap {
pub order: Vec<u32>,
pub generation: u64,
}
impl PieceOrderMap {
#[must_use]
pub fn build(
file_priorities: &[FilePriority],
file_piece_ranges: &[(u32, u32)],
num_pieces: u32,
generation: u64,
) -> Self {
assert_eq!(file_priorities.len(), file_piece_ranges.len());
if num_pieces == 0 {
return Self {
order: Vec::new(),
generation,
};
}
let mut order = Vec::with_capacity(num_pieces as usize);
let mut seen = Bitfield::new(num_pieces);
let mut file_indices: Vec<usize> = (0..file_priorities.len())
.filter(|&i| file_priorities[i] != FilePriority::Skip)
.collect();
file_indices.sort_by(|&a, &b| file_priorities[b].cmp(&file_priorities[a]));
for file_idx in file_indices {
let (first, last) = file_piece_ranges[file_idx];
if first > last {
continue;
}
Self::emit_file_pieces(first, last, &mut seen, &mut order);
}
Self { order, generation }
}
fn emit_file_pieces(first: u32, last: u32, seen: &mut Bitfield, order: &mut Vec<u32>) {
let range_len = last - first + 1;
if !seen.get(first) {
seen.set(first);
order.push(first);
}
if range_len > 1 && !seen.get(last) {
seen.set(last);
order.push(last);
}
if range_len > 2 {
for piece in (first + 1)..last {
if !seen.get(piece) {
seen.set(piece);
order.push(piece);
}
}
}
}
#[must_use]
pub fn empty() -> Self {
Self {
order: Vec::new(),
generation: 0,
}
}
}
pub(crate) type AvailabilitySnapshot = PieceOrderMap;
struct PeerCursorState {
cursor: usize,
order_gen: u64,
}
pub(crate) struct PieceTracker {
queue_pieces: Bitfield,
inflight: FxHashMap<u32, InflightPiece>,
cursors: FxHashMap<SocketAddr, PeerCursorState>,
pub(crate) last_cursor_resumed: bool,
}
#[allow(dead_code)]
impl PieceTracker {
pub fn new(num_pieces: u32, we_have: &Bitfield, wanted: &Bitfield) -> Self {
let mut queue_pieces = Bitfield::new(num_pieces);
for i in 0..num_pieces {
if wanted.get(i) && !we_have.get(i) {
queue_pieces.set(i);
}
}
Self {
queue_pieces,
inflight: FxHashMap::default(),
cursors: FxHashMap::default(),
last_cursor_resumed: false,
}
}
pub fn record_reservation(&mut self, piece: u32, peer: SocketAddr) {
self.queue_pieces.clear(piece);
self.inflight.insert(
piece,
InflightPiece {
peer,
started: Instant::now(),
},
);
}
#[must_use]
pub fn queue_intersects(&self, peer_bitfield: &Bitfield) -> bool {
self.queue_pieces.intersects(peer_bitfield)
}
pub fn acquire_piece(
&mut self,
peer: SocketAddr,
peer_bitfield: &Bitfield,
peer_avg_time: Option<Duration>,
order_map: &PieceOrderMap,
can_steal: impl Fn(u32) -> bool,
) -> AcquireResult {
if let Some(r) = self.try_steal(
peer,
peer_avg_time,
10.0,
|p| peer_bitfield.get(p),
&can_steal,
) {
return r;
}
self.last_cursor_resumed = false;
if self.queue_pieces.intersects(peer_bitfield) {
let cs = self.cursors.entry(peer).or_insert(PeerCursorState {
cursor: 0,
order_gen: order_map.generation,
});
if cs.order_gen != order_map.generation {
cs.cursor = 0;
cs.order_gen = order_map.generation;
}
let len = order_map.order.len();
if cs.cursor >= len {
cs.cursor = 0;
}
let start = cs.cursor;
if start > 0 {
self.last_cursor_resumed = true;
}
for i in (start..len).chain(0..start) {
let piece = order_map.order[i];
if !self.queue_pieces.get(piece) || !peer_bitfield.get(piece) {
continue;
}
cs.cursor = if i + 1 >= len { 0 } else { i + 1 };
return self.reserve_piece(piece, peer);
}
cs.cursor = 0;
}
if let Some(r) = self.try_steal(
peer,
peer_avg_time,
3.0,
|p| peer_bitfield.get(p),
&can_steal,
) {
return r;
}
AcquireResult::NoneAvailable
}
fn reserve_piece(&mut self, piece: u32, peer: SocketAddr) -> AcquireResult {
self.queue_pieces.clear(piece);
self.inflight.insert(
piece,
InflightPiece {
peer,
started: Instant::now(),
},
);
AcquireResult::Reserved(piece)
}
pub fn try_steal(
&mut self,
peer: SocketAddr,
peer_avg_time: Option<Duration>,
threshold: f64,
peer_has_piece: impl Fn(u32) -> bool,
can_steal: impl Fn(u32) -> bool,
) -> Option<AcquireResult> {
let my_avg = peer_avg_time?;
let min_elapsed = Duration::from_secs_f64(my_avg.as_secs_f64() * threshold);
let (piece, old_peer, _) = self
.inflight
.iter()
.filter(|(_, info)| info.peer != peer)
.filter(|(p, _)| peer_has_piece(**p))
.map(|(p, info)| (*p, info.peer, info.started.elapsed()))
.filter(|(_, _, elapsed)| *elapsed >= min_elapsed)
.max_by_key(|(_, _, elapsed)| *elapsed)?;
if !can_steal(piece) {
return None;
}
let info = self.inflight.get_mut(&piece)?;
info.peer = peer;
info.started = Instant::now();
Some(AcquireResult::Stolen {
piece,
from_peer: old_peer,
})
}
pub fn mark_piece_hash_ok(&mut self, piece: u32) {
self.inflight.remove(&piece);
}
pub fn mark_piece_hash_failed(&mut self, piece: u32) {
self.inflight.remove(&piece);
self.queue_pieces.set(piece);
}
pub fn return_to_queue(&mut self, piece: u32) {
self.inflight.remove(&piece);
self.queue_pieces.set(piece);
}
pub fn release_pieces_owned_by(&mut self, peer: &SocketAddr) -> usize {
let pieces: Vec<u32> = self
.inflight
.iter()
.filter(|(_, info)| &info.peer == peer)
.map(|(p, _)| *p)
.collect();
let count = pieces.len();
for piece in pieces {
self.inflight.remove(&piece);
self.queue_pieces.set(piece);
}
count
}
pub fn take_inflight(&mut self, piece: u32) -> Option<Duration> {
let info = self.inflight.remove(&piece)?;
Some(info.started.elapsed())
}
pub fn is_queued(&self, piece: u32) -> bool {
self.queue_pieces.get(piece)
}
pub fn queue_count(&self) -> u32 {
self.queue_pieces.count_ones()
}
pub fn inflight_count(&self) -> usize {
self.inflight.len()
}
pub fn mark_wanted(&mut self, piece: u32) {
if !self.inflight.contains_key(&piece) {
self.queue_pieces.set(piece);
}
}
pub fn mark_unwanted(&mut self, piece: u32) {
self.queue_pieces.clear(piece);
}
pub fn remove_peer_cursor(&mut self, peer: &SocketAddr) {
self.cursors.remove(peer);
}
#[cfg(test)]
pub fn insert_inflight_at(&mut self, piece: u32, peer: SocketAddr, started: Instant) {
self.queue_pieces.clear(piece);
self.inflight.insert(piece, InflightPiece { peer, started });
}
}
pub(crate) struct DirectDispatchState {
current_piece: Option<CurrentPiece>,
lengths: Lengths,
}
#[allow(dead_code)]
impl DirectDispatchState {
pub fn new(lengths: Lengths) -> Self {
Self {
current_piece: None,
lengths,
}
}
pub fn set_current_piece(&mut self, piece: u32, total_blocks: u32) {
self.current_piece = Some(CurrentPiece {
piece,
next_block: 0,
total_blocks,
});
}
pub fn has_blocks(&self) -> bool {
self.current_piece
.as_ref()
.is_some_and(|cp| cp.next_block < cp.total_blocks)
}
pub fn next_block(&mut self) -> Option<BlockRequest> {
let cp = self.current_piece.as_mut()?;
if cp.next_block >= cp.total_blocks {
self.current_piece = None;
return None;
}
let piece = cp.piece;
let block_idx = cp.next_block;
cp.next_block += 1;
Some(self.make_block_request(piece, block_idx))
}
fn make_block_request(&self, piece: u32, block_idx: u32) -> BlockRequest {
let (begin, length) = self
.lengths
.chunk_info(piece, block_idx)
.expect("block_idx out of range for piece");
BlockRequest {
piece,
begin,
length,
}
}
pub fn current_piece_index(&self) -> Option<u32> {
self.current_piece.as_ref().map(|cp| cp.piece)
}
pub fn clear_current_piece(&mut self) {
self.current_piece = None;
}
}
struct CasCurrentPiece {
piece: u32,
next_block: u32,
total_blocks: u32,
}
pub(crate) struct PeerDispatchState {
snapshot: Arc<AvailabilitySnapshot>,
cursor: usize,
current_piece: Option<CasCurrentPiece>,
lengths: Lengths,
block_maps: Option<Arc<BlockMaps>>,
steal_candidates: Option<Arc<StealCandidates>>,
piece_write_guards: Option<Arc<PieceWriteGuards>>,
}
#[allow(dead_code)]
impl PeerDispatchState {
pub fn new(snapshot: Arc<AvailabilitySnapshot>, lengths: Lengths) -> Self {
Self {
snapshot,
cursor: 0,
current_piece: None,
lengths,
block_maps: None,
steal_candidates: None,
piece_write_guards: None,
}
}
pub fn set_block_maps(&mut self, bm: Arc<BlockMaps>) {
self.block_maps = Some(bm);
}
pub fn set_steal_candidates(&mut self, sc: Arc<StealCandidates>) {
self.steal_candidates = Some(sc);
}
pub fn set_piece_write_guards(&mut self, pwg: Arc<PieceWriteGuards>) {
self.piece_write_guards = Some(pwg);
}
pub fn update_snapshot(&mut self, snapshot: Arc<AvailabilitySnapshot>) {
if snapshot.generation != self.snapshot.generation {
self.cursor = 0;
}
self.snapshot = snapshot;
}
pub fn next_block(
&mut self,
peer_bitfield: &Bitfield,
atomic_states: &AtomicPieceStates,
) -> Option<BlockRequest> {
const MAX_STEAL_ATTEMPTS: usize = 32;
if let Some(ref mut cp) = self.current_piece {
if cp.next_block < cp.total_blocks {
let piece = cp.piece;
let block_idx = cp.next_block;
cp.next_block += 1;
if let Some(ref bm) = self.block_maps {
bm.mark_requested(piece, block_idx);
}
return Some(self.make_block_request(piece, block_idx));
}
self.current_piece = None;
}
while self.cursor < self.snapshot.order.len() {
let piece = self.snapshot.order[self.cursor];
self.cursor += 1;
if !peer_bitfield.get(piece) {
continue;
}
if atomic_states.try_reserve(piece) {
let total_blocks = self.lengths.chunks_in_piece(piece);
if total_blocks == 0 {
continue;
}
if let Some(ref bm) = self.block_maps {
bm.mark_requested(piece, 0);
}
self.current_piece = Some(CasCurrentPiece {
piece,
next_block: 1,
total_blocks,
});
return Some(self.make_block_request(piece, 0));
}
}
if let (Some(bm), Some(sc)) = (&self.block_maps, &self.steal_candidates) {
for _ in 0..MAX_STEAL_ATTEMPTS {
let Some(piece) = sc.pop() else { break };
if !peer_bitfield.get(piece) {
sc.push(piece); continue;
}
let state = atomic_states.get(piece);
if state == PieceState::Complete || state == PieceState::Unwanted {
continue; }
if let Some(ref pwg) = self.piece_write_guards
&& !pwg.is_idle(piece)
{
sc.push(piece); continue;
}
let total_blocks = self.lengths.chunks_in_piece(piece);
if let Some(block_idx) = bm.next_unrequested(piece, total_blocks) {
let was_already_set = bm.mark_requested(piece, block_idx);
if was_already_set {
sc.push(piece);
continue;
}
if bm.next_unrequested(piece, total_blocks).is_some() {
sc.push(piece); }
return Some(self.make_block_request(piece, block_idx));
}
}
}
None }
fn make_block_request(&self, piece: u32, block_idx: u32) -> BlockRequest {
let (begin, length) = self
.lengths
.chunk_info(piece, block_idx)
.expect("block_idx out of range for piece");
BlockRequest {
piece,
begin,
length,
}
}
pub fn current_piece_index(&self) -> Option<u32> {
self.current_piece.as_ref().map(|cp| cp.piece)
}
pub fn clear_current_piece(&mut self) {
self.current_piece = None;
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
fn test_lengths() -> Lengths {
Lengths::new(128 * 1024, 32768, 16384)
}
fn test_lengths_4blocks() -> Lengths {
Lengths::new(256 * 1024, 64 * 1024, 16384)
}
fn all_pieces_wanted(num_pieces: u32) -> Bitfield {
let mut bf = Bitfield::new(num_pieces);
for i in 0..num_pieces {
bf.set(i);
}
bf
}
fn peer_has_all(num_pieces: u32) -> Bitfield {
all_pieces_wanted(num_pieces)
}
fn addr(port: u16) -> SocketAddr {
use std::net::{IpAddr, Ipv4Addr};
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port)
}
#[test]
fn atomic_reserve_release_cycle() {
let num_pieces = 8u32;
let we_have = Bitfield::new(num_pieces);
let wanted = all_pieces_wanted(num_pieces);
let states = AtomicPieceStates::new(num_pieces, &we_have, &wanted);
for i in 0..num_pieces {
assert_eq!(states.get(i), PieceState::Available);
}
assert!(states.try_reserve(3));
assert_eq!(states.get(3), PieceState::Reserved);
assert!(!states.try_reserve(3));
states.release(3);
assert_eq!(states.get(3), PieceState::Available);
assert!(states.try_reserve(3));
assert_eq!(states.get(3), PieceState::Reserved);
states.mark_complete(3);
assert_eq!(states.get(3), PieceState::Complete);
assert!(!states.try_reserve(3));
states.force_reserved(3);
assert_eq!(states.get(3), PieceState::Reserved);
}
#[test]
fn atomic_unwanted_transitions() {
let num_pieces = 4u32;
let we_have = Bitfield::new(num_pieces);
let mut wanted = Bitfield::new(num_pieces);
wanted.set(0);
wanted.set(1);
let states = AtomicPieceStates::new(num_pieces, &we_have, &wanted);
assert_eq!(states.get(0), PieceState::Available);
assert_eq!(states.get(1), PieceState::Available);
assert_eq!(states.get(2), PieceState::Unwanted);
assert_eq!(states.get(3), PieceState::Unwanted);
assert!(!states.try_reserve(2));
states.mark_available(2);
assert_eq!(states.get(2), PieceState::Available);
assert!(states.try_reserve(2));
states.mark_available(2);
assert_eq!(states.get(2), PieceState::Reserved);
}
#[test]
fn atomic_we_have_initialized_complete() {
let num_pieces = 4u32;
let mut we_have = Bitfield::new(num_pieces);
we_have.set(0);
we_have.set(2);
let wanted = all_pieces_wanted(num_pieces);
let states = AtomicPieceStates::new(num_pieces, &we_have, &wanted);
assert_eq!(states.get(0), PieceState::Complete);
assert_eq!(states.get(1), PieceState::Available);
assert_eq!(states.get(2), PieceState::Complete);
assert_eq!(states.get(3), PieceState::Available);
assert!(!states.try_reserve(0));
assert!(states.try_reserve(1));
}
#[test]
fn atomic_len() {
let states = AtomicPieceStates::new(42, &Bitfield::new(42), &all_pieces_wanted(42));
assert_eq!(states.len(), 42);
}
#[test]
fn atomic_snapshot_maps_all_variants_to_qbt_codes() {
let num_pieces = 5u32;
let mut we_have = Bitfield::new(num_pieces);
we_have.set(0);
let mut wanted = Bitfield::new(num_pieces);
wanted.set(0);
wanted.set(1);
wanted.set(2);
wanted.set(3);
let states = AtomicPieceStates::new(num_pieces, &we_have, &wanted);
assert!(states.try_reserve(2));
states.transition_to_endgame(3);
let snap = states.snapshot();
assert_eq!(snap.len(), 5);
assert_eq!(snap[0], 2, "Complete maps to 2");
assert_eq!(snap[1], 0, "Available maps to 0");
assert_eq!(snap[2], 1, "Reserved maps to 1");
assert_eq!(snap[3], 1, "Endgame maps to 1");
assert_eq!(snap[4], 0, "Unwanted maps to 0");
}
#[test]
fn atomic_snapshot_empty() {
let states = AtomicPieceStates::new(0, &Bitfield::new(0), &all_pieces_wanted(0));
assert!(states.snapshot().is_empty());
}
#[test]
fn atomic_cas_conflict_two_threads() {
use std::sync::Arc;
let states = Arc::new(AtomicPieceStates::new(
1,
&Bitfield::new(1),
&all_pieces_wanted(1),
));
let results: Vec<bool> = std::thread::scope(|s| {
let handles: Vec<_> = (0..2)
.map(|_| {
let st = Arc::clone(&states);
s.spawn(move || st.try_reserve(0))
})
.collect();
handles.into_iter().map(|h| h.join().unwrap()).collect()
});
let wins: usize = results.iter().filter(|&&r| r).count();
assert_eq!(wins, 1, "exactly one CAS should succeed, got {wins}");
assert_eq!(states.get(0), PieceState::Reserved);
}
#[test]
fn atomic_endgame_allows_multiple_reservations() {
let states = AtomicPieceStates::new(4, &Bitfield::new(4), &all_pieces_wanted(4));
assert!(states.try_reserve(1));
assert_eq!(states.get(1), PieceState::Reserved);
states.transition_to_endgame(1);
assert_eq!(states.get(1), PieceState::Endgame);
assert!(states.try_reserve(1));
assert!(states.try_reserve(1));
assert!(states.try_reserve(1));
assert_eq!(states.get(1), PieceState::Endgame);
}
#[test]
fn atomic_endgame_state_transitions() {
let states = AtomicPieceStates::new(4, &Bitfield::new(4), &all_pieces_wanted(4));
assert!(states.try_reserve(0));
assert!(states.try_reserve(1));
states.transition_to_endgame(0);
states.transition_to_endgame(1);
states.force_reserved(0);
assert_eq!(states.get(0), PieceState::Reserved);
states.release(1);
assert_eq!(states.get(1), PieceState::Available);
}
#[test]
fn peer_slab_insert_remove_lookup() {
let mut slab = PeerSlab::new();
let a1 = addr(1000);
let a2 = addr(2000);
let a3 = addr(3000);
let s1 = slab.insert(a1);
let s2 = slab.insert(a2);
let s3 = slab.insert(a3);
assert_eq!(slab.len(), 3);
assert_eq!(slab.get(s1), Some(&a1));
assert_eq!(slab.get(s2), Some(&a2));
assert_eq!(slab.slot_of(&a2), Some(s2));
assert!(slab.contains(s1));
let removed = slab.remove(s2);
assert_eq!(removed, a2);
assert_eq!(slab.len(), 2);
assert_eq!(slab.get(s2), None);
assert_eq!(slab.slot_of(&a2), None);
let slot = slab.remove_by_addr(&a3);
assert_eq!(slot, Some(s3));
assert_eq!(slab.len(), 1);
assert_eq!(slab.remove_by_addr(&a3), None);
}
#[test]
fn peer_slab_slot_reuse() {
let mut slab = PeerSlab::new();
let a1 = addr(1000);
let a2 = addr(2000);
let s1 = slab.insert(a1);
slab.remove(s1);
let s2 = slab.insert(a2);
assert_eq!(s2, s1); assert_eq!(slab.get(s2), Some(&a2));
}
#[test]
fn block_maps_mark_requested() {
let lengths = test_lengths(); let bm = BlockMaps::new(4, &lengths);
let was_set = bm.mark_requested(0, 0);
assert!(
!was_set,
"first mark_requested should return false (we claimed it)"
);
let was_set = bm.mark_requested(0, 0);
assert!(
was_set,
"second mark_requested should return true (already set)"
);
}
#[test]
fn block_maps_mark_received() {
let lengths = test_lengths(); let bm = BlockMaps::new(4, &lengths);
assert!(!bm.all_received(0, 2));
bm.mark_received(0, 0);
assert!(!bm.all_received(0, 2));
bm.mark_received(0, 1);
assert!(bm.all_received(0, 2));
}
#[test]
fn block_maps_next_unrequested() {
let lengths = test_lengths(); let bm = BlockMaps::new(4, &lengths);
assert_eq!(bm.next_unrequested(1, 2), Some(0));
bm.mark_requested(1, 0);
assert_eq!(bm.next_unrequested(1, 2), Some(1));
bm.mark_requested(1, 1);
assert_eq!(bm.next_unrequested(1, 2), None);
}
#[test]
fn block_maps_all_requested() {
let lengths = test_lengths(); let bm = BlockMaps::new(4, &lengths);
bm.mark_requested(2, 0);
bm.mark_requested(2, 1);
assert_eq!(bm.next_unrequested(2, 2), None);
}
#[test]
fn block_maps_all_received() {
let lengths = test_lengths(); let bm = BlockMaps::new(4, &lengths);
bm.mark_received(3, 0);
bm.mark_received(3, 1);
assert!(bm.all_received(3, 2));
}
#[test]
fn block_maps_partial_received() {
let lengths = test_lengths(); let bm = BlockMaps::new(4, &lengths);
bm.mark_received(1, 0);
assert!(
!bm.all_received(1, 2),
"should be false with only 1 of 2 blocks received"
);
}
#[test]
fn block_maps_clear_resets_bits() {
let lengths = test_lengths(); let bm = BlockMaps::new(4, &lengths);
bm.mark_requested(0, 0);
bm.mark_requested(0, 1);
bm.mark_received(0, 0);
bm.clear(0, 2);
assert_eq!(bm.next_unrequested(0, 2), Some(0));
assert!(!bm.all_received(0, 2));
assert!(!bm.mark_requested(0, 0));
}
#[test]
fn block_maps_independent_pieces() {
let lengths = test_lengths(); let bm = BlockMaps::new(4, &lengths);
bm.mark_requested(0, 0);
bm.mark_requested(0, 1);
assert_eq!(bm.next_unrequested(0, 2), None);
assert_eq!(bm.next_unrequested(1, 2), Some(0));
}
#[test]
fn steal_handles_duplicate_claim() {
let lengths = test_lengths_4blocks(); let num_pieces = 4u32;
let bm = Arc::new(BlockMaps::new(num_pieces, &lengths));
bm.mark_requested(0, 0);
bm.mark_requested(0, 1);
bm.mark_requested(0, 2);
let results: Vec<bool> = std::thread::scope(|s| {
let handles: Vec<_> = (0..2)
.map(|_| {
let bm_ref = Arc::clone(&bm);
s.spawn(move || bm_ref.mark_requested(0, 3))
})
.collect();
handles
.into_iter()
.map(|h| h.join().expect("thread panicked"))
.collect()
});
let winners = results.iter().filter(|&&was_set| !was_set).count();
let losers = results.iter().filter(|&&was_set| was_set).count();
assert_eq!(winners, 1, "exactly one thread should claim the block");
assert_eq!(
losers, 1,
"exactly one thread should see it already claimed"
);
assert!(
bm.mark_requested(0, 3),
"block 3 should be marked after the race"
);
}
#[test]
fn block_maps_concurrent_access() {
let lengths = test_lengths_4blocks(); let num_pieces = 4u32;
let total_blocks = lengths.chunks_in_piece(0);
let bm = Arc::new(BlockMaps::new(num_pieces, &lengths));
let claimed: Vec<Option<u32>> = std::thread::scope(|s| {
let handles: Vec<_> = (0..8)
.map(|_| {
let bm_ref = Arc::clone(&bm);
s.spawn(move || {
if let Some(block_idx) = bm_ref.next_unrequested(0, total_blocks) {
let was_set = bm_ref.mark_requested(0, block_idx);
if !was_set {
return Some(block_idx);
}
}
None
})
})
.collect();
handles
.into_iter()
.map(|h| h.join().expect("thread panicked"))
.collect()
});
let successful: Vec<u32> = claimed.into_iter().flatten().collect();
assert!(
successful.len() <= total_blocks as usize,
"cannot claim more blocks than exist: got {} claims for {} blocks",
successful.len(),
total_blocks
);
let unique: std::collections::HashSet<u32> = successful.iter().copied().collect();
assert_eq!(
unique.len(),
successful.len(),
"all claimed blocks must be unique"
);
for &idx in &successful {
assert!(
idx < total_blocks,
"claimed block index {idx} out of range (total={total_blocks})"
);
}
}
#[test]
fn write_guard_increments_counter() {
let guards = PieceWriteGuards::new(4);
assert!(guards.is_idle(0));
let _g = guards.begin_write(0);
assert!(!guards.is_idle(0));
}
#[test]
fn write_guard_drop_decrements() {
let guards = PieceWriteGuards::new(4);
{
let _g = guards.begin_write(0);
}
assert!(guards.is_idle(0));
}
#[test]
fn concurrent_writers_not_idle() {
let guards = PieceWriteGuards::new(4);
let g1 = guards.begin_write(0);
let _g2 = guards.begin_write(0);
drop(g1);
assert!(!guards.is_idle(0), "still one writer active");
}
#[test]
fn concurrent_writers_all_dropped_idle() {
let guards = PieceWriteGuards::new(4);
let g1 = guards.begin_write(0);
let g2 = guards.begin_write(0);
drop(g1);
drop(g2);
assert!(guards.is_idle(0));
}
#[test]
fn order_map_single_file_first_last_middle() {
let priorities = [FilePriority::Normal];
let ranges = [(0, 4)]; let map = PieceOrderMap::build(&priorities, &ranges, 5, 1);
assert_eq!(map.order, vec![0, 4, 1, 2, 3]);
assert_eq!(map.generation, 1);
}
#[test]
fn order_map_single_piece_file() {
let priorities = [FilePriority::Normal];
let ranges = [(0, 0)];
let map = PieceOrderMap::build(&priorities, &ranges, 1, 0);
assert_eq!(map.order, vec![0]);
}
#[test]
fn order_map_two_piece_file() {
let priorities = [FilePriority::Normal];
let ranges = [(0, 1)];
let map = PieceOrderMap::build(&priorities, &ranges, 2, 0);
assert_eq!(map.order, vec![0, 1]);
}
#[test]
fn order_map_high_before_normal() {
let priorities = [FilePriority::Normal, FilePriority::High];
let ranges = [(0, 2), (3, 5)];
let map = PieceOrderMap::build(&priorities, &ranges, 6, 0);
assert_eq!(map.order, vec![3, 5, 4, 0, 2, 1]);
}
#[test]
fn order_map_skip_excluded() {
let priorities = [
FilePriority::Normal,
FilePriority::Skip,
FilePriority::Normal,
];
let ranges = [(0, 1), (2, 3), (4, 5)];
let map = PieceOrderMap::build(&priorities, &ranges, 6, 0);
assert_eq!(map.order, vec![0, 1, 4, 5]);
}
#[test]
fn order_map_dedup_shared_pieces() {
let priorities = [FilePriority::High, FilePriority::Normal];
let ranges = [(0, 3), (2, 5)]; let map = PieceOrderMap::build(&priorities, &ranges, 6, 0);
assert_eq!(map.order, vec![0, 3, 1, 2, 5, 4]);
}
#[test]
fn order_map_empty_torrent() {
let priorities: [FilePriority; 0] = [];
let ranges: [(u32, u32); 0] = [];
let map = PieceOrderMap::build(&priorities, &ranges, 0, 0);
assert!(map.order.is_empty());
}
#[test]
fn order_map_all_files_skipped() {
let priorities = [FilePriority::Skip, FilePriority::Skip];
let ranges = [(0, 2), (3, 5)];
let map = PieceOrderMap::build(&priorities, &ranges, 6, 0);
assert!(map.order.is_empty());
}
#[test]
fn order_map_multi_file_same_priority() {
let priorities = [FilePriority::Normal, FilePriority::Normal];
let ranges = [(0, 2), (3, 5)];
let map = PieceOrderMap::build(&priorities, &ranges, 6, 0);
assert_eq!(map.order, vec![0, 2, 1, 3, 5, 4]);
}
#[test]
fn order_map_all_priorities() {
let priorities = [FilePriority::Low, FilePriority::High, FilePriority::Normal];
let ranges = [(0, 1), (2, 3), (4, 5)];
let map = PieceOrderMap::build(&priorities, &ranges, 6, 0);
assert_eq!(map.order, vec![2, 3, 4, 5, 0, 1]);
}
#[test]
fn tracker_new_queue_matches_wanted() {
let have = Bitfield::new(4);
let wanted = all_pieces_wanted(4);
let tracker = PieceTracker::new(4, &have, &wanted);
assert_eq!(tracker.queue_count(), 4);
for i in 0..4 {
assert!(tracker.is_queued(i));
}
}
#[test]
fn tracker_new_excludes_have() {
let mut have = Bitfield::new(4);
have.set(1);
have.set(3);
let wanted = all_pieces_wanted(4);
let tracker = PieceTracker::new(4, &have, &wanted);
assert_eq!(tracker.queue_count(), 2);
assert!(tracker.is_queued(0));
assert!(!tracker.is_queued(1));
assert!(tracker.is_queued(2));
assert!(!tracker.is_queued(3));
}
#[test]
fn tracker_record_reservation() {
let mut tracker = PieceTracker::new(4, &Bitfield::new(4), &all_pieces_wanted(4));
tracker.record_reservation(2, addr(1000));
assert!(!tracker.is_queued(2));
assert_eq!(tracker.inflight_count(), 1);
assert_eq!(tracker.queue_count(), 3);
}
#[test]
fn tracker_acquire_reserves_from_queue() {
let mut tracker = PieceTracker::new(4, &Bitfield::new(4), &all_pieces_wanted(4));
let bf = peer_has_all(4);
let order_map = PieceOrderMap::build(&[FilePriority::Normal], &[(0, 3)], 4, 0);
let result = tracker.acquire_piece(addr(1000), &bf, None, &order_map, |_| true);
match result {
AcquireResult::Reserved(p) => assert_eq!(p, 0),
_ => panic!("expected Reserved"),
}
assert!(!tracker.is_queued(0));
assert_eq!(tracker.inflight_count(), 1);
}
#[test]
fn tracker_acquire_skips_pieces_peer_lacks() {
let mut tracker = PieceTracker::new(4, &Bitfield::new(4), &all_pieces_wanted(4));
let mut bf = Bitfield::new(4);
bf.set(2);
let order_map = PieceOrderMap::build(&[FilePriority::Normal], &[(0, 3)], 4, 0);
let result = tracker.acquire_piece(addr(1000), &bf, None, &order_map, |_| true);
match result {
AcquireResult::Reserved(p) => assert_eq!(p, 2),
_ => panic!("expected Reserved(2)"),
}
}
#[test]
fn queue_intersects_true_when_peer_has_wanted_piece() {
let tracker = PieceTracker::new(8, &Bitfield::new(8), &all_pieces_wanted(8));
let mut peer_bf = Bitfield::new(8);
peer_bf.set(5);
assert!(tracker.queue_intersects(&peer_bf));
}
#[test]
fn queue_intersects_false_when_peer_has_no_wanted_piece() {
let mut have = Bitfield::new(8);
for i in 0..8 {
have.set(i);
}
let mut wanted = Bitfield::new(8);
wanted.set(3);
let tracker = PieceTracker::new(8, &have, &wanted);
assert!(!tracker.queue_intersects(&Bitfield::new(8)));
let mut peer_bf = Bitfield::new(8);
peer_bf.set(0);
assert!(!tracker.queue_intersects(&peer_bf));
}
#[test]
fn queue_intersects_false_after_reservation_drains_match() {
let mut tracker = PieceTracker::new(4, &Bitfield::new(4), &all_pieces_wanted(4));
for i in 1..4 {
tracker.mark_unwanted(i);
}
let mut peer_bf = Bitfield::new(4);
peer_bf.set(0);
assert!(tracker.queue_intersects(&peer_bf));
tracker.record_reservation(0, addr(1000));
assert!(
!tracker.queue_intersects(&peer_bf),
"queue should be empty after reservation drained the only match"
);
}
#[test]
fn acquire_phase2_short_circuits_without_intersect() {
let mut have = Bitfield::new(8);
have.set(5); let wanted = all_pieces_wanted(8);
let mut tracker = PieceTracker::new(8, &have, &wanted);
for i in 1..8 {
tracker.mark_unwanted(i);
}
assert_eq!(tracker.queue_count(), 1);
let mut peer_bf = Bitfield::new(8);
peer_bf.set(5);
let order_map = PieceOrderMap::build(&[FilePriority::Normal], &[(0, 7)], 8, 0);
assert!(!tracker.queue_intersects(&peer_bf));
let result = tracker.acquire_piece(addr(2000), &peer_bf, None, &order_map, |_| true);
match result {
AcquireResult::NoneAvailable => {}
other => {
panic!("expected NoneAvailable when peer bitfield has no intersect, got {other:?}")
}
}
}
#[test]
fn acquire_phase2_runs_when_intersect_present() {
let mut tracker = PieceTracker::new(8, &Bitfield::new(8), &all_pieces_wanted(8));
for i in 1..8 {
tracker.mark_unwanted(i);
}
let mut peer_bf = Bitfield::new(8);
peer_bf.set(0);
peer_bf.set(5);
let order_map = PieceOrderMap::build(&[FilePriority::Normal], &[(0, 7)], 8, 0);
assert!(tracker.queue_intersects(&peer_bf));
let result = tracker.acquire_piece(addr(3000), &peer_bf, None, &order_map, |_| true);
match result {
AcquireResult::Reserved(p) => assert_eq!(p, 0),
other => panic!("expected Reserved(0) when peer has piece 0 in queue, got {other:?}"),
}
}
#[test]
fn acquire_phase3_still_runs_when_no_intersect() {
let mut tracker = PieceTracker::new(4, &Bitfield::new(4), &all_pieces_wanted(4));
for i in 0..4 {
tracker.mark_unwanted(i);
}
assert_eq!(tracker.queue_count(), 0);
let slow = addr(1000);
let fast = addr(2000);
tracker.insert_inflight_at(
1,
slow,
Instant::now().checked_sub(Duration::from_secs(20)).unwrap(),
);
let mut peer_bf = Bitfield::new(4);
peer_bf.set(1);
let order_map = PieceOrderMap::empty();
assert!(!tracker.queue_intersects(&peer_bf));
let result = tracker.acquire_piece(
fast,
&peer_bf,
Some(Duration::from_secs(1)),
&order_map,
|_| true,
);
match result {
AcquireResult::Stolen { piece, from_peer } => {
assert_eq!(piece, 1);
assert_eq!(from_peer, slow);
}
other => panic!(
"expected Stolen (Phase 1) even with empty queue / no intersect, got {other:?}"
),
}
}
#[test]
fn tracker_steal_10x_threshold() {
let mut tracker = PieceTracker::new(4, &Bitfield::new(4), &all_pieces_wanted(4));
let slow = addr(1000);
let fast = addr(2000);
tracker.insert_inflight_at(
0,
slow,
Instant::now().checked_sub(Duration::from_secs(20)).unwrap(),
);
let bf = peer_has_all(4);
let order_map = PieceOrderMap::build(&[FilePriority::Normal], &[(0, 3)], 4, 0);
let result =
tracker.acquire_piece(fast, &bf, Some(Duration::from_secs(1)), &order_map, |_| {
true
});
match result {
AcquireResult::Stolen { piece, from_peer } => {
assert_eq!(piece, 0);
assert_eq!(from_peer, slow);
}
_ => panic!("expected Stolen"),
}
}
#[test]
fn tracker_steal_3x_threshold() {
let mut tracker = PieceTracker::new(2, &Bitfield::new(2), &all_pieces_wanted(2));
let slow = addr(1000);
let fast = addr(2000);
tracker.insert_inflight_at(
0,
slow,
Instant::now().checked_sub(Duration::from_secs(5)).unwrap(),
);
tracker.insert_inflight_at(
1,
slow,
Instant::now().checked_sub(Duration::from_secs(1)).unwrap(),
);
let bf = peer_has_all(2);
let order_map = PieceOrderMap::empty();
let result =
tracker.acquire_piece(fast, &bf, Some(Duration::from_secs(1)), &order_map, |_| {
true
});
match result {
AcquireResult::Stolen { piece, from_peer } => {
assert_eq!(piece, 0);
assert_eq!(from_peer, slow);
}
_ => panic!("expected Stolen from 3x threshold"),
}
}
#[test]
fn tracker_reserve_preferred_over_3x_steal() {
let mut tracker = PieceTracker::new(4, &Bitfield::new(4), &all_pieces_wanted(4));
let slow = addr(1000);
let fast = addr(2000);
tracker.insert_inflight_at(
0,
slow,
Instant::now().checked_sub(Duration::from_secs(5)).unwrap(),
);
let bf = peer_has_all(4);
let order_map = PieceOrderMap::build(&[FilePriority::Normal], &[(0, 3)], 4, 0);
let result =
tracker.acquire_piece(fast, &bf, Some(Duration::from_secs(1)), &order_map, |_| {
true
});
match result {
AcquireResult::Reserved(p) => assert_ne!(p, 0),
_ => panic!("expected Reserved (queue), not Stolen"),
}
}
#[test]
fn tracker_release_on_disconnect() {
let mut tracker = PieceTracker::new(4, &Bitfield::new(4), &all_pieces_wanted(4));
let peer = addr(1000);
tracker.record_reservation(0, peer);
tracker.record_reservation(2, peer);
assert_eq!(tracker.queue_count(), 2);
assert_eq!(tracker.inflight_count(), 2);
let released = tracker.release_pieces_owned_by(&peer);
assert_eq!(released, 2);
assert_eq!(tracker.queue_count(), 4);
assert_eq!(tracker.inflight_count(), 0);
assert!(tracker.is_queued(0));
assert!(tracker.is_queued(2));
}
#[test]
fn tracker_hash_ok_removes_from_inflight() {
let mut tracker = PieceTracker::new(4, &Bitfield::new(4), &all_pieces_wanted(4));
tracker.record_reservation(1, addr(1000));
tracker.mark_piece_hash_ok(1);
assert_eq!(tracker.inflight_count(), 0);
assert!(!tracker.is_queued(1));
}
#[test]
fn tracker_hash_failed_returns_to_queue() {
let mut tracker = PieceTracker::new(4, &Bitfield::new(4), &all_pieces_wanted(4));
tracker.record_reservation(1, addr(1000));
assert!(!tracker.is_queued(1));
tracker.mark_piece_hash_failed(1);
assert_eq!(tracker.inflight_count(), 0);
assert!(tracker.is_queued(1));
}
#[test]
fn tracker_steal_respects_can_steal() {
let mut tracker = PieceTracker::new(2, &Bitfield::new(2), &all_pieces_wanted(2));
tracker.insert_inflight_at(
0,
addr(1000),
Instant::now().checked_sub(Duration::from_secs(20)).unwrap(),
);
tracker.insert_inflight_at(
1,
addr(1000),
Instant::now().checked_sub(Duration::from_secs(20)).unwrap(),
);
let bf = peer_has_all(2);
let result = tracker.try_steal(
addr(2000),
Some(Duration::from_secs(1)),
10.0,
|p| bf.get(p),
|_| false,
);
assert!(result.is_none());
}
#[test]
fn tracker_steal_picks_slowest() {
let mut tracker = PieceTracker::new(4, &Bitfield::new(4), &all_pieces_wanted(4));
let slow1 = addr(1000);
let slow2 = addr(2000);
let fast = addr(3000);
tracker.insert_inflight_at(
0,
slow1,
Instant::now().checked_sub(Duration::from_secs(5)).unwrap(),
);
tracker.insert_inflight_at(
1,
slow2,
Instant::now().checked_sub(Duration::from_secs(15)).unwrap(),
);
tracker.insert_inflight_at(
2,
slow1,
Instant::now().checked_sub(Duration::from_secs(10)).unwrap(),
);
let bf = peer_has_all(4);
let result = tracker.try_steal(
fast,
Some(Duration::from_secs(1)),
3.0,
|p| bf.get(p),
|_| true,
);
match result {
Some(AcquireResult::Stolen { piece, from_peer }) => {
assert_eq!(piece, 1); assert_eq!(from_peer, slow2);
}
_ => panic!("expected Stolen"),
}
}
#[test]
fn tracker_no_steal_from_self() {
let mut tracker = PieceTracker::new(2, &Bitfield::new(2), &all_pieces_wanted(2));
let peer = addr(1000);
tracker.insert_inflight_at(
0,
peer,
Instant::now().checked_sub(Duration::from_secs(20)).unwrap(),
);
let bf = peer_has_all(2);
let result = tracker.try_steal(
peer,
Some(Duration::from_secs(1)),
3.0,
|p| bf.get(p),
|_| true,
);
assert!(result.is_none());
}
#[test]
fn tracker_no_steal_without_avg_time() {
let mut tracker = PieceTracker::new(2, &Bitfield::new(2), &all_pieces_wanted(2));
tracker.insert_inflight_at(
0,
addr(1000),
Instant::now().checked_sub(Duration::from_secs(20)).unwrap(),
);
let result = tracker.try_steal(addr(2000), None, 3.0, |_| true, |_| true);
assert!(result.is_none());
}
#[test]
fn tracker_take_inflight_returns_duration() {
let mut tracker = PieceTracker::new(4, &Bitfield::new(4), &all_pieces_wanted(4));
tracker.record_reservation(2, addr(1000));
let dur = tracker.take_inflight(2);
assert!(dur.is_some());
assert!(dur.unwrap() < Duration::from_secs(1));
assert_eq!(tracker.inflight_count(), 0);
}
#[test]
fn tracker_take_inflight_nonexistent() {
let tracker = PieceTracker::new(4, &Bitfield::new(4), &all_pieces_wanted(4));
assert!(!tracker.inflight.contains_key(&0));
}
#[test]
fn tracker_mark_wanted_unwanted() {
let mut tracker = PieceTracker::new(4, &Bitfield::new(4), &all_pieces_wanted(4));
assert!(tracker.is_queued(2));
tracker.mark_unwanted(2);
assert!(!tracker.is_queued(2));
tracker.mark_wanted(2);
assert!(tracker.is_queued(2));
}
#[test]
fn tracker_mark_wanted_no_override_inflight() {
let mut tracker = PieceTracker::new(4, &Bitfield::new(4), &all_pieces_wanted(4));
tracker.record_reservation(2, addr(1000));
tracker.mark_wanted(2);
assert!(!tracker.is_queued(2));
}
#[test]
fn dispatch_hot_path_sequential_blocks() {
let lengths = test_lengths(); let mut state = DirectDispatchState::new(lengths);
state.set_current_piece(0, 2);
let r1 = state.next_block();
assert_eq!(
r1,
Some(BlockRequest {
piece: 0,
begin: 0,
length: 16384
})
);
let r2 = state.next_block();
assert_eq!(
r2,
Some(BlockRequest {
piece: 0,
begin: 16384,
length: 16384
})
);
let r3 = state.next_block();
assert_eq!(r3, None);
state.set_current_piece(1, 2);
let r4 = state.next_block();
assert_eq!(
r4,
Some(BlockRequest {
piece: 1,
begin: 0,
length: 16384
})
);
}
#[test]
fn dispatch_no_piece_returns_none() {
let lengths = test_lengths();
let mut state = DirectDispatchState::new(lengths);
assert_eq!(state.next_block(), None);
}
#[test]
fn dispatch_has_blocks_tracks_state() {
let lengths = test_lengths(); let mut state = DirectDispatchState::new(lengths);
assert!(!state.has_blocks());
state.set_current_piece(0, 2);
assert!(state.has_blocks());
let _ = state.next_block();
assert!(state.has_blocks());
let _ = state.next_block();
assert!(!state.has_blocks());
}
#[test]
fn dispatch_clear_current_piece() {
let lengths = test_lengths();
let mut state = DirectDispatchState::new(lengths);
state.set_current_piece(0, 2);
assert_eq!(state.current_piece_index(), Some(0));
state.clear_current_piece();
assert_eq!(state.current_piece_index(), None);
assert_eq!(state.next_block(), None);
state.set_current_piece(1, 2);
let block = state.next_block();
assert!(block.is_some());
assert_eq!(block.unwrap().piece, 1);
}
#[test]
fn dispatch_set_current_piece_replaces_previous() {
let lengths = test_lengths(); let mut state = DirectDispatchState::new(lengths);
state.set_current_piece(0, 2);
let _ = state.next_block();
state.set_current_piece(2, 2);
let block = state.next_block();
assert_eq!(
block,
Some(BlockRequest {
piece: 2,
begin: 0,
length: 16384
})
);
}
#[test]
fn return_to_queue_makes_piece_acquirable() {
let num = 4u32;
let have = Bitfield::new(num);
let wanted = all_pieces_wanted(num);
let mut pt = PieceTracker::new(num, &have, &wanted);
let order = PieceOrderMap {
order: vec![0, 1, 2, 3],
generation: 0,
};
let bf = peer_has_all(num);
let a = addr(1);
let b = addr(2);
let r = pt.acquire_piece(a, &bf, None, &order, |_| true);
assert!(matches!(r, AcquireResult::Reserved(0)));
assert!(!pt.is_queued(0));
assert_eq!(pt.inflight_count(), 1);
pt.return_to_queue(0);
assert!(pt.is_queued(0));
assert_eq!(pt.inflight_count(), 0);
let r2 = pt.acquire_piece(b, &bf, None, &order, |_| true);
assert!(matches!(r2, AcquireResult::Reserved(0)));
}
#[test]
fn steal_fires_with_real_avg_time() {
let num = 4u32;
let have = Bitfield::new(num);
let wanted = all_pieces_wanted(num);
let mut pt = PieceTracker::new(num, &have, &wanted);
let order = PieceOrderMap {
order: vec![0, 1, 2, 3],
generation: 0,
};
let bf = peer_has_all(num);
let slow = addr(1);
let fast = addr(2);
let old_start = Instant::now().checked_sub(Duration::from_mins(1)).unwrap();
pt.insert_inflight_at(0, slow, old_start);
pt.insert_inflight_at(1, slow, old_start);
let avg = Duration::from_secs(2);
let r = pt.acquire_piece(fast, &bf, Some(avg), &order, |_| true);
assert!(matches!(r, AcquireResult::Stolen { piece: _, from_peer } if from_peer == slow));
}
#[test]
fn steal_disabled_with_none_avg() {
let num = 4u32;
let have = Bitfield::new(num);
let wanted = all_pieces_wanted(num);
let mut pt = PieceTracker::new(num, &have, &wanted);
let bf = peer_has_all(num);
let slow = addr(1);
let fast = addr(2);
let old_start = Instant::now().checked_sub(Duration::from_mins(1)).unwrap();
pt.insert_inflight_at(0, slow, old_start);
pt.insert_inflight_at(1, slow, old_start);
pt.insert_inflight_at(2, slow, old_start);
pt.insert_inflight_at(3, slow, old_start);
let order = PieceOrderMap {
order: vec![0, 1, 2, 3],
generation: 0,
};
let r = pt.acquire_piece(fast, &bf, None, &order, |_| true);
assert!(matches!(r, AcquireResult::NoneAvailable));
}
#[test]
fn cursor_resumes_from_last_position() {
let num = 4u32;
let have = Bitfield::new(num);
let wanted = all_pieces_wanted(num);
let mut pt = PieceTracker::new(num, &have, &wanted);
let order = PieceOrderMap {
order: vec![0, 1, 2, 3],
generation: 0,
};
let bf = peer_has_all(num);
let a = addr(1);
let r1 = pt.acquire_piece(a, &bf, None, &order, |_| true);
assert!(matches!(r1, AcquireResult::Reserved(0)));
assert!(!pt.last_cursor_resumed);
let r2 = pt.acquire_piece(a, &bf, None, &order, |_| true);
assert!(matches!(r2, AcquireResult::Reserved(1)));
assert!(pt.last_cursor_resumed);
let r3 = pt.acquire_piece(a, &bf, None, &order, |_| true);
assert!(matches!(r3, AcquireResult::Reserved(2)));
assert!(pt.last_cursor_resumed);
}
#[test]
fn cursor_wraps_around() {
let num = 4u32;
let have = Bitfield::new(num);
let mut wanted = Bitfield::new(num);
wanted.set(0);
wanted.set(3);
let mut pt = PieceTracker::new(num, &have, &wanted);
let order = PieceOrderMap {
order: vec![0, 1, 2, 3],
generation: 0,
};
let bf = peer_has_all(num);
let a = addr(1);
let r1 = pt.acquire_piece(a, &bf, None, &order, |_| true);
assert!(matches!(r1, AcquireResult::Reserved(0)));
let r2 = pt.acquire_piece(a, &bf, None, &order, |_| true);
assert!(matches!(r2, AcquireResult::Reserved(3)));
assert!(pt.last_cursor_resumed);
}
#[test]
fn cursor_resets_to_zero_after_full_cycle_no_match() {
let num = 4u32;
let have = Bitfield::new(num);
let wanted = all_pieces_wanted(num);
let mut pt = PieceTracker::new(num, &have, &wanted);
let order = PieceOrderMap {
order: vec![0, 1, 2, 3],
generation: 0,
};
let bf_has_none = Bitfield::new(num);
let mut bf_has_0 = Bitfield::new(num);
bf_has_0.set(0);
let a = addr(1);
let r1 = pt.acquire_piece(a, &bf_has_0, None, &order, |_| true);
assert!(matches!(r1, AcquireResult::Reserved(0)));
let r2 = pt.acquire_piece(a, &bf_has_none, None, &order, |_| true);
assert!(matches!(r2, AcquireResult::NoneAvailable));
}
#[test]
fn order_gen_change_resets_cursor() {
let num = 4u32;
let have = Bitfield::new(num);
let wanted = all_pieces_wanted(num);
let mut pt = PieceTracker::new(num, &have, &wanted);
let bf = peer_has_all(num);
let a = addr(1);
let order_g0 = PieceOrderMap {
order: vec![0, 1, 2, 3],
generation: 0,
};
let r1 = pt.acquire_piece(a, &bf, None, &order_g0, |_| true);
assert!(matches!(r1, AcquireResult::Reserved(0)));
let order_g1 = PieceOrderMap {
order: vec![1, 2, 3],
generation: 1,
};
let r2 = pt.acquire_piece(a, &bf, None, &order_g1, |_| true);
assert!(matches!(r2, AcquireResult::Reserved(1)));
assert!(!pt.last_cursor_resumed);
}
#[test]
fn remove_peer_cursor_cleanup() {
let num = 4u32;
let have = Bitfield::new(num);
let wanted = all_pieces_wanted(num);
let mut pt = PieceTracker::new(num, &have, &wanted);
let order = PieceOrderMap {
order: vec![0, 1, 2, 3],
generation: 0,
};
let bf = peer_has_all(num);
let a = addr(1);
let r1 = pt.acquire_piece(a, &bf, None, &order, |_| true);
assert!(matches!(r1, AcquireResult::Reserved(0)));
pt.remove_peer_cursor(&a);
let r2 = pt.acquire_piece(a, &bf, None, &order, |_| true);
assert!(matches!(r2, AcquireResult::Reserved(1)));
assert!(!pt.last_cursor_resumed);
}
#[test]
fn cursor_fairness_distribution() {
let num = 20u32;
let have = Bitfield::new(num);
let wanted = all_pieces_wanted(num);
let mut pt = PieceTracker::new(num, &have, &wanted);
let order = PieceOrderMap {
order: (0..num).collect(),
generation: 0,
};
let bf = peer_has_all(num);
let peers: Vec<SocketAddr> = (0..10).map(|i| addr(i + 1)).collect();
let mut acquired: std::collections::HashSet<u32> = std::collections::HashSet::new();
for _ in 0..100 {
for &p in &peers {
if let AcquireResult::Reserved(piece) =
pt.acquire_piece(p, &bf, None, &order, |_| true)
{
acquired.insert(piece);
}
}
}
assert_eq!(
acquired.len(),
num as usize,
"all 20 pieces should be acquired"
);
}
#[test]
fn phase3_steal_still_runs_after_cursor_walk_miss() {
let num = 4u32;
let have = Bitfield::new(num);
let wanted = all_pieces_wanted(num);
let mut pt = PieceTracker::new(num, &have, &wanted);
let slow = addr(1);
let fast = addr(2);
let old_start = Instant::now().checked_sub(Duration::from_mins(1)).unwrap();
pt.insert_inflight_at(0, slow, old_start);
pt.insert_inflight_at(1, slow, old_start);
pt.insert_inflight_at(2, slow, old_start);
pt.insert_inflight_at(3, slow, old_start);
let order = PieceOrderMap {
order: vec![0, 1, 2, 3],
generation: 0,
};
let bf = peer_has_all(num);
let avg = Duration::from_secs(2);
let r = pt.acquire_piece(fast, &bf, Some(avg), &order, |_| true);
assert!(matches!(r, AcquireResult::Stolen { .. }));
}
#[test]
fn empty_order_map_no_panic() {
let num = 4u32;
let have = Bitfield::new(num);
let wanted = all_pieces_wanted(num);
let mut pt = PieceTracker::new(num, &have, &wanted);
pt.cursors.insert(
addr(1),
PeerCursorState {
cursor: 10,
order_gen: 0,
},
);
let order = PieceOrderMap {
order: vec![],
generation: 0,
};
let bf = peer_has_all(num);
let r = pt.acquire_piece(addr(1), &bf, None, &order, |_| true);
assert!(matches!(r, AcquireResult::NoneAvailable));
}
}