mod download;
mod peer_manager;
mod torrent;
mod uni_deque;
mod upload;
use std::collections::HashMap;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::path::PathBuf;
use std::str::FromStr as _;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use crate::dht::{DhtNode, generate_node_id};
use crate::error::{Error, ErrorKind};
use crate::magnet::{MagnetUri, hex_encode};
use crate::metainfo::{Info, Metainfo, Mode, RawInfo};
use crate::spec::TorrentSpec;
use crate::storage::FileStorage;
use self::torrent::TorrentHandle;
pub type InfoHash = [u8; 20];
pub struct Session {
config: SessionConfig,
torrents: Arc<RwLock<HashMap<InfoHash, TorrentHandle>>>,
#[expect(dead_code)]
dht_node: Option<Arc<DhtNode>>,
}
#[derive(Debug, Clone)]
pub struct SessionConfig {
pub listen_port: u16,
pub max_connections: u32,
pub max_uploads: u32,
pub download_dir: PathBuf,
pub enable_dht: bool,
pub node_id: Option<[u8; 20]>,
}
impl Default for SessionConfig {
fn default() -> Self {
SessionConfig {
listen_port: 6881,
max_connections: 50,
max_uploads: 8,
download_dir: PathBuf::from("downloads"),
enable_dht: true,
node_id: None,
}
}
}
#[derive(Debug, Clone)]
pub struct TorrentStatus {
pub info_hash: InfoHash,
pub name: String,
pub progress: f64,
pub download_rate: f64,
pub upload_rate: f64,
pub num_peers: usize,
pub num_seeds: usize,
pub state: TorrentState,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TorrentState {
Queued,
Downloading,
Seeding,
Paused,
Error,
}
impl Session {
pub async fn new(config: SessionConfig) -> Result<Self, Error> {
let torrents: Arc<RwLock<HashMap<InfoHash, TorrentHandle>>> =
Arc::new(RwLock::new(HashMap::new()));
let dht_node = if config.enable_dht {
let bind_addr = SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::UNSPECIFIED,
config.listen_port + 1,
));
let bootstrap = &[
("router.bittorrent.com", 6881),
("dht.transmissionbt.com", 6881),
];
let node_id = config.node_id.unwrap_or_else(generate_node_id);
let node = DhtNode::new(node_id, bind_addr, bootstrap).await?;
let (dht, t) = (node.clone(), torrents.clone());
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(30)).await;
let info_hashes: Vec<InfoHash> = t.read().await.keys().copied().collect();
for ih in info_hashes {
let peers = dht.get_peers(&ih).await;
if !peers.is_empty() {
if let Some(handle) = t.read().await.get(&ih) {
handle.peer_mgr.write().await.add_peers(peers);
}
}
}
}
});
Some(node)
} else {
None
};
Ok(Session {
config,
torrents,
dht_node,
})
}
pub async fn add_torrent(&self, spec: impl Into<TorrentSpec>) -> Result<InfoHash, Error> {
match spec.into() {
TorrentSpec::Metainfo(meta) => self.add_metainfo(meta).await,
TorrentSpec::Magnet(uri) => self.add_magnet(uri).await,
}
}
pub async fn add_magnet_str(&self, uri: impl AsRef<str>) -> Result<InfoHash, Error> {
let magnet: MagnetUri = uri.as_ref().parse()?;
self.add_torrent(magnet).await
}
pub async fn add_torrent_bytes(&self, data: &[u8]) -> Result<InfoHash, Error> {
self.add_torrent(Metainfo::try_from(data)?).await
}
async fn add_metainfo(&self, meta: Metainfo) -> Result<InfoHash, Error> {
let info_hash = meta.info_hash();
let _name = match &meta.info.mode {
Mode::Single { name, .. } | Mode::Multiple { name, .. } => name.clone(),
};
let storage = Arc::new(FileStorage::new(&meta.info, &self.config.download_dir).await?);
let handle = TorrentHandle::new(meta, info_hash, storage, &self.config);
self.torrents.write().await.insert(info_hash, handle);
Ok(info_hash)
}
async fn add_magnet(&self, uri: MagnetUri) -> Result<InfoHash, Error> {
let info_hash = *uri.primary_info_hash();
let name = uri
.display_name
.clone()
.unwrap_or_else(|| hex_encode(info_hash));
let announce = uri.trackers.first().cloned().unwrap_or_default();
let announce_list = if uri.trackers.len() > 1 {
vec![uri.trackers[1..].to_vec()]
} else {
vec![]
};
let meta = Metainfo {
announce,
announce_list,
info: Info {
piece_length: 0,
pieces: vec![],
mode: Mode::Single {
name: name.clone(),
length: uri.exact_length.unwrap_or(0),
},
raw_info: RawInfo::Hash(info_hash),
},
creation_date: None,
comment: None,
created_by: None,
encoding: None,
};
let storage = Arc::new(FileStorage::new(&meta.info, &self.config.download_dir).await?);
let handle = TorrentHandle::new(meta, info_hash, storage, &self.config);
if !uri.peers.is_empty() {
let mut peer_addrs = Vec::with_capacity(uri.peers.len());
for peer_str in &uri.peers {
if let Ok(addr) = SocketAddr::from_str(peer_str) {
peer_addrs.push(addr);
}
}
if !peer_addrs.is_empty() {
handle.peer_mgr.write().await.add_peers(peer_addrs);
}
}
self.torrents.write().await.insert(info_hash, handle);
Ok(info_hash)
}
pub async fn remove_torrent(&self, info_hash: &InfoHash) -> Result<(), Error> {
let handle = self.torrents.write().await.remove(info_hash);
if let Some(mut h) = handle {
h.cancel().await;
}
Ok(())
}
pub async fn torrent_status(&self, info_hash: &InfoHash) -> Result<TorrentStatus, Error> {
let torrents = self.torrents.read().await;
let handle = torrents
.get(info_hash)
.ok_or(Error::new(ErrorKind::InvalidInput))?;
Ok(handle.status().await)
}
pub async fn active_torrents(&self) -> Vec<InfoHash> {
self.torrents.read().await.keys().copied().collect()
}
}