pub mod peer;
pub mod peers;
pub mod stats;
use std::{
borrow::Cow,
collections::HashSet,
net::{IpAddr, SocketAddr},
num::NonZeroU32,
sync::{
Arc,
atomic::{AtomicBool, AtomicU64, Ordering},
},
time::{Duration, Instant},
};
use anyhow::{Context, bail};
use buffers::{ByteBuf, ByteBufOwned};
use clone_to_owned::CloneToOwned;
use librqbit_core::{
constants::CHUNK_SIZE,
hash_id::Id20,
lengths::{ChunkInfo, Lengths, ValidPieceIndex},
spawn_utils::spawn_with_cancel,
speed_estimator::SpeedEstimator,
torrent_metainfo::ValidatedTorrentMetaV1Info,
};
use parking_lot::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
use peer_binary_protocol::{
Handshake, Message, Piece, Request,
extended::{
self, ExtendedMessage,
handshake::ExtendedHandshake,
ut_metadata::{UtMetadata, UtMetadataData},
ut_pex::UtPex,
},
};
use tokio::sync::{
Notify, OwnedSemaphorePermit, Semaphore,
mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
};
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, debug, debug_span, error, info, trace, warn};
use crate::{
Error,
chunk_tracker::{ChunkMarkingResult, ChunkTracker, HaveNeededSelected},
file_ops::FileOps,
limits::Limits,
peer_connection::{
PeerConnection, PeerConnectionHandler, PeerConnectionOptions, WriterRequest,
},
piece_tracker::{AcquireRequest, AcquireResult, PieceTracker},
session::CheckedIncomingConnection,
session_stats::SessionStats,
stream_connect::ConnectionKind,
torrent_state::{peer::Peer, utils::atomic_inc},
type_aliases::{BF, FilePriorities, FileStorage, PeerHandle},
};
use self::{
peer::{
PeerRx, PeerState, PeerTx, RemoveInflightRequestResult,
stats::{
atomic::PeerCountersAtomic as AtomicPeerCounters,
snapshot::{PeerStatsFilter, PeerStatsSnapshot},
},
},
peers::PeerStates,
stats::{atomic::AtomicStats, snapshot::StatsSnapshot},
};
use super::{
ManagedTorrentShared, TorrentMetadata,
paused::TorrentStatePaused,
streaming::TorrentStreams,
utils::{TimedExistence, timeit},
};
fn make_piece_bitfield(lengths: &Lengths) -> BF {
BF::from_boxed_slice(vec![0; lengths.piece_bitfield_bytes()].into_boxed_slice())
}
pub(crate) struct TorrentStateLocked {
pub(crate) pieces: Option<PieceTracker>,
file_priorities: FilePriorities,
fatal_errors_tx: Option<tokio::sync::oneshot::Sender<anyhow::Error>>,
unflushed_bitv_bytes: u64,
}
impl TorrentStateLocked {
pub(crate) fn get_chunks(&self) -> crate::Result<&ChunkTracker> {
self.pieces
.as_ref()
.map(|p| p.chunks())
.ok_or(Error::ChunkTrackerEmpty)
}
pub(crate) fn get_pieces(&self) -> crate::Result<&PieceTracker> {
self.pieces.as_ref().ok_or(Error::ChunkTrackerEmpty)
}
pub(crate) fn get_pieces_mut(&mut self) -> crate::Result<&mut PieceTracker> {
self.pieces.as_mut().ok_or(Error::ChunkTrackerEmpty)
}
fn try_flush_bitv(&mut self, shared: &ManagedTorrentShared, flush_async: bool) {
if self.unflushed_bitv_bytes == 0 {
return;
}
trace!("trying to flush bitfield");
if let Some(Err(e)) = self
.pieces
.as_mut()
.map(|pt| pt.flush_have_pieces(flush_async))
{
warn!(id=?shared.id, info_hash = ?shared.info_hash, "error flushing bitfield: {e:#}");
} else {
trace!("flushed bitfield");
self.unflushed_bitv_bytes = 0;
}
}
}
const FLUSH_BITV_EVERY_BYTES: u64 = 16 * 1024 * 1024;
pub enum AddIncomingPeerResult {
Added,
AlreadyActive,
ConcurrencyLimitReached,
}
pub struct TorrentStateLive {
peers: PeerStates,
pub(crate) shared: Arc<ManagedTorrentShared>,
metadata: Arc<TorrentMetadata>,
_locked: RwLock<TorrentStateLocked>,
pub(crate) files: FileStorage,
per_piece_locks: Vec<RwLock<()>>,
stats: AtomicStats,
lengths: Lengths,
peer_semaphore: Arc<Semaphore>,
peer_queue_tx: UnboundedSender<SocketAddr>,
finished_notify: Notify,
new_pieces_notify: Notify,
down_speed_estimator: SpeedEstimator,
up_speed_estimator: SpeedEstimator,
cancellation_token: CancellationToken,
session_stats: Arc<SessionStats>,
pub(crate) streams: Arc<TorrentStreams>,
have_broadcast_tx: tokio::sync::broadcast::Sender<ValidPieceIndex>,
ratelimit_upload_tx: tokio::sync::mpsc::UnboundedSender<(
tokio::sync::mpsc::UnboundedSender<WriterRequest>,
ChunkInfo,
)>,
ratelimits: Limits,
}
impl TorrentStateLive {
pub(crate) fn new(
paused: TorrentStatePaused,
fatal_errors_tx: tokio::sync::oneshot::Sender<anyhow::Error>,
cancellation_token: CancellationToken,
) -> anyhow::Result<Arc<Self>> {
let (peer_queue_tx, peer_queue_rx) = unbounded_channel();
let session = paused
.shared
.session
.upgrade()
.context("session is dead, cannot start torrent")?;
let session_stats = session.stats.clone();
let down_speed_estimator = SpeedEstimator::default();
let up_speed_estimator = SpeedEstimator::default();
let have_bytes = paused.chunk_tracker.get_hns().have_bytes;
let lengths = *paused.chunk_tracker.get_lengths();
let file_priorities = {
let mut pri = (0..paused.metadata.file_infos.len()).collect::<Vec<usize>>();
pri.sort_unstable_by_key(|id| {
paused
.metadata
.file_infos
.get(*id)
.map(|fi| fi.relative_filename.as_path())
});
pri
};
let (have_broadcast_tx, _) = tokio::sync::broadcast::channel(128);
let (ratelimit_upload_tx, ratelimit_upload_rx) = tokio::sync::mpsc::unbounded_channel::<(
tokio::sync::mpsc::UnboundedSender<WriterRequest>,
ChunkInfo,
)>();
let ratelimits = Limits::new(paused.shared.options.ratelimits);
let state = Arc::new(TorrentStateLive {
shared: paused.shared.clone(),
metadata: paused.metadata.clone(),
peers: PeerStates {
session_stats: session_stats.peers.clone(),
stats: Default::default(),
states: Default::default(),
live_outgoing_peers: Default::default(),
},
_locked: RwLock::new(TorrentStateLocked {
pieces: Some(PieceTracker::new(paused.chunk_tracker)),
file_priorities,
fatal_errors_tx: Some(fatal_errors_tx),
unflushed_bitv_bytes: 0,
}),
files: paused.files,
stats: AtomicStats {
have_bytes: AtomicU64::new(have_bytes),
..Default::default()
},
lengths,
peer_semaphore: Arc::new(Semaphore::new(
paused.shared.options.peer_limit.unwrap_or(128),
)),
new_pieces_notify: Notify::new(),
peer_queue_tx,
finished_notify: Notify::new(),
down_speed_estimator,
up_speed_estimator,
cancellation_token,
have_broadcast_tx,
session_stats,
streams: paused.streams,
per_piece_locks: (0..lengths.total_pieces())
.map(|_| RwLock::new(()))
.collect(),
ratelimit_upload_tx,
ratelimits,
});
state.spawn(
debug_span!(parent: state.shared.span.clone(), "speed_estimator_updater"),
format!("[{}]speed_estimator_updater", state.shared.id),
{
let state = Arc::downgrade(&state);
async move {
loop {
let state = match state.upgrade() {
Some(state) => state,
None => return Ok(()),
};
let now = Instant::now();
let stats = state.stats_snapshot();
let fetched = stats.fetched_bytes;
let remaining = state
.lock_read("get_remaining_bytes")
.get_chunks()?
.get_remaining_bytes();
state
.down_speed_estimator
.add_snapshot(fetched, Some(remaining), now);
state
.up_speed_estimator
.add_snapshot(stats.uploaded_bytes, None, now);
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
},
);
state.spawn(
debug_span!(parent: state.shared.span.clone(), "peer_adder"),
format!("[{}]peer_adder", state.shared.id),
state.clone().task_peer_adder(peer_queue_rx),
);
state.spawn(
debug_span!(parent: state.shared.span.clone(), "upload_scheduler"),
format!("[{}]upload_scheduler", state.shared.id),
state.clone().task_upload_scheduler(ratelimit_upload_rx),
);
Ok(state)
}
#[track_caller]
pub(crate) fn spawn(
&self,
span: tracing::Span,
name: impl Into<Cow<'static, str>>,
fut: impl std::future::Future<Output = crate::Result<()>> + Send + 'static,
) {
spawn_with_cancel(span, name, self.cancellation_token.clone(), fut);
}
pub fn down_speed_estimator(&self) -> &SpeedEstimator {
&self.down_speed_estimator
}
pub fn up_speed_estimator(&self) -> &SpeedEstimator {
&self.up_speed_estimator
}
pub(crate) fn add_incoming_peer(
self: &Arc<Self>,
checked_peer: CheckedIncomingConnection,
) -> anyhow::Result<AddIncomingPeerResult> {
use dashmap::mapref::entry::Entry;
let (tx, rx) = unbounded_channel();
let permit = match self.peer_semaphore.clone().try_acquire_owned() {
Ok(permit) => permit,
Err(_) => {
debug!("limit of live peers reached, dropping incoming peer");
self.peers.with_peer(checked_peer.addr, |p| {
atomic_inc(&p.stats.counters.incoming_connections);
});
return Ok(AddIncomingPeerResult::ConcurrencyLimitReached);
}
};
let counters = match self.peers.states.entry(checked_peer.addr) {
Entry::Occupied(mut occ) => {
let peer = occ.get_mut();
if let Err(e) = peer.incoming_connection(
checked_peer.handshake.peer_id,
tx.clone(),
&self.peers,
checked_peer.kind,
) {
match e {
peer::IncomingConnectionResult::AlreadyActive => {
debug!(
addr = %checked_peer.addr,
kind = %checked_peer.kind,
"peer already active, ignoring incoming connection"
);
return Ok(AddIncomingPeerResult::AlreadyActive);
}
}
}
peer.stats.counters.clone()
}
Entry::Vacant(vac) => {
atomic_inc(&self.peers.stats.seen);
let peer = Peer::new_live_for_incoming_connection(
*vac.key(),
checked_peer.handshake.peer_id,
tx.clone(),
&self.peers,
checked_peer.kind,
);
let counters = peer.stats.counters.clone();
vac.insert(peer);
counters
}
};
atomic_inc(&counters.incoming_connections);
self.spawn(
debug_span!(
parent: self.shared.span.clone(),
"manage_incoming_peer",
addr = %checked_peer.addr
),
format!(
"[{}][addr={}]manage_incoming_peer",
self.shared.id, checked_peer.addr
),
aframe!(
self.clone()
.task_manage_incoming_peer(checked_peer, counters, tx, rx, permit)
),
);
Ok(AddIncomingPeerResult::Added)
}
async fn task_upload_scheduler(
self: Arc<Self>,
mut rx: tokio::sync::mpsc::UnboundedReceiver<(
tokio::sync::mpsc::UnboundedSender<WriterRequest>,
ChunkInfo,
)>,
) -> crate::Result<()> {
while let Some((tx, ci)) = rx.recv().await {
tokio::select! {
_ = tx.closed() => {
continue;
}
res = self.ratelimits.prepare_for_upload(NonZeroU32::new(ci.size).unwrap()) => {
res?;
}
};
if let Some(session) = self.shared.session.upgrade() {
tokio::select! {
_ = tx.closed() => {
continue;
}
res = session.ratelimits.prepare_for_upload(NonZeroU32::new(ci.size).unwrap()) => {
res?;
}
}
}
let _ = tx.send(WriterRequest::ReadChunkRequest(ci));
}
Ok(())
}
async fn task_manage_incoming_peer(
self: Arc<Self>,
checked_peer: CheckedIncomingConnection,
counters: Arc<AtomicPeerCounters>,
tx: PeerTx,
rx: PeerRx,
permit: OwnedSemaphorePermit,
) -> crate::Result<()> {
let handler = PeerHandler {
addr: checked_peer.addr,
incoming: true,
on_bitfield_notify: Default::default(),
flow_control: Mutex::new(PeerFlowControl::default()),
state: self.clone(),
tx,
counters,
first_message_received: AtomicBool::new(false),
cancel_token: self.cancellation_token.child_token(),
client_name_and_version: self.shared.client_name_and_version().to_owned(),
};
let _token_guard = handler.cancel_token.clone().drop_guard();
let options = PeerConnectionOptions {
connect_timeout: self.shared.options.peer_connect_timeout,
read_write_timeout: self.shared.options.peer_read_write_timeout,
..Default::default()
};
let peer_connection = PeerConnection::new(
checked_peer.addr,
self.shared.info_hash,
self.shared.peer_id,
&handler,
Some(options),
self.shared.spawner.clone(),
self.shared.connector.clone(),
);
let requester = handler.task_peer_chunk_requester();
let res = tokio::select! {
r = requester => {r}
r = peer_connection.manage_peer_incoming(
rx,
checked_peer,
self.have_broadcast_tx.subscribe()
) => {r}
};
match res {
Ok(()) => {
handler.on_peer_died(None)?;
}
Err(e) => {
debug!("error managing peer: {:#}", e);
handler.on_peer_died(Some(e))?;
}
};
drop(permit);
Ok(())
}
async fn task_manage_outgoing_peer(
self: Arc<Self>,
addr: SocketAddr,
permit: OwnedSemaphorePermit,
) -> crate::Result<()> {
let state = self;
let (rx, tx) = state.peers.mark_peer_connecting(addr)?;
let counters = state
.peers
.with_peer(addr, |p| p.stats.counters.clone())
.ok_or(Error::BugPeerNotFound)?;
let handler = PeerHandler {
addr,
incoming: false,
on_bitfield_notify: Default::default(),
flow_control: Mutex::new(PeerFlowControl::default()),
state: state.clone(),
tx,
counters,
first_message_received: AtomicBool::new(false),
cancel_token: state.cancellation_token.child_token(),
client_name_and_version: state.shared.client_name_and_version().to_owned(),
};
let _token_guard = handler.cancel_token.clone().drop_guard();
let options = PeerConnectionOptions {
connect_timeout: state.shared.options.peer_connect_timeout,
read_write_timeout: state.shared.options.peer_read_write_timeout,
..Default::default()
};
let peer_connection = PeerConnection::new(
addr,
state.shared.info_hash,
state.shared.peer_id,
&handler,
Some(options),
state.shared.spawner.clone(),
state.shared.connector.clone(),
);
let requester = aframe!(
handler
.task_peer_chunk_requester()
.instrument(debug_span!("chunk_requester"))
);
let conn_manager = aframe!(
peer_connection
.manage_peer_outgoing(rx, state.have_broadcast_tx.subscribe())
.instrument(debug_span!("peer_connection"))
);
handler
.counters
.outgoing_connection_attempts
.fetch_add(1, Ordering::Relaxed);
let res = tokio::select! {
r = requester => {r}
r = conn_manager => {r}
};
match res {
Ok(()) => {
handler.on_peer_died(None)?;
}
Err(e) => {
debug!("error managing peer: {:#}", e);
handler.on_peer_died(Some(e))?;
}
}
drop(permit);
Ok(())
}
async fn task_peer_adder(
self: Arc<Self>,
mut peer_queue_rx: UnboundedReceiver<SocketAddr>,
) -> crate::Result<()> {
let state = self;
loop {
let addr = peer_queue_rx.recv().await.ok_or(Error::TorrentIsNotLive)?;
if state.shared.options.disable_upload() && state.is_finished_and_no_active_streams() {
debug!(?addr, "ignoring peer as we are finished");
state.peers.mark_peer_not_needed(addr);
continue;
}
let session = state
.shared
.session
.upgrade()
.ok_or(Error::SessionDestroyed)?;
if session.ipv4_only && addr.is_ipv6() {
debug!(?addr, "skipping ipv6 peer (ipv4_only=true)");
continue;
}
if addr.port() == 0 {
debug!(?addr, "skipping peer with port 0");
continue;
}
if session.blocklist.has(addr.ip()) {
session
.stats
.counters
.blocked_outgoing
.fetch_add(1, Ordering::Relaxed);
debug!(?addr, "blocked outgoing connection (by the blacklist)");
continue;
}
if session
.allowlist
.as_ref()
.is_some_and(|l| !l.has(addr.ip()))
{
session
.stats
.counters
.blocked_outgoing
.fetch_add(1, Ordering::Relaxed);
debug!(?addr, "blocked outgoing connection (by the allowlist)");
continue;
}
let permit = state.peer_semaphore.clone().acquire_owned().await?;
state.spawn(
debug_span!(parent: state.shared.span.clone(), "manage_peer", peer = ?addr),
format!("[{}][addr={addr}]manage_peer", state.shared.id),
aframe!(state.clone().task_manage_outgoing_peer(addr, permit)),
);
}
}
pub fn torrent(&self) -> &ManagedTorrentShared {
&self.shared
}
pub fn info(&self) -> &ValidatedTorrentMetaV1Info<ByteBufOwned> {
&self.metadata.info
}
pub fn info_hash(&self) -> Id20 {
self.shared.info_hash
}
pub fn peer_id(&self) -> Id20 {
self.shared.peer_id
}
pub(crate) fn file_ops(&self) -> FileOps<'_> {
FileOps::new(&self.metadata.info, &*self.files, &self.metadata.file_infos)
}
pub(crate) fn lock_read(
&self,
reason: &'static str,
) -> TimedExistence<RwLockReadGuard<'_, TorrentStateLocked>> {
TimedExistence::new(timeit(reason, || self._locked.read()), reason)
}
pub(crate) fn lock_write(
&self,
reason: &'static str,
) -> TimedExistence<RwLockWriteGuard<'_, TorrentStateLocked>> {
TimedExistence::new(timeit(reason, || self._locked.write()), reason)
}
fn set_peer_live(&self, handle: PeerHandle, h: Handshake, connection_kind: ConnectionKind) {
self.peers.with_peer_mut(handle, "set_peer_live", |p| {
p.connecting_to_live(h.peer_id, &self.peers, connection_kind);
});
}
pub fn get_uploaded_bytes(&self) -> u64 {
self.stats.uploaded_bytes.load(Ordering::Relaxed)
}
pub fn get_downloaded_bytes(&self) -> u64 {
self.stats
.downloaded_and_checked_bytes
.load(Ordering::Acquire)
}
pub fn get_approx_have_bytes(&self) -> u64 {
self.stats.have_bytes.load(Ordering::Relaxed)
}
pub fn get_hns(&self) -> Option<HaveNeededSelected> {
self.lock_read("get_hns")
.get_chunks()
.ok()
.map(|c| *c.get_hns())
}
fn transmit_haves(&self, index: ValidPieceIndex) {
let _ = self.have_broadcast_tx.send(index);
}
pub(crate) fn add_peer_if_not_seen(&self, addr: SocketAddr) -> crate::Result<bool> {
match self.peers.add_if_not_seen(addr) {
Some(handle) => handle,
None => return Ok(false),
};
self.peer_queue_tx
.send(addr)
.ok()
.ok_or(Error::TorrentIsNotLive)?;
Ok(true)
}
pub fn stats_snapshot(&self) -> StatsSnapshot {
use Ordering::*;
let downloaded_bytes = self.stats.downloaded_and_checked_bytes.load(Relaxed);
StatsSnapshot {
downloaded_and_checked_bytes: downloaded_bytes,
downloaded_and_checked_pieces: self.stats.downloaded_and_checked_pieces.load(Relaxed),
fetched_bytes: self.stats.fetched_bytes.load(Relaxed),
uploaded_bytes: self.stats.uploaded_bytes.load(Relaxed),
total_piece_download_ms: self.stats.total_piece_download_ms.load(Relaxed),
peer_stats: self.peers.stats(),
}
}
pub fn per_peer_stats_snapshot(&self, filter: PeerStatsFilter) -> PeerStatsSnapshot {
PeerStatsSnapshot {
peers: self
.peers
.states
.iter()
.filter(|e| filter.state.matches(e.value().get_state()))
.map(|e| (e.key().to_string(), e.value().into()))
.collect(),
}
}
pub async fn wait_until_completed(&self) {
if self.is_finished() {
return;
}
self.finished_notify.notified().await;
}
pub fn pause(&self) -> anyhow::Result<TorrentStatePaused> {
self.cancellation_token.cancel();
let mut g = self.lock_write("pause");
g.fatal_errors_tx.take();
let piece_tracker = g
.pieces
.take()
.context("bug: pausing already paused torrent")?;
let chunk_tracker = piece_tracker.into_chunks();
Ok(TorrentStatePaused {
shared: self.shared.clone(),
metadata: self.metadata.clone(),
files: self.files.take()?,
chunk_tracker,
streams: self.streams.clone(),
})
}
fn on_fatal_error(&self, e: anyhow::Error) -> anyhow::Result<()> {
let mut g = self.lock_write("fatal_error");
let tx = g
.fatal_errors_tx
.take()
.context("fatal_errors_tx already taken")?;
let res = anyhow::anyhow!("fatal error: {:?}", e);
if tx.send(e).is_err() {
warn!(id=self.shared.id, info_hash=?self.shared.info_hash, "there's nowhere to send fatal error, receiver is dead");
}
Err(res)
}
pub(crate) fn update_only_files(&self, only_files: &HashSet<usize>) -> anyhow::Result<()> {
let mut g = self.lock_write("update_only_files");
let pt = g.get_pieces_mut()?;
let hns = pt.update_only_files(&self.metadata.file_infos, only_files)?;
if !hns.finished() {
self.reconnect_all_not_needed_peers();
}
Ok(())
}
pub(crate) fn is_finished(&self) -> bool {
self.get_hns().map(|h| h.finished()).unwrap_or_default()
}
fn has_active_streams_unfinished_files(&self, state: &TorrentStateLocked) -> bool {
let chunks = match state.get_chunks() {
Ok(c) => c,
Err(_) => return false,
};
self.streams
.streamed_file_ids()
.any(|file_id| !chunks.is_file_finished(&self.metadata.file_infos[file_id]))
}
fn is_finished_and_no_active_streams(&self) -> bool {
self.is_finished()
&& !self.has_active_streams_unfinished_files(
&self.lock_read("is_finished_and_dont_need_peers"),
)
}
fn on_piece_completed(&self, id: ValidPieceIndex) -> anyhow::Result<()> {
if let Err(e) = self.files.on_piece_completed(id) {
debug!(?id, "file storage errored in on_piece_completed(): {e:#}");
}
let mut g = self.lock_write("on_piece_completed");
let locked = &mut **g;
let pieces = locked.get_pieces_mut()?;
for (idx, file_info) in self
.metadata
.file_infos
.iter()
.enumerate()
.skip_while(|(_, fi)| !fi.piece_range.contains(&id.get()))
.take_while(|(_, fi)| fi.piece_range.contains(&id.get()))
{
let _remaining = pieces.update_file_have_on_piece_completed(id, idx, file_info);
}
self.streams
.wake_streams_on_piece_completed(id, self.metadata.lengths());
locked.unflushed_bitv_bytes += self.metadata.lengths().piece_length(id) as u64;
if locked.unflushed_bitv_bytes >= FLUSH_BITV_EVERY_BYTES {
locked.try_flush_bitv(&self.shared, true)
}
let chunks = locked.get_chunks()?;
if chunks.is_finished() {
if chunks.get_selected_pieces()[id.get_usize()] {
locked.try_flush_bitv(&self.shared, false);
info!(id=self.shared.id, info_hash=?self.shared.info_hash, "torrent finished downloading");
}
self.finished_notify.notify_waiters();
if !self.has_active_streams_unfinished_files(locked) {
drop(g);
self.disconnect_all_peers_that_have_full_torrent();
}
}
Ok(())
}
fn disconnect_all_peers_that_have_full_torrent(&self) {
for mut pe in self.peers.states.iter_mut() {
if let PeerState::Live(l) = pe.value().get_state()
&& l.has_full_torrent(self.lengths.total_pieces() as usize)
{
let prev = pe.value_mut().set_not_needed(&self.peers);
let _ = prev
.take_live_no_counters()
.unwrap()
.tx
.send(WriterRequest::Disconnect(Ok(())));
}
}
}
pub(crate) fn reconnect_all_not_needed_peers(&self) {
self.peers
.states
.iter_mut()
.filter_map(|mut p| p.value_mut().reconnect_not_needed_peer(&self.peers))
.map(|socket_addr| self.peer_queue_tx.send(socket_addr))
.take_while(|r| r.is_ok())
.last();
}
async fn task_send_pex_to_peer(
self: Arc<Self>,
this_peer_addr: SocketAddr,
tx: PeerTx,
) -> anyhow::Result<()> {
const MAX_SENT_PEERS: usize = 50;
const PEX_MESSAGE_INTERVAL: Duration = Duration::from_secs(60);
let mut connected = Vec::with_capacity(MAX_SENT_PEERS);
let mut dropped = Vec::with_capacity(MAX_SENT_PEERS);
let mut peer_view_of_live_peers = HashSet::new();
tokio::time::sleep(Duration::from_secs(10)).await;
let mut interval = tokio::time::interval(PEX_MESSAGE_INTERVAL);
loop {
interval.tick().await;
if tx.is_closed() {
return Ok(());
}
{
let live_peers = self.peers.live_outgoing_peers.read();
connected.clear();
dropped.clear();
connected.extend(
live_peers
.difference(&peer_view_of_live_peers)
.take(MAX_SENT_PEERS)
.copied(),
);
dropped.extend(
peer_view_of_live_peers
.difference(&live_peers)
.take(MAX_SENT_PEERS)
.copied(),
);
}
trace!(connected_len = connected.len(), dropped_len = dropped.len());
let peer_ip_non_local = match this_peer_addr.ip() {
IpAddr::V4(a) => !a.is_loopback() && !a.is_private(),
IpAddr::V6(a) => !a.is_loopback() && !a.is_unique_local(),
};
let other_ip_is_local = |addr: &IpAddr| match addr {
IpAddr::V4(a) => a.is_loopback() || a.is_private(),
IpAddr::V6(a) => {
a.is_loopback() || a.is_unicast_link_local() || a.is_unique_local()
}
};
let filter = |addr: &SocketAddr| !(peer_ip_non_local && other_ip_is_local(&addr.ip()));
if !connected.is_empty() || !dropped.is_empty() {
let pex_msg = extended::ut_pex::UtPex::from_addrs(
connected.iter().copied(),
dropped.iter().copied(),
);
if tx.send(WriterRequest::UtPex(pex_msg)).is_err() {
return Ok(()); }
for addr in &dropped {
peer_view_of_live_peers.remove(addr);
}
peer_view_of_live_peers.extend(connected.iter().filter(|a| filter(a)).copied());
}
}
}
}
const DEFAULT_PEER_REQUEST_WINDOW: usize = 128;
struct PeerFlowControl {
i_am_choked: bool,
request_window: usize,
}
impl Default for PeerFlowControl {
fn default() -> Self {
Self {
i_am_choked: true,
request_window: DEFAULT_PEER_REQUEST_WINDOW,
}
}
}
struct PeerHandler {
state: Arc<TorrentStateLive>,
counters: Arc<AtomicPeerCounters>,
flow_control: Mutex<PeerFlowControl>,
on_bitfield_notify: Notify,
addr: SocketAddr,
incoming: bool,
tx: PeerTx,
first_message_received: AtomicBool,
cancel_token: CancellationToken,
client_name_and_version: String,
}
impl PeerConnectionHandler for &'_ PeerHandler {
fn on_connected(&self, connection_time: Duration) {
self.counters
.outgoing_connections
.fetch_add(1, Ordering::Relaxed);
#[allow(clippy::cast_possible_truncation)]
self.counters
.total_time_connecting_ms
.fetch_add(connection_time.as_millis() as u64, Ordering::Relaxed);
}
async fn on_received_message(&self, message: Message<'_>) -> anyhow::Result<()> {
if !matches!(&message, Message::Bitfield(..))
&& !self.first_message_received.swap(true, Ordering::Relaxed)
{
self.on_bitfield_notify.notify_waiters();
}
match message {
Message::Request(request) => {
self.on_download_request(request)
.context("on_download_request")?;
}
Message::Bitfield(b) => self
.on_bitfield(b.clone_to_owned(None))
.context("on_bitfield")?,
Message::Choke => self.on_i_am_choked(),
Message::Unchoke => self.on_i_am_unchoked(),
Message::Interested => self.on_peer_interested(),
Message::Piece(piece) => self
.on_received_piece(piece)
.await
.context("on_received_piece")?,
Message::KeepAlive => {
trace!("keepalive received");
}
Message::Have(h) => self.on_have(h),
Message::NotInterested => {
trace!("received \"not interested\", but we don't process it yet")
}
Message::Cancel(_) => {
trace!("received \"cancel\", but we don't process it yet")
}
Message::Extended(ExtendedMessage::UtMetadata(UtMetadata::Request(
metadata_piece_id,
))) => {
if self.state.metadata.info.info().private {
warn!(
id = self.state.shared.id,
info_hash = ?self.state.shared.info_hash,
"received noncompliant ut_metadata message from {}, ignoring",
self.addr
);
} else {
self.send_metadata_piece(metadata_piece_id)
.with_context(|| {
format!("error sending metadata piece {metadata_piece_id}")
})?;
}
}
Message::Extended(ExtendedMessage::UtPex(pex)) => {
if self.state.metadata.info.info().private {
warn!(
id = self.state.shared.id,
info_hash = ?self.state.shared.info_hash,
"received noncompliant PEX message from {}, ignoring",
self.addr
);
} else {
self.on_pex_message(pex);
}
}
message => {
warn!(
id = self.state.shared.id,
info_hash = ?self.state.shared.info_hash,
"received unsupported message {:?}, ignoring", message
);
}
};
Ok(())
}
fn serialize_bitfield_message_to_buf(&self, buf: &mut [u8]) -> anyhow::Result<usize> {
let g = self.state.lock_read("serialize_bitfield_message_to_buf");
let msg = Message::Bitfield(ByteBuf(g.get_chunks()?.get_have_pieces().as_bytes()));
let len = msg.serialize(buf, &Default::default)?;
trace!("sending: {:?}, length={}", &msg, len);
Ok(len)
}
fn on_handshake(&self, handshake: Handshake, ckind: ConnectionKind) -> anyhow::Result<()> {
self.state.set_peer_live(self.addr, handshake, ckind);
Ok(())
}
fn on_uploaded_bytes(&self, bytes: u32) {
self.counters
.uploaded_bytes
.fetch_add(bytes as u64, Ordering::Relaxed);
self.state
.stats
.uploaded_bytes
.fetch_add(bytes as u64, Ordering::Relaxed);
self.state
.session_stats
.counters
.uploaded_bytes
.fetch_add(bytes as u64, Ordering::Relaxed);
}
fn read_chunk(&self, chunk: &ChunkInfo, buf: &mut [u8]) -> anyhow::Result<()> {
self.state.file_ops().read_chunk(self.addr, chunk, buf)
}
fn on_extended_handshake(&self, hs: &ExtendedHandshake<ByteBuf>) -> anyhow::Result<()> {
if let Some(reqq) = hs.reqq.and_then(|reqq| usize::try_from(reqq).ok())
&& reqq > 0
{
let request_window = reqq.min(DEFAULT_PEER_REQUEST_WINDOW);
let mut flow = self.lock_flow_control("update request window");
if flow.request_window != request_window {
debug!(
reqq,
request_window, "updated peer request window from extended handshake"
);
flow.request_window = request_window;
self.notify_request_slots_changed();
}
}
if !self.state.metadata.info.info().private && hs.m.ut_pex.is_some() {
spawn_with_cancel(
debug_span!(
parent: self.state.shared.span.clone(),
"sending_pex_to_peer",
peer = ?self.addr,
),
format!(
"[{}][addr={}]sending_pex_to_peer",
self.state.shared.id, self.addr
),
self.cancel_token.clone(),
self.state
.clone()
.task_send_pex_to_peer(self.addr, self.tx.clone()),
);
}
if self.incoming
&& let Some(port) = hs.port()
{
let peer_ip = hs.ip_addr().unwrap_or(self.addr.ip());
let outgoing_addr = SocketAddr::new(peer_ip, port);
self.state
.peers
.with_peer_mut(self.addr, "update outgoing addr", |peer| {
peer.outgoing_address = Some(outgoing_addr)
});
}
Ok(())
}
fn should_send_bitfield(&self) -> bool {
if self.state.torrent().options.disable_upload() {
return false;
}
self.state.get_approx_have_bytes() > 0
}
fn should_transmit_have(&self, id: ValidPieceIndex) -> bool {
if self.state.shared.options.disable_upload() {
return false;
}
let have = self
.state
.peers
.with_live(self.addr, |l| {
l.bitfield.get(id.get_usize()).map(|p| *p).unwrap_or(true)
})
.unwrap_or(true);
!have
}
fn update_my_extended_handshake(
&self,
handshake: &mut ExtendedHandshake<ByteBuf>,
) -> anyhow::Result<()> {
let info_bytes = &self.state.metadata.info_bytes;
if !info_bytes.is_empty()
&& let Ok(len) = info_bytes.len().try_into()
{
handshake.metadata_size = Some(len);
}
Ok(())
}
fn client_name_and_version(&self) -> &str {
&self.client_name_and_version
}
}
impl PeerHandler {
fn on_peer_died(self, error: Option<crate::Error>) -> crate::Result<()> {
let peers = &self.state.peers;
let handle = self.addr;
let mut pe = match peers.states.get_mut(&handle) {
Some(peer) => TimedExistence::new(peer, "on_peer_died"),
None => {
warn!(
id = self.state.shared.id,
info_hash = ?self.state.shared.info_hash,
addr=?handle,
"bug: peer not found in table. Forgetting it forever"
);
return Ok(());
}
};
let prev = pe.value_mut().take_state(peers);
match prev {
PeerState::Connecting(_) => {}
PeerState::Live(live) => {
let mut g = self.state.lock_write("mark_chunk_requests_canceled");
let released = g.get_pieces_mut()?.release_pieces_owned_by(self.addr);
if released > 0 {
trace!(
"peer dead, released {} in-flight pieces back to queue",
released
);
}
let mut had_inflight = false;
for req in live.inflight_requests() {
had_inflight = true;
trace!(
"peer dead, marking chunk request cancelled, index={}, chunk={}",
req.piece_index.get(),
req.chunk_index
);
}
if released > 0 || had_inflight {
self.state.new_pieces_notify.notify_waiters();
}
}
PeerState::NotNeeded => {
pe.value_mut().set_state(PeerState::NotNeeded, peers);
return Ok(());
}
s @ PeerState::Queued | s @ PeerState::Dead => {
warn!(
id = self.state.shared.id,
info_hash = ?self.state.shared.info_hash,
addr = ?handle,
"bug: peer was in a wrong state {s:?}, ignoring it forever"
);
drop(pe);
self.state.peers.drop_peer(handle);
return Ok(());
}
};
let _error = match error {
Some(e) => e,
None => {
trace!("peer died without errors, not re-queueing");
pe.value_mut().set_state(PeerState::NotNeeded, peers);
return Ok(());
}
};
self.counters.errors.fetch_add(1, Ordering::Relaxed);
if self.state.is_finished_and_no_active_streams() {
debug!("torrent finished, not re-queueing");
pe.value_mut().set_state(PeerState::NotNeeded, peers);
return Ok(());
}
pe.value_mut().set_state(PeerState::Dead, peers);
if self.incoming {
debug!(
peer = handle.to_string(),
"incoming peer died, not re-queueing"
);
return Ok(());
}
let backoff = pe.value_mut().stats.backoff.next();
drop(pe);
if let Some(dur) = backoff {
if cfg!(feature = "_disable_reconnect_test") {
return Ok(());
}
self.state.clone().spawn(
debug_span!(
parent: self.state.shared.span.clone(),
"wait_for_peer",
peer = ?handle,
duration = format!("{dur:?}")
),
format!("[{}][addr={}]wait_for_peer", self.state.shared.id, handle),
async move {
trace!("waiting to reconnect again");
tokio::time::sleep(dur).await;
trace!("finished waiting");
let should_requeue = self
.state
.peers
.with_peer_mut(handle, "dead_to_queued", |peer| {
match peer.get_state() {
PeerState::Dead => {
peer.set_state(PeerState::Queued, &self.state.peers);
true
}
PeerState::Live(_)
| PeerState::Connecting(_)
| PeerState::Queued => {
trace!(
state = peer.get_state().name(),
"peer is no longer dead, skipping requeue"
);
false
}
PeerState::NotNeeded => false,
}
})
.unwrap_or(false);
if should_requeue {
self.state
.peer_queue_tx
.send(handle)
.ok()
.ok_or(Error::TorrentIsNotLive)?;
}
Ok::<_, Error>(())
},
);
} else {
debug!("dropping peer, backoff exhausted");
self.state.peers.drop_peer(handle);
};
Ok(())
}
fn acquire_next_piece(&self) -> crate::Result<Option<ValidPieceIndex>> {
if self.is_choked() {
debug!("we are choked, can't acquire piece");
return Ok(None);
}
let mut steal_info: Option<(SocketAddr, ValidPieceIndex)> = None;
let result = self
.state
.peers
.with_live_mut(self.addr, "acquire_next_piece", |live| {
let mut g = self.state.lock_write("acquire_next_piece");
let bf = &live.bitfield;
let TorrentStateLocked {
pieces,
file_priorities,
..
} = &mut **g;
let pieces = pieces.as_mut().ok_or(Error::ChunkTrackerEmpty)?;
let result = pieces.acquire_piece(AcquireRequest {
peer: self.addr,
peer_avg_time: self.counters.average_piece_download_time(),
priority_pieces: self.state.streams.iter_next_pieces(&self.state.lengths),
file_priorities,
file_infos: &self.state.metadata.file_infos,
peer_has_piece: |p| bf.get(p.get() as usize).map(|v| *v) == Some(true),
can_steal: |p| {
self.state.per_piece_locks[p.get_usize()]
.try_write()
.is_some()
},
});
match result {
AcquireResult::Reserved(piece) => {
trace!("reserved piece {}", piece);
Ok(Some(piece))
}
AcquireResult::Stolen { piece, from_peer } => {
debug!("stole piece {} from {}", piece, from_peer);
steal_info = Some((from_peer, piece));
Ok(Some(piece))
}
AcquireResult::NoneAvailable => Ok(None),
}
})
.transpose()
.map(|r| r.flatten());
if let Some((from_peer, piece)) = steal_info {
self.state.peers.on_steal(from_peer, self.addr, piece);
}
result
}
fn on_download_request(&self, request: Request) -> anyhow::Result<()> {
if self.state.torrent().options.disable_upload() {
anyhow::bail!("upload disabled, but peer requested a piece")
}
let piece_index = match self.state.lengths.validate_piece_index(request.index) {
Some(p) => p,
None => {
anyhow::bail!(
"received {:?}, but it is not a valid chunk request (piece index is invalid). Ignoring.",
request
);
}
};
let chunk_info = match self.state.lengths.chunk_info_from_received_data(
piece_index,
request.begin,
request.length,
) {
Some(d) => d,
None => {
anyhow::bail!(
"received {:?}, but it is not a valid chunk request (chunk data is invalid). Ignoring.",
request
);
}
};
if !self
.state
.lock_read("is_chunk_ready_to_upload")
.get_chunks()?
.is_chunk_ready_to_upload(&chunk_info)
{
anyhow::bail!(
"got request for a chunk that is not ready to upload. chunk {:?}",
&chunk_info
);
}
self.state
.ratelimit_upload_tx
.send((self.tx.clone(), chunk_info))?;
Ok(())
}
fn on_have(&self, have: u32) {
self.state
.peers
.with_live_mut(self.addr, "on_have", |live| {
if live.bitfield.is_empty() {
live.bitfield = make_piece_bitfield(&self.state.lengths);
}
match live.bitfield.get_mut(have as usize) {
Some(mut v) => *v = true,
None => {
warn!(
id = self.state.shared.id,
info_hash = ?self.state.shared.info_hash,
addr = ?self.addr,
"received have {} out of range",
have
);
return;
}
};
trace!("updated bitfield with have={}", have);
if let Some(true) = live
.bitfield
.get(..self.state.lengths.total_pieces() as usize)
.map(|s| s.all())
{
debug!("peer has full torrent");
}
});
self.on_bitfield_notify.notify_waiters();
}
fn on_bitfield(&self, bitfield: ByteBufOwned) -> anyhow::Result<()> {
if bitfield.as_ref().len() != self.state.lengths.piece_bitfield_bytes() {
anyhow::bail!(
"dropping peer as its bitfield has unexpected size. Got {}, expected {}",
bitfield.as_ref().len(),
self.state.lengths.piece_bitfield_bytes(),
);
}
let bf = BF::from_boxed_slice(bitfield.0.to_vec().into_boxed_slice());
if let Some(true) = bf
.get(..self.state.lengths.total_pieces() as usize)
.map(|s| s.all())
{
debug!("peer has full torrent");
}
self.state.peers.update_bitfield(self.addr, bf);
self.on_bitfield_notify.notify_waiters();
Ok(())
}
async fn wait_for_any_notify(&self, notify: &Notify, check: impl Fn() -> bool) {
loop {
let notified = notify.notified();
if check() {
return;
}
notified.await;
}
}
async fn wait_for_bitfield(&self) {
self.wait_for_any_notify(&self.on_bitfield_notify, || {
self.state
.peers
.with_live(self.addr, |live| !live.bitfield.is_empty())
.unwrap_or_default()
})
.await;
}
async fn wait_for_request_slot(&self) {
loop {
let Some(notify) = self.request_slots_changed() else {
return;
};
let notified = notify.notified();
if self.can_send_request() {
return;
}
notified.await;
}
}
async fn task_peer_chunk_requester(&self) -> crate::Result<()> {
let handle = self.addr;
self.wait_for_bitfield().await;
let mut update_interest = {
let mut current = false;
move |h: &PeerHandler, new_value: bool| -> crate::Result<()> {
if new_value != current {
h.tx.send(if new_value {
WriterRequest::Message(Message::Interested)
} else {
WriterRequest::Message(Message::NotInterested)
})
.ok()
.ok_or(Error::PeerTaskDead)?;
current = new_value;
}
Ok(())
}
};
loop {
if self.state.is_finished_and_no_active_streams() {
update_interest(self, false)?;
if self
.state
.peers
.is_peer_not_interested_and_has_full_torrent(
self.addr,
self.state.lengths.total_pieces() as usize,
)
{
debug!("nothing left to do, neither of us is interested, disconnecting peer");
self.tx
.send(WriterRequest::Disconnect(Ok(())))
.ok()
.ok_or(Error::PeerTaskDead)?;
tokio::time::sleep(Duration::from_millis(100)).await;
return Ok(());
} else {
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
}
update_interest(self, true)?;
aframe!(self.wait_for_request_slot()).await;
let new_piece_notify = self.state.new_pieces_notify.notified();
let next = match self.acquire_next_piece()? {
Some(next) => next,
None => {
debug!("no pieces to request");
match aframe!(tokio::time::timeout(
Duration::from_secs(5),
new_piece_notify
))
.await
{
Ok(()) => debug!("woken up, new pieces might be available"),
Err(_) => debug!("woken up by sleep timer"),
}
continue;
}
};
for chunk in self.state.lengths.iter_chunk_infos(next) {
let request = Request {
index: next.get(),
begin: chunk.offset,
length: chunk.size,
};
aframe!(self.wait_for_request_slot()).await;
self.state
.ratelimits
.prepare_for_download(NonZeroU32::new(request.length).unwrap())
.await?;
if let Some(session) = self.state.torrent().session.upgrade() {
session
.ratelimits
.prepare_for_download(NonZeroU32::new(request.length).unwrap())
.await?;
}
aframe!(self.wait_for_request_slot()).await;
match self
.state
.peers
.with_live_mut(handle, "add chunk request", |live| {
live.add_inflight_request(chunk)
}) {
Some(true) => {}
Some(false) => {
warn!(
id = self.state.shared.id,
info_hash = ?self.state.shared.info_hash,
addr = ?self.addr,
"we already requested {:?} previously",
chunk
);
continue;
}
None => return Ok(()),
};
if self
.tx
.send(WriterRequest::Message(Message::Request(request)))
.is_err()
{
return Ok(());
}
}
}
}
fn on_i_am_choked(&self) {
self.lock_flow_control("i_am_choked = true").i_am_choked = true;
self.notify_request_slots_changed();
}
fn on_peer_interested(&self) {
trace!("peer is interested");
self.state.peers.mark_peer_interested(self.addr, true);
}
fn on_i_am_unchoked(&self) {
trace!("we are unchoked");
self.lock_flow_control("i_am_choked = false").i_am_choked = false;
self.notify_request_slots_changed();
}
async fn on_received_piece(&self, piece: Piece<ByteBuf<'_>>) -> anyhow::Result<()> {
let piece_index = self
.state
.lengths
.validate_piece_index(piece.index)
.with_context(|| format!("peer sent an invalid piece {}", piece.index))?;
let chunk_info = match self.state.lengths.chunk_info_from_received_data(
piece_index,
piece.begin,
piece.len().try_into().context("bug")?,
) {
Some(i) => i,
None => {
anyhow::bail!("peer sent us an invalid piece {:?}", &piece,);
}
};
self.counters
.fetched_bytes
.fetch_add(piece.len() as u64, Ordering::Relaxed);
self.counters.fetched_chunks.fetch_add(1, Ordering::Relaxed);
let should_process = self
.state
.peers
.with_live_mut(self.addr, "inflight_requests.remove", |h| {
match h.remove_inflight_request(&chunk_info) {
RemoveInflightRequestResult::Expected => Ok(true),
RemoveInflightRequestResult::LateCanceled => {
trace!(?piece, "peer sent us a chunk we did not ask for");
Ok(false)
}
RemoveInflightRequestResult::Unexpected => anyhow::bail!(
"peer sent us a piece we did not ask. Inflight requests: {:?}. Got: {:?}",
h.inflight_requests_debug(),
&piece,
),
}
})
.context("peer not found")??;
if !should_process {
return Ok(());
}
self.state
.stats
.fetched_bytes
.fetch_add(piece.len() as u64, Ordering::Relaxed);
self.state
.session_stats
.counters
.fetched_bytes
.fetch_add(piece.len() as u64, Ordering::Relaxed);
fn write_to_disk(
state: &TorrentStateLive,
addr: PeerHandle,
counters: &AtomicPeerCounters,
piece: &Piece<ByteBuf<'_>>,
chunk_info: &ChunkInfo,
) -> anyhow::Result<()> {
let index = piece.index;
let ppl_guard = {
let g = state.lock_read("check_steal");
let ppl = state
.per_piece_locks
.get(piece.index as usize)
.map(|l| l.read());
match g.get_pieces()?.get_inflight(chunk_info.piece_index) {
Some(inflight) if inflight.peer == addr => {}
Some(inflight) => {
debug!(
"in-flight piece {} was stolen by {}, ignoring",
chunk_info.piece_index, inflight.peer
);
return Ok(());
}
None => {
debug!(
"in-flight piece {} not found. it was probably completed by someone else",
chunk_info.piece_index
);
return Ok(());
}
};
ppl
};
if !cfg!(feature = "_disable_disk_write_net_benchmark") {
match state.file_ops().write_chunk(addr, piece, chunk_info) {
Ok(()) => {}
Err(e) => {
error!(
id = state.shared.id,
info_hash = ?state.shared.info_hash,
"FATAL: error writing chunk to disk: {e:#}"
);
return state.on_fatal_error(e);
}
};
}
let full_piece_download_time = {
let mut g = state.lock_write("mark_chunk_downloaded");
let chunk_marking_result = g.get_pieces_mut()?.mark_chunk_downloaded(piece);
trace!(?piece, chunk_marking_result=?chunk_marking_result);
match chunk_marking_result {
Some(ChunkMarkingResult::Completed) => {
trace!("piece={} done, will write and checksum", piece.index);
g.get_pieces_mut()?.take_inflight(chunk_info.piece_index)
}
Some(ChunkMarkingResult::PreviouslyCompleted) => {
debug!("piece={} was done by someone else, ignoring", piece.index);
return Ok(());
}
Some(ChunkMarkingResult::NotCompleted) => None,
None => {
anyhow::bail!(
"bogus data received: {:?}, cannot map this to a chunk, dropping peer",
piece
);
}
}
};
drop(ppl_guard);
let full_piece_download_time = match full_piece_download_time {
Some(t) => t,
None => return Ok(()),
};
match state
.file_ops()
.check_piece(chunk_info.piece_index)
.with_context(|| format!("error checking piece={index}"))?
{
true => {
{
let mut g = state.lock_write("mark_piece_downloaded");
g.get_pieces_mut()?
.mark_piece_hash_ok(chunk_info.piece_index);
}
let piece_len = state.lengths.piece_length(chunk_info.piece_index) as u64;
state
.stats
.downloaded_and_checked_bytes
.fetch_add(piece_len, Ordering::Release);
state
.stats
.downloaded_and_checked_pieces
.fetch_add(1, Ordering::Release);
state
.stats
.have_bytes
.fetch_add(piece_len, Ordering::Relaxed);
#[allow(clippy::cast_possible_truncation)]
state.stats.total_piece_download_ms.fetch_add(
full_piece_download_time.as_millis() as u64,
Ordering::Relaxed,
);
counters.on_piece_completed(piece_len, full_piece_download_time);
state.peers.reset_peer_backoff(addr);
trace!(piece = index, "successfully downloaded and verified");
state.on_piece_completed(chunk_info.piece_index)?;
state.transmit_haves(chunk_info.piece_index);
}
false => {
warn!(
id = state.shared.id,
info_hash = ?state.shared.info_hash,
?addr,
"checksum for piece={} did not validate. disconnecting peer.", index
);
state
.lock_write("mark_piece_broken")
.get_pieces_mut()?
.mark_piece_hash_failed(chunk_info.piece_index);
state.new_pieces_notify.notify_waiters();
anyhow::bail!("i am probably a bogus peer. dying.")
}
};
Ok(())
}
self.state
.shared
.spawner
.block_in_place_with_semaphore(|| {
write_to_disk(&self.state, self.addr, &self.counters, &piece, &chunk_info)
})
.await
.with_context(|| format!("error processing received chunk {chunk_info:?}"))?;
Ok(())
}
fn send_metadata_piece(&self, piece_id: u32) -> anyhow::Result<()> {
let data = &self.state.metadata.info_bytes;
let metadata_size = data.len();
if metadata_size == 0 {
anyhow::bail!("peer requested for info metadata but we don't have it")
}
let total_pieces: usize = (metadata_size as u64)
.div_ceil(CHUNK_SIZE as u64)
.try_into()?;
if piece_id as usize > total_pieces {
bail!("piece out of bounds")
}
let offset = piece_id * CHUNK_SIZE;
let end = (offset + CHUNK_SIZE).min(data.len().try_into()?);
let total_size: u32 = data
.len()
.try_into()
.context("can't send metadata: len doesn't fit into u32")?;
let data = data.slice(offset as usize..end as usize);
self.tx
.send(WriterRequest::UtMetadata(UtMetadata::Data(
UtMetadataData::from_bytes(piece_id, total_size, data.into()),
)))
.context("error sending UtMetadata: channel closed")?;
Ok(())
}
fn on_pex_message(&self, msg: UtPex<ByteBuf<'_>>) {
msg.dropped_peers()
.chain(msg.added_peers())
.for_each(|peer| {
self.state
.add_peer_if_not_seen(peer.addr)
.map_err(|error| {
warn!(
id = self.state.shared.id,
info_hash = ?self.state.shared.info_hash,
?peer,
"failed to add peer: {error:#}"
);
error
})
.ok();
});
}
fn is_choked(&self) -> bool {
self.lock_flow_control("is_choked").i_am_choked
}
fn requested_inflight_count(&self) -> Option<usize> {
self.state
.peers
.with_live(self.addr, |live| live.requested_inflight_count())
}
fn can_send_request(&self) -> bool {
let (i_am_choked, request_window) = {
let flow = self.lock_flow_control("can_send_request");
(flow.i_am_choked, flow.request_window)
};
if i_am_choked {
return false;
}
self.requested_inflight_count()
.is_some_and(|requested| requested < request_window)
}
fn lock_flow_control(
&self,
reason: &'static str,
) -> TimedExistence<MutexGuard<'_, PeerFlowControl>> {
TimedExistence::new(timeit(reason, || self.flow_control.lock()), reason)
}
fn request_slots_changed(&self) -> Option<Arc<Notify>> {
self.state
.peers
.with_live(self.addr, |live| live.request_slots_changed())
}
fn notify_request_slots_changed(&self) {
if let Some(notify) = self.request_slots_changed() {
notify.notify_waiters();
}
}
}