pub mod stats;
use std::collections::HashSet;
use std::net::SocketAddr;
use std::sync::{Arc, atomic::Ordering};
use librqbit_core::hash_id::Id20;
use librqbit_core::lengths::{ChunkInfo, ValidPieceIndex};
use peer_binary_protocol::{Message, Request};
use tokio::sync::{
Notify,
mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
};
use tracing::debug;
use crate::peer_connection::WriterRequest;
use crate::stream_connect::ConnectionKind;
use crate::type_aliases::BF;
use super::PeerStates;
pub(crate) type InflightRequest = ChunkInfo;
pub(crate) type PeerRx = UnboundedReceiver<WriterRequest>;
pub(crate) type PeerTx = UnboundedSender<WriterRequest>;
#[derive(Debug)]
pub(crate) struct Peer {
pub addr: SocketAddr,
state: PeerState,
pub stats: stats::atomic::PeerStats,
pub outgoing_address: Option<SocketAddr>,
}
impl Peer {
pub fn new_live_for_incoming_connection(
addr: SocketAddr,
peer_id: Id20,
tx: PeerTx,
counters: &PeerStates,
connection_kind: ConnectionKind,
) -> Self {
let state = PeerState::Live(LivePeerState::new(peer_id, tx, true, connection_kind));
for counter in [&counters.session_stats, &counters.stats] {
counter.inc(&state);
}
Self {
addr,
state,
stats: Default::default(),
outgoing_address: None,
}
}
pub fn new_with_outgoing_address(addr: SocketAddr) -> Self {
Self {
addr,
outgoing_address: Some(addr),
stats: Default::default(),
state: Default::default(),
}
}
pub(crate) fn reconnect_not_needed_peer(
&mut self,
counters: &PeerStates,
) -> Option<SocketAddr> {
if let PeerState::NotNeeded = self.get_state() {
match self.outgoing_address {
None => None,
Some(socket_addr) if self.addr == socket_addr => {
self.set_state(PeerState::Queued, counters);
Some(socket_addr)
}
Some(socket_addr) => {
debug!(
peer = %self.addr,
outgoing_addr = %socket_addr,
"peer will by retried on different address",
);
Some(socket_addr)
}
}
} else {
None
}
}
}
#[derive(Debug, Default)]
pub(crate) enum PeerState {
#[default]
Queued,
Connecting(PeerTx),
Live(LivePeerState),
Dead,
NotNeeded,
}
impl std::fmt::Display for PeerState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name())
}
}
impl PeerState {
pub fn name(&self) -> &'static str {
match self {
PeerState::Queued => "queued",
PeerState::Connecting(_) => "connecting",
PeerState::Live(_) => "live",
PeerState::Dead => "dead",
PeerState::NotNeeded => "not needed",
}
}
pub fn take_live_no_counters(self) -> Option<LivePeerState> {
match self {
PeerState::Live(l) => Some(l),
_ => None,
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum IncomingConnectionResult {
#[error("peer already active")]
AlreadyActive,
}
impl Peer {
pub fn get_state(&self) -> &PeerState {
&self.state
}
pub fn take_state(&mut self, counters: &PeerStates) -> PeerState {
self.set_state(Default::default(), counters)
}
pub fn destroy(self, counters: &PeerStates) {
for counter in [&counters.session_stats, &counters.stats] {
counter.dec(&self.state);
}
if let (Some(addr), PeerState::Live(..)) = (self.outgoing_address, &self.state) {
counters.live_outgoing_peers.write().remove(&addr);
}
}
pub fn set_state(&mut self, new: PeerState, counters: &PeerStates) -> PeerState {
for counter in [&counters.session_stats, &counters.stats] {
counter.incdec(&self.state, &new);
}
if let Some(addr) = self.outgoing_address {
if matches!(&self.state, PeerState::Live(..)) {
counters.live_outgoing_peers.write().remove(&addr);
}
if matches!(&new, PeerState::Live(..))
&& self
.stats
.counters
.outgoing_connections
.load(Ordering::Relaxed)
> 0
{
counters.live_outgoing_peers.write().insert(addr);
}
}
std::mem::replace(&mut self.state, new)
}
pub fn get_live(&self) -> Option<&LivePeerState> {
match &self.state {
PeerState::Live(l) => Some(l),
_ => None,
}
}
pub fn get_live_mut(&mut self) -> Option<&mut LivePeerState> {
match &mut self.state {
PeerState::Live(l) => Some(l),
_ => None,
}
}
pub fn idle_to_connecting(&mut self, counters: &PeerStates) -> Option<(PeerRx, PeerTx)> {
match &self.state {
PeerState::Queued | PeerState::NotNeeded => {
let (tx, rx) = unbounded_channel();
let tx_2 = tx.clone();
self.set_state(PeerState::Connecting(tx), counters);
Some((rx, tx_2))
}
_ => None,
}
}
pub fn incoming_connection(
&mut self,
peer_id: Id20,
tx: PeerTx,
counters: &PeerStates,
connection_kind: ConnectionKind,
) -> Result<(), IncomingConnectionResult> {
if matches!(&self.state, PeerState::Connecting(..) | PeerState::Live(..)) {
return Err(IncomingConnectionResult::AlreadyActive);
}
match self.take_state(counters) {
PeerState::Queued | PeerState::Dead | PeerState::NotNeeded => {
self.set_state(
PeerState::Live(LivePeerState::new(peer_id, tx, true, connection_kind)),
counters,
);
}
PeerState::Connecting(..) | PeerState::Live(..) => unreachable!(),
}
Ok(())
}
pub fn connecting_to_live(
&mut self,
peer_id: Id20,
counters: &PeerStates,
conn_kind: ConnectionKind,
) -> Option<&mut LivePeerState> {
if let PeerState::Connecting(_) = &self.state {
let tx = match self.take_state(counters) {
PeerState::Connecting(tx) => tx,
_ => unreachable!(),
};
self.set_state(
PeerState::Live(LivePeerState::new(peer_id, tx, false, conn_kind)),
counters,
);
self.get_live_mut()
} else {
None
}
}
pub fn set_not_needed(&mut self, counters: &PeerStates) -> PeerState {
self.set_state(PeerState::NotNeeded, counters)
}
}
#[derive(Debug)]
pub(crate) struct LivePeerState {
pub peer_interested: bool,
pub bitfield: BF,
inflight_requests: HashSet<InflightRequest>,
late_cancelled_request_tolerance: u32,
request_slots_changed: Arc<Notify>,
pub tx: PeerTx,
pub connection_kind: ConnectionKind,
}
impl LivePeerState {
pub fn new(
_peer_id: Id20,
tx: PeerTx,
initial_interested: bool,
connection_kind: ConnectionKind,
) -> Self {
LivePeerState {
peer_interested: initial_interested,
bitfield: BF::default(),
inflight_requests: Default::default(),
late_cancelled_request_tolerance: 0,
request_slots_changed: Default::default(),
tx,
connection_kind,
}
}
pub fn has_full_torrent(&self, total_pieces: usize) -> bool {
self.bitfield.get(0..total_pieces).is_some_and(|s| s.all())
}
pub fn request_slots_changed(&self) -> Arc<Notify> {
self.request_slots_changed.clone()
}
pub fn requested_inflight_count(&self) -> usize {
self.inflight_requests.len()
}
pub fn add_inflight_request(&mut self, chunk: ChunkInfo) -> bool {
self.inflight_requests.insert(chunk)
}
pub fn remove_inflight_request(&mut self, chunk: &ChunkInfo) -> RemoveInflightRequestResult {
if self.inflight_requests.remove(chunk) {
self.request_slots_changed.notify_waiters();
return RemoveInflightRequestResult::Expected;
}
if self.late_cancelled_request_tolerance > 0 {
self.late_cancelled_request_tolerance -= 1;
RemoveInflightRequestResult::LateCanceled
} else {
RemoveInflightRequestResult::Unexpected
}
}
pub fn cancel_inflight_requests_for_piece(&mut self, piece: ValidPieceIndex) {
let tx = &self.tx;
let late_cancelled_request_tolerance = &mut self.late_cancelled_request_tolerance;
let before = self.inflight_requests.len();
self.inflight_requests.retain(|req| {
if req.piece_index == piece {
let _ = tx.send(WriterRequest::Message(Message::Cancel(Request {
index: piece.get(),
begin: req.offset,
length: req.size,
})));
*late_cancelled_request_tolerance += 1;
false
} else {
true
}
});
if self.inflight_requests.len() != before {
self.request_slots_changed.notify_waiters();
}
}
pub fn inflight_requests(&self) -> impl Iterator<Item = &InflightRequest> {
self.inflight_requests.iter()
}
pub fn inflight_requests_debug(&self) -> &HashSet<InflightRequest> {
&self.inflight_requests
}
}
pub(crate) enum RemoveInflightRequestResult {
Expected,
LateCanceled,
Unexpected,
}