mod builder;
mod config;
mod download;
mod peer_manager;
mod torrent;
mod uni_deque;
mod upload;
pub use self::builder::TorrentBuilder;
pub use self::config::{InfoHash, SessionConfig, TorrentState, TorrentStatus};
use std::collections::HashMap;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::str::FromStr as _;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use crate::dht::{DhtNode, generate_node_id};
use crate::error::{Error, ErrorKind};
use crate::magnet::{MagnetUri, hex_encode};
use crate::metainfo::{Info, Metainfo, Mode, RawInfo};
use crate::spec::TorrentSpec;
use self::torrent::TorrentHandle;
pub struct Session {
config: SessionConfig,
torrents: Arc<RwLock<HashMap<InfoHash, TorrentHandle>>>,
#[expect(dead_code)]
dht_node: Option<Arc<DhtNode>>,
}
impl Session {
pub async fn new(config: SessionConfig) -> Result<Self, Error> {
let torrents: Arc<RwLock<HashMap<InfoHash, TorrentHandle>>> =
Arc::new(RwLock::new(HashMap::new()));
let node_id = config.node_id.unwrap_or_else(generate_node_id);
let dht_node = {
let bootstrap_nodes = config.bootstrap_nodes.as_deref().unwrap_or(&[]);
let v4 = bootstrap_nodes
.iter()
.map(|n| (n.host.as_str(), n.port))
.collect::<Vec<_>>();
let bootstrap_nodes_v6 = config.bootstrap_nodes_v6.as_deref().unwrap_or(&[]);
let v6 = bootstrap_nodes_v6
.iter()
.map(|n| (n.host.as_str(), n.port))
.collect::<Vec<_>>();
if v4.is_empty() && v6.is_empty() {
None
} else {
let bind_v4 = SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::UNSPECIFIED,
config.listen_port + 1,
));
let bind_v6 = SocketAddr::V6(SocketAddrV6::new(
Ipv6Addr::UNSPECIFIED,
config.listen_port + 2,
0,
0,
));
match DhtNode::new(node_id, bind_v4, bind_v6, &v4, &v6).await {
Ok(node) => {
spawn_dht_poll(node.clone(), torrents.clone(), config.dht_poll_interval);
Some(node)
}
Err(e) => {
tracing::warn!("DHT init failed, DHT disabled: {e}");
None
}
}
}
};
Ok(Session {
config,
torrents,
dht_node,
})
}
pub fn add_torrent(&self, spec: impl Into<TorrentSpec>) -> Result<TorrentBuilder<'_>, Error> {
let spec = spec.into();
let magnet_peers: Vec<SocketAddr> = if let TorrentSpec::Magnet(ref uri) = spec {
let peers = uri.peers.iter();
peers.filter_map(|p| SocketAddr::from_str(p).ok()).collect()
} else {
vec![]
};
let (meta, info_hash) = resolve_spec(spec.clone());
let metadata_resolved = matches!(spec, TorrentSpec::Metainfo(_));
let handle = TorrentHandle::register(meta, info_hash, &self.config);
self.torrents.write().unwrap().insert(info_hash, handle);
Ok(TorrentBuilder::new(
self,
info_hash,
metadata_resolved,
magnet_peers,
))
}
pub fn add_torrent_bytes(&self, data: &[u8]) -> Result<TorrentBuilder<'_>, Error> {
self.add_torrent(Metainfo::try_from(data)?)
}
pub fn add_magnet_str(&self, uri: impl AsRef<str>) -> Result<TorrentBuilder<'_>, Error> {
let magnet: MagnetUri = uri.as_ref().parse()?;
self.add_torrent(magnet)
}
pub(crate) fn config(&self) -> &SessionConfig {
&self.config
}
pub(crate) fn torrents(&self) -> &Arc<RwLock<HashMap<InfoHash, TorrentHandle>>> {
&self.torrents
}
pub async fn remove_torrent(&self, info_hash: &InfoHash) -> Result<(), Error> {
let handle = self.torrents.write().unwrap().remove(info_hash);
if let Some(mut h) = handle {
h.cancel().await;
}
Ok(())
}
pub async fn torrent_status(&self, info_hash: &InfoHash) -> Result<TorrentStatus, Error> {
let status = {
let torrents = self.torrents.read().unwrap();
match torrents.get(info_hash) {
Some(handle) => handle.status.clone(), None => {
return Err(Error::new(ErrorKind::InvalidInput));
}
}
};
Ok(status.read().await.clone())
}
pub fn active_torrents(&self) -> Vec<InfoHash> {
self.torrents.read().unwrap().keys().copied().collect()
}
}
fn resolve_spec(spec: TorrentSpec) -> (Metainfo, [u8; 20]) {
match spec {
TorrentSpec::Metainfo(meta) => {
let ih = meta.info_hash();
(meta, ih)
}
TorrentSpec::Magnet(uri) => {
let ih = *uri.primary_info_hash();
let name = uri.display_name.clone().unwrap_or_else(|| hex_encode(ih));
let announce = uri.trackers.first().cloned().unwrap_or_default();
let announce_list = if uri.trackers.len() > 1 {
vec![uri.trackers[1..].to_vec()]
} else {
vec![]
};
let meta = Metainfo {
announce,
announce_list,
info: Info {
piece_length: 0,
pieces: vec![],
mode: Mode::Single {
name,
length: uri.exact_length.unwrap_or(0),
},
raw_info: RawInfo::Hash(ih),
},
creation_date: None,
comment: None,
created_by: None,
encoding: None,
};
(meta, ih)
}
}
}
fn spawn_dht_poll(
dht: Arc<DhtNode>, torrents: Arc<RwLock<HashMap<InfoHash, TorrentHandle>>>,
poll_interval: Duration,
) {
tokio::spawn(async move {
loop {
tokio::time::sleep(poll_interval).await;
let info_hashes: Vec<InfoHash> = torrents.read().unwrap().keys().copied().collect();
for ih in info_hashes {
let peers = dht.get_peers(&ih).await;
if !peers.is_empty() {
let peer_mgr = torrents
.read()
.unwrap()
.get(&ih)
.map(|h| h.peer_mgr.clone());
if let Some(peer_mgr) = peer_mgr {
peer_mgr.write().await.add_peers(peers);
}
}
}
}
});
}