use std::{
collections::HashMap,
net::SocketAddr,
sync::Arc,
time::{Duration, Instant},
};
use futures::{
select,
stream::{Fuse, StreamExt},
};
use tokio::{
net::{TcpListener, TcpStream},
sync::{
mpsc::{self, UnboundedReceiver, UnboundedSender},
RwLock,
},
task, time,
};
use crate::{
alert::{Alert, AlertSender},
conf::TorrentConf,
counter::ThruputCounters,
disk::{
self,
error::{ReadError, WriteError},
},
download::PieceDownload,
error::Error,
peer::{self, ConnectionState, PeerSession, SessionState, SessionTick},
piece_picker::PiecePicker,
storage_info::StorageInfo,
tracker::{Announce, Event, Tracker},
Bitfield, BlockInfo, PeerId, PieceIndex, Sha1Hash, TorrentId,
};
use error::*;
use stats::{Peers, PieceStats, ThruputStats, TorrentStats};
pub mod error;
pub mod stats;
/// The channel for communicating with torrent.
pub(crate) type Sender = UnboundedSender<Command>;
/// The type of channel on which a torrent can listen for block write
/// completions.
pub(crate) type Receiver = UnboundedReceiver<Command>;
/// The types of messages that the torrent can receive from other parts of the
/// engine.
#[derive(Debug)]
pub(crate) enum Command {
/// Sent when some blocks were written to disk or an error ocurred while
/// writing.
PieceCompletion(Result<PieceCompletion, WriteError>),
/// There was an error reading a block.
ReadError {
block_info: BlockInfo,
error: ReadError,
},
/// A message sent only once, after the peer has been connected.
PeerConnected { addr: SocketAddr, id: PeerId },
/// Peer sessions periodically send this message when they have a state
/// change.
PeerState { addr: SocketAddr, info: SessionTick },
/// Gracefully shut down the torrent.
///
/// This command tells all active peer sessions of torrent to do the same,
/// waits for them and announces to trackers our exit.
Shutdown,
}
/// The type returned on completing a piece.
#[derive(Debug)]
pub(crate) struct PieceCompletion {
/// The index of the piece.
pub index: PieceIndex,
/// Whether the piece is valid. If it's not, it's not written to disk.
pub is_valid: bool,
}
/// Information and methods shared with peer sessions in the torrent.
///
/// This type contains fields that need to be read or updated by peer sessions.
/// Fields expected to be mutated are thus secured for inter-task access with
/// various synchronization primitives.
pub(crate) struct TorrentContext {
/// The torrent ID, unique in this engine.
pub id: TorrentId,
/// The info hash of the torrent, derived from its metainfo. This is used to
/// identify the torrent with other peers and trackers.
pub info_hash: Sha1Hash,
/// The arbitrary client id, chosen by the user of this library. This is
/// advertised to peers and trackers.
pub client_id: PeerId,
/// A copy of the torrent channel sender. This is not used by torrent iself,
/// but by the peer session tasks to which an arc copy of this torrent
/// context is given.
pub cmd_tx: Sender,
/// The piece picker picks the next most optimal piece to download and is
/// shared by all peers in a torrent.
pub piece_picker: Arc<RwLock<PiecePicker>>,
/// These are the active piece downloads in which the peer sessions in this
/// torrent are participating.
///
/// They are stored and synchronized in this object to download a piece from
/// multiple peers, which helps us to have fewer incomplete pieces.
///
/// Peer sessions may be run on different threads, any of which may read and
/// write to this map and to the pieces in the map. Thus we need a read
/// write lock on both.
// TODO: Benchmark whether using the nested locking approach isn't too slow.
// For mvp it should do.
pub downloads: RwLock<HashMap<PieceIndex, RwLock<PieceDownload>>>,
/// The channel on which to post alerts to user.
pub alert_tx: AlertSender,
/// The handle to the disk IO task, used to issue commands on it. A copy of
/// this handle is passed down to each peer session.
pub disk_tx: disk::Sender,
/// Info about the torrent's storage (piece length, download length, etc).
pub storage: StorageInfo,
}
/// Parameters for the torrent constructor.
pub(crate) struct Params {
pub id: TorrentId,
pub disk_tx: disk::Sender,
pub info_hash: Sha1Hash,
pub storage_info: StorageInfo,
pub own_pieces: Bitfield,
pub trackers: Vec<Tracker>,
pub client_id: PeerId,
pub listen_addr: SocketAddr,
pub conf: TorrentConf,
pub alert_tx: AlertSender,
}
/// Represents a torrent upload or download.
///
/// This is the main entity responsible for the high-level management of
/// a torrent download or upload. It starts and stops connections with peers
/// ([`PeerSession`](crate::peer::PeerSession) instances) and stores metadata
/// about the torrent.
pub(crate) struct Torrent {
/// The peers in this torrent.
peers: HashMap<SocketAddr, PeerSessionEntry>,
/// The peers returned by tracker to which we can connect.
available_peers: Vec<SocketAddr>,
/// Information that is shared with peer sessions.
ctx: Arc<TorrentContext>,
/// The port on which other entities in the engine send this torrent
/// messages.
///
/// The channel has to be wrapped in a `stream::Fuse` so that we can
/// `select!` on it in the torrent event loop.
cmd_rx: Fuse<Receiver>,
/// The trackers we can announce to.
trackers: Vec<TrackerEntry>,
/// The address on which torrent should listen for new peers.
listen_addr: SocketAddr,
/// The time the torrent was first started.
start_time: Option<Instant>,
/// The total time the torrent has been running.
///
/// This is a separate field as `Instant::now() - start_time` cannot be
/// relied upon due to the fact that it is possible to pause a torrent, in
/// which case we don't want to record the run time.
// TODO: pausing a torrent is not actually at this point, but this is done
// in expectation of that feature
run_duration: Duration,
/// In the last part of the download the torrent is in what's called the
/// endgame. This is the stage when all pieces have been picked but not all
/// have been received. There is a tendency for a piece to be mostly
/// downloaded by one peer, but when only a few pieces are left to complete
/// the torrent this could defer completion because some of these last
/// pieces may end up with slower peers. So when endgame is active, we let
/// all peers finish the remaining pieces and cancel pending requests from
/// the slower peers.
in_endgame: bool,
/// Measures various transfer statistics.
counters: ThruputCounters,
/// The configuration of this particular torrent.
conf: TorrentConf,
/// If `TorrentAlertConf::latest_completed_pieces` alert type is set, each
/// round the torrent collects the pieces that were downloaded, sends them
/// to peer as an alert, and resets the list.
///
/// This is set to some if the configuration is enabled, and set to none if
/// disabled.
completed_pieces: Option<Vec<PieceIndex>>,
}
impl Torrent {
/// Creates a new `Torrent` instance for downloading or seeding a torrent.
///
/// # Important
///
/// This constructor only initializes the torrent components but does not
/// actually start it. See [`Self::start`].
pub fn new(params: Params) -> (Self, Sender) {
let Params {
id,
disk_tx,
info_hash,
storage_info,
own_pieces,
trackers,
client_id,
listen_addr,
conf,
alert_tx,
} = params;
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let piece_picker = PiecePicker::new(own_pieces);
let cmd_rx = cmd_rx.fuse();
let trackers = trackers.into_iter().map(TrackerEntry::new).collect();
let completed_pieces = if conf.alerts.completed_pieces {
Some(Vec::new())
} else {
None
};
(
Self {
peers: HashMap::new(),
available_peers: Vec::new(),
ctx: Arc::new(TorrentContext {
id,
cmd_tx: cmd_tx.clone(),
piece_picker: Arc::new(RwLock::new(piece_picker)),
downloads: RwLock::new(HashMap::new()),
info_hash,
client_id,
alert_tx,
disk_tx,
storage: storage_info,
}),
start_time: None,
run_duration: Duration::default(),
cmd_rx,
trackers,
in_endgame: false,
counters: Default::default(),
listen_addr,
conf,
completed_pieces,
},
cmd_tx,
)
}
/// Starts the torrent and runs until an error is encountered.
pub async fn start(&mut self, peers: &[SocketAddr]) -> Result<()> {
log::info!("Starting torrent");
self.available_peers.extend_from_slice(peers);
// record the torrent starttime
self.start_time = Some(Instant::now());
// if the torrent is a seed, don't send the started event, just an
// empty announce
let tracker_event =
if self.ctx.piece_picker.read().await.missing_piece_count() == 0 {
None
} else {
Some(Event::Started)
};
if let Err(e) = self
.announce_to_trackers(Instant::now(), tracker_event)
.await
{
// this is a torrent error, not a tracker error, as that is handled
// inside the function
self.ctx
.alert_tx
.send(Alert::Error(Error::Torrent {
id: self.ctx.id,
error: e,
}))
.ok();
}
if let Err(e) = self.run().await {
// send alert of torrent failure to user
self.ctx
.alert_tx
.send(Alert::Error(Error::Torrent {
id: self.ctx.id,
error: e,
}))
.ok();
}
Ok(())
}
/// Starts the torrent and runs until an error is encountered.
async fn run(&mut self) -> Result<()> {
let mut tick_timer = time::interval(Duration::from_secs(1)).fuse();
let mut last_tick_time = None;
let mut listener = TcpListener::bind(&self.listen_addr).await?;
// the bind port may have been 0, so we need to get the actual port in
// use
self.listen_addr = listener.local_addr()?;
let mut incoming = listener.incoming().fuse();
// the torrent loop is triggered every second by the loop timer and by
// disk IO events
loop {
select! {
tick_time = tick_timer.select_next_some() => {
self.tick(&mut last_tick_time, tick_time.into_std()).await?;
}
peer_conn_result = incoming.select_next_some() => {
let socket = match peer_conn_result {
Ok(socket) => socket,
Err(e) => {
log::info!("Error accepting peer connection: {}", e);
continue;
}
};
let addr = match socket.peer_addr() {
Ok(addr) => addr,
Err(e) => {
log::info!("Error getting socket address of peer: {}", e);
continue;
}
};
log::info!("New connection {:?}", addr);
// start inbound session
let (session, tx) = PeerSession::new(
Arc::clone(&self.ctx),
addr,
);
self.peers.insert(addr, PeerSessionEntry::start_inbound(socket, session, tx));
}
cmd = self.cmd_rx.select_next_some() => {
match cmd {
Command::PeerConnected { addr, id } => {
if let Some(peer) = self.peers.get_mut(&addr) {
log::debug!(
"Peer {} connected with client '{}', \
updating state",
addr, String::from_utf8_lossy(&id)
);
peer.id = Some(id);
}
}
Command::PeerState { addr, info } => {
self.handle_peer_state_change(addr, info);
}
Command::PieceCompletion(write_result) => {
log::debug!("Disk write result {:?}", write_result);
match write_result {
Ok(piece) => {
self.handle_piece_completion(piece).await?;
}
Err(e) => {
log::error!(
"Failed to write piece to disk: {}",
e
);
}
}
}
Command::ReadError { block_info, error } => {
log::error!(
"Failed to read from disk {}: {}",
block_info,
error
);
// TODO: For now we just log for simplicity's sake, but in the
// future we'll need error recovery mechanisms here.
// For instance, it may be that the torrent file got moved while
// the torrent was still seeding. In this case we'd need to stop
// torrent and send an alert to the API consumer.
}
Command::Shutdown => {
self.shutdown().await?;
break;
}
}
}
}
}
Ok(())
}
/// The torrent tick, as in "the tick of a clock", which runs every second
/// to perform periodic updates.
///
/// This is when we update statistics and report them to the user, when new
/// peers are connected, and when perioric announces are made.
async fn tick(
&mut self,
last_tick_time: &mut Option<Instant>,
now: Instant,
) -> Result<()> {
// calculate how long torrent has been running
let elapsed_since_last_tick = last_tick_time
.or(self.start_time)
.map(|t| now.saturating_duration_since(t))
.unwrap_or_default();
self.run_duration += elapsed_since_last_tick;
*last_tick_time = Some(now);
// check if we can connect some peers
// NOTE: do this before announcing as we don't want to block new
// connections with the potentially long running announce requests
self.connect_peers();
// check if we need to announce to some trackers
let event = None;
self.announce_to_trackers(now, event).await?;
log::debug!(
"Stats: \
elapsed {} s, \
download: {} b/s (peak: {} b/s, total: {} b) wasted: {} b \
upload: {} b/s (peak: {} b/s, total: {} b)",
self.run_duration.as_secs(),
self.counters.payload.down.avg(),
self.counters.payload.down.peak(),
self.counters.payload.down.total(),
self.counters.waste.total(),
self.counters.payload.up.avg(),
self.counters.payload.up.peak(),
self.counters.payload.up.total(),
);
// TODO: consider removing this check, it's expensive, or caching it in
// piece picker
if log::log_enabled!(log::Level::Debug) {
let piece_picker_guard = self.ctx.piece_picker.read().await;
let unavailable_piece_count =
piece_picker_guard.pieces().iter().fold(0, |acc, piece| {
if piece.frequency == 0 {
acc + 1
} else {
acc
}
});
if unavailable_piece_count > 0 {
log::debug!(
"Torrent swarm doesn't have all pieces (missing: {})",
unavailable_piece_count
);
}
}
// send periodic stats update to api user
let stats = self.build_stats().await;
self.ctx
.alert_tx
.send(Alert::TorrentStats {
id: self.ctx.id,
stats: Box::new(stats),
})
.ok();
self.counters.reset();
Ok(())
}
/// Attempts to connect available peers, if we have any.
fn connect_peers(&mut self) {
let connect_count = self
.conf
.max_connected_peer_count
.saturating_sub(self.peers.len())
.min(self.available_peers.len());
if connect_count == 0 {
log::trace!("Cannot connect to peers");
return;
}
log::debug!("Connecting {} peer(s)", connect_count);
for addr in self.available_peers.drain(0..connect_count) {
log::info!("Connecting to peer {}", addr);
let (session, tx) = PeerSession::new(Arc::clone(&self.ctx), addr);
self.peers
.insert(addr, PeerSessionEntry::start_outbound(session, tx));
}
}
/// Chacks whether we need to announce to any trackers of if we need to request
/// peers.
async fn announce_to_trackers(
&mut self,
now: Instant,
event: Option<Event>,
) -> Result<()> {
// calculate transfer statistics in advance
let uploaded = self.counters.payload.up.total();
let downloaded = self.counters.payload.down.total();
let left = self.ctx.storage.download_len - downloaded;
// skip trackers that errored too often
// TODO: introduce a retry timeout
let tracker_error_threshold = self.conf.tracker_error_threshold;
for tracker in self
.trackers
.iter_mut()
.filter(|t| t.error_count < tracker_error_threshold)
{
// Check if the torrent's peer count has fallen below the minimum.
// But don't request new peers otherwise or if we're about to stop
// torrent.
let peer_count = self.peers.len() + self.available_peers.len();
let needed_peer_count = if peer_count
>= self.conf.min_requested_peer_count
|| event == Some(Event::Stopped)
{
None
} else {
debug_assert!(self.conf.max_connected_peer_count >= peer_count);
let needed = self.conf.max_connected_peer_count - peer_count;
// Download at least this numbe of peers, even if we don't need
// as many. This is because later we may be able to connect to
// more peers and in that case we don't want to wait till the
// next tracker request.
Some(self.conf.min_requested_peer_count.max(needed))
};
// we can override the normal annoucne interval if we need peers or
// if we have an event to announce
if event.is_some()
|| (needed_peer_count > Some(0)
&& tracker.can_announce(now, self.conf.announce_interval))
|| tracker.should_announce(now, self.conf.announce_interval)
{
let params = Announce {
tracker_id: tracker.id.clone(),
info_hash: self.ctx.info_hash,
peer_id: self.ctx.client_id,
port: self.listen_addr.port(),
peer_count: needed_peer_count,
uploaded,
downloaded,
left,
ip: None,
event,
};
// TODO: We probably don't want to block the torrent event loop
// here waiting on the tracker response. Instead, poll the
// future in the event loop select call, or spawn the tracker
// announce on a separate task and return the result as
// an mpsc message.
match tracker.client.announce(params).await {
Ok(resp) => {
log::info!(
"Announced to tracker {}, response: {:?}",
tracker.client,
resp
);
if let Some(tracker_id) = resp.tracker_id {
tracker.id = Some(tracker_id);
}
if let Some(failure_reason) = resp.failure_reason {
log::warn!(
"Error contacting tracker {}: {}",
tracker.client,
failure_reason
);
}
if let Some(warning_message) = resp.warning_message {
log::warn!(
"Warning from tracker {}: {}",
tracker.client,
warning_message
);
}
if let Some(interval) = resp.interval {
log::info!(
"Tracker {} interval: {} s",
tracker.client,
interval.as_secs()
);
tracker.interval = Some(interval);
}
if let Some(min_interval) = resp.min_interval {
log::info!(
"Tracker {} min min_interval: {} s",
tracker.client,
min_interval.as_secs()
);
tracker.min_interval = Some(min_interval);
}
if let (Some(seeder_count), Some(leecher_count)) =
(resp.seeder_count, resp.leecher_count)
{
log::debug!(
"Torrent seeds: {} and leeches: {}",
seeder_count,
leecher_count
);
}
if !resp.peers.is_empty() {
log::debug!(
"Received peers from tracker {}: {:?}",
tracker.client,
resp.peers
);
self.available_peers.extend(resp.peers.into_iter());
}
}
Err(e) => {
log::warn!(
"Error announcing to tracker {}: {}",
tracker.client,
e
);
tracker.error_count += 1;
self.ctx.alert_tx.send(Alert::Error(
Error::Tracker {
id: self.ctx.id,
error: e,
},
))?;
}
}
tracker.last_announce_time = Some(now);
}
}
Ok(())
}
/// Returns high-level statistics about the torrent for sending to the user.
async fn build_stats(&mut self) -> TorrentStats {
let missing_piece_count =
self.ctx.piece_picker.read().await.missing_piece_count();
let piece_count = self.ctx.storage.piece_count;
let completed_pieces = self
.completed_pieces
.as_mut()
.map(|p| std::mem::replace(p, Vec::new()));
let peers = if self.conf.alerts.peers {
let peers = self
.peers
.iter()
.map(|(addr, entry)| stats::PeerSessionStats {
addr: *addr,
id: entry.id,
state: entry.state,
piece_count: entry.piece_count,
thruput: entry.thruput,
})
.collect();
Peers::Full(peers)
} else {
Peers::Count(self.peers.len())
};
TorrentStats {
start_time: self.start_time,
run_duration: self.run_duration,
pieces: PieceStats {
total: piece_count,
complete: piece_count - missing_piece_count,
pending: self.ctx.downloads.read().await.len(),
latest_completed: completed_pieces,
},
thruput: ThruputStats::from(&self.counters),
peers,
}
}
/// Handles the message that peer sessions send to torrent when their state
/// changed.
///
/// It simply updates the minimum copy of the peer's state that is kept in
/// torrent in order to perform various pieces of logic (the choke
/// algorithm and detailed reporting to user, neither of which is done at
/// the moment).
fn handle_peer_state_change(
&mut self,
addr: SocketAddr,
info: SessionTick,
) {
if let Some(peer) = self.peers.get_mut(&addr) {
log::debug!("Updating peer {} state", addr);
peer.state = info.state;
peer.piece_count = info.piece_count;
peer.thruput = ThruputStats::from(&info.counters);
// update torrent thruput stats
self.counters += &info.counters;
// if we disconnected peer, remove it
if peer.state.connection == ConnectionState::Disconnected {
self.peers.remove(&addr);
}
} else {
log::debug!("Tried updating non-existent peer {}", addr);
}
}
/// Does some bookkeeping to mark the piece as finished. All peer sessions
/// are notified of the newly downloaded piece.
async fn handle_piece_completion(
&mut self,
piece: PieceCompletion,
) -> Result<()> {
// if this write completed a piece, check torrent
// completion
if piece.is_valid {
// remove download entry
self.ctx.downloads.write().await.remove(&piece.index);
// register piece in piece picker
let mut piece_picker_write_guard =
self.ctx.piece_picker.write().await;
piece_picker_write_guard.received_piece(piece.index);
let missing_piece_count =
piece_picker_write_guard.missing_piece_count();
// Even if we don't have all pieces, they may all have already
// been picked. In this case we need to enter endgame mode, if not
// already in it.
if !self.in_endgame
&& missing_piece_count > 0
&& piece_picker_write_guard.all_pieces_picked()
{
log::info!("Torrent entering endgame");
self.in_endgame = true;
}
// we don't need the lock anymore
drop(piece_picker_write_guard);
log::info!(
"Downloaded piece {} (left: {})",
piece.index,
missing_piece_count
);
if let Some(latest_completed_pieces) = &mut self.completed_pieces {
latest_completed_pieces.push(piece.index);
}
// tell all sessions that we got a new piece so that they can send
// a "have(piece)" message to their peers or cancel potential
// duplicate requests for the same piece
for peer in self.peers.values() {
if let Some(tx) = &peer.tx {
// this may be after the peer session had already stopped
// but before the torrent tick ran and got a chance to reap
// the dead session
tx.send(peer::Command::PieceCompletion {
index: piece.index,
in_endgame: self.in_endgame,
})
.ok();
}
}
// if the torrent is fully downloaded, stop the download loop
if missing_piece_count == 0 {
log::info!(
"Finished torrent download, exiting. \
Peak download rate: {} b/s, wasted: {} b",
self.counters.payload.down.peak(),
self.counters.waste.total(),
);
// notify user of torrent completion
self.ctx
.alert_tx
.send(Alert::TorrentComplete(self.ctx.id))
.ok();
// tell trackers we've finished
self.announce_to_trackers(
Instant::now(),
Some(Event::Completed),
)
.await?;
}
} else {
// TODO(https://github.com/mandreyel/cratetorrent/issues/61):
// implement parole mode for the peers that sent corrupt data
log::warn!("Piece {} is invalid", piece.index);
// mark all blocks free to be requested in piece
if let Some(piece) =
self.ctx.downloads.read().await.get(&piece.index)
{
piece.write().await.free_all_blocks();
}
}
Ok(())
}
/// Shuts down torrent and all peer sessions, and also announces torrent's
/// exit to tracker.
async fn shutdown(&mut self) -> Result<()> {
// send shutdown command to all connected peers
for peer in self.peers.values() {
if let Some(tx) = &peer.tx {
// we don't particularly care if we weren't successful
// in sending the command (for now)
tx.send(peer::Command::Shutdown).ok();
}
}
for peer in self.peers.values_mut() {
if let Err(e) = peer
.join_handle
.take()
.expect("peer join handle missing")
.await
.expect("task error")
{
log::error!("Peer session error: {}", e);
}
}
// tell trackers we're leaving
self.announce_to_trackers(Instant::now(), Some(Event::Stopped))
.await
}
}
/// A peer in the torrent. Contains additional metadata needed by torrent to
/// manage the peer.
struct PeerSessionEntry {
/// The channel on which to communicate with the peer session.
///
/// This is set when the session is started.
tx: Option<peer::Sender>,
/// Peer's 20 byte BitTorrent id. Updated when the peer sends us its peer
/// id, in the handshake.
id: Option<PeerId>,
/// Cached information about the session state. Updated every time peer
/// updates us.
state: SessionState,
/// The number of pieces that the peer has available.
piece_count: usize,
/// Most recent throughput statistics of this peer.
thruput: ThruputStats,
/// The peer session task's join handle, used during shutdown.
join_handle: Option<task::JoinHandle<peer::error::Result<()>>>,
}
impl PeerSessionEntry {
fn start_outbound(mut session: PeerSession, tx: peer::Sender) -> Self {
let join_handle =
task::spawn(async move { session.start_outbound().await });
Self::new(tx, join_handle)
}
fn start_inbound(
socket: TcpStream,
mut session: PeerSession,
tx: peer::Sender,
) -> Self {
let join_handle =
task::spawn(async move { session.start_inbound(socket).await });
Self::new(tx, join_handle)
}
fn new(
tx: peer::Sender,
join_handle: task::JoinHandle<peer::error::Result<()>>,
) -> Self {
Self {
tx: Some(tx),
id: None,
state: SessionState {
connection: ConnectionState::Connecting,
..Default::default()
},
piece_count: 0,
thruput: Default::default(),
join_handle: Some(join_handle),
}
}
}
/// Contains the tracker client as well as additional metadata about the
/// tracker.
struct TrackerEntry {
client: Tracker,
/// If a previous announce contained a tracker_id, it should be included in
/// next announces. Therefore it is cached here.
id: Option<String>,
/// The last announce time is kept here so that we don't request too often.
last_announce_time: Option<Instant>,
/// The interval at which we should update the tracker of our progress.
/// This is set after the first announce request.
interval: Option<Duration>,
/// The absolute minimum interval at which we can contact tracker.
/// This is set after the first announce request.
min_interval: Option<Duration>,
/// Each time we fail to requet from tracker, this counter is incremented.
/// If it fails too often, we stop requesting from tracker.
error_count: usize,
}
impl TrackerEntry {
fn new(client: Tracker) -> Self {
Self {
client,
id: None,
last_announce_time: None,
interval: None,
min_interval: None,
error_count: 0,
}
}
/// Determines whether we should announce to the tracker at the given time,
/// based on when we last announced.
///
/// Later this function should take into consideration the client's minimum
/// announce frequency settings.
fn should_announce(
&self,
t: Instant,
default_announce_interval: Duration,
) -> bool {
if let Some(last_announce_time) = self.last_announce_time {
let min_next_announce_time = last_announce_time
+ self.interval.unwrap_or(default_announce_interval);
t > min_next_announce_time
} else {
true
}
}
/// Determines whether we're allowed to announce at the given time.
///
/// We may need peers before the next step in the announce interval.
/// However, we can't do this too often, so we need to check our last
/// announce time first.
fn can_announce(
&self,
t: Instant,
default_announce_interval: Duration,
) -> bool {
if let Some(last_announce_time) = self.last_announce_time {
let min_next_announce_time = last_announce_time
+ self
.min_interval
.or(self.min_interval)
.unwrap_or(default_announce_interval);
t > min_next_announce_time
} else {
true
}
}
}