mod config;
mod download;
mod peer_manager;
mod torrent;
mod uni_deque;
mod upload;
pub use config::{InfoHash, SessionConfig, TorrentState, TorrentStatus};
use std::collections::HashMap;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::path::{Path, PathBuf};
use std::str::FromStr as _;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::dht::{DhtNode, generate_node_id};
use crate::error::{Error, ErrorKind};
use crate::magnet::{MagnetUri, hex_encode};
use crate::metainfo::{Info, Metainfo, Mode, RawInfo};
use crate::spec::TorrentSpec;
use self::torrent::TorrentHandle;
pub struct Session {
config: SessionConfig,
torrents: Arc<RwLock<HashMap<InfoHash, TorrentHandle>>>,
#[expect(dead_code)]
dht_node: Option<Arc<DhtNode>>,
}
impl Session {
pub async fn new(config: SessionConfig) -> Result<Self, Error> {
let torrents: Arc<RwLock<HashMap<InfoHash, TorrentHandle>>> =
Arc::new(RwLock::new(HashMap::new()));
let dht_node = if let Some(ref bootstrap) = config.bootstrap_nodes {
if bootstrap.is_empty() {
None
} else {
let bind_addr = SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::UNSPECIFIED,
config.listen_port + 1,
));
let bootstrap_refs: Vec<(&str, u16)> = bootstrap
.iter()
.map(|n| (n.host.as_str(), n.port))
.collect();
let node_id = config.node_id.unwrap_or_else(generate_node_id);
let node = DhtNode::new(node_id, bind_addr, &bootstrap_refs).await?;
let (dht, t) = (node.clone(), torrents.clone());
let poll_interval = config.dht_poll_interval;
tokio::spawn(async move {
loop {
tokio::time::sleep(poll_interval).await;
let info_hashes: Vec<InfoHash> = t.read().await.keys().copied().collect();
for ih in info_hashes {
let peers = dht.get_peers(&ih).await;
if !peers.is_empty() {
if let Some(handle) = t.read().await.get(&ih) {
handle.peer_mgr.write().await.add_peers(peers);
}
}
}
}
});
Some(node)
}
} else {
None
};
Ok(Session {
config,
torrents,
dht_node,
})
}
pub async fn add_torrent(
&self, spec: impl Into<TorrentSpec>, download_dir: impl Into<PathBuf>,
) -> Result<InfoHash, Error> {
let download_dir = download_dir.into();
match spec.into() {
TorrentSpec::Metainfo(meta) => self.add_metainfo(meta, &download_dir).await,
TorrentSpec::Magnet(uri) => self.add_magnet(uri, &download_dir).await,
}
}
pub async fn add_magnet_str(
&self, uri: impl AsRef<str>, download_dir: impl Into<PathBuf>,
) -> Result<InfoHash, Error> {
let magnet: MagnetUri = uri.as_ref().parse()?;
self.add_torrent(magnet, download_dir).await
}
pub async fn add_torrent_bytes(
&self, data: &[u8], download_dir: impl Into<PathBuf>,
) -> Result<InfoHash, Error> {
self.add_torrent(Metainfo::try_from(data)?, download_dir)
.await
}
async fn add_metainfo(&self, meta: Metainfo, download_dir: &Path) -> Result<InfoHash, Error> {
self.check_capacity().await?;
let info_hash = meta.info_hash();
let _name = match &meta.info.mode {
Mode::Single { name, .. } | Mode::Multiple { name, .. } => name.clone(),
};
let storage_factory = &self.config.storage_factory;
let storage = storage_factory.create(&meta.info, download_dir).await?;
let handle = TorrentHandle::new(meta, info_hash, storage, &self.config);
self.torrents.write().await.insert(info_hash, handle);
Ok(info_hash)
}
async fn add_magnet(&self, uri: MagnetUri, download_dir: &Path) -> Result<InfoHash, Error> {
self.check_capacity().await?;
let info_hash = *uri.primary_info_hash();
let name = uri
.display_name
.clone()
.unwrap_or_else(|| hex_encode(info_hash));
let announce = uri.trackers.first().cloned().unwrap_or_default();
let announce_list = if uri.trackers.len() > 1 {
vec![uri.trackers[1..].to_vec()]
} else {
vec![]
};
let meta = Metainfo {
announce,
announce_list,
info: Info {
piece_length: 0,
pieces: vec![],
mode: Mode::Single {
name: name.clone(),
length: uri.exact_length.unwrap_or(0),
},
raw_info: RawInfo::Hash(info_hash),
},
creation_date: None,
comment: None,
created_by: None,
encoding: None,
};
let storage_factory = &self.config.storage_factory;
let storage = storage_factory.create(&meta.info, download_dir).await?;
let handle = TorrentHandle::new(meta, info_hash, storage, &self.config);
if !uri.peers.is_empty() {
let mut peer_addrs = Vec::with_capacity(uri.peers.len());
for peer_str in &uri.peers {
if let Ok(addr) = SocketAddr::from_str(peer_str) {
peer_addrs.push(addr);
}
}
if !peer_addrs.is_empty() {
handle.peer_mgr.write().await.add_peers(peer_addrs);
}
}
self.torrents.write().await.insert(info_hash, handle);
Ok(info_hash)
}
pub async fn remove_torrent(&self, info_hash: &InfoHash) -> Result<(), Error> {
let handle = self.torrents.write().await.remove(info_hash);
if let Some(mut h) = handle {
h.cancel().await;
let _ = h.task.await;
}
Ok(())
}
pub async fn torrent_status(&self, info_hash: &InfoHash) -> Result<TorrentStatus, Error> {
let torrents = self.torrents.read().await;
let handle = torrents
.get(info_hash)
.ok_or(Error::new(ErrorKind::InvalidInput))?;
Ok(handle.status().await)
}
pub async fn active_torrents(&self) -> Vec<InfoHash> {
self.torrents.read().await.keys().copied().collect()
}
async fn check_capacity(&self) -> Result<(), Error> {
let limit = self.config.max_active_torrents;
if limit == 0 {
return Ok(());
}
let count = self.torrents.read().await.len();
if count >= limit {
return Err(Error::new(ErrorKind::InvalidInput));
}
Ok(())
}
}