use std::collections::BTreeSet;
use rustc_hash::FxHashMap;
use std::net::SocketAddr;
use irontide_storage::Bitfield;
#[cfg(test)]
use crate::piece_selector::InFlightPiece;
type PendingBlocks = Vec<(SocketAddr, Vec<(u32, u32, u32)>)>;
pub(crate) struct EndGame {
active: bool,
blocks: FxHashMap<(u32, u32), BlockEntry>,
}
struct BlockEntry {
length: u32,
peers: Vec<SocketAddr>,
}
impl EndGame {
pub fn new() -> Self {
Self {
active: false,
blocks: FxHashMap::default(),
}
}
pub fn is_active(&self) -> bool {
self.active
}
#[cfg_attr(not(test), allow(dead_code))]
pub fn activate(&mut self, pending: &PendingBlocks) {
self.active = true;
self.blocks.clear();
for (addr, requests) in pending {
for &(index, begin, length) in requests {
self.blocks
.entry((index, begin))
.or_insert_with(|| BlockEntry {
length,
peers: Vec::new(),
})
.peers
.push(*addr);
}
}
}
pub fn deactivate(&mut self) {
self.active = false;
self.blocks.clear();
}
#[cfg_attr(not(test), allow(dead_code))]
pub fn block_requesters(&self, index: u32, begin: u32) -> &[SocketAddr] {
self.blocks
.get(&(index, begin))
.map_or(&[], |e| e.peers.as_slice())
}
pub fn block_count(&self) -> usize {
self.blocks.len()
}
pub fn pick_block(
&self,
peer_addr: SocketAddr,
peer_has: &Bitfield,
) -> Option<(u32, u32, u32)> {
for (&(index, begin), entry) in &self.blocks {
if !peer_has.get(index) {
continue;
}
if entry.peers.contains(&peer_addr) {
continue;
}
return Some((index, begin, entry.length));
}
None
}
pub fn pick_block_strict(
&self,
peer_addr: SocketAddr,
peer_has: &Bitfield,
uncovered_pieces: &[u32],
) -> Option<(u32, u32, u32)> {
if !uncovered_pieces.is_empty() {
return None;
}
self.pick_block(peer_addr, peer_has)
}
pub fn block_received(
&mut self,
index: u32,
begin: u32,
from_peer: SocketAddr,
) -> Vec<(SocketAddr, u32, u32, u32)> {
let Some(entry) = self.blocks.remove(&(index, begin)) else {
return Vec::new();
};
entry
.peers
.into_iter()
.filter(|&addr| addr != from_peer)
.map(|addr| (addr, index, begin, entry.length))
.collect()
}
pub fn register_request(&mut self, index: u32, begin: u32, peer_addr: SocketAddr) {
if let Some(entry) = self.blocks.get_mut(&(index, begin))
&& !entry.peers.contains(&peer_addr)
{
entry.peers.push(peer_addr);
}
}
pub fn peer_disconnected(&mut self, addr: SocketAddr) {
for entry in self.blocks.values_mut() {
entry.peers.retain(|&a| a != addr);
}
}
pub fn remove_piece(&mut self, index: u32) {
self.blocks.retain(|&(pi, _), _| pi != index);
}
pub fn pick_block_streaming(
&self,
peer_addr: SocketAddr,
peer_has: &Bitfield,
streaming_pieces: &BTreeSet<u32>,
) -> Option<(u32, u32, u32)> {
for &piece in streaming_pieces {
if !peer_has.get(piece) {
continue;
}
for (&(idx, begin), entry) in &self.blocks {
if idx == piece && !entry.peers.contains(&peer_addr) {
return Some((idx, begin, entry.length));
}
}
}
self.pick_block(peer_addr, peer_has)
}
}
#[cfg(test)]
impl EndGame {
#[allow(dead_code)]
pub fn activate_with_inflight(
&mut self,
in_flight_pieces: &FxHashMap<u32, InFlightPiece>,
pending: &PendingBlocks,
) {
self.active = true;
self.blocks.clear();
for ifp in in_flight_pieces.values() {
for (&(index, begin), &addr) in &ifp.assigned_blocks {
self.blocks
.entry((index, begin))
.or_insert_with(|| BlockEntry {
length: irontide_core::DEFAULT_CHUNK_SIZE,
peers: Vec::new(),
})
.peers
.push(addr);
}
}
for (addr, requests) in pending {
for &(index, begin, length) in requests {
let entry = self
.blocks
.entry((index, begin))
.or_insert_with(|| BlockEntry {
length,
peers: Vec::new(),
});
if !entry.peers.contains(addr) {
entry.peers.push(*addr);
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
fn addr(port: u16) -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port)
}
#[test]
fn new_is_inactive() {
let eg = EndGame::new();
assert!(!eg.is_active());
}
#[test]
fn activate_populates_blocks_from_pending() {
let mut eg = EndGame::new();
let peer_a = addr(1);
let peer_b = addr(2);
let pending = vec![
(peer_a, vec![(0, 0, 16384), (0, 16384, 16384)]),
(peer_b, vec![(1, 0, 16384), (1, 16384, 16384)]),
];
eg.activate(&pending);
assert!(eg.is_active());
assert_eq!(eg.block_requesters(0, 0).len(), 1);
assert!(eg.block_requesters(0, 0).contains(&peer_a));
assert_eq!(eg.block_requesters(1, 0).len(), 1);
assert!(eg.block_requesters(1, 0).contains(&peer_b));
}
#[test]
fn deactivate_clears_state() {
let mut eg = EndGame::new();
let pending = vec![(addr(1), vec![(0, 0, 16384)])];
eg.activate(&pending);
assert!(eg.is_active());
eg.deactivate();
assert!(!eg.is_active());
assert_eq!(eg.block_requesters(0, 0).len(), 0);
}
#[test]
fn pick_block_returns_unassigned_block() {
let mut eg = EndGame::new();
let peer_a = addr(1);
let peer_b = addr(2);
let pending = vec![(peer_a, vec![(0, 0, 16384), (0, 16384, 16384)])];
eg.activate(&pending);
let mut peer_b_has = Bitfield::new(4);
peer_b_has.set(0);
let block = eg.pick_block(peer_b, &peer_b_has);
assert!(block.is_some());
let (idx, begin, len) = block.unwrap();
assert_eq!(idx, 0);
assert_eq!(len, 16384);
assert!(begin == 0 || begin == 16384);
}
#[test]
fn pick_block_skips_already_assigned_peer() {
let mut eg = EndGame::new();
let peer_a = addr(1);
let pending = vec![(peer_a, vec![(0, 0, 16384)])];
eg.activate(&pending);
let mut peer_a_has = Bitfield::new(4);
peer_a_has.set(0);
let block = eg.pick_block(peer_a, &peer_a_has);
assert!(block.is_none());
}
#[test]
fn pick_block_skips_piece_peer_lacks() {
let mut eg = EndGame::new();
let peer_a = addr(1);
let peer_b = addr(2);
let pending = vec![(peer_a, vec![(0, 0, 16384)])];
eg.activate(&pending);
let peer_b_has = Bitfield::new(4);
let block = eg.pick_block(peer_b, &peer_b_has);
assert!(block.is_none());
}
#[test]
fn pick_block_strict_blocks_when_piece_uncovered() {
let mut eg = EndGame::new();
let peer_a = addr(1);
let pending = vec![(peer_a, vec![(0, 0, 16384)])];
eg.activate(&pending);
let peer_b = addr(2);
let mut peer_b_has = Bitfield::new(4);
peer_b_has.set(0);
let block = eg.pick_block_strict(peer_b, &peer_b_has, &[1u32]);
assert!(block.is_none());
let block = eg.pick_block_strict(peer_b, &peer_b_has, &[]);
assert!(block.is_some());
}
#[test]
fn block_received_returns_cancel_targets() {
let mut eg = EndGame::new();
let peer_a = addr(1);
let peer_b = addr(2);
let peer_c = addr(3);
let pending = vec![
(peer_a, vec![(0, 0, 16384)]),
(peer_b, vec![(0, 0, 16384)]),
(peer_c, vec![(0, 0, 16384)]),
];
eg.activate(&pending);
let cancels = eg.block_received(0, 0, peer_a);
assert_eq!(cancels.len(), 2);
assert!(cancels.contains(&(peer_b, 0, 0, 16384)));
assert!(cancels.contains(&(peer_c, 0, 0, 16384)));
}
#[test]
fn block_received_removes_entry() {
let mut eg = EndGame::new();
let peer_a = addr(1);
let pending = vec![(peer_a, vec![(0, 0, 16384)])];
eg.activate(&pending);
let _ = eg.block_received(0, 0, peer_a);
assert_eq!(eg.block_requesters(0, 0).len(), 0);
}
#[test]
fn peer_disconnected_removes_from_all_entries() {
let mut eg = EndGame::new();
let peer_a = addr(1);
let peer_b = addr(2);
let pending = vec![(peer_a, vec![(0, 0, 16384)]), (peer_b, vec![(0, 0, 16384)])];
eg.activate(&pending);
assert_eq!(eg.block_requesters(0, 0).len(), 2);
eg.peer_disconnected(peer_a);
assert_eq!(eg.block_requesters(0, 0).len(), 1);
assert!(eg.block_requesters(0, 0).contains(&peer_b));
}
#[test]
fn remove_piece_clears_all_blocks_for_piece() {
let mut eg = EndGame::new();
let peer_a = addr(1);
let pending = vec![(
peer_a,
vec![(0, 0, 16384), (0, 16384, 16384), (1, 0, 16384)],
)];
eg.activate(&pending);
assert_eq!(eg.block_requesters(0, 0).len(), 1);
assert_eq!(eg.block_requesters(1, 0).len(), 1);
eg.remove_piece(0);
assert_eq!(eg.block_requesters(0, 0).len(), 0);
assert_eq!(eg.block_requesters(0, 16384).len(), 0);
assert_eq!(eg.block_requesters(1, 0).len(), 1);
}
#[test]
fn register_request_adds_peer() {
let mut eg = EndGame::new();
let peer_a = addr(1);
let peer_b = addr(2);
let pending = vec![(peer_a, vec![(0, 0, 16384)])];
eg.activate(&pending);
eg.register_request(0, 0, peer_b);
assert_eq!(eg.block_requesters(0, 0).len(), 2);
assert!(eg.block_requesters(0, 0).contains(&peer_b));
eg.register_request(0, 0, peer_b);
assert_eq!(eg.block_requesters(0, 0).len(), 2);
}
}