use std::{
collections::{HashMap, HashSet},
time::{Duration, Instant},
};
use buffers::ByteBuf;
use librqbit_core::lengths::ValidPieceIndex;
use peer_binary_protocol::Piece;
use crate::{
chunk_tracker::{ChunkMarkingResult, ChunkTracker},
file_info::FileInfo,
type_aliases::{FileInfos, FilePriorities, PeerHandle},
};
#[derive(Debug, Clone)]
pub struct InflightPiece {
pub peer: PeerHandle,
pub started: Instant,
}
#[derive(Debug)]
pub enum AcquireResult {
Reserved(ValidPieceIndex),
Stolen {
piece: ValidPieceIndex,
from_peer: PeerHandle,
},
NoneAvailable,
}
pub struct AcquireRequest<'a, I, P, S>
where
I: Iterator<Item = ValidPieceIndex>,
P: Fn(ValidPieceIndex) -> bool,
S: Fn(ValidPieceIndex) -> bool,
{
pub peer: PeerHandle,
pub peer_avg_time: Option<Duration>,
pub priority_pieces: I,
pub file_priorities: &'a FilePriorities,
pub file_infos: &'a FileInfos,
pub peer_has_piece: P,
pub can_steal: S,
}
pub struct PieceTracker {
chunks: ChunkTracker,
inflight: HashMap<ValidPieceIndex, InflightPiece>,
}
impl PieceTracker {
pub fn new(chunks: ChunkTracker) -> Self {
Self {
chunks,
inflight: HashMap::new(),
}
}
pub fn chunks(&self) -> &ChunkTracker {
&self.chunks
}
pub fn into_chunks(mut self) -> ChunkTracker {
for piece in self.inflight.into_keys() {
self.chunks.mark_piece_broken_if_not_have(piece);
}
self.chunks
}
pub fn acquire_piece<I, P, S>(&mut self, mut req: AcquireRequest<I, P, S>) -> AcquireResult
where
I: Iterator<Item = ValidPieceIndex>,
P: Fn(ValidPieceIndex) -> bool,
S: Fn(ValidPieceIndex) -> bool,
{
if let Some(result) = self.try_steal(&req, 10.0) {
return result;
}
for piece in &mut req.priority_pieces {
if !self.chunks.is_piece_have(piece)
&& !self.inflight.contains_key(&piece)
&& (req.peer_has_piece)(piece)
{
return self.reserve_piece(piece, req.peer);
}
}
let queued: Vec<_> = self
.chunks
.iter_queued_pieces(req.file_priorities, req.file_infos)
.collect();
for piece in queued {
if (req.peer_has_piece)(piece) {
return self.reserve_piece(piece, req.peer);
}
}
if let Some(result) = self.try_steal(&req, 3.0) {
return result;
}
AcquireResult::NoneAvailable
}
fn reserve_piece(&mut self, piece: ValidPieceIndex, peer: PeerHandle) -> AcquireResult {
self.chunks.reserve_needed_piece(piece);
self.inflight.insert(
piece,
InflightPiece {
peer,
started: Instant::now(),
},
);
AcquireResult::Reserved(piece)
}
fn try_steal<I, P, S>(
&mut self,
req: &AcquireRequest<I, P, S>,
threshold: f64,
) -> Option<AcquireResult>
where
I: Iterator<Item = ValidPieceIndex>,
P: Fn(ValidPieceIndex) -> bool,
S: Fn(ValidPieceIndex) -> bool,
{
let my_avg = req.peer_avg_time?;
let min_elapsed = Duration::from_secs_f64(my_avg.as_secs_f64() * threshold);
let (piece, old_peer, _) = self
.inflight
.iter()
.filter(|(_, info)| info.peer != req.peer)
.filter(|(p, _)| (req.peer_has_piece)(**p))
.map(|(p, info)| (*p, info.peer, info.started.elapsed()))
.filter(|(_, _, elapsed)| *elapsed >= min_elapsed)
.max_by_key(|(_, _, elapsed)| *elapsed)?;
if !(req.can_steal)(piece) {
return None;
}
let info = self.inflight.get_mut(&piece)?;
info.peer = req.peer;
info.started = Instant::now();
Some(AcquireResult::Stolen {
piece,
from_peer: old_peer,
})
}
pub fn take_inflight(&mut self, piece: ValidPieceIndex) -> Option<Duration> {
let inflight = self.inflight.remove(&piece)?;
Some(inflight.started.elapsed())
}
pub fn mark_piece_hash_ok(&mut self, piece: ValidPieceIndex) {
self.chunks.mark_piece_downloaded(piece);
}
pub fn mark_piece_hash_failed(&mut self, piece: ValidPieceIndex) {
self.chunks.mark_piece_broken_if_not_have(piece);
}
pub fn release_pieces_owned_by(&mut self, peer: PeerHandle) -> usize {
let pieces_to_release: Vec<_> = self
.inflight
.iter()
.filter(|(_, info)| info.peer == peer)
.map(|(p, _)| *p)
.collect();
let count = pieces_to_release.len();
for piece in pieces_to_release {
self.inflight.remove(&piece);
self.chunks.mark_piece_broken_if_not_have(piece);
}
count
}
pub fn get_inflight(&self, piece: ValidPieceIndex) -> Option<&InflightPiece> {
self.inflight.get(&piece)
}
#[allow(dead_code)]
pub fn is_inflight(&self, piece: ValidPieceIndex) -> bool {
self.inflight.contains_key(&piece)
}
#[allow(dead_code)]
pub fn inflight_count(&self) -> usize {
self.inflight.len()
}
pub fn mark_chunk_downloaded(
&mut self,
piece: &Piece<ByteBuf<'_>>,
) -> Option<ChunkMarkingResult> {
self.chunks.mark_chunk_downloaded(piece)
}
pub fn update_only_files(
&mut self,
file_infos: &FileInfos,
new_only_files: &HashSet<usize>,
) -> anyhow::Result<crate::chunk_tracker::HaveNeededSelected> {
self.chunks.update_only_files(file_infos, new_only_files)
}
pub fn update_file_have_on_piece_completed(
&mut self,
piece_id: ValidPieceIndex,
file_id: usize,
file_info: &FileInfo,
) -> u64 {
self.chunks
.update_file_have_on_piece_completed(piece_id, file_id, file_info)
}
pub fn flush_have_pieces(&mut self, flush_async: bool) -> anyhow::Result<()> {
self.chunks.get_have_pieces_mut().flush(flush_async)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{bitv::BitV as BitVTrait, type_aliases::BF};
use librqbit_core::lengths::Lengths;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
fn peer(id: u8) -> PeerHandle {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, id)), 6881)
}
fn make_test_chunk_tracker(num_pieces: u32) -> ChunkTracker {
let piece_length = 16384u32; let total_length = piece_length as u64 * num_pieces as u64;
let lengths = Lengths::new(total_length, piece_length).unwrap();
let bf_len = lengths.piece_bitfield_bytes();
let have = BF::from_boxed_slice(vec![0u8; bf_len].into_boxed_slice());
let mut selected = BF::from_boxed_slice(vec![0u8; bf_len].into_boxed_slice());
for i in 0..num_pieces as usize {
selected.set(i, true);
}
let file_infos = vec![crate::file_info::FileInfo {
relative_filename: "test.dat".into(),
offset_in_torrent: 0,
len: total_length,
piece_range: 0..num_pieces,
attrs: Default::default(),
}];
ChunkTracker::new(have.into_dyn(), selected, lengths, &file_infos).unwrap()
}
fn make_test_file_infos(num_pieces: u32) -> FileInfos {
let piece_length = 16384u64;
vec![crate::file_info::FileInfo {
relative_filename: "test.dat".into(),
offset_in_torrent: 0,
len: piece_length * num_pieces as u64,
piece_range: 0..num_pieces,
attrs: Default::default(),
}]
}
fn make_default_file_priorities(file_infos: &FileInfos) -> FilePriorities {
(0..file_infos.len()).collect()
}
#[test]
fn test_new_piece_tracker() {
let chunks = make_test_chunk_tracker(10);
let tracker = PieceTracker::new(chunks);
assert_eq!(tracker.inflight_count(), 0);
}
#[test]
fn test_reserve_piece_from_queue() {
let chunks = make_test_chunk_tracker(5);
let mut tracker = PieceTracker::new(chunks);
let file_infos = make_test_file_infos(5);
let file_priorities = make_default_file_priorities(&file_infos);
let result = tracker.acquire_piece(AcquireRequest {
peer: peer(1),
peer_avg_time: None,
priority_pieces: std::iter::empty(),
file_priorities: &file_priorities,
file_infos: &file_infos,
peer_has_piece: |_| true, can_steal: |_| true,
});
match result {
AcquireResult::Reserved(piece) => {
assert_eq!(piece.get(), 0);
assert!(tracker.is_inflight(piece));
assert_eq!(tracker.inflight_count(), 1);
}
_ => panic!("Expected Reserved, got {:?}", result),
}
}
#[test]
fn test_reserve_filters_by_peer_has_piece() {
let chunks = make_test_chunk_tracker(5);
let mut tracker = PieceTracker::new(chunks);
let file_infos = make_test_file_infos(5);
let file_priorities = make_default_file_priorities(&file_infos);
let result = tracker.acquire_piece(AcquireRequest {
peer: peer(1),
peer_avg_time: None,
priority_pieces: std::iter::empty(),
file_priorities: &file_priorities,
file_infos: &file_infos,
peer_has_piece: |p| p.get() >= 2,
can_steal: |_| true,
});
match result {
AcquireResult::Reserved(piece) => {
assert!(piece.get() >= 2, "Should have gotten a piece >= 2");
}
_ => panic!("Expected Reserved, got {:?}", result),
}
}
#[test]
fn test_complete_piece() {
let chunks = make_test_chunk_tracker(5);
let mut tracker = PieceTracker::new(chunks);
let file_infos = make_test_file_infos(5);
let file_priorities = make_default_file_priorities(&file_infos);
let result = tracker.acquire_piece(AcquireRequest {
peer: peer(1),
peer_avg_time: None,
priority_pieces: std::iter::empty(),
file_priorities: &file_priorities,
file_infos: &file_infos,
peer_has_piece: |_| true,
can_steal: |_| true,
});
let piece = match result {
AcquireResult::Reserved(p) => p,
_ => panic!("Expected Reserved"),
};
let duration = tracker.take_inflight(piece);
assert!(duration.is_some());
assert!(!tracker.is_inflight(piece));
tracker.mark_piece_hash_ok(piece);
assert!(tracker.chunks().is_piece_have(piece));
}
#[test]
fn test_fail_piece_requeues() {
let chunks = make_test_chunk_tracker(5);
let mut tracker = PieceTracker::new(chunks);
let file_infos = make_test_file_infos(5);
let file_priorities = make_default_file_priorities(&file_infos);
let result = tracker.acquire_piece(AcquireRequest {
peer: peer(1),
peer_avg_time: None,
priority_pieces: std::iter::empty(),
file_priorities: &file_priorities,
file_infos: &file_infos,
peer_has_piece: |_| true,
can_steal: |_| true,
});
let piece = match result {
AcquireResult::Reserved(p) => p,
_ => panic!("Expected Reserved"),
};
assert!(tracker.is_inflight(piece));
let duration = tracker.take_inflight(piece);
assert!(duration.is_some());
tracker.mark_piece_hash_failed(piece);
assert!(!tracker.is_inflight(piece));
assert!(!tracker.chunks().is_piece_have(piece));
let result2 = tracker.acquire_piece(AcquireRequest {
peer: peer(2),
peer_avg_time: None,
priority_pieces: std::iter::empty(),
file_priorities: &file_priorities,
file_infos: &file_infos,
peer_has_piece: |p| p == piece, can_steal: |_| true,
});
match result2 {
AcquireResult::Reserved(p) => assert_eq!(p, piece),
_ => panic!("Expected piece to be re-reservable after fail"),
}
}
#[test]
fn test_release_pieces_owned_by_peer() {
let chunks = make_test_chunk_tracker(5);
let mut tracker = PieceTracker::new(chunks);
let file_infos = make_test_file_infos(5);
let file_priorities = make_default_file_priorities(&file_infos);
let peer_a = peer(1);
let peer_b = peer(2);
let piece_a1 = match tracker.acquire_piece(AcquireRequest {
peer: peer_a,
peer_avg_time: None,
priority_pieces: std::iter::empty(),
file_priorities: &file_priorities,
file_infos: &file_infos,
peer_has_piece: |_| true,
can_steal: |_| true,
}) {
AcquireResult::Reserved(p) => p,
_ => panic!("Expected Reserved"),
};
let piece_a2 = match tracker.acquire_piece(AcquireRequest {
peer: peer_a,
peer_avg_time: None,
priority_pieces: std::iter::empty(),
file_priorities: &file_priorities,
file_infos: &file_infos,
peer_has_piece: |_| true,
can_steal: |_| true,
}) {
AcquireResult::Reserved(p) => p,
_ => panic!("Expected Reserved"),
};
let piece_b = match tracker.acquire_piece(AcquireRequest {
peer: peer_b,
peer_avg_time: None,
priority_pieces: std::iter::empty(),
file_priorities: &file_priorities,
file_infos: &file_infos,
peer_has_piece: |_| true,
can_steal: |_| true,
}) {
AcquireResult::Reserved(p) => p,
_ => panic!("Expected Reserved"),
};
assert_eq!(tracker.inflight_count(), 3);
assert!(tracker.is_inflight(piece_a1));
assert!(tracker.is_inflight(piece_a2));
assert!(tracker.is_inflight(piece_b));
let released = tracker.release_pieces_owned_by(peer_a);
assert_eq!(released, 2);
assert_eq!(tracker.inflight_count(), 1);
assert!(tracker.is_inflight(piece_b));
assert!(!tracker.is_inflight(piece_a1));
assert!(!tracker.is_inflight(piece_a2));
}
#[test]
fn test_into_chunks_requeues_inflight() {
let chunks = make_test_chunk_tracker(5);
let mut tracker = PieceTracker::new(chunks);
let file_infos = make_test_file_infos(5);
let file_priorities = make_default_file_priorities(&file_infos);
tracker.acquire_piece(AcquireRequest {
peer: peer(1),
peer_avg_time: None,
priority_pieces: std::iter::empty(),
file_priorities: &file_priorities,
file_infos: &file_infos,
peer_has_piece: |_| true,
can_steal: |_| true,
});
tracker.acquire_piece(AcquireRequest {
peer: peer(1),
peer_avg_time: None,
priority_pieces: std::iter::empty(),
file_priorities: &file_priorities,
file_infos: &file_infos,
peer_has_piece: |_| true,
can_steal: |_| true,
});
assert_eq!(tracker.inflight_count(), 2);
let chunks = tracker.into_chunks();
let mut new_tracker = PieceTracker::new(chunks);
let result = new_tracker.acquire_piece(AcquireRequest {
peer: peer(2),
peer_avg_time: None,
priority_pieces: std::iter::empty(),
file_priorities: &file_priorities,
file_infos: &file_infos,
peer_has_piece: |_| true,
can_steal: |_| true,
});
match result {
AcquireResult::Reserved(p) => assert_eq!(p.get(), 0),
_ => panic!("Expected to reserve piece 0 after into_chunks"),
}
}
#[test]
fn test_priority_pieces_checked_first() {
let chunks = make_test_chunk_tracker(5);
let mut tracker = PieceTracker::new(chunks);
let file_infos = make_test_file_infos(5);
let file_priorities = make_default_file_priorities(&file_infos);
let piece3 = tracker
.chunks()
.get_lengths()
.validate_piece_index(3)
.unwrap();
let piece2 = tracker
.chunks()
.get_lengths()
.validate_piece_index(2)
.unwrap();
let priority = vec![piece3, piece2];
let result = tracker.acquire_piece(AcquireRequest {
peer: peer(1),
peer_avg_time: None,
priority_pieces: priority.into_iter(),
file_priorities: &file_priorities,
file_infos: &file_infos,
peer_has_piece: |_| true,
can_steal: |_| true,
});
match result {
AcquireResult::Reserved(p) => assert_eq!(p.get(), 3),
_ => panic!("Expected Reserved(3), got {:?}", result),
}
}
#[test]
fn test_none_available_when_no_pieces() {
let chunks = make_test_chunk_tracker(5);
let mut tracker = PieceTracker::new(chunks);
let file_infos = make_test_file_infos(5);
let file_priorities = make_default_file_priorities(&file_infos);
let result = tracker.acquire_piece(AcquireRequest {
peer: peer(1),
peer_avg_time: None,
priority_pieces: std::iter::empty(),
file_priorities: &file_priorities,
file_infos: &file_infos,
peer_has_piece: |_| false, can_steal: |_| true,
});
match result {
AcquireResult::NoneAvailable => {}
_ => panic!("Expected NoneAvailable, got {:?}", result),
}
}
#[test]
fn test_take_inflight_nonexistent_piece_returns_none() {
let chunks = make_test_chunk_tracker(5);
let mut tracker = PieceTracker::new(chunks);
let piece = tracker
.chunks()
.get_lengths()
.validate_piece_index(0)
.unwrap();
let result = tracker.take_inflight(piece);
assert!(result.is_none());
}
#[test]
fn test_steal_only_pieces_peer_has() {
let chunks = make_test_chunk_tracker(5);
let mut tracker = PieceTracker::new(chunks);
let file_infos = make_test_file_infos(5);
let file_priorities = make_default_file_priorities(&file_infos);
let peer_a = peer(1);
let peer_b = peer(2);
let piece_0 = match tracker.acquire_piece(AcquireRequest {
peer: peer_a,
peer_avg_time: None,
priority_pieces: std::iter::empty(),
file_priorities: &file_priorities,
file_infos: &file_infos,
peer_has_piece: |_| true,
can_steal: |_| true,
}) {
AcquireResult::Reserved(p) => {
assert_eq!(p.get(), 0);
p
}
_ => panic!("Expected Reserved"),
};
let piece_4 = match tracker.acquire_piece(AcquireRequest {
peer: peer_a,
peer_avg_time: None,
priority_pieces: std::iter::empty(),
file_priorities: &file_priorities,
file_infos: &file_infos,
peer_has_piece: |_| true,
can_steal: |_| true,
}) {
AcquireResult::Reserved(p) => {
assert_eq!(p.get(), 4);
p
}
_ => panic!("Expected Reserved"),
};
std::thread::sleep(Duration::from_millis(5));
let result = tracker.acquire_piece(AcquireRequest {
peer: peer_b,
peer_avg_time: Some(Duration::from_millis(1)),
priority_pieces: std::iter::empty(),
file_priorities: &file_priorities,
file_infos: &file_infos,
peer_has_piece: |p| p.get() == 4, can_steal: |_| true,
});
match result {
AcquireResult::Stolen { piece, from_peer } => {
assert_eq!(piece, piece_4, "Should steal piece 4 (the one peer B has)");
assert_eq!(from_peer, peer_a);
assert_eq!(tracker.get_inflight(piece_0).unwrap().peer, peer_a);
}
_ => panic!("Expected Stolen, got {:?}", result),
}
}
}