pub(crate) mod http;
pub(crate) mod udp;
use self::http::*;
use self::udp::*;
use crate::asyncutil::ShutdownGroup;
use crate::consts::{LEFT, NUMWANT, TRACKER_TIMEOUT};
use crate::peer::Peer;
use crate::types::{InfoHash, Key, LocalPeer, PeerId};
use crate::util::{comma_list, ErrorChain};
use bytes::Bytes;
use std::fmt;
use std::str::FromStr;
use std::sync::Arc;
use thiserror::Error;
use tokio::time::timeout;
use url::Url;
#[derive(Clone, Debug, Eq, PartialEq)]
pub(crate) enum Tracker {
Http(HttpTracker),
Udp(UdpTracker),
}
impl Tracker {
pub(crate) fn url_string(&self) -> String {
match self {
Tracker::Http(tr) => tr.url_string(),
Tracker::Udp(tr) => tr.url_string(),
}
}
pub(crate) async fn get_peers(
&self,
info_hash: InfoHash,
local: LocalPeer,
shutdown_group: Arc<ShutdownGroup>,
) -> Result<Vec<Peer>, TrackerError> {
log::info!("Requesting peers for {info_hash} from {self}");
timeout(
TRACKER_TIMEOUT,
self.inner_get_peers(info_hash, local, shutdown_group),
)
.await
.unwrap_or(Err(TrackerError::Timeout))
}
async fn inner_get_peers(
&self,
info_hash: InfoHash,
local: LocalPeer,
shutdown_group: Arc<ShutdownGroup>,
) -> Result<Vec<Peer>, TrackerError> {
let mut s = self.connect(info_hash, local).await?;
let peers = s.start().await?.peers;
let display = self.to_string();
log::info!("{display} returned {} peers for {info_hash}", peers.len());
log::debug!(
"{display} returned peers for {info_hash}: {}",
comma_list(&peers)
);
shutdown_group.spawn(|token| async move {
tokio::select! {
() = token.cancelled() => log::trace!(r#""stopped" announcement to {display} for {info_hash} cancelled"#),
r = s.stop() => {
if let Err(e) = r {
log::warn!(
r#"failure sending "stopped" announcement to {display} for {info_hash}: {}"#,
ErrorChain(e)
);
}
}
}
});
Ok(peers)
}
async fn connect(
&self,
info_hash: InfoHash,
local: LocalPeer,
) -> Result<TrackerSession, TrackerError> {
let inner = match self {
Tracker::Http(t) => InnerTrackerSession::Http(t.connect()?),
Tracker::Udp(t) => InnerTrackerSession::Udp(t.connect().await?),
};
Ok(TrackerSession {
inner,
info_hash,
local,
})
}
}
impl fmt::Display for Tracker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Tracker::Http(http) => write!(f, "{http}"),
Tracker::Udp(udp) => write!(f, "{udp}"),
}
}
}
impl FromStr for Tracker {
type Err = TrackerUrlError;
fn from_str(s: &str) -> Result<Tracker, TrackerUrlError> {
let url = Url::parse(s)?;
match url.scheme() {
"http" | "https" => Ok(Tracker::Http(HttpTracker::try_from(url)?)),
"udp" => Ok(Tracker::Udp(UdpTracker::try_from(url)?)),
sch => Err(TrackerUrlError::UnsupportedScheme(sch.into())),
}
}
}
struct TrackerSession {
inner: InnerTrackerSession,
info_hash: InfoHash,
local: LocalPeer,
}
enum InnerTrackerSession {
Http(HttpTrackerSession),
Udp(UdpTrackerSession),
}
impl TrackerSession {
fn tracker_display(&self) -> String {
match &self.inner {
InnerTrackerSession::Http(s) => s.tracker.to_string(),
InnerTrackerSession::Udp(s) => s.tracker.to_string(),
}
}
async fn start(&mut self) -> Result<AnnounceResponse, TrackerError> {
log::trace!(
r#"Sending "started" announcement to {} for {}"#,
self.tracker_display(),
self.info_hash
);
self.announce(Announcement {
info_hash: self.info_hash,
peer_id: self.local.id,
downloaded: 0,
left: LEFT,
uploaded: 0,
event: AnnounceEvent::Started,
key: self.local.key,
numwant: NUMWANT,
port: self.local.port,
})
.await
}
async fn stop(&mut self) -> Result<AnnounceResponse, TrackerError> {
log::trace!(
r#"Sending "stopped" announcement to {} for {}"#,
self.tracker_display(),
self.info_hash
);
self.announce(Announcement {
info_hash: self.info_hash,
peer_id: self.local.id,
downloaded: 0,
left: LEFT,
uploaded: 0,
event: AnnounceEvent::Stopped,
key: self.local.key,
numwant: NUMWANT,
port: self.local.port,
})
.await
}
async fn announce(
&mut self,
announcement: Announcement,
) -> Result<AnnounceResponse, TrackerError> {
let announcement = match &mut self.inner {
InnerTrackerSession::Http(s) => s.announce(announcement).await?,
InnerTrackerSession::Udp(s) => s.announce(announcement).await?,
};
if let Some(msg) = announcement.warning_message.as_ref() {
log::trace!(
"{} replied with warning in response to {} announcement: {:?}",
self.tracker_display(),
self.info_hash,
msg,
);
}
Ok(announcement)
}
}
#[derive(Clone, Debug, Error, Eq, PartialEq)]
pub(crate) enum TrackerUrlError {
#[error("invalid tracker URL")]
Url(#[from] url::ParseError),
#[error("unsupported tracker URL scheme: {0:?}")]
UnsupportedScheme(String),
#[error("no host in tracker URL")]
NoHost,
#[error("no port in UDP tracker URL")]
NoUdpPort,
}
#[derive(Debug, Error)]
pub(crate) enum TrackerError {
#[error("interactions with tracker did not complete in time")]
Timeout,
#[error("tracker replied with error message {0:?}")]
Failure(String),
#[error(transparent)]
Http(#[from] HttpTrackerError),
#[error(transparent)]
Udp(#[from] UdpTrackerError),
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
enum AnnounceEvent {
#[allow(dead_code)]
Announce,
#[allow(dead_code)]
Completed,
Started,
Stopped,
}
impl AnnounceEvent {
fn add_query_param(&self, url: &mut Url) {
let value = match self {
AnnounceEvent::Announce => return,
AnnounceEvent::Completed => "completed",
AnnounceEvent::Started => "started",
AnnounceEvent::Stopped => "stopped",
};
url.query_pairs_mut().append_pair("event", value);
}
fn for_udp(&self) -> u32 {
match self {
AnnounceEvent::Announce => 0,
AnnounceEvent::Completed => 1,
AnnounceEvent::Started => 2,
AnnounceEvent::Stopped => 3,
}
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
struct Announcement {
info_hash: InfoHash,
peer_id: PeerId,
downloaded: u64,
left: u64,
uploaded: u64,
event: AnnounceEvent,
key: Key,
numwant: u32,
port: u16,
}
#[derive(Clone, Debug, Eq, PartialEq)]
struct AnnounceResponse {
interval: u32,
peers: Vec<Peer>,
warning_message: Option<String>,
min_interval: Option<u32>,
tracker_id: Option<Bytes>,
complete: Option<u32>,
incomplete: Option<u32>,
leechers: Option<u32>,
seeders: Option<u32>,
}