use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use dashmap::DashMap;
use tokio::sync::{broadcast, mpsc, oneshot};
use tracing::{debug, info, warn};
use irontide_core::{DEFAULT_CHUNK_SIZE, Id20, Lengths, Magnet, TorrentMetaV1};
use irontide_dht::DhtHandle;
use irontide_storage::TorrentStorage;
use crate::alert::{Alert, AlertCategory, AlertKind, AlertStream, post_alert};
use crate::settings::Settings;
use crate::torrent::TorrentHandle;
use crate::types::{
FileInfo, SessionStats, TorrentConfig, TorrentInfo, TorrentState, TorrentStats, TorrentSummary,
};
type SharedBucket = Arc<parking_lot::Mutex<crate::rate_limiter::TokenBucket>>;
type QueueMoveFn = fn(&mut [crate::queue::QueueEntry], Id20) -> Vec<(Id20, i32, i32)>;
pub(crate) type SharedBanManager = Arc<parking_lot::RwLock<crate::ban::BanManager>>;
pub(crate) type SharedIpFilter = Arc<parking_lot::RwLock<crate::ip_filter::IpFilter>>;
#[derive(Debug, Clone)]
pub struct ResumeLoadResult {
pub restored: usize,
pub skipped: usize,
pub failed: usize,
}
struct TorrentEntry {
handle: TorrentHandle,
meta: Option<TorrentMetaV1>,
queue_position: i32,
auto_managed: bool,
started_at: Option<tokio::time::Instant>,
prev_downloaded: u64,
prev_uploaded: u64,
}
impl TorrentEntry {
fn is_private(&self) -> bool {
self.meta
.as_ref()
.is_some_and(|m| m.info.private == Some(1))
}
}
enum SessionCommand {
AddTorrent {
meta: Box<irontide_core::TorrentMeta>,
storage: Option<Arc<dyn TorrentStorage>>,
download_dir: Option<PathBuf>,
reply: oneshot::Sender<crate::Result<Id20>>,
},
AddMagnet {
magnet: Magnet,
download_dir: Option<PathBuf>,
reply: oneshot::Sender<crate::Result<Id20>>,
},
RemoveTorrent {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<()>>,
},
PauseTorrent {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<()>>,
},
ResumeTorrent {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<()>>,
},
TorrentStats {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<TorrentStats>>,
},
TorrentInfo {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<TorrentInfo>>,
},
ListTorrents {
reply: oneshot::Sender<Vec<Id20>>,
},
SessionStats {
reply: oneshot::Sender<SessionStats>,
},
SaveTorrentResumeData {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<irontide_core::FastResumeData>>,
},
SaveSessionState {
reply: oneshot::Sender<crate::Result<crate::persistence::SessionState>>,
},
LoadResumeState {
reply: oneshot::Sender<crate::Result<ResumeLoadResult>>,
},
QueuePosition {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<i32>>,
},
SetQueuePosition {
info_hash: Id20,
pos: i32,
reply: oneshot::Sender<crate::Result<()>>,
},
QueuePositionUp {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<()>>,
},
QueuePositionDown {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<()>>,
},
QueuePositionTop {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<()>>,
},
QueuePositionBottom {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<()>>,
},
BanPeer {
ip: IpAddr,
reply: oneshot::Sender<()>,
},
UnbanPeer {
ip: IpAddr,
reply: oneshot::Sender<bool>,
},
BannedPeers {
reply: oneshot::Sender<Vec<IpAddr>>,
},
SetIpFilter {
filter: crate::ip_filter::IpFilter,
reply: oneshot::Sender<()>,
},
GetIpFilter {
reply: oneshot::Sender<crate::ip_filter::IpFilter>,
},
GetSettings {
reply: oneshot::Sender<Settings>,
},
ApplySettings {
settings: Box<Settings>,
reply: oneshot::Sender<crate::Result<()>>,
},
MoveTorrentStorage {
info_hash: Id20,
new_path: std::path::PathBuf,
reply: oneshot::Sender<crate::Result<()>>,
},
AddPeers {
info_hash: Id20,
peers: Vec<SocketAddr>,
source: crate::peer_state::PeerSource,
reply: oneshot::Sender<crate::Result<()>>,
},
OpenFile {
info_hash: Id20,
file_index: usize,
reply: oneshot::Sender<crate::Result<crate::streaming::FileStream>>,
},
ForceReannounce {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<()>>,
},
TrackerList {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<Vec<crate::tracker_manager::TrackerInfo>>>,
},
Scrape {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<Option<(String, irontide_tracker::ScrapeInfo)>>>,
},
SetFilePriority {
info_hash: Id20,
index: usize,
priority: irontide_core::FilePriority,
reply: oneshot::Sender<crate::Result<()>>,
},
FilePriorities {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<Vec<irontide_core::FilePriority>>>,
},
SetDownloadLimit {
info_hash: Id20,
bytes_per_sec: u64,
reply: oneshot::Sender<crate::Result<()>>,
},
SetUploadLimit {
info_hash: Id20,
bytes_per_sec: u64,
reply: oneshot::Sender<crate::Result<()>>,
},
DownloadLimit {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<u64>>,
},
UploadLimit {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<u64>>,
},
SetSequentialDownload {
info_hash: Id20,
enabled: bool,
reply: oneshot::Sender<crate::Result<()>>,
},
IsSequentialDownload {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<bool>>,
},
SetSuperSeeding {
info_hash: Id20,
enabled: bool,
reply: oneshot::Sender<crate::Result<()>>,
},
IsSuperSeeding {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<bool>>,
},
SetSeedMode {
info_hash: Id20,
enabled: bool,
reply: oneshot::Sender<crate::Result<()>>,
},
AddTracker {
info_hash: Id20,
url: String,
reply: oneshot::Sender<crate::Result<()>>,
},
ReplaceTrackers {
info_hash: Id20,
urls: Vec<String>,
reply: oneshot::Sender<crate::Result<()>>,
},
ForceRecheck {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<()>>,
},
RenameFile {
info_hash: Id20,
file_index: usize,
new_name: String,
reply: oneshot::Sender<crate::Result<()>>,
},
SetMaxConnections {
info_hash: Id20,
limit: usize,
reply: oneshot::Sender<crate::Result<()>>,
},
MaxConnections {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<usize>>,
},
SetMaxUploads {
info_hash: Id20,
limit: usize,
reply: oneshot::Sender<crate::Result<()>>,
},
MaxUploads {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<usize>>,
},
GetPeerInfo {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<Vec<crate::types::PeerInfo>>>,
},
GetDownloadQueue {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<Vec<crate::types::PartialPieceInfo>>>,
},
HavePiece {
info_hash: Id20,
index: u32,
reply: oneshot::Sender<crate::Result<bool>>,
},
PieceAvailability {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<Vec<u32>>>,
},
FileProgress {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<Vec<u64>>>,
},
InfoHashesQuery {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<irontide_core::InfoHashes>>,
},
TorrentFile {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<Option<irontide_core::TorrentMetaV1>>>,
},
TorrentFileV2 {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<Option<irontide_core::TorrentMetaV2>>>,
},
ForceDhtAnnounce {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<()>>,
},
ForceLsdAnnounce {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<()>>,
},
ReadPiece {
info_hash: Id20,
index: u32,
reply: oneshot::Sender<crate::Result<bytes::Bytes>>,
},
FlushCache {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<()>>,
},
IsValid {
info_hash: Id20,
reply: oneshot::Sender<bool>,
},
ClearError {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<()>>,
},
FileStatus {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<Vec<crate::types::FileStatus>>>,
},
Flags {
info_hash: Id20,
reply: oneshot::Sender<crate::Result<crate::types::TorrentFlags>>,
},
SetFlags {
info_hash: Id20,
flags: crate::types::TorrentFlags,
reply: oneshot::Sender<crate::Result<()>>,
},
UnsetFlags {
info_hash: Id20,
flags: crate::types::TorrentFlags,
reply: oneshot::Sender<crate::Result<()>>,
},
ConnectPeer {
info_hash: Id20,
addr: SocketAddr,
reply: oneshot::Sender<crate::Result<()>>,
},
DhtPutImmutable {
value: Vec<u8>,
reply: oneshot::Sender<crate::Result<Id20>>,
},
DhtGetImmutable {
target: Id20,
reply: oneshot::Sender<crate::Result<Option<Vec<u8>>>>,
},
DhtPutMutable {
keypair_bytes: [u8; 32],
value: Vec<u8>,
seq: i64,
salt: Vec<u8>,
reply: oneshot::Sender<crate::Result<Id20>>,
},
#[allow(clippy::type_complexity)]
DhtGetMutable {
public_key: [u8; 32],
salt: Vec<u8>,
reply: oneshot::Sender<crate::Result<Option<(Vec<u8>, i64)>>>,
},
SaveResumeState {
reply: oneshot::Sender<crate::Result<usize>>,
},
PostSessionStats,
Shutdown,
}
#[derive(Clone)]
pub struct SessionHandle {
cmd_tx: mpsc::Sender<SessionCommand>,
alert_tx: broadcast::Sender<Alert>,
alert_mask: Arc<AtomicU32>,
counters: Arc<crate::stats::SessionCounters>,
#[allow(dead_code)]
factory: Arc<crate::transport::NetworkFactory>,
}
impl SessionHandle {
pub async fn start(settings: Settings) -> crate::Result<Self> {
Self::start_with_plugins(settings, Arc::new(Vec::new())).await
}
pub async fn start_with_backend(
settings: Settings,
backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
) -> crate::Result<Self> {
Self::start_with_plugins_and_backend(settings, Arc::new(Vec::new()), backend).await
}
pub async fn start_with_plugins(
settings: Settings,
plugins: Arc<Vec<Box<dyn crate::extension::ExtensionPlugin>>>,
) -> crate::Result<Self> {
let disk_config = crate::disk::DiskConfig::from(&settings);
let backend = crate::disk_backend::create_backend_from_config(&disk_config);
Self::start_with_plugins_and_backend(settings, plugins, backend).await
}
pub async fn start_with_plugins_and_backend(
settings: Settings,
plugins: Arc<Vec<Box<dyn crate::extension::ExtensionPlugin>>>,
backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
) -> crate::Result<Self> {
Self::start_full(
settings,
plugins,
backend,
Arc::new(crate::transport::NetworkFactory::tokio()),
)
.await
}
pub async fn start_with_transport(
settings: Settings,
factory: Arc<crate::transport::NetworkFactory>,
) -> crate::Result<Self> {
let disk_config = crate::disk::DiskConfig::from(&settings);
let backend = crate::disk_backend::create_backend_from_config(&disk_config);
Self::start_full(settings, Arc::new(Vec::new()), backend, factory).await
}
pub async fn start_full(
settings: Settings,
plugins: Arc<Vec<Box<dyn crate::extension::ExtensionPlugin>>>,
backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
factory: Arc<crate::transport::NetworkFactory>,
) -> crate::Result<Self> {
let mut settings = settings;
if settings.force_proxy {
if settings.proxy.proxy_type == crate::proxy::ProxyType::None {
return Err(crate::Error::Config(
"force_proxy requires a proxy to be configured".into(),
));
}
settings.enable_upnp = false;
settings.enable_natpmp = false;
settings.enable_dht = false;
settings.enable_lsd = false;
}
if settings.anonymous_mode {
settings.enable_dht = false;
settings.enable_lsd = false;
settings.enable_upnp = false;
settings.enable_natpmp = false;
}
let (cmd_tx, cmd_rx) = mpsc::channel(256);
let (alert_tx, _) = broadcast::channel(settings.alert_channel_size);
let alert_mask = Arc::new(AtomicU32::new(settings.alert_mask.bits()));
let (lsd, lsd_peers_rx) = if settings.enable_lsd {
match crate::lsd::LsdHandle::start(settings.listen_port).await {
Ok((handle, rx)) => (Some(handle), Some(rx)),
Err(e) => {
warn!("LSD unavailable (port 6771): {e}");
(None, None)
}
}
} else {
(None, None)
};
let global_upload_bucket = Arc::new(parking_lot::Mutex::new(
crate::rate_limiter::TokenBucket::new(settings.upload_rate_limit),
));
let global_download_bucket = Arc::new(parking_lot::Mutex::new(
crate::rate_limiter::TokenBucket::new(settings.download_rate_limit),
));
let (utp_socket, utp_listener) = if settings.enable_utp {
match irontide_utp::UtpSocket::bind(settings.to_utp_config(settings.listen_port)).await
{
Ok((socket, listener)) => (Some(socket), Some(listener)),
Err(e) => {
warn!("uTP bind failed: {e}");
(None, None)
}
}
} else {
(None, None)
};
let (utp_socket_v6, utp_listener_v6) = if settings.enable_utp && settings.enable_ipv6 {
match irontide_utp::UtpSocket::bind(settings.to_utp_config_v6(settings.listen_port))
.await
{
Ok((socket, listener)) => (Some(socket), Some(listener)),
Err(e) => {
debug!("uTP IPv6 bind failed (non-fatal): {e}");
(None, None)
}
}
} else {
(None, None)
};
let (nat, nat_events_rx) = if settings.enable_upnp || settings.enable_natpmp {
let nat_config = settings.to_nat_config();
let (handle, events_rx) = irontide_nat::NatHandle::start(nat_config);
let udp_port = if settings.enable_utp {
Some(settings.listen_port)
} else {
None
};
handle.map_ports(settings.listen_port, udp_port).await;
(Some(handle), Some(events_rx))
} else {
(None, None)
};
let sam_session = if settings.enable_i2p {
let tunnel_config = settings.to_sam_tunnel_config();
match crate::i2p::SamSession::create(
&settings.i2p_hostname,
settings.i2p_port,
"torrent",
tunnel_config,
)
.await
{
Ok(session) => {
let b32 = session.destination().to_b32_address();
info!("I2P SAM session created: {}", b32);
post_alert(
&alert_tx,
&alert_mask,
AlertKind::I2pSessionCreated { b32_address: b32 },
);
Some(Arc::new(session))
}
Err(e) => {
warn!("I2P SAM session failed: {e}");
post_alert(
&alert_tx,
&alert_mask,
AlertKind::I2pError {
message: format!("SAM session creation failed: {e}"),
},
);
None
}
}
} else {
None
};
let ssl_manager = if settings.ssl_listen_port != 0 || settings.ssl_cert_path.is_some() {
match crate::ssl_manager::SslManager::new(&settings) {
Ok(mgr) => {
info!("SSL manager initialized");
Some(Arc::new(mgr))
}
Err(e) => {
warn!(error = %e, "SSL manager initialization failed");
None
}
}
} else {
None
};
let tcp_listener: Option<Box<dyn crate::transport::TransportListener>> = match factory
.bind_tcp(SocketAddr::from(([0, 0, 0, 0], settings.listen_port)))
.await
{
Ok(l) => {
info!(port = settings.listen_port, "TCP listener started");
Some(l)
}
Err(e) => {
warn!(port = settings.listen_port, error = %e, "TCP listener bind failed");
None
}
};
let ssl_listener: Option<Box<dyn crate::transport::TransportListener>> = if settings
.ssl_listen_port
!= 0
{
match factory
.bind_tcp(SocketAddr::from(([0, 0, 0, 0], settings.ssl_listen_port)))
.await
{
Ok(l) => {
info!(port = settings.ssl_listen_port, "SSL listener started");
Some(l)
}
Err(e) => {
warn!(port = settings.ssl_listen_port, error = %e, "SSL listener bind failed");
None
}
}
} else {
None
};
let (dht_v4, dht_v4_ip_rx) = if settings.enable_dht {
match DhtHandle::start(settings.to_dht_config()).await {
Ok((handle, ip_rx)) => {
info!("DHT v4 started");
(Some(handle), Some(ip_rx))
}
Err(e) => {
warn!("DHT v4 start failed: {e}");
(None, None)
}
}
} else {
(None, None)
};
let (dht_v6, dht_v6_ip_rx) = if settings.enable_dht && settings.enable_ipv6 {
match DhtHandle::start(settings.to_dht_config_v6()).await {
Ok((handle, ip_rx)) => {
info!("DHT v6 started");
(Some(handle), Some(ip_rx))
}
Err(e) => {
debug!("DHT v6 start failed (non-fatal): {e}");
(None, None)
}
}
} else {
(None, None)
};
let ban_config = crate::ban::BanConfig::from(&settings);
let ban_manager: SharedBanManager = Arc::new(parking_lot::RwLock::new(
crate::ban::BanManager::new(ban_config),
));
let ip_filter: SharedIpFilter =
Arc::new(parking_lot::RwLock::new(crate::ip_filter::IpFilter::new()));
let disk_config = crate::disk::DiskConfig::from(&settings);
let spawner = crate::blocking_spawner::BlockingSpawner::new(settings.max_blocking_threads);
let (disk_manager, disk_actor_handle) =
crate::disk::DiskManagerHandle::new_with_backend(disk_config, backend, spawner);
let counters = Arc::new(crate::stats::SessionCounters::new());
let hash_pool = std::sync::Arc::new(crate::hash_pool::HashPool::new(
settings.hashing_threads,
64,
));
let info_hash_registry = Arc::new(DashMap::new());
let (validated_tx, validated_conn_rx) = mpsc::channel(64);
let listener_task = crate::listener::ListenerTask::new(
tcp_listener,
utp_listener,
utp_listener_v6,
Arc::clone(&info_hash_registry),
validated_tx,
);
let _listener_task = tokio::spawn(listener_task.run());
let external_ip = settings.external_ip;
let actor = SessionActor {
settings,
torrents: HashMap::new(),
dht_v4,
dht_v6,
lsd,
lsd_peers_rx,
cmd_rx,
alert_tx: alert_tx.clone(),
alert_mask: Arc::clone(&alert_mask),
global_upload_bucket,
global_download_bucket,
utp_socket,
utp_socket_v6,
nat,
nat_events_rx,
ban_manager,
ip_filter,
disk_manager,
disk_actor_handle,
external_ip,
dht_v4_ip_rx,
dht_v6_ip_rx,
plugins,
sam_session,
ssl_manager,
ssl_listener,
validated_conn_rx,
info_hash_registry,
_listener_task,
counters: Arc::clone(&counters),
factory: Arc::clone(&factory),
hash_pool,
};
let join_handle = tokio::spawn(actor.run());
tokio::spawn(async move {
match join_handle.await {
Ok(()) => {
tracing::warn!("session actor exited cleanly");
}
Err(e) if e.is_panic() => {
let panic_payload = e.into_panic();
let msg = if let Some(s) = panic_payload.downcast_ref::<&str>() {
(*s).to_string()
} else if let Some(s) = panic_payload.downcast_ref::<String>() {
s.clone()
} else {
"unknown panic payload".to_string()
};
tracing::error!("session actor PANICKED: {msg}");
}
Err(e) => {
tracing::error!("session actor task error: {e}");
}
}
});
Ok(SessionHandle {
cmd_tx,
alert_tx,
alert_mask,
counters,
factory,
})
}
pub async fn add_torrent(
&self,
meta: irontide_core::TorrentMeta,
storage: Option<Arc<dyn TorrentStorage>>,
) -> crate::Result<Id20> {
self.add_torrent_with_dir(meta, storage, None).await
}
pub async fn add_torrent_with_dir(
&self,
meta: irontide_core::TorrentMeta,
storage: Option<Arc<dyn TorrentStorage>>,
download_dir: Option<PathBuf>,
) -> crate::Result<Id20> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::AddTorrent {
meta: Box::new(meta),
storage,
download_dir,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn add_magnet(&self, magnet: Magnet) -> crate::Result<Id20> {
self.add_magnet_with_dir(magnet, None).await
}
pub async fn add_magnet_with_dir(
&self,
magnet: Magnet,
download_dir: Option<PathBuf>,
) -> crate::Result<Id20> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::AddMagnet {
magnet,
download_dir,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn remove_torrent(&self, info_hash: Id20) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::RemoveTorrent {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn pause_torrent(&self, info_hash: Id20) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::PauseTorrent {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn resume_torrent(&self, info_hash: Id20) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::ResumeTorrent {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn torrent_stats(&self, info_hash: Id20) -> crate::Result<TorrentStats> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::TorrentStats {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn torrent_info(&self, info_hash: Id20) -> crate::Result<TorrentInfo> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::TorrentInfo {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn list_torrents(&self) -> crate::Result<Vec<Id20>> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::ListTorrents { reply: tx })
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)
}
pub async fn session_stats(&self) -> crate::Result<SessionStats> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::SessionStats { reply: tx })
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)
}
pub fn subscribe(&self) -> broadcast::Receiver<Alert> {
self.alert_tx.subscribe()
}
pub fn subscribe_filtered(&self, filter: AlertCategory) -> AlertStream {
AlertStream::new(self.alert_tx.subscribe(), filter)
}
pub async fn post_session_stats(&self) -> crate::Result<()> {
self.cmd_tx
.send(SessionCommand::PostSessionStats)
.await
.map_err(|_| crate::Error::Shutdown)
}
pub fn counters(&self) -> &Arc<crate::stats::SessionCounters> {
&self.counters
}
pub fn set_alert_mask(&self, mask: AlertCategory) {
self.alert_mask.store(mask.bits(), Ordering::Relaxed);
}
pub fn alert_mask(&self) -> AlertCategory {
AlertCategory::from_bits_truncate(self.alert_mask.load(Ordering::Relaxed))
}
pub async fn add_peers(
&self,
info_hash: Id20,
peers: Vec<SocketAddr>,
source: crate::peer_state::PeerSource,
) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::AddPeers {
info_hash,
peers,
source,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn shutdown(&self) -> crate::Result<()> {
let _ = tokio::time::timeout(
std::time::Duration::from_secs(10),
self.cmd_tx.send(SessionCommand::Shutdown),
)
.await;
Ok(())
}
pub async fn save_torrent_resume_data(
&self,
info_hash: Id20,
) -> crate::Result<irontide_core::FastResumeData> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::SaveTorrentResumeData {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn save_session_state(&self) -> crate::Result<crate::persistence::SessionState> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::SaveSessionState { reply: tx })
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn load_resume_state(&self) -> crate::Result<ResumeLoadResult> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::LoadResumeState { reply: tx })
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn save_resume_state(&self) -> crate::Result<usize> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::SaveResumeState { reply: tx })
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn queue_position(&self, info_hash: Id20) -> crate::Result<i32> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::QueuePosition {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn set_queue_position(&self, info_hash: Id20, pos: i32) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::SetQueuePosition {
info_hash,
pos,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn queue_position_up(&self, info_hash: Id20) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::QueuePositionUp {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn queue_position_down(&self, info_hash: Id20) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::QueuePositionDown {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn queue_position_top(&self, info_hash: Id20) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::QueuePositionTop {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn queue_position_bottom(&self, info_hash: Id20) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::QueuePositionBottom {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn ban_peer(&self, ip: IpAddr) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::BanPeer { ip, reply: tx })
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)
}
pub async fn unban_peer(&self, ip: IpAddr) -> crate::Result<bool> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::UnbanPeer { ip, reply: tx })
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)
}
pub async fn set_ip_filter(&self, filter: crate::ip_filter::IpFilter) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::SetIpFilter { filter, reply: tx })
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)
}
pub async fn ip_filter(&self) -> crate::Result<crate::ip_filter::IpFilter> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::GetIpFilter { reply: tx })
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)
}
pub async fn settings(&self) -> crate::Result<Settings> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::GetSettings { reply: tx })
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)
}
pub async fn apply_settings(&self, settings: Settings) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::ApplySettings {
settings: Box::new(settings),
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn banned_peers(&self) -> crate::Result<Vec<IpAddr>> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::BannedPeers { reply: tx })
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)
}
pub async fn move_torrent_storage(
&self,
info_hash: Id20,
new_path: std::path::PathBuf,
) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::MoveTorrentStorage {
info_hash,
new_path,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn open_file(
&self,
info_hash: Id20,
file_index: usize,
) -> crate::Result<crate::streaming::FileStream> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::OpenFile {
info_hash,
file_index,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn force_reannounce(&self, info_hash: Id20) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::ForceReannounce {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn tracker_list(
&self,
info_hash: Id20,
) -> crate::Result<Vec<crate::tracker_manager::TrackerInfo>> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::TrackerList {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn scrape(
&self,
info_hash: Id20,
) -> crate::Result<Option<(String, irontide_tracker::ScrapeInfo)>> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::Scrape {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn set_file_priority(
&self,
info_hash: Id20,
index: usize,
priority: irontide_core::FilePriority,
) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::SetFilePriority {
info_hash,
index,
priority,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn file_priorities(
&self,
info_hash: Id20,
) -> crate::Result<Vec<irontide_core::FilePriority>> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::FilePriorities {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn set_download_limit(
&self,
info_hash: Id20,
bytes_per_sec: u64,
) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::SetDownloadLimit {
info_hash,
bytes_per_sec,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn set_upload_limit(&self, info_hash: Id20, bytes_per_sec: u64) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::SetUploadLimit {
info_hash,
bytes_per_sec,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn download_limit(&self, info_hash: Id20) -> crate::Result<u64> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::DownloadLimit {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn upload_limit(&self, info_hash: Id20) -> crate::Result<u64> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::UploadLimit {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn set_sequential_download(
&self,
info_hash: Id20,
enabled: bool,
) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::SetSequentialDownload {
info_hash,
enabled,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn is_sequential_download(&self, info_hash: Id20) -> crate::Result<bool> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::IsSequentialDownload {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn set_super_seeding(&self, info_hash: Id20, enabled: bool) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::SetSuperSeeding {
info_hash,
enabled,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn is_super_seeding(&self, info_hash: Id20) -> crate::Result<bool> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::IsSuperSeeding {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn set_seed_mode(&self, info_hash: Id20, enabled: bool) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::SetSeedMode {
info_hash,
enabled,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn add_tracker(&self, info_hash: Id20, url: String) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::AddTracker {
info_hash,
url,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn replace_trackers(&self, info_hash: Id20, urls: Vec<String>) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::ReplaceTrackers {
info_hash,
urls,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn force_recheck(&self, info_hash: Id20) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::ForceRecheck {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn rename_file(
&self,
info_hash: Id20,
file_index: usize,
new_name: String,
) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::RenameFile {
info_hash,
file_index,
new_name,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn set_max_connections(&self, info_hash: Id20, limit: usize) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::SetMaxConnections {
info_hash,
limit,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn max_connections(&self, info_hash: Id20) -> crate::Result<usize> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::MaxConnections {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn set_max_uploads(&self, info_hash: Id20, limit: usize) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::SetMaxUploads {
info_hash,
limit,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn max_uploads(&self, info_hash: Id20) -> crate::Result<usize> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::MaxUploads {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn get_peer_info(
&self,
info_hash: Id20,
) -> crate::Result<Vec<crate::types::PeerInfo>> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::GetPeerInfo {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn get_download_queue(
&self,
info_hash: Id20,
) -> crate::Result<Vec<crate::types::PartialPieceInfo>> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::GetDownloadQueue {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn have_piece(&self, info_hash: Id20, index: u32) -> crate::Result<bool> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::HavePiece {
info_hash,
index,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn piece_availability(&self, info_hash: Id20) -> crate::Result<Vec<u32>> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::PieceAvailability {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn file_progress(&self, info_hash: Id20) -> crate::Result<Vec<u64>> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::FileProgress {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn info_hashes(&self, info_hash: Id20) -> crate::Result<irontide_core::InfoHashes> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::InfoHashesQuery {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn torrent_file(
&self,
info_hash: Id20,
) -> crate::Result<Option<irontide_core::TorrentMetaV1>> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::TorrentFile {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn torrent_file_v2(
&self,
info_hash: Id20,
) -> crate::Result<Option<irontide_core::TorrentMetaV2>> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::TorrentFileV2 {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn force_dht_announce(&self, info_hash: Id20) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::ForceDhtAnnounce {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn force_lsd_announce(&self, info_hash: Id20) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::ForceLsdAnnounce {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn read_piece(&self, info_hash: Id20, index: u32) -> crate::Result<bytes::Bytes> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::ReadPiece {
info_hash,
index,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn flush_cache(&self, info_hash: Id20) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::FlushCache {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn is_valid(&self, info_hash: Id20) -> bool {
let (tx, rx) = oneshot::channel();
if self
.cmd_tx
.send(SessionCommand::IsValid {
info_hash,
reply: tx,
})
.await
.is_err()
{
return false;
}
rx.await.unwrap_or(false)
}
pub async fn clear_error(&self, info_hash: Id20) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::ClearError {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn file_status(
&self,
info_hash: Id20,
) -> crate::Result<Vec<crate::types::FileStatus>> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::FileStatus {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn flags(&self, info_hash: Id20) -> crate::Result<crate::types::TorrentFlags> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::Flags {
info_hash,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn set_flags(
&self,
info_hash: Id20,
flags: crate::types::TorrentFlags,
) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::SetFlags {
info_hash,
flags,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn unset_flags(
&self,
info_hash: Id20,
flags: crate::types::TorrentFlags,
) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::UnsetFlags {
info_hash,
flags,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn connect_peer(&self, info_hash: Id20, addr: SocketAddr) -> crate::Result<()> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::ConnectPeer {
info_hash,
addr,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn dht_put_immutable(&self, value: Vec<u8>) -> crate::Result<Id20> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::DhtPutImmutable { value, reply: tx })
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn dht_get_immutable(&self, target: Id20) -> crate::Result<Option<Vec<u8>>> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::DhtGetImmutable { target, reply: tx })
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn dht_put_mutable(
&self,
keypair_bytes: [u8; 32],
value: Vec<u8>,
seq: i64,
salt: Vec<u8>,
) -> crate::Result<Id20> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::DhtPutMutable {
keypair_bytes,
value,
seq,
salt,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn dht_get_mutable(
&self,
public_key: [u8; 32],
salt: Vec<u8>,
) -> crate::Result<Option<(Vec<u8>, i64)>> {
let (tx, rx) = oneshot::channel();
self.cmd_tx
.send(SessionCommand::DhtGetMutable {
public_key,
salt,
reply: tx,
})
.await
.map_err(|_| crate::Error::Shutdown)?;
rx.await.map_err(|_| crate::Error::Shutdown)?
}
pub async fn list_torrent_summaries(&self) -> crate::Result<Vec<TorrentSummary>> {
let ids = self.list_torrents().await?;
let mut summaries = Vec::with_capacity(ids.len());
for id in ids {
if let Ok(stats) = self.torrent_stats(id).await {
summaries.push(TorrentSummary::from(&stats));
}
}
Ok(summaries)
}
pub async fn add_magnet_uri(&self, uri: &str) -> crate::Result<irontide_core::InfoHashes> {
let magnet = irontide_core::Magnet::parse(uri)?;
let info_hashes = magnet.info_hashes.clone();
self.add_magnet(magnet).await?;
Ok(info_hashes)
}
pub async fn add_torrent_bytes(
&self,
bytes: &[u8],
) -> crate::Result<irontide_core::InfoHashes> {
let meta = irontide_core::torrent_from_bytes_any(bytes)?;
let info_hashes = meta.info_hashes();
self.add_torrent(meta, None).await?;
Ok(info_hashes)
}
}
struct SessionActor {
settings: Settings,
torrents: HashMap<Id20, TorrentEntry>,
dht_v4: Option<DhtHandle>,
dht_v6: Option<DhtHandle>,
lsd: Option<crate::lsd::LsdHandle>,
lsd_peers_rx: Option<mpsc::Receiver<(Id20, SocketAddr)>>,
cmd_rx: mpsc::Receiver<SessionCommand>,
alert_tx: broadcast::Sender<Alert>,
alert_mask: Arc<AtomicU32>,
global_upload_bucket: SharedBucket,
global_download_bucket: SharedBucket,
utp_socket: Option<irontide_utp::UtpSocket>,
utp_socket_v6: Option<irontide_utp::UtpSocket>,
nat: Option<irontide_nat::NatHandle>,
nat_events_rx: Option<mpsc::Receiver<irontide_nat::NatEvent>>,
ban_manager: SharedBanManager,
ip_filter: SharedIpFilter,
disk_manager: crate::disk::DiskManagerHandle,
#[allow(dead_code)]
disk_actor_handle: tokio::task::JoinHandle<()>,
external_ip: Option<std::net::IpAddr>,
dht_v4_ip_rx: Option<mpsc::Receiver<std::net::IpAddr>>,
dht_v6_ip_rx: Option<mpsc::Receiver<std::net::IpAddr>>,
plugins: Arc<Vec<Box<dyn crate::extension::ExtensionPlugin>>>,
sam_session: Option<Arc<crate::i2p::SamSession>>,
ssl_manager: Option<Arc<crate::ssl_manager::SslManager>>,
ssl_listener: Option<Box<dyn crate::transport::TransportListener>>,
validated_conn_rx: mpsc::Receiver<crate::listener::IdentifiedConnection>,
info_hash_registry: Arc<DashMap<Id20, ()>>,
#[allow(dead_code)]
_listener_task: tokio::task::JoinHandle<()>,
counters: Arc<crate::stats::SessionCounters>,
factory: Arc<crate::transport::NetworkFactory>,
hash_pool: std::sync::Arc<crate::hash_pool::HashPool>,
}
impl SessionActor {
async fn run(mut self) {
let mut refill_interval = tokio::time::interval(std::time::Duration::from_millis(100));
refill_interval.tick().await;
let auto_manage_secs = self.settings.auto_manage_interval.max(1);
let mut auto_manage_interval =
tokio::time::interval(std::time::Duration::from_secs(auto_manage_secs));
auto_manage_interval.tick().await;
let stats_interval_ms = self.settings.stats_report_interval;
let mut stats_timer = if stats_interval_ms > 0 {
Some(tokio::time::interval(std::time::Duration::from_millis(
stats_interval_ms,
)))
} else {
None
};
if let Some(ref mut t) = stats_timer {
t.tick().await; }
let sample_interval_secs = self.settings.dht_sample_infohashes_interval;
let mut sample_timer = if sample_interval_secs > 0 {
Some(tokio::time::interval(std::time::Duration::from_secs(
sample_interval_secs,
)))
} else {
None
};
if let Some(ref mut t) = sample_timer {
t.tick().await; }
let mut resume_save_interval = if self.settings.save_resume_interval_secs > 0 {
Some(tokio::time::interval(std::time::Duration::from_secs(
self.settings.save_resume_interval_secs,
)))
} else {
None
};
if let Some(ref mut t) = resume_save_interval {
t.tick().await; }
{
let resume_dir = self.effective_resume_dir();
let resume_files = crate::resume_file::scan_resume_dir(&resume_dir);
if !resume_files.is_empty() {
match self.handle_load_resume_state().await {
Ok(result) => {
info!(
restored = result.restored,
skipped = result.skipped,
failed = result.failed,
"auto-restored torrents on startup"
);
}
Err(e) => {
warn!("auto-restore on startup failed: {e}");
}
}
let active_hashes: std::collections::HashSet<String> = self
.torrents
.keys()
.map(|h| hex::encode(h.as_bytes()))
.collect();
let current_files = crate::resume_file::scan_resume_dir(&resume_dir);
for path in ¤t_files {
if let Some(stem) = path.file_stem().and_then(|s| s.to_str())
&& !active_hashes.contains(stem)
{
if let Err(e) = std::fs::remove_file(path) {
warn!(path = %path.display(), "failed to remove orphan resume file: {e}");
} else {
debug!(path = %path.display(), "removed orphan resume file");
}
}
}
}
}
loop {
tokio::select! {
cmd = self.cmd_rx.recv() => {
match cmd {
Some(SessionCommand::AddTorrent {
meta,
storage,
download_dir,
reply,
}) => {
let result = self.handle_add_torrent(*meta, storage, download_dir).await;
let _ = reply.send(result);
}
Some(SessionCommand::AddMagnet { magnet, download_dir, reply }) => {
let result = self.handle_add_magnet(magnet, download_dir).await;
let _ = reply.send(result);
}
Some(SessionCommand::RemoveTorrent { info_hash, reply }) => {
let result = self.handle_remove_torrent(info_hash).await;
let _ = reply.send(result);
}
Some(SessionCommand::PauseTorrent { info_hash, reply }) => {
let result = self.handle_pause_torrent(info_hash).await;
let _ = reply.send(result);
}
Some(SessionCommand::ResumeTorrent { info_hash, reply }) => {
let result = self.handle_resume_torrent(info_hash).await;
let _ = reply.send(result);
}
Some(SessionCommand::TorrentStats { info_hash, reply }) => {
let result = self.handle_torrent_stats(info_hash).await;
let _ = reply.send(result);
}
Some(SessionCommand::TorrentInfo { info_hash, reply }) => {
let result = self.handle_torrent_info(info_hash);
let _ = reply.send(result);
}
Some(SessionCommand::ListTorrents { reply }) => {
let list: Vec<Id20> = self.torrents.keys().copied().collect();
let _ = reply.send(list);
}
Some(SessionCommand::SessionStats { reply }) => {
let stats = self.make_session_stats().await;
let _ = reply.send(stats);
}
Some(SessionCommand::SaveTorrentResumeData { info_hash, reply }) => {
let result = self.handle_save_torrent_resume(info_hash).await;
let _ = reply.send(result);
}
Some(SessionCommand::SaveSessionState { reply }) => {
let result = self.handle_save_session_state().await;
let _ = reply.send(result);
}
Some(SessionCommand::LoadResumeState { reply }) => {
let result = self.handle_load_resume_state().await;
let _ = reply.send(result);
}
Some(SessionCommand::QueuePosition { info_hash, reply }) => {
let result = match self.torrents.get(&info_hash) {
Some(entry) => Ok(entry.queue_position),
None => Err(crate::Error::TorrentNotFound(info_hash)),
};
let _ = reply.send(result);
}
Some(SessionCommand::SetQueuePosition { info_hash, pos, reply }) => {
let result = self.handle_set_queue_position(info_hash, pos);
let _ = reply.send(result);
}
Some(SessionCommand::QueuePositionUp { info_hash, reply }) => {
let result = self.handle_queue_move(info_hash, crate::queue::move_up);
let _ = reply.send(result);
}
Some(SessionCommand::QueuePositionDown { info_hash, reply }) => {
let result = self.handle_queue_move(info_hash, crate::queue::move_down);
let _ = reply.send(result);
}
Some(SessionCommand::QueuePositionTop { info_hash, reply }) => {
let result = self.handle_queue_move(info_hash, crate::queue::move_top);
let _ = reply.send(result);
}
Some(SessionCommand::QueuePositionBottom { info_hash, reply }) => {
let result = self.handle_queue_move(info_hash, crate::queue::move_bottom);
let _ = reply.send(result);
}
Some(SessionCommand::BanPeer { ip, reply }) => {
self.ban_manager.write().ban(ip);
let _ = reply.send(());
}
Some(SessionCommand::UnbanPeer { ip, reply }) => {
let was_banned = self.ban_manager.write().unban(&ip);
let _ = reply.send(was_banned);
}
Some(SessionCommand::BannedPeers { reply }) => {
let list: Vec<IpAddr> = self.ban_manager.read()
.banned_list().iter().copied().collect();
let _ = reply.send(list);
}
Some(SessionCommand::SetIpFilter { filter, reply }) => {
*self.ip_filter.write() = filter;
let _ = reply.send(());
}
Some(SessionCommand::GetIpFilter { reply }) => {
let filter = self.ip_filter.read().clone();
let _ = reply.send(filter);
}
Some(SessionCommand::GetSettings { reply }) => {
let _ = reply.send(self.settings.clone());
}
Some(SessionCommand::ApplySettings { settings, reply }) => {
let result = self.handle_apply_settings(*settings);
let _ = reply.send(result);
}
Some(SessionCommand::MoveTorrentStorage { info_hash, new_path, reply }) => {
let result = self.handle_move_torrent_storage(info_hash, new_path).await;
let _ = reply.send(result);
}
Some(SessionCommand::AddPeers { info_hash, peers, source, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.add_peers(peers, source).await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::OpenFile { info_hash, file_index, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.open_file(file_index).await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::ForceReannounce { info_hash, reply }) => {
let result = match self.torrents.get(&info_hash) {
Some(entry) => {
entry.handle.force_reannounce().await
}
None => Err(crate::Error::TorrentNotFound(info_hash)),
};
let _ = reply.send(result);
}
Some(SessionCommand::TrackerList { info_hash, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.tracker_list().await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::Scrape { info_hash, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.scrape().await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::SetFilePriority { info_hash, index, priority, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.set_file_priority(index, priority).await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::FilePriorities { info_hash, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.file_priorities().await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::SetDownloadLimit { info_hash, bytes_per_sec, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.set_download_limit(bytes_per_sec).await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::SetUploadLimit { info_hash, bytes_per_sec, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.set_upload_limit(bytes_per_sec).await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::DownloadLimit { info_hash, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.download_limit().await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::UploadLimit { info_hash, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.upload_limit().await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::SetSequentialDownload { info_hash, enabled, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.set_sequential_download(enabled).await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::IsSequentialDownload { info_hash, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.is_sequential_download().await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::SetSuperSeeding { info_hash, enabled, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.set_super_seeding(enabled).await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::IsSuperSeeding { info_hash, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.is_super_seeding().await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::SetSeedMode { info_hash, enabled, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.set_seed_mode(enabled).await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::AddTracker { info_hash, url, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.add_tracker(url).await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::ReplaceTrackers { info_hash, urls, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.replace_trackers(urls).await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::ForceRecheck { info_hash, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.force_recheck().await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::RenameFile { info_hash, file_index, new_name, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.rename_file(file_index, new_name).await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::SetMaxConnections { info_hash, limit, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.set_max_connections(limit).await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::MaxConnections { info_hash, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.max_connections().await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::SetMaxUploads { info_hash, limit, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.set_max_uploads(limit).await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::MaxUploads { info_hash, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.max_uploads().await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::GetPeerInfo { info_hash, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.get_peer_info().await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::GetDownloadQueue { info_hash, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.get_download_queue().await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::HavePiece { info_hash, index, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.have_piece(index).await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::PieceAvailability { info_hash, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.piece_availability().await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::FileProgress { info_hash, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.file_progress().await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::InfoHashesQuery { info_hash, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.info_hashes().await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::TorrentFile { info_hash, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.torrent_file().await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::TorrentFileV2 { info_hash, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.torrent_file_v2().await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::ForceDhtAnnounce { info_hash, reply }) => {
let result = match self.torrents.get(&info_hash) {
Some(entry) => {
entry.handle.force_dht_announce().await
}
None => Err(crate::Error::TorrentNotFound(info_hash)),
};
let _ = reply.send(result);
}
Some(SessionCommand::ForceLsdAnnounce { info_hash, reply }) => {
let result = match self.torrents.get(&info_hash) {
Some(entry) if entry.is_private() => {
Err(crate::Error::InvalidSettings(
"LSD disabled for private torrent".into(),
))
}
Some(_) => {
if let Some(ref lsd) = self.lsd {
lsd.announce(vec![info_hash]).await;
}
Ok(())
}
None => Err(crate::Error::TorrentNotFound(info_hash)),
};
let _ = reply.send(result);
}
Some(SessionCommand::ReadPiece { info_hash, index, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.read_piece(index).await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::FlushCache { info_hash, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.flush_cache().await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::IsValid { info_hash, reply }) => {
let valid = self.torrents.get(&info_hash)
.map(|e| e.handle.is_valid())
.unwrap_or(false);
let _ = reply.send(valid);
}
Some(SessionCommand::ClearError { info_hash, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.clear_error().await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::FileStatus { info_hash, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.file_status().await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::Flags { info_hash, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.flags().await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::SetFlags { info_hash, flags, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.set_flags(flags).await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::UnsetFlags { info_hash, flags, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.unset_flags(flags).await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::ConnectPeer { info_hash, addr, reply }) => {
let result = if let Some(entry) = self.torrents.get(&info_hash) {
entry.handle.connect_peer(addr).await
} else {
Err(crate::Error::TorrentNotFound(info_hash))
};
let _ = reply.send(result);
}
Some(SessionCommand::DhtPutImmutable { value, reply }) => {
let result = self.handle_dht_put_immutable(value).await;
let _ = reply.send(result);
}
Some(SessionCommand::DhtGetImmutable { target, reply }) => {
let result = self.handle_dht_get_immutable(target).await;
let _ = reply.send(result);
}
Some(SessionCommand::DhtPutMutable { keypair_bytes, value, seq, salt, reply }) => {
let result = self.handle_dht_put_mutable(keypair_bytes, value, seq, salt).await;
let _ = reply.send(result);
}
Some(SessionCommand::DhtGetMutable { public_key, salt, reply }) => {
let result = self.handle_dht_get_mutable(public_key, salt).await;
let _ = reply.send(result);
}
Some(SessionCommand::PostSessionStats) => {
self.fire_stats_alert();
}
Some(SessionCommand::SaveResumeState { reply }) => {
let count = self.save_dirty_resume_files().await;
let _ = reply.send(Ok(count));
}
Some(SessionCommand::Shutdown) | None => {
self.shutdown_all().await;
return;
}
}
}
result = async {
match &mut self.lsd_peers_rx {
Some(rx) => rx.recv().await,
None => std::future::pending().await,
}
} => {
if let Some((info_hash, peer_addr)) = result
&& let Some(entry) = self.torrents.get(&info_hash)
&& !entry.is_private() {
let _ = entry.handle.add_peers(vec![peer_addr], crate::peer_state::PeerSource::Lsd).await;
}
}
Some(conn) = self.validated_conn_rx.recv() => {
self.handle_identified_inbound(conn);
}
result = async {
if let Some(ref mut listener) = self.ssl_listener {
listener.accept().await
} else {
std::future::pending().await
}
} => {
if let Ok((stream, addr)) = result {
self.handle_ssl_incoming(stream, addr).await;
}
}
_ = refill_interval.tick() => {
let elapsed = std::time::Duration::from_millis(100);
self.global_upload_bucket.lock().refill(elapsed);
self.global_download_bucket.lock().refill(elapsed);
}
_ = auto_manage_interval.tick() => {
self.evaluate_queue().await;
}
event = recv_nat_event(&mut self.nat_events_rx) => {
match event {
irontide_nat::NatEvent::MappingSucceeded { port, protocol } => {
info!(port, %protocol, "port mapping succeeded");
post_alert(
&self.alert_tx,
&self.alert_mask,
AlertKind::PortMappingSucceeded { port, protocol },
);
}
irontide_nat::NatEvent::MappingFailed { port, message } => {
warn!(port, %message, "port mapping failed");
post_alert(
&self.alert_tx,
&self.alert_mask,
AlertKind::PortMappingFailed { port, message },
);
}
irontide_nat::NatEvent::ExternalIpDiscovered { ip } => {
info!(%ip, "external IP discovered via NAT traversal");
self.external_ip = Some(ip);
for entry in self.torrents.values() {
let _ = entry.handle.update_external_ip(ip).await;
}
if let Some(dht) = &self.dht_v4 {
let _ = dht.update_external_ip(ip, irontide_dht::IpVoteSource::Nat).await;
}
if let Some(dht) = &self.dht_v6 {
let _ = dht.update_external_ip(ip, irontide_dht::IpVoteSource::Nat).await;
}
}
}
}
Some(ip) = recv_dht_ip(&mut self.dht_v4_ip_rx) => {
info!(%ip, "external IP discovered via DHT v4 (BEP 42)");
self.external_ip = Some(ip);
for entry in self.torrents.values() {
let _ = entry.handle.update_external_ip(ip).await;
}
}
Some(ip) = recv_dht_ip(&mut self.dht_v6_ip_rx) => {
info!(%ip, "external IP discovered via DHT v6 (BEP 42)");
self.external_ip = Some(ip);
for entry in self.torrents.values() {
let _ = entry.handle.update_external_ip(ip).await;
}
}
_ = async {
match &mut stats_timer {
Some(t) => t.tick().await,
None => std::future::pending().await,
}
} => {
self.fire_stats_alert();
}
_ = async {
match &mut sample_timer {
Some(t) => t.tick().await,
None => std::future::pending().await,
}
} => {
self.fire_sample_infohashes().await;
}
_ = async {
match &mut resume_save_interval {
Some(t) => t.tick().await,
None => std::future::pending().await,
}
} => {
let count = self.save_dirty_resume_files().await;
if count > 0 {
info!(count, "periodic resume save completed");
}
}
}
}
}
fn global_buckets_if_limited(&self) -> (Option<SharedBucket>, Option<SharedBucket>) {
let up = if self.settings.upload_rate_limit > 0 {
Some(Arc::clone(&self.global_upload_bucket))
} else {
None
};
let down = if self.settings.download_rate_limit > 0 {
Some(Arc::clone(&self.global_download_bucket))
} else {
None
};
(up, down)
}
fn make_slot_tuner(&self) -> crate::slot_tuner::SlotTuner {
if self.settings.auto_upload_slots {
crate::slot_tuner::SlotTuner::new(
4, self.settings.auto_upload_slots_min,
self.settings.auto_upload_slots_max,
)
} else {
crate::slot_tuner::SlotTuner::disabled(4)
}
}
fn make_torrent_config(&self) -> TorrentConfig {
TorrentConfig::from(&self.settings)
}
fn next_queue_position(&self) -> i32 {
self.torrents
.values()
.filter(|e| e.auto_managed)
.map(|e| e.queue_position)
.max()
.map(|m| m + 1)
.unwrap_or(0)
}
async fn handle_add_torrent(
&mut self,
torrent_meta: irontide_core::TorrentMeta,
storage: Option<Arc<dyn TorrentStorage>>,
download_dir: Option<PathBuf>,
) -> crate::Result<Id20> {
let version = torrent_meta.version();
let meta_v2 = torrent_meta.as_v2().cloned();
let meta = match torrent_meta.as_v1() {
Some(v1) => v1.clone(),
None => {
let v2 = torrent_meta.as_v2().unwrap();
synthesize_v1_from_v2(v2)?
}
};
let info_hash = meta.info_hash;
if self.torrents.contains_key(&info_hash) {
return Err(crate::Error::DuplicateTorrent(info_hash));
}
if self.torrents.len() >= self.settings.max_torrents {
return Err(crate::Error::SessionAtCapacity(self.settings.max_torrents));
}
let mut torrent_config = self.make_torrent_config();
if let Some(dir) = download_dir {
torrent_config.download_dir = dir;
}
let storage: Arc<dyn TorrentStorage> = match storage {
Some(s) => s,
None => {
let lengths = Lengths::new(
meta.info.total_length(),
meta.info.piece_length,
DEFAULT_CHUNK_SIZE,
);
let files = meta.info.files();
let file_paths: Vec<PathBuf> = files
.iter()
.map(|f| f.path.iter().collect::<PathBuf>())
.collect();
let file_lengths: Vec<u64> = files.iter().map(|f| f.length).collect();
let prealloc_mode = torrent_config.preallocate_mode.unwrap_or_else(|| {
irontide_storage::PreallocateMode::from(
torrent_config.storage_mode == irontide_core::StorageMode::Full,
)
});
match irontide_storage::FilesystemStorage::new(
&torrent_config.download_dir,
file_paths,
file_lengths,
lengths.clone(),
None,
prealloc_mode,
torrent_config.filesystem_direct_io,
) {
Ok(s) => Arc::new(s),
Err(e) => {
warn!(
"failed to create filesystem storage: {e}, falling back to memory"
);
Arc::new(irontide_storage::MemoryStorage::new(lengths))
}
}
}
};
let disk_handle = self.disk_manager.register_torrent(info_hash, storage).await;
let (global_up, global_down) = self.global_buckets_if_limited();
let slot_tuner = self.make_slot_tuner();
let handle = TorrentHandle::from_torrent(
meta.clone(),
version,
meta_v2,
disk_handle,
self.disk_manager.clone(),
torrent_config,
self.dht_v4.clone(),
self.dht_v6.clone(),
global_up,
global_down,
slot_tuner,
self.alert_tx.clone(),
Arc::clone(&self.alert_mask),
self.utp_socket.clone(),
self.utp_socket_v6.clone(),
Arc::clone(&self.ban_manager),
Arc::clone(&self.ip_filter),
Arc::clone(&self.plugins),
self.sam_session.clone(),
self.ssl_manager.clone(),
Arc::clone(&self.factory),
Some(Arc::clone(&self.hash_pool)),
)
.await?;
let name = meta.info.name.clone();
self.torrents.insert(
info_hash,
TorrentEntry {
handle,
meta: Some(meta),
queue_position: -1,
auto_managed: true,
started_at: Some(tokio::time::Instant::now()),
prev_downloaded: 0,
prev_uploaded: 0,
},
);
self.info_hash_registry.insert(info_hash, ());
let pos = self.next_queue_position();
if let Some(entry) = self.torrents.get_mut(&info_hash)
&& entry.auto_managed
{
entry.queue_position = pos;
}
info!(%info_hash, "torrent added to session");
post_alert(
&self.alert_tx,
&self.alert_mask,
AlertKind::TorrentAdded { info_hash, name },
);
let is_private = self
.torrents
.get(&info_hash)
.is_some_and(|e| e.is_private());
if let Some(ref lsd) = self.lsd
&& !is_private
{
lsd.announce(vec![info_hash]).await;
}
Ok(info_hash)
}
async fn handle_add_magnet(
&mut self,
magnet: Magnet,
download_dir: Option<PathBuf>,
) -> crate::Result<Id20> {
let info_hash = magnet.info_hash();
let display_name = magnet.display_name.clone().unwrap_or_default();
if self.torrents.contains_key(&info_hash) {
return Err(crate::Error::DuplicateTorrent(info_hash));
}
if self.torrents.len() >= self.settings.max_torrents {
return Err(crate::Error::SessionAtCapacity(self.settings.max_torrents));
}
let mut config = self.make_torrent_config();
if let Some(dir) = download_dir {
config.download_dir = dir;
}
let (global_up, global_down) = self.global_buckets_if_limited();
let slot_tuner = self.make_slot_tuner();
let handle = TorrentHandle::from_magnet(
magnet,
self.disk_manager.clone(),
config,
self.dht_v4.clone(),
self.dht_v6.clone(),
global_up,
global_down,
slot_tuner,
self.alert_tx.clone(),
Arc::clone(&self.alert_mask),
self.utp_socket.clone(),
self.utp_socket_v6.clone(),
Arc::clone(&self.ban_manager),
Arc::clone(&self.ip_filter),
Arc::clone(&self.plugins),
self.sam_session.clone(),
self.ssl_manager.clone(),
Arc::clone(&self.factory),
Some(Arc::clone(&self.hash_pool)),
)
.await?;
self.spawn_metadata_resolver(info_hash, &handle);
self.torrents.insert(
info_hash,
TorrentEntry {
handle,
meta: None,
queue_position: -1,
auto_managed: true,
started_at: Some(tokio::time::Instant::now()),
prev_downloaded: 0,
prev_uploaded: 0,
},
);
self.info_hash_registry.insert(info_hash, ());
let pos = self.next_queue_position();
if let Some(entry) = self.torrents.get_mut(&info_hash)
&& entry.auto_managed
{
entry.queue_position = pos;
}
info!(%info_hash, "magnet torrent added to session");
post_alert(
&self.alert_tx,
&self.alert_mask,
AlertKind::TorrentAdded {
info_hash,
name: display_name,
},
);
if let Some(ref lsd) = self.lsd {
lsd.announce(vec![info_hash]).await;
}
Ok(info_hash)
}
fn spawn_metadata_resolver(&self, info_hash: Id20, torrent_handle: &TorrentHandle) {
let dht = match self.dht_v4 {
Some(ref dht) => dht.clone(),
None => return, };
let factory = Arc::clone(&self.factory);
let connect_timeout = std::time::Duration::from_secs(self.settings.peer_connect_timeout);
let handle = torrent_handle.clone();
tokio::spawn(async move {
let peer_rx = match dht.get_peers(info_hash).await {
Ok(rx) => rx,
Err(e) => {
debug!(
%info_hash,
"metadata resolver: failed to start DHT get_peers: {e}"
);
return;
}
};
let peer_id = irontide_core::PeerId::generate().0;
match crate::metadata_resolver::resolve_metadata(
info_hash,
peer_id,
peer_rx,
factory,
connect_timeout,
crate::metadata_resolver::DEFAULT_MAX_CONCURRENT,
)
.await
{
Ok((meta, peers)) => {
let info_bytes = if let Some(b) = meta.info_bytes {
b.to_vec()
} else {
match irontide_bencode::to_bytes(&meta.info) {
Ok(bytes) => bytes,
Err(e) => {
debug!(
%info_hash,
"metadata resolver: failed to re-encode info dict: {e}"
);
return;
}
}
};
debug!(
%info_hash,
num_peers = peers.len(),
"metadata resolver: pre-resolved metadata, sending to torrent actor"
);
handle.send_pre_resolved_metadata(info_bytes, peers);
}
Err(e) => {
debug!(
%info_hash,
"metadata resolver: failed to resolve metadata: {e}"
);
}
}
});
}
async fn handle_remove_torrent(&mut self, info_hash: Id20) -> crate::Result<()> {
let entry = self
.torrents
.remove(&info_hash)
.ok_or(crate::Error::TorrentNotFound(info_hash))?;
self.info_hash_registry.remove(&info_hash);
let was_auto_managed = entry.auto_managed;
let removed_position = entry.queue_position;
entry.handle.shutdown().await?;
self.disk_manager.unregister_torrent(info_hash).await;
if was_auto_managed && removed_position >= 0 {
let mut entries = self.queue_entries();
let changed = crate::queue::remove_position(&mut entries, removed_position);
self.apply_queue_changes(&changed);
}
let resume_dir = self.effective_resume_dir();
if let Err(e) = crate::resume_file::delete_resume_file(&resume_dir, &info_hash) {
if e.kind() != std::io::ErrorKind::NotFound {
warn!(%info_hash, "failed to delete resume file on removal: {e}");
}
}
info!(%info_hash, "torrent removed from session");
post_alert(
&self.alert_tx,
&self.alert_mask,
AlertKind::TorrentRemoved { info_hash },
);
Ok(())
}
async fn handle_pause_torrent(&mut self, info_hash: Id20) -> crate::Result<()> {
let entry = self
.torrents
.get(&info_hash)
.ok_or(crate::Error::TorrentNotFound(info_hash))?;
entry.handle.pause().await
}
async fn handle_resume_torrent(&mut self, info_hash: Id20) -> crate::Result<()> {
let entry = self
.torrents
.get(&info_hash)
.ok_or(crate::Error::TorrentNotFound(info_hash))?;
entry.handle.resume().await
}
async fn handle_move_torrent_storage(
&self,
info_hash: Id20,
new_path: std::path::PathBuf,
) -> crate::Result<()> {
let entry = self
.torrents
.get(&info_hash)
.ok_or(crate::Error::TorrentNotFound(info_hash))?;
entry.handle.move_storage(new_path).await
}
async fn handle_torrent_stats(&self, info_hash: Id20) -> crate::Result<TorrentStats> {
let entry = self
.torrents
.get(&info_hash)
.ok_or(crate::Error::TorrentNotFound(info_hash))?;
let mut stats = entry.handle.stats().await?;
stats.queue_position = entry.queue_position;
stats.auto_managed = entry.auto_managed;
Ok(stats)
}
fn handle_torrent_info(&self, info_hash: Id20) -> crate::Result<TorrentInfo> {
let entry = self
.torrents
.get(&info_hash)
.ok_or(crate::Error::TorrentNotFound(info_hash))?;
let meta = entry
.meta
.as_ref()
.ok_or(crate::Error::MetadataNotReady(info_hash))?;
let files: Vec<FileInfo> = if let Some(ref file_list) = meta.info.files {
file_list
.iter()
.map(|f| FileInfo {
path: f.path.iter().collect::<PathBuf>(),
length: f.length,
})
.collect()
} else {
vec![FileInfo {
path: PathBuf::from(&meta.info.name),
length: meta.info.total_length(),
}]
};
Ok(TorrentInfo {
info_hash,
name: meta.info.name.clone(),
total_length: meta.info.total_length(),
piece_length: meta.info.piece_length,
num_pieces: meta.info.num_pieces() as u32,
files,
private: meta.info.private == Some(1),
})
}
fn update_session_gauges(&self) {
use crate::stats::*;
let c = &self.counters;
c.set(SES_NUM_TORRENTS, self.torrents.len() as i64);
c.set(SES_ACTIVE_TORRENTS, self.torrents.len() as i64);
let dht_nodes = self.dht_v4.is_some() as i64 + self.dht_v6.is_some() as i64;
c.set(DHT_NODES, dht_nodes);
c.set(DHT_NODES_V4, self.dht_v4.is_some() as i64);
c.set(DHT_NODES_V6, self.dht_v6.is_some() as i64);
let ban_count = self.ban_manager.read().banned_list().len() as i64;
c.set(PEER_NUM_BANNED, ban_count);
}
fn fire_stats_alert(&self) {
self.update_session_gauges();
let values = self.counters.snapshot();
crate::alert::post_alert(
&self.alert_tx,
&self.alert_mask,
crate::alert::AlertKind::SessionStatsAlert { values },
);
}
async fn fire_sample_infohashes(&self) {
let dht = match (&self.dht_v4, &self.dht_v6) {
(Some(d), _) | (_, Some(d)) => d,
_ => return,
};
let mut buf = [0u8; 20];
irontide_core::random_bytes(&mut buf);
let target = Id20::from(buf);
match dht.sample_infohashes(target).await {
Ok(result) => {
post_alert(
&self.alert_tx,
&self.alert_mask,
AlertKind::DhtSampleInfohashes {
num_samples: result.samples.len(),
total_estimate: result.num,
},
);
}
Err(e) => {
debug!("sample_infohashes failed: {e}");
}
}
}
async fn make_session_stats(&self) -> SessionStats {
self.update_session_gauges();
let mut total_downloaded = 0u64;
let mut total_uploaded = 0u64;
for entry in self.torrents.values() {
if let Ok(stats) = entry.handle.stats().await {
total_downloaded += stats.downloaded;
total_uploaded += stats.uploaded;
}
}
SessionStats {
active_torrents: self.torrents.len(),
total_downloaded,
total_uploaded,
dht_nodes: self.dht_v4.is_some() as usize + self.dht_v6.is_some() as usize,
}
}
async fn handle_save_torrent_resume(
&self,
info_hash: Id20,
) -> crate::Result<irontide_core::FastResumeData> {
let entry = self
.torrents
.get(&info_hash)
.ok_or(crate::Error::TorrentNotFound(info_hash))?;
let mut resume = entry.handle.save_resume_data().await?;
resume.queue_position = entry.queue_position as i64;
resume.auto_managed = if entry.auto_managed { 1 } else { 0 };
Ok(resume)
}
async fn handle_save_session_state(&self) -> crate::Result<crate::persistence::SessionState> {
use crate::persistence::SessionState;
let mut torrents = Vec::new();
for (info_hash, entry) in &self.torrents {
match entry.handle.save_resume_data().await {
Ok(rd) => torrents.push(rd),
Err(e) => {
warn!(%info_hash, "failed to save resume data: {e}");
}
}
}
let (banned_peers, peer_strikes) = {
let ban_mgr = self.ban_manager.read();
let banned_peers: Vec<String> = ban_mgr
.banned_list()
.iter()
.map(|ip| ip.to_string())
.collect();
let peer_strikes: Vec<crate::persistence::PeerStrikeEntry> = ban_mgr
.strikes_map()
.iter()
.map(|(ip, &count)| crate::persistence::PeerStrikeEntry {
ip: ip.to_string(),
count: count as i64,
})
.collect();
(banned_peers, peer_strikes)
};
let mut dht_entries = Vec::new();
let mut dht_node_id = None;
if let Some(ref dht) = self.dht_v4 {
if let Ok(stats) = dht.stats().await {
dht_node_id = Some(stats.node_id.to_hex());
}
for (_id, addr) in dht.get_routing_nodes().await {
dht_entries.push(crate::persistence::DhtNodeEntry {
host: addr.ip().to_string(),
port: addr.port() as i64,
});
}
}
if let Some(ref dht) = self.dht_v6 {
for (_id, addr) in dht.get_routing_nodes().await {
dht_entries.push(crate::persistence::DhtNodeEntry {
host: addr.ip().to_string(),
port: addr.port() as i64,
});
}
}
Ok(SessionState {
dht_nodes: dht_entries,
dht_node_id,
torrents,
banned_peers,
peer_strikes,
})
}
fn effective_resume_dir(&self) -> PathBuf {
self.settings
.resume_data_dir
.clone()
.unwrap_or_else(crate::resume_file::default_resume_dir)
}
async fn handle_load_resume_state(&mut self) -> crate::Result<ResumeLoadResult> {
let resume_dir = self.effective_resume_dir();
let paths = crate::resume_file::scan_resume_dir(&resume_dir);
let mut restored = 0usize;
let mut skipped = 0usize;
let mut failed = 0usize;
for path in &paths {
let file_name = path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("<unknown>");
let bytes = match std::fs::read(path) {
Ok(b) => b,
Err(e) => {
warn!(file = %file_name, "failed to read resume file: {e}");
failed = failed.saturating_add(1);
continue;
}
};
let rd = match crate::resume_file::deserialize_resume(&bytes) {
Ok(rd) => rd,
Err(e) => {
warn!(file = %file_name, "failed to deserialize resume file: {e}");
failed = failed.saturating_add(1);
continue;
}
};
if let Some(meta) = crate::resume_file::reconstruct_torrent_meta(&rd) {
let info_hash = meta.info_hash;
let pieces = rd.pieces.clone();
let torrent_meta = irontide_core::TorrentMeta::V1(meta);
let restore_dir = if rd.save_path.is_empty() {
None
} else {
Some(PathBuf::from(&rd.save_path))
};
match self.handle_add_torrent(torrent_meta, None, restore_dir).await {
Ok(added_hash) => {
if !pieces.is_empty()
&& let Some(entry) = self.torrents.get(&added_hash)
&& let Err(e) = entry.handle.restore_resume_bitmap(pieces).await
{
warn!(
%info_hash,
"failed to restore piece bitmap, torrent will recheck: {e}"
);
}
info!(%info_hash, "restored torrent from resume file");
restored = restored.saturating_add(1);
}
Err(crate::Error::DuplicateTorrent(_)) => {
debug!(%info_hash, "skipped duplicate torrent from resume");
skipped = skipped.saturating_add(1);
}
Err(e) => {
warn!(%info_hash, "failed to add restored torrent: {e}");
failed = failed.saturating_add(1);
}
}
} else if let Some(magnet) = crate::resume_file::reconstruct_magnet(&rd) {
let info_hash = magnet.info_hash();
let restore_dir = if rd.save_path.is_empty() {
None
} else {
Some(PathBuf::from(&rd.save_path))
};
match self.handle_add_magnet(magnet, restore_dir).await {
Ok(_) => {
info!(%info_hash, "restored magnet from resume file");
restored = restored.saturating_add(1);
}
Err(crate::Error::DuplicateTorrent(_)) => {
debug!(%info_hash, "skipped duplicate magnet from resume");
skipped = skipped.saturating_add(1);
}
Err(e) => {
warn!(%info_hash, "failed to add restored magnet: {e}");
failed = failed.saturating_add(1);
}
}
} else {
warn!(file = %file_name, "resume file has no valid info dict and no valid info hash");
failed = failed.saturating_add(1);
}
}
info!(restored, skipped, failed, "resume state loaded");
Ok(ResumeLoadResult {
restored,
skipped,
failed,
})
}
async fn save_dirty_resume_files(&mut self) -> usize {
let resume_dir = self.effective_resume_dir();
if let Err(e) = std::fs::create_dir_all(resume_dir.join("torrents")) {
warn!("failed to create resume dir: {e}");
return 0;
}
let mut saved = 0usize;
let info_hashes: Vec<Id20> = self.torrents.keys().copied().collect();
for info_hash in &info_hashes {
let entry = match self.torrents.get(info_hash) {
Some(e) => e,
None => continue,
};
let needs_save = match entry.handle.stats().await {
Ok(stats) => stats.need_save_resume,
Err(_) => continue,
};
if !needs_save {
continue;
}
let rd = match entry.handle.save_resume_data().await {
Ok(rd) => rd,
Err(e) => {
warn!(%info_hash, "failed to build resume data: {e}");
continue;
}
};
let bytes = match crate::resume_file::serialize_resume(&rd) {
Ok(b) => b,
Err(e) => {
warn!(%info_hash, "failed to serialize resume data: {e}");
continue;
}
};
let path = crate::resume_file::resume_file_path(&resume_dir, info_hash);
if let Err(e) = crate::resume_file::atomic_write(&path, &bytes) {
warn!(%info_hash, "failed to write resume file: {e}");
continue;
}
if let Err(e) = entry.handle.clear_save_resume_flag().await {
warn!(%info_hash, "failed to clear save_resume flag: {e}");
}
saved = saved.saturating_add(1);
}
saved
}
fn handle_apply_settings(&mut self, new: Settings) -> crate::Result<()> {
new.validate()?;
if new.upload_rate_limit != self.settings.upload_rate_limit {
self.global_upload_bucket
.lock()
.set_rate(new.upload_rate_limit);
}
if new.download_rate_limit != self.settings.download_rate_limit {
self.global_download_bucket
.lock()
.set_rate(new.download_rate_limit);
}
if new.alert_mask != self.settings.alert_mask {
self.alert_mask
.store(new.alert_mask.bits(), Ordering::Relaxed);
}
self.settings = new;
post_alert(&self.alert_tx, &self.alert_mask, AlertKind::SettingsChanged);
Ok(())
}
fn queue_entries(&self) -> Vec<crate::queue::QueueEntry> {
self.torrents
.iter()
.filter(|(_, e)| e.auto_managed)
.map(|(&hash, e)| crate::queue::QueueEntry {
info_hash: hash,
position: e.queue_position,
})
.collect()
}
fn handle_set_queue_position(&mut self, info_hash: Id20, pos: i32) -> crate::Result<()> {
if !self.torrents.contains_key(&info_hash) {
return Err(crate::Error::TorrentNotFound(info_hash));
}
let mut entries = self.queue_entries();
let changed = crate::queue::set_position(&mut entries, info_hash, pos);
self.apply_queue_changes(&changed);
Ok(())
}
fn handle_queue_move(&mut self, info_hash: Id20, op: QueueMoveFn) -> crate::Result<()> {
if !self.torrents.contains_key(&info_hash) {
return Err(crate::Error::TorrentNotFound(info_hash));
}
let mut entries = self.queue_entries();
let changed = op(&mut entries, info_hash);
self.apply_queue_changes(&changed);
Ok(())
}
fn apply_queue_changes(&mut self, changed: &[(Id20, i32, i32)]) {
for &(hash, old_pos, new_pos) in changed {
if let Some(entry) = self.torrents.get_mut(&hash) {
entry.queue_position = new_pos;
}
crate::alert::post_alert(
&self.alert_tx,
&self.alert_mask,
crate::alert::AlertKind::TorrentQueuePositionChanged {
info_hash: hash,
old_pos,
new_pos,
},
);
}
}
async fn evaluate_queue(&mut self) {
let now = tokio::time::Instant::now();
let startup_duration = std::time::Duration::from_secs(self.settings.auto_manage_startup);
let auto_manage_secs = self.settings.auto_manage_interval.max(1);
let mut candidates = Vec::new();
let hashes: Vec<Id20> = self.torrents.keys().copied().collect();
for &info_hash in &hashes {
let (auto_managed, queue_position, started_at, prev_downloaded, prev_uploaded) = {
let entry = match self.torrents.get(&info_hash) {
Some(e) => e,
None => continue,
};
if !entry.auto_managed {
continue;
}
(
entry.auto_managed,
entry.queue_position,
entry.started_at,
entry.prev_downloaded,
entry.prev_uploaded,
)
};
let _ = auto_managed;
let stats = match self.torrents.get(&info_hash) {
Some(entry) => match entry.handle.stats().await {
Ok(s) => s,
Err(_) => continue,
},
None => continue,
};
let category = match stats.state {
TorrentState::Downloading
| TorrentState::FetchingMetadata
| TorrentState::Checking => crate::queue::QueueCategory::Downloading,
TorrentState::Seeding | TorrentState::Complete => {
crate::queue::QueueCategory::Seeding
}
TorrentState::Paused => {
if stats.pieces_have >= stats.pieces_total && stats.pieces_total > 0 {
crate::queue::QueueCategory::Seeding
} else {
crate::queue::QueueCategory::Downloading
}
}
TorrentState::Stopped | TorrentState::Sharing => continue,
};
let is_active = stats.state != TorrentState::Paused;
let download_rate = stats.downloaded.saturating_sub(prev_downloaded) / auto_manage_secs;
let upload_rate = stats.uploaded.saturating_sub(prev_uploaded) / auto_manage_secs;
let past_startup = started_at
.map(|t| now.duration_since(t) > startup_duration)
.unwrap_or(true);
let is_inactive = past_startup
&& match category {
crate::queue::QueueCategory::Downloading => {
download_rate < self.settings.inactive_down_rate
}
crate::queue::QueueCategory::Seeding => {
upload_rate < self.settings.inactive_up_rate
}
};
candidates.push(crate::queue::QueueCandidate {
info_hash,
position: queue_position,
category,
is_active,
is_inactive,
});
}
for &hash in &hashes {
if let Some(entry) = self.torrents.get(&hash)
&& let Ok(stats) = entry.handle.stats().await
&& let Some(entry) = self.torrents.get_mut(&hash)
{
entry.prev_downloaded = stats.downloaded;
entry.prev_uploaded = stats.uploaded;
}
}
let decision = crate::queue::evaluate(
&candidates,
self.settings.active_downloads,
self.settings.active_seeds,
self.settings.active_limit,
self.settings.dont_count_slow_torrents,
self.settings.auto_manage_prefer_seeds,
);
for hash in &decision.to_pause {
if let Some(entry) = self.torrents.get(hash) {
let _ = entry.handle.pause().await;
}
post_alert(
&self.alert_tx,
&self.alert_mask,
AlertKind::TorrentAutoManaged {
info_hash: *hash,
paused: true,
},
);
}
for hash in &decision.to_resume {
if let Some(entry) = self.torrents.get_mut(hash) {
let _ = entry.handle.resume().await;
entry.started_at = Some(tokio::time::Instant::now());
}
post_alert(
&self.alert_tx,
&self.alert_mask,
AlertKind::TorrentAutoManaged {
info_hash: *hash,
paused: false,
},
);
}
}
fn handle_identified_inbound(&self, conn: crate::listener::IdentifiedConnection) {
if let Some(entry) = self.torrents.get(&conn.info_hash) {
debug!(%conn.addr, %conn.info_hash, "routing validated inbound peer");
let handle = entry.handle.clone();
tokio::spawn(async move {
let _ = handle.send_incoming_peer(conn.stream, conn.addr).await;
});
} else {
debug!(%conn.addr, %conn.info_hash, "validated peer for removed torrent, dropping");
}
}
async fn handle_ssl_incoming(
&mut self,
stream: crate::transport::BoxedStream,
addr: std::net::SocketAddr,
) {
use tokio_rustls::LazyConfigAcceptor;
let acceptor = LazyConfigAcceptor::new(rustls::server::Acceptor::default(), stream);
let start_handshake = match acceptor.await {
Ok(sh) => sh,
Err(e) => {
debug!(%addr, error = %e, "SSL ClientHello read failed");
return;
}
};
let client_hello = start_handshake.client_hello();
let sni = match client_hello.server_name() {
Some(name) => name.to_string(),
None => {
debug!(%addr, "SSL connection missing SNI");
return;
}
};
let info_hash = match Id20::from_hex(&sni) {
Ok(h) => h,
Err(_) => {
debug!(%addr, sni = %sni, "SSL SNI is not a valid info hash");
return;
}
};
let torrent = match self.torrents.get(&info_hash) {
Some(t) => t,
None => {
debug!(%addr, %info_hash, "SSL connection for unknown torrent");
return;
}
};
let ssl_cert = match torrent.meta.as_ref().and_then(|m| m.ssl_cert.as_ref()) {
Some(cert) => cert.clone(),
None => {
debug!(%addr, %info_hash, "SSL connection for non-SSL torrent");
return;
}
};
let server_config = match self.ssl_manager.as_ref() {
Some(mgr) => match mgr.server_config(&ssl_cert) {
Ok(cfg) => cfg,
Err(e) => {
warn!(%addr, %info_hash, error = %e, "failed to build SSL server config");
return;
}
},
None => {
debug!(%addr, "SSL manager not initialized");
return;
}
};
let tls_stream = match start_handshake.into_stream(server_config).await {
Ok(s) => s,
Err(e) => {
warn!(%addr, %info_hash, error = %e, "SSL handshake failed");
post_alert(
&self.alert_tx,
&self.alert_mask,
AlertKind::SslTorrentError {
info_hash,
message: format!("inbound TLS handshake from {addr}: {e}"),
},
);
return;
}
};
let _ = torrent.handle.spawn_ssl_peer(addr, tls_stream).await;
}
async fn handle_dht_put_immutable(&self, value: Vec<u8>) -> crate::Result<Id20> {
let dht = self.dht_v4.as_ref().ok_or(crate::Error::DhtDisabled)?;
match dht.put_immutable(value.clone()).await {
Ok(target) => {
post_alert(
&self.alert_tx,
&self.alert_mask,
AlertKind::DhtPutComplete { target },
);
Ok(target)
}
Err(e) => {
let target = irontide_core::sha1(&value);
post_alert(
&self.alert_tx,
&self.alert_mask,
AlertKind::DhtItemError {
target,
message: e.to_string(),
},
);
Err(crate::Error::Dht(e))
}
}
}
async fn handle_dht_get_immutable(&self, target: Id20) -> crate::Result<Option<Vec<u8>>> {
let dht = self.dht_v4.as_ref().ok_or(crate::Error::DhtDisabled)?;
match dht.get_immutable(target).await {
Ok(value) => {
post_alert(
&self.alert_tx,
&self.alert_mask,
AlertKind::DhtGetResult {
target,
value: value.clone(),
},
);
Ok(value)
}
Err(e) => {
post_alert(
&self.alert_tx,
&self.alert_mask,
AlertKind::DhtItemError {
target,
message: e.to_string(),
},
);
Err(crate::Error::Dht(e))
}
}
}
async fn handle_dht_put_mutable(
&self,
keypair_bytes: [u8; 32],
value: Vec<u8>,
seq: i64,
salt: Vec<u8>,
) -> crate::Result<Id20> {
let dht = self.dht_v4.as_ref().ok_or(crate::Error::DhtDisabled)?;
match dht.put_mutable(keypair_bytes, value, seq, salt).await {
Ok(target) => {
post_alert(
&self.alert_tx,
&self.alert_mask,
AlertKind::DhtMutablePutComplete { target, seq },
);
Ok(target)
}
Err(e) => {
post_alert(
&self.alert_tx,
&self.alert_mask,
AlertKind::DhtItemError {
target: Id20::from([0u8; 20]),
message: e.to_string(),
},
);
Err(crate::Error::Dht(e))
}
}
}
async fn handle_dht_get_mutable(
&self,
public_key: [u8; 32],
salt: Vec<u8>,
) -> crate::Result<Option<(Vec<u8>, i64)>> {
let dht = self.dht_v4.as_ref().ok_or(crate::Error::DhtDisabled)?;
let target = irontide_dht::compute_mutable_target(&public_key, &salt);
match dht.get_mutable(public_key, salt).await {
Ok(result) => {
let (value, seq) = match &result {
Some((v, s)) => (Some(v.clone()), Some(*s)),
None => (None, None),
};
post_alert(
&self.alert_tx,
&self.alert_mask,
AlertKind::DhtMutableGetResult {
target,
value,
seq,
public_key,
},
);
Ok(result)
}
Err(e) => {
post_alert(
&self.alert_tx,
&self.alert_mask,
AlertKind::DhtItemError {
target,
message: e.to_string(),
},
);
Err(crate::Error::Dht(e))
}
}
}
async fn shutdown_all(&mut self) {
let save_count = self.save_dirty_resume_files().await;
if save_count > 0 {
info!(save_count, "saved resume files on shutdown");
}
for (info_hash, entry) in self.torrents.drain() {
debug!(%info_hash, "shutting down torrent");
let _ = entry.handle.shutdown().await;
}
if let Some(ref dht) = self.dht_v4 {
let _ = dht.shutdown().await;
}
if let Some(ref dht) = self.dht_v6 {
let _ = dht.shutdown().await;
}
if let Some(ref nat) = self.nat {
nat.shutdown().await;
}
if let Some(ref lsd) = self.lsd {
lsd.shutdown().await;
}
if let Some(ref socket) = self.utp_socket
&& let Err(e) = socket.shutdown().await
{
debug!(error = %e, "uTP socket shutdown error");
}
if let Some(ref socket) = self.utp_socket_v6
&& let Err(e) = socket.shutdown().await
{
debug!(error = %e, "uTP v6 socket shutdown error");
}
self.disk_manager.shutdown().await;
}
}
async fn recv_nat_event(
rx: &mut Option<mpsc::Receiver<irontide_nat::NatEvent>>,
) -> irontide_nat::NatEvent {
match rx {
Some(r) => match r.recv().await {
Some(event) => event,
None => std::future::pending().await,
},
None => std::future::pending().await,
}
}
async fn recv_dht_ip(
rx: &mut Option<mpsc::Receiver<std::net::IpAddr>>,
) -> Option<std::net::IpAddr> {
match rx {
Some(r) => r.recv().await,
None => std::future::pending().await,
}
}
fn synthesize_v1_from_v2(
v2: &irontide_core::TorrentMetaV2,
) -> crate::Result<irontide_core::TorrentMetaV1> {
use irontide_core::{FileEntry, InfoDict};
let info_hash = v2.info_hashes.best_v1();
let v2_files = v2.info.files();
let file_entries: Vec<FileEntry> = v2_files
.iter()
.map(|f| FileEntry {
length: f.attr.length,
path: f.path.clone(),
attr: None,
mtime: None,
symlink_path: None,
})
.collect();
let num_pieces = v2.info.num_pieces() as usize;
let pieces = vec![0u8; num_pieces * 20];
let info = InfoDict {
name: v2.info.name.clone(),
piece_length: v2.info.piece_length,
pieces,
length: if file_entries.len() == 1 {
Some(file_entries[0].length)
} else {
None
},
files: if file_entries.len() > 1 {
Some(file_entries)
} else {
None
},
private: None,
source: None,
ssl_cert: v2.ssl_cert.clone(),
similar: Vec::new(),
collections: Vec::new(),
};
Ok(irontide_core::TorrentMetaV1 {
info_hash,
announce: v2.announce.clone(),
announce_list: v2.announce_list.clone(),
comment: v2.comment.clone(),
created_by: v2.created_by.clone(),
creation_date: v2.creation_date,
info,
info_bytes: None,
url_list: Vec::new(),
httpseeds: Vec::new(),
ssl_cert: v2.ssl_cert.clone(),
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::TorrentState;
use irontide_core::{DEFAULT_CHUNK_SIZE, Lengths, torrent_from_bytes};
use irontide_storage::MemoryStorage;
use std::time::Duration;
fn make_test_torrent(data: &[u8], piece_length: u64) -> TorrentMetaV1 {
use serde::Serialize;
let mut pieces = Vec::new();
let mut offset = 0;
while offset < data.len() {
let end = (offset + piece_length as usize).min(data.len());
let hash = irontide_core::sha1(&data[offset..end]);
pieces.extend_from_slice(hash.as_bytes());
offset = end;
}
#[derive(Serialize)]
struct Info<'a> {
length: u64,
name: &'a str,
#[serde(rename = "piece length")]
piece_length: u64,
#[serde(with = "serde_bytes")]
pieces: &'a [u8],
}
#[derive(Serialize)]
struct Torrent<'a> {
info: Info<'a>,
}
let t = Torrent {
info: Info {
length: data.len() as u64,
name: "test",
piece_length,
pieces: &pieces,
},
};
let bytes = irontide_bencode::to_bytes(&t).unwrap();
torrent_from_bytes(&bytes).unwrap()
}
fn make_storage(data: &[u8], piece_length: u64) -> Arc<MemoryStorage> {
let lengths = Lengths::new(data.len() as u64, piece_length, DEFAULT_CHUNK_SIZE);
Arc::new(MemoryStorage::new(lengths))
}
fn test_settings() -> Settings {
Settings {
listen_port: 0,
download_dir: PathBuf::from("/tmp"),
max_torrents: 10,
enable_dht: false,
enable_pex: false,
enable_lsd: false,
enable_fast_extension: false,
enable_utp: false,
enable_upnp: false,
enable_natpmp: false,
enable_ipv6: false,
alert_channel_size: 64,
disk_io_threads: 2,
storage_mode: irontide_core::StorageMode::Sparse,
disk_cache_size: 1024 * 1024,
..Settings::default()
}
}
#[tokio::test]
async fn session_start_and_shutdown() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let stats = session.session_stats().await.unwrap();
assert_eq!(stats.active_torrents, 0);
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn add_and_list_torrent() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let expected_hash = meta.info_hash;
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
assert_eq!(info_hash, expected_hash);
let list = session.list_torrents().await.unwrap();
assert_eq!(list.len(), 1);
assert!(list.contains(&info_hash));
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn remove_torrent() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
session.remove_torrent(info_hash).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let list = session.list_torrents().await.unwrap();
assert!(list.is_empty());
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn duplicate_torrent_rejected() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage1 = make_storage(&data, 16384);
let storage2 = make_storage(&data, 16384);
session
.add_torrent(meta.clone().into(), Some(storage1))
.await
.unwrap();
let result = session.add_torrent(meta.into(), Some(storage2)).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("duplicate"));
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn session_at_capacity() {
let mut config = test_settings();
config.max_torrents = 1;
let session = SessionHandle::start(config).await.unwrap();
let data1 = vec![0xAA; 16384];
let meta1 = make_test_torrent(&data1, 16384);
let storage1 = make_storage(&data1, 16384);
session
.add_torrent(meta1.into(), Some(storage1))
.await
.unwrap();
let data2 = vec![0xBB; 16384];
let meta2 = make_test_torrent(&data2, 16384);
let storage2 = make_storage(&data2, 16384);
let result = session.add_torrent(meta2.into(), Some(storage2)).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("capacity"));
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn torrent_stats_via_session() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 32768];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
let stats = session.torrent_stats(info_hash).await.unwrap();
assert_eq!(stats.state, TorrentState::Downloading);
assert_eq!(stats.pieces_total, 2);
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn torrent_info_via_session() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 32768];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
let info = session.torrent_info(info_hash).await.unwrap();
assert_eq!(info.info_hash, info_hash);
assert_eq!(info.name, "test");
assert_eq!(info.total_length, 32768);
assert_eq!(info.num_pieces, 2);
assert!(!info.private);
assert_eq!(info.files.len(), 1);
assert_eq!(info.files[0].length, 32768);
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn pause_resume_via_session() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
session.pause_torrent(info_hash).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let stats = session.torrent_stats(info_hash).await.unwrap();
assert_eq!(stats.state, TorrentState::Paused);
session.resume_torrent(info_hash).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let stats = session.torrent_stats(info_hash).await.unwrap();
assert_eq!(stats.state, TorrentState::Downloading);
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn not_found_errors() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let fake_hash = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
assert!(session.torrent_stats(fake_hash).await.is_err());
assert!(session.torrent_info(fake_hash).await.is_err());
assert!(session.pause_torrent(fake_hash).await.is_err());
assert!(session.resume_torrent(fake_hash).await.is_err());
assert!(session.remove_torrent(fake_hash).await.is_err());
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn session_stats_aggregate() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data1 = vec![0xAA; 16384];
let meta1 = make_test_torrent(&data1, 16384);
let storage1 = make_storage(&data1, 16384);
session
.add_torrent(meta1.into(), Some(storage1))
.await
.unwrap();
let data2 = vec![0xBB; 16384];
let meta2 = make_test_torrent(&data2, 16384);
let storage2 = make_storage(&data2, 16384);
session
.add_torrent(meta2.into(), Some(storage2))
.await
.unwrap();
let stats = session.session_stats().await.unwrap();
assert_eq!(stats.active_torrents, 2);
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn add_magnet_and_list() {
use irontide_core::Magnet;
let session = SessionHandle::start(test_settings()).await.unwrap();
let magnet = Magnet {
info_hashes: irontide_core::InfoHashes::v1_only(
Id20::from_hex("aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d").unwrap(),
),
display_name: Some("test-magnet".into()),
trackers: vec![],
peers: vec![],
selected_files: None,
};
let expected_hash = magnet.info_hash();
let info_hash = session.add_magnet(magnet).await.unwrap();
assert_eq!(info_hash, expected_hash);
let list = session.list_torrents().await.unwrap();
assert_eq!(list.len(), 1);
assert!(list.contains(&info_hash));
let err = session.torrent_info(info_hash).await.unwrap_err();
assert!(err.to_string().contains("metadata not yet available"));
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn add_magnet_duplicate_rejected() {
use irontide_core::Magnet;
let session = SessionHandle::start(test_settings()).await.unwrap();
let magnet = Magnet {
info_hashes: irontide_core::InfoHashes::v1_only(
Id20::from_hex("aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d").unwrap(),
),
display_name: Some("test-magnet".into()),
trackers: vec![],
peers: vec![],
selected_files: None,
};
session.add_magnet(magnet.clone()).await.unwrap();
let result = session.add_magnet(magnet).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("duplicate"));
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn session_with_lsd_enabled() {
use irontide_core::Magnet;
let mut config = test_settings();
config.enable_lsd = true;
let session = SessionHandle::start(config).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
let magnet = Magnet {
info_hashes: irontide_core::InfoHashes::v1_only(
Id20::from_hex("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap(),
),
display_name: Some("lsd-test".into()),
trackers: vec![],
peers: vec![],
selected_files: None,
};
session.add_magnet(magnet).await.unwrap();
let list = session.list_torrents().await.unwrap();
assert_eq!(list.len(), 2);
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn add_v2_only_torrent() {
use irontide_bencode::BencodeValue;
use std::collections::BTreeMap;
let session = SessionHandle::start(test_settings()).await.unwrap();
let mut attr_map: BTreeMap<Vec<u8>, BencodeValue> = BTreeMap::new();
attr_map.insert(b"length".to_vec(), BencodeValue::Integer(16384));
let mut file_node: BTreeMap<Vec<u8>, BencodeValue> = BTreeMap::new();
file_node.insert(b"".to_vec(), BencodeValue::Dict(attr_map));
let mut ft_map: BTreeMap<Vec<u8>, BencodeValue> = BTreeMap::new();
ft_map.insert(b"test.dat".to_vec(), BencodeValue::Dict(file_node));
let mut info_map: BTreeMap<Vec<u8>, BencodeValue> = BTreeMap::new();
info_map.insert(b"file tree".to_vec(), BencodeValue::Dict(ft_map));
info_map.insert(b"meta version".to_vec(), BencodeValue::Integer(2));
info_map.insert(b"name".to_vec(), BencodeValue::Bytes(b"v2test".to_vec()));
info_map.insert(b"piece length".to_vec(), BencodeValue::Integer(16384));
let mut root_map: BTreeMap<Vec<u8>, BencodeValue> = BTreeMap::new();
root_map.insert(b"info".to_vec(), BencodeValue::Dict(info_map));
let bytes = irontide_bencode::to_bytes(&BencodeValue::Dict(root_map)).unwrap();
let meta = irontide_core::torrent_from_bytes_any(&bytes).unwrap();
assert!(meta.is_v2());
let info_hash = session.add_torrent(meta, None).await.unwrap();
let list = session.list_torrents().await.unwrap();
assert!(list.contains(&info_hash));
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn save_torrent_resume_data_via_session() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 32768];
let meta = make_test_torrent(&data, 16384);
let info_hash = meta.info_hash;
let storage = make_storage(&data, 16384);
session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
let rd = session.save_torrent_resume_data(info_hash).await.unwrap();
assert_eq!(rd.info_hash, info_hash.as_bytes().as_slice());
assert_eq!(rd.name, "test");
assert_eq!(rd.file_format, "libtorrent resume file");
assert_eq!(rd.file_version, 1);
assert!(!rd.pieces.is_empty());
assert_eq!(rd.paused, 0);
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn save_session_state_captures_all_torrents() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data1 = vec![0xAA; 16384];
let meta1 = make_test_torrent(&data1, 16384);
let storage1 = make_storage(&data1, 16384);
session
.add_torrent(meta1.into(), Some(storage1))
.await
.unwrap();
let data2 = vec![0xBB; 16384];
let meta2 = make_test_torrent(&data2, 16384);
let storage2 = make_storage(&data2, 16384);
session
.add_torrent(meta2.into(), Some(storage2))
.await
.unwrap();
let state = session.save_session_state().await.unwrap();
assert_eq!(state.torrents.len(), 2);
for rd in &state.torrents {
assert_eq!(rd.file_format, "libtorrent resume file");
assert_eq!(rd.info_hash.len(), 20);
}
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn save_resume_data_not_found() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let fake_hash = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
let result = session.save_torrent_resume_data(fake_hash).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("not found"));
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn subscribe_receives_torrent_added_alert() {
use crate::alert::AlertKind;
let session = SessionHandle::start(test_settings()).await.unwrap();
let mut alerts = session.subscribe();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let _info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
let alert = tokio::time::timeout(Duration::from_secs(2), alerts.recv())
.await
.unwrap()
.unwrap();
assert!(matches!(alert.kind, AlertKind::TorrentAdded { .. }));
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn subscribe_receives_torrent_removed_alert() {
use crate::alert::AlertKind;
use crate::types::TorrentState;
let session = SessionHandle::start(test_settings()).await.unwrap();
let mut alerts = session.subscribe();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
while let Ok(Ok(a)) = tokio::time::timeout(Duration::from_secs(1), alerts.recv()).await {
if matches!(
a.kind,
AlertKind::StateChanged {
new_state: TorrentState::Downloading,
..
}
) {
break;
}
}
session.remove_torrent(info_hash).await.unwrap();
loop {
let alert = tokio::time::timeout(Duration::from_secs(2), alerts.recv())
.await
.unwrap()
.unwrap();
if matches!(alert.kind, AlertKind::TorrentRemoved { .. }) {
break;
}
}
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn multiple_subscribers_each_receive_alerts() {
use crate::alert::AlertKind;
let session = SessionHandle::start(test_settings()).await.unwrap();
let mut sub1 = session.subscribe();
let mut sub2 = session.subscribe();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
let a1 = tokio::time::timeout(Duration::from_secs(2), sub1.recv())
.await
.unwrap()
.unwrap();
let a2 = tokio::time::timeout(Duration::from_secs(2), sub2.recv())
.await
.unwrap()
.unwrap();
assert!(matches!(a1.kind, AlertKind::TorrentAdded { .. }));
assert!(matches!(a2.kind, AlertKind::TorrentAdded { .. }));
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn set_alert_mask_filters_at_runtime() {
use crate::alert::{AlertCategory, AlertKind};
let session = SessionHandle::start(test_settings()).await.unwrap();
let mut alerts = session.subscribe();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
let alert = tokio::time::timeout(Duration::from_secs(2), alerts.recv())
.await
.unwrap()
.unwrap();
assert!(matches!(alert.kind, AlertKind::TorrentAdded { .. }));
while tokio::time::timeout(Duration::from_millis(200), alerts.recv())
.await
.is_ok()
{}
session.set_alert_mask(AlertCategory::empty());
let data2 = vec![0xBB; 16384];
let meta2 = make_test_torrent(&data2, 16384);
let storage2 = make_storage(&data2, 16384);
session
.add_torrent(meta2.into(), Some(storage2))
.await
.unwrap();
let result = tokio::time::timeout(Duration::from_millis(200), alerts.recv()).await;
assert!(result.is_err(), "should have timed out with empty mask");
session.set_alert_mask(AlertCategory::STATUS);
let data3 = vec![0xCC; 16384];
let meta3 = make_test_torrent(&data3, 16384);
let storage3 = make_storage(&data3, 16384);
session
.add_torrent(meta3.into(), Some(storage3))
.await
.unwrap();
let alert = tokio::time::timeout(Duration::from_secs(2), alerts.recv())
.await
.unwrap()
.unwrap();
assert!(matches!(alert.kind, AlertKind::TorrentAdded { .. }));
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn alert_stream_filters_per_subscriber() {
use crate::alert::{AlertCategory, AlertKind};
let session = SessionHandle::start(test_settings()).await.unwrap();
let mut status_sub = session.subscribe_filtered(AlertCategory::STATUS);
let mut peer_sub = session.subscribe_filtered(AlertCategory::PEER);
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
let alert = tokio::time::timeout(Duration::from_secs(2), status_sub.recv())
.await
.unwrap()
.unwrap();
assert!(matches!(alert.kind, AlertKind::TorrentAdded { .. }));
let result = tokio::time::timeout(Duration::from_millis(200), peer_sub.recv()).await;
assert!(
result.is_err(),
"PEER subscriber should not get STATUS alerts"
);
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn state_changed_tracks_transitions() {
use crate::alert::AlertKind;
let session = SessionHandle::start(test_settings()).await.unwrap();
let mut alerts = session.subscribe();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
let _ = tokio::time::timeout(Duration::from_secs(1), alerts.recv())
.await
.unwrap();
session.pause_torrent(info_hash).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let mut state_changes = Vec::new();
let mut paused_alerts = Vec::new();
loop {
match tokio::time::timeout(Duration::from_millis(200), alerts.recv()).await {
Ok(Ok(a)) => match &a.kind {
AlertKind::StateChanged {
prev_state,
new_state,
..
} => {
state_changes.push((*prev_state, *new_state));
}
AlertKind::TorrentPaused { .. } => {
paused_alerts.push(a);
}
_ => {} },
_ => break,
}
}
assert!(
state_changes.contains(&(TorrentState::Downloading, TorrentState::Paused)),
"expected Downloading→Paused, got: {state_changes:?}"
);
assert!(!paused_alerts.is_empty(), "expected TorrentPaused alert");
session.resume_torrent(info_hash).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let mut resume_state_changes = Vec::new();
let mut resumed_alerts = Vec::new();
loop {
match tokio::time::timeout(Duration::from_millis(200), alerts.recv()).await {
Ok(Ok(a)) => match &a.kind {
AlertKind::StateChanged {
prev_state,
new_state,
..
} => {
resume_state_changes.push((*prev_state, *new_state));
}
AlertKind::TorrentResumed { .. } => {
resumed_alerts.push(a);
}
_ => {}
},
_ => break,
}
}
assert!(
resume_state_changes.contains(&(TorrentState::Paused, TorrentState::Downloading)),
"expected Paused→Downloading, got: {resume_state_changes:?}"
);
assert!(!resumed_alerts.is_empty(), "expected TorrentResumed alert");
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn session_config_creates_utp_socket() {
let mut config = test_settings();
config.enable_utp = true;
let session = SessionHandle::start(config).await.unwrap();
let stats = session.session_stats().await.unwrap();
assert_eq!(stats.active_torrents, 0);
session.shutdown().await.unwrap();
}
#[test]
fn settings_nat_defaults() {
let s = Settings::default();
assert!(s.enable_upnp, "enable_upnp should default to true");
assert!(s.enable_natpmp, "enable_natpmp should default to true");
}
#[tokio::test]
async fn session_with_nat_disabled() {
let config = test_settings();
assert!(!config.enable_upnp);
assert!(!config.enable_natpmp);
let session = SessionHandle::start(config).await.unwrap();
let stats = session.session_stats().await.unwrap();
assert_eq!(stats.active_torrents, 0);
session.shutdown().await.unwrap();
}
#[test]
fn anonymous_mode_disables_discovery() {
let mut config = test_settings();
config.anonymous_mode = true;
config.enable_dht = true;
config.enable_lsd = true;
config.enable_upnp = true;
config.enable_natpmp = true;
if config.anonymous_mode {
config.enable_dht = false;
config.enable_lsd = false;
config.enable_upnp = false;
config.enable_natpmp = false;
}
assert!(!config.enable_dht);
assert!(!config.enable_lsd);
assert!(!config.enable_upnp);
assert!(!config.enable_natpmp);
}
#[tokio::test]
async fn anonymous_mode_session_starts_with_discovery_disabled() {
let mut config = test_settings();
config.anonymous_mode = true;
config.enable_dht = true;
config.enable_lsd = true;
let session = SessionHandle::start(config).await.unwrap();
let stats = session.session_stats().await.unwrap();
assert_eq!(stats.active_torrents, 0);
session.shutdown().await.unwrap();
}
#[test]
fn force_proxy_requires_proxy_configured() {
let mut config = test_settings();
config.force_proxy = true;
config.proxy = crate::proxy::ProxyConfig::default();
assert_eq!(config.proxy.proxy_type, crate::proxy::ProxyType::None);
assert!(config.force_proxy);
}
#[tokio::test]
async fn force_proxy_errors_without_proxy() {
let mut config = test_settings();
config.force_proxy = true;
let result = SessionHandle::start(config).await;
assert!(result.is_err());
match result {
Err(e) => assert!(
e.to_string().contains("force_proxy"),
"error should mention force_proxy: {e}"
),
Ok(_) => panic!("expected error"),
}
}
#[test]
fn force_proxy_disables_features() {
let mut config = test_settings();
config.force_proxy = true;
config.proxy = crate::proxy::ProxyConfig {
proxy_type: crate::proxy::ProxyType::Socks5,
hostname: "proxy.example.com".into(),
port: 1080,
..Default::default()
};
config.enable_dht = true;
config.enable_lsd = true;
config.enable_upnp = true;
config.enable_natpmp = true;
if config.force_proxy {
config.enable_upnp = false;
config.enable_natpmp = false;
config.enable_dht = false;
config.enable_lsd = false;
}
assert!(!config.enable_dht);
assert!(!config.enable_lsd);
assert!(!config.enable_upnp);
assert!(!config.enable_natpmp);
}
#[test]
fn proxy_config_round_trip() {
let s = Settings {
proxy: crate::proxy::ProxyConfig {
proxy_type: crate::proxy::ProxyType::Socks5Password,
hostname: "localhost".into(),
port: 9050,
username: Some("user".into()),
password: Some("pass".into()),
..Default::default()
},
force_proxy: true,
anonymous_mode: true,
..test_settings()
};
assert_eq!(s.proxy.proxy_type, crate::proxy::ProxyType::Socks5Password);
assert_eq!(s.proxy.hostname, "localhost");
assert_eq!(s.proxy.port, 9050);
assert!(s.force_proxy);
assert!(s.anonymous_mode);
assert_eq!(s.proxy.to_url(), "socks5://user:pass@localhost:9050");
}
#[tokio::test]
async fn apply_settings_runtime() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let original = session.settings().await.unwrap();
assert_eq!(original.max_torrents, 10);
let mut new = original.clone();
new.max_torrents = 200;
new.upload_rate_limit = 1_000_000;
session.apply_settings(new).await.unwrap();
let updated = session.settings().await.unwrap();
assert_eq!(updated.max_torrents, 200);
assert_eq!(updated.upload_rate_limit, 1_000_000);
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn apply_settings_validation_error() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let mut bad = Settings::default();
bad.force_proxy = true;
let result = session.apply_settings(bad).await;
assert!(result.is_err());
let current = session.settings().await.unwrap();
assert!(!current.force_proxy);
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn session_stats_counters_accessible() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let counters = session.counters();
assert!(counters.uptime_secs() >= 0);
assert_eq!(counters.len(), crate::stats::NUM_METRICS);
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn post_session_stats_fires_alert() {
use crate::alert::{AlertCategory, AlertKind};
let session = SessionHandle::start(test_settings()).await.unwrap();
let mut stats_sub = session.subscribe_filtered(AlertCategory::STATS);
session.post_session_stats().await.unwrap();
let alert = tokio::time::timeout(Duration::from_secs(2), stats_sub.recv())
.await
.expect("timed out waiting for SessionStatsAlert")
.expect("recv error");
assert!(
matches!(alert.kind, AlertKind::SessionStatsAlert { ref values } if values.len() == crate::stats::NUM_METRICS),
"expected SessionStatsAlert with {} values, got {:?}",
crate::stats::NUM_METRICS,
alert.kind,
);
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn session_stats_include_torrent_count() {
use crate::alert::{AlertCategory, AlertKind};
let session = SessionHandle::start(test_settings()).await.unwrap();
let mut stats_sub = session.subscribe_filtered(AlertCategory::STATS);
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
session.post_session_stats().await.unwrap();
let alert = tokio::time::timeout(Duration::from_secs(2), stats_sub.recv())
.await
.expect("timed out waiting for SessionStatsAlert")
.expect("recv error");
match alert.kind {
AlertKind::SessionStatsAlert { values } => {
assert!(
values[crate::stats::SES_NUM_TORRENTS] > 0,
"SES_NUM_TORRENTS should be > 0 after adding a torrent, got {}",
values[crate::stats::SES_NUM_TORRENTS],
);
}
other => panic!("expected SessionStatsAlert, got {other:?}"),
}
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn stats_timer_disabled_when_zero() {
use crate::alert::{AlertCategory, AlertKind};
let mut config = test_settings();
config.stats_report_interval = 0;
let session = SessionHandle::start(config).await.unwrap();
let mut stats_sub = session.subscribe_filtered(AlertCategory::STATS);
let result = tokio::time::timeout(Duration::from_millis(200), stats_sub.recv()).await;
assert!(
result.is_err(),
"no SessionStatsAlert should fire when stats_report_interval is 0"
);
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn sample_infohashes_timer_disabled_when_zero() {
use crate::alert::{AlertCategory, AlertKind};
let mut config = test_settings();
config.dht_sample_infohashes_interval = 0;
let session = SessionHandle::start(config).await.unwrap();
let mut dht_sub = session.subscribe_filtered(AlertCategory::DHT);
let result = tokio::time::timeout(Duration::from_millis(200), dht_sub.recv()).await;
assert!(
result.is_err(),
"no DhtSampleInfohashes alert should fire when interval is 0"
);
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn open_file_not_found() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let fake_hash = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
let result = session.open_file(fake_hash, 0).await;
assert!(result.is_err());
let err = result.err().unwrap();
assert!(err.to_string().contains("not found"));
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn open_file_routes_to_torrent() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 32768];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
let stream = session.open_file(info_hash, 0).await;
assert!(stream.is_ok(), "open_file should succeed for file_index 0");
let result = session.open_file(info_hash, 999).await;
assert!(
result.is_err(),
"open_file should fail for invalid file_index"
);
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn session_force_reannounce() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
let result = session.force_reannounce(info_hash).await;
assert!(
result.is_ok(),
"force_reannounce should succeed: {result:?}"
);
let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
assert!(session.force_reannounce(fake).await.is_err());
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn session_tracker_list() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
let trackers = session.tracker_list(info_hash).await.unwrap();
assert!(trackers.is_empty(), "test torrent has no trackers");
let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
assert!(session.tracker_list(fake).await.is_err());
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn session_scrape() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
let scrape = session.scrape(info_hash).await.unwrap();
assert!(scrape.is_none(), "test torrent has no trackers to scrape");
let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
assert!(session.scrape(fake).await.is_err());
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn session_set_file_priority() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
let result = session
.set_file_priority(info_hash, 0, irontide_core::FilePriority::Normal)
.await;
assert!(
result.is_ok(),
"set_file_priority should succeed: {result:?}"
);
let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
assert!(
session
.set_file_priority(fake, 0, irontide_core::FilePriority::Normal)
.await
.is_err()
);
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn session_file_priorities() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
let priorities = session.file_priorities(info_hash).await.unwrap();
assert_eq!(
priorities.len(),
1,
"single-file torrent should have 1 file priority"
);
assert_eq!(priorities[0], irontide_core::FilePriority::Normal);
let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
assert!(session.file_priorities(fake).await.is_err());
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn set_download_limit_zero_means_unlimited() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
session.set_download_limit(info_hash, 50_000).await.unwrap();
session.set_download_limit(info_hash, 0).await.unwrap();
let limit = session.download_limit(info_hash).await.unwrap();
assert_eq!(limit, 0, "0 means unlimited");
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn set_upload_limit_persists() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
session.set_upload_limit(info_hash, 100_000).await.unwrap();
let limit = session.upload_limit(info_hash).await.unwrap();
assert_eq!(limit, 100_000);
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn download_limit_default_is_zero() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
let limit = session.download_limit(info_hash).await.unwrap();
assert_eq!(limit, 0, "default download limit should be 0 (unlimited)");
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn rate_limit_round_trip() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
session
.set_download_limit(info_hash, 1_000_000)
.await
.unwrap();
session.set_upload_limit(info_hash, 500_000).await.unwrap();
let dl = session.download_limit(info_hash).await.unwrap();
let ul = session.upload_limit(info_hash).await.unwrap();
assert_eq!(dl, 1_000_000);
assert_eq!(ul, 500_000);
session
.set_download_limit(info_hash, 2_000_000)
.await
.unwrap();
let dl = session.download_limit(info_hash).await.unwrap();
assert_eq!(dl, 2_000_000);
let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
assert!(session.download_limit(fake).await.is_err());
assert!(session.upload_limit(fake).await.is_err());
assert!(session.set_download_limit(fake, 100).await.is_err());
assert!(session.set_upload_limit(fake, 100).await.is_err());
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn sequential_download_toggle() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
session
.set_sequential_download(info_hash, true)
.await
.unwrap();
assert!(session.is_sequential_download(info_hash).await.unwrap());
session
.set_sequential_download(info_hash, false)
.await
.unwrap();
assert!(!session.is_sequential_download(info_hash).await.unwrap());
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn super_seeding_toggle() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
session.set_super_seeding(info_hash, true).await.unwrap();
assert!(session.is_super_seeding(info_hash).await.unwrap());
session.set_super_seeding(info_hash, false).await.unwrap();
assert!(!session.is_super_seeding(info_hash).await.unwrap());
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn sequential_download_default_false() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
assert!(!session.is_sequential_download(info_hash).await.unwrap());
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn super_seeding_default_false() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
assert!(!session.is_super_seeding(info_hash).await.unwrap());
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn seed_mode_flips_user_flag() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
let stats_before = session.torrent_stats(info_hash).await.unwrap();
assert!(
!stats_before.user_seed_mode,
"new torrent should not start in user seed mode"
);
session.set_seed_mode(info_hash, true).await.unwrap();
let stats_on = session.torrent_stats(info_hash).await.unwrap();
assert!(
stats_on.user_seed_mode,
"stats should reflect user_seed_mode=true after enabling"
);
session.set_seed_mode(info_hash, false).await.unwrap();
let stats_off = session.torrent_stats(info_hash).await.unwrap();
assert!(
!stats_off.user_seed_mode,
"stats should reflect user_seed_mode=false after disabling"
);
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn seed_mode_round_trip() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
for (i, enabled) in [true, false, true, true, false].iter().enumerate() {
session.set_seed_mode(info_hash, *enabled).await.unwrap();
let stats = session.torrent_stats(info_hash).await.unwrap();
assert_eq!(
stats.user_seed_mode, *enabled,
"iteration {i}: stats.user_seed_mode should track the toggle"
);
}
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn seed_mode_missing_info_hash_errors() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let fake =
irontide_core::Id20::from_hex("ffffffffffffffffffffffffffffffffffffffff").unwrap();
let err = session
.set_seed_mode(fake, true)
.await
.expect_err("set_seed_mode on unknown info hash must return an error");
match err {
crate::Error::TorrentNotFound(h) => assert_eq!(h, fake),
other => panic!("expected TorrentNotFound, got {other:?}"),
}
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn seed_mode_idempotent() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
session.set_seed_mode(info_hash, true).await.unwrap();
session.set_seed_mode(info_hash, true).await.unwrap();
assert!(
session
.torrent_stats(info_hash)
.await
.unwrap()
.user_seed_mode
);
session.set_seed_mode(info_hash, false).await.unwrap();
session.set_seed_mode(info_hash, false).await.unwrap();
assert!(
!session
.torrent_stats(info_hash)
.await
.unwrap()
.user_seed_mode
);
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn add_tracker_increases_count() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
let before = session.tracker_list(info_hash).await.unwrap();
assert!(before.is_empty());
session
.add_tracker(info_hash, "udp://tracker.example.com:6969/announce".into())
.await
.unwrap();
let after = session.tracker_list(info_hash).await.unwrap();
assert_eq!(after.len(), 1);
assert_eq!(after[0].url, "udp://tracker.example.com:6969/announce");
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn replace_trackers_replaces_all() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
session
.add_tracker(info_hash, "udp://tracker1.example.com:6969/announce".into())
.await
.unwrap();
session
.add_tracker(info_hash, "http://tracker2.example.com/announce".into())
.await
.unwrap();
assert_eq!(session.tracker_list(info_hash).await.unwrap().len(), 2);
session
.replace_trackers(
info_hash,
vec!["http://replacement.example.com/announce".into()],
)
.await
.unwrap();
let after = session.tracker_list(info_hash).await.unwrap();
assert_eq!(after.len(), 1);
assert_eq!(after[0].url, "http://replacement.example.com/announce");
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn add_tracker_deduplicates() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
session
.add_tracker(info_hash, "udp://tracker.example.com:6969/announce".into())
.await
.unwrap();
session
.add_tracker(info_hash, "udp://tracker.example.com:6969/announce".into())
.await
.unwrap();
let trackers = session.tracker_list(info_hash).await.unwrap();
assert_eq!(trackers.len(), 1);
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn info_hashes_matches_added_torrent() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let expected_v1 = meta.info_hash;
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
let hashes = session.info_hashes(info_hash).await.unwrap();
assert_eq!(hashes.v1, Some(expected_v1));
assert!(hashes.v2.is_none());
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn torrent_file_returns_meta() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 32768];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
let torrent = session.torrent_file(info_hash).await.unwrap();
assert!(torrent.is_some());
let torrent = torrent.unwrap();
assert_eq!(torrent.info_hash, info_hash);
assert_eq!(torrent.info.name, "test");
assert_eq!(torrent.info.total_length(), 32768);
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn torrent_file_none_before_metadata() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let magnet = irontide_core::Magnet::parse(
"magnet:?xt=urn:btih:aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d&dn=test",
)
.unwrap();
let info_hash = session.add_magnet(magnet).await.unwrap();
let torrent = session.torrent_file(info_hash).await.unwrap();
assert!(torrent.is_none());
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn force_dht_announce_no_error() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
let result = session.force_dht_announce(info_hash).await;
assert!(
result.is_ok(),
"force_dht_announce should succeed: {result:?}"
);
let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
assert!(session.force_dht_announce(fake).await.is_err());
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn force_lsd_announce_no_error() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
let result = session.force_lsd_announce(info_hash).await;
assert!(
result.is_ok(),
"force_lsd_announce should succeed: {result:?}"
);
let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
assert!(session.force_lsd_announce(fake).await.is_err());
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn read_piece_after_download() {
let data = vec![0xCD; 32768]; let meta = make_test_torrent(&data, 16384);
let lengths = Lengths::new(data.len() as u64, 16384, DEFAULT_CHUNK_SIZE);
let storage = Arc::new(MemoryStorage::new(lengths));
storage.write_chunk(0, 0, &data[..16384]).unwrap();
storage.write_chunk(1, 0, &data[16384..]).unwrap();
let session = SessionHandle::start(test_settings()).await.unwrap();
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
let piece_data = session.read_piece(info_hash, 0).await.unwrap();
assert_eq!(piece_data.len(), 16384);
assert!(piece_data.iter().all(|&b| b == 0xCD));
let piece_data = session.read_piece(info_hash, 1).await.unwrap();
assert_eq!(piece_data.len(), 16384);
assert!(piece_data.iter().all(|&b| b == 0xCD));
let result = session.read_piece(info_hash, 999).await;
assert!(result.is_err(), "read_piece out of range should fail");
let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
assert!(session.read_piece(fake, 0).await.is_err());
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn flush_cache_completes() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
let result = session.flush_cache(info_hash).await;
assert!(result.is_ok(), "flush_cache should succeed: {result:?}");
let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
assert!(session.flush_cache(fake).await.is_err());
session.shutdown().await.unwrap();
}
fn test_settings_with_dht() -> Settings {
let mut s = test_settings();
s.enable_dht = true;
s
}
#[tokio::test]
async fn test_dht_disabled_returns_error() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let err = session
.dht_put_immutable(b"test".to_vec())
.await
.unwrap_err();
assert!(
format!("{err:?}").contains("DhtDisabled"),
"expected DhtDisabled, got {err:?}"
);
let target = Id20::from([0u8; 20]);
let err = session.dht_get_immutable(target).await.unwrap_err();
assert!(
format!("{err:?}").contains("DhtDisabled"),
"expected DhtDisabled, got {err:?}"
);
let err = session
.dht_put_mutable([42u8; 32], b"val".to_vec(), 1, Vec::new())
.await
.unwrap_err();
assert!(
format!("{err:?}").contains("DhtDisabled"),
"expected DhtDisabled, got {err:?}"
);
let err = session
.dht_get_mutable([42u8; 32], Vec::new())
.await
.unwrap_err();
assert!(
format!("{err:?}").contains("DhtDisabled"),
"expected DhtDisabled, got {err:?}"
);
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_dht_put_get_immutable_round_trip() {
let session = SessionHandle::start(test_settings_with_dht())
.await
.unwrap();
let value = b"hello BEP 44".to_vec();
let target = session.dht_put_immutable(value.clone()).await.unwrap();
let got = session.dht_get_immutable(target).await.unwrap();
assert_eq!(got, Some(value));
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_dht_put_immutable_fires_alert() {
use crate::alert::{AlertCategory, AlertKind};
let session = SessionHandle::start(test_settings_with_dht())
.await
.unwrap();
let mut alerts = session.subscribe_filtered(AlertCategory::DHT);
let value = b"alert test".to_vec();
let target = session.dht_put_immutable(value).await.unwrap();
let alert = tokio::time::timeout(Duration::from_secs(5), alerts.recv())
.await
.expect("timeout waiting for alert")
.expect("alert channel closed");
match alert.kind {
AlertKind::DhtPutComplete { target: t } => {
assert_eq!(t, target);
}
other => panic!("expected DhtPutComplete, got {other:?}"),
}
session.shutdown().await.unwrap();
}
fn make_private_torrent(data: &[u8], piece_length: u64) -> TorrentMetaV1 {
use serde::Serialize;
let mut pieces = Vec::new();
let mut offset = 0;
while offset < data.len() {
let end = (offset + piece_length as usize).min(data.len());
let hash = irontide_core::sha1(&data[offset..end]);
pieces.extend_from_slice(hash.as_bytes());
offset = end;
}
#[derive(Serialize)]
struct Info<'a> {
length: u64,
name: &'a str,
#[serde(rename = "piece length")]
piece_length: u64,
#[serde(with = "serde_bytes")]
pieces: &'a [u8],
private: i64,
}
#[derive(Serialize)]
struct Torrent<'a> {
info: Info<'a>,
}
let t = Torrent {
info: Info {
length: data.len() as u64,
name: "private-test",
piece_length,
pieces: &pieces,
private: 1,
},
};
let bytes = irontide_bencode::to_bytes(&t).unwrap();
torrent_from_bytes(&bytes).unwrap()
}
#[test]
fn is_private_true_via_parsed_meta() {
let data = vec![0xAB; 16384];
let meta = make_private_torrent(&data, 16384);
assert_eq!(
meta.info.private,
Some(1),
"private field should be Some(1)"
);
}
#[test]
fn is_private_false_for_public_torrent() {
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
assert_eq!(
meta.info.private, None,
"public torrent should have no private flag"
);
}
#[test]
fn private_torrent_config_disables_lsd() {
let config = TorrentConfig::default();
assert!(
config.enable_lsd,
"default TorrentConfig should have LSD enabled"
);
}
#[tokio::test]
async fn force_lsd_announce_private_torrent_returns_error() {
let session = SessionHandle::start(test_settings()).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_private_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
let info_hash = session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
let result = session.force_lsd_announce(info_hash).await;
assert!(
result.is_err(),
"force_lsd_announce on private torrent should return error, got: {result:?}"
);
let err_str = format!("{:?}", result.unwrap_err());
assert!(
err_str.contains("InvalidSettings") || err_str.contains("LSD disabled"),
"expected InvalidSettings error, got: {err_str}"
);
session.shutdown().await.unwrap();
}
fn resume_test_settings(dir: &std::path::Path) -> Settings {
Settings {
resume_data_dir: Some(dir.to_path_buf()),
save_resume_interval_secs: 0, ..test_settings()
}
}
#[tokio::test]
async fn save_resume_state_empty_session_returns_zero() {
let tmp = tempfile::TempDir::new().unwrap();
let session = SessionHandle::start(resume_test_settings(tmp.path()))
.await
.unwrap();
let count = session.save_resume_state().await.unwrap();
assert_eq!(count, 0, "empty session should save 0 resume files");
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn save_resume_state_saves_dirty_torrents() {
let tmp = tempfile::TempDir::new().unwrap();
let session = SessionHandle::start(resume_test_settings(tmp.path()))
.await
.unwrap();
let data1 = vec![0xAA; 16384];
let meta1 = make_test_torrent(&data1, 16384);
let hash1 = meta1.info_hash;
let storage1 = make_storage(&data1, 16384);
session
.add_torrent(meta1.into(), Some(storage1))
.await
.unwrap();
let data2 = vec![0xBB; 16384];
let meta2 = make_test_torrent(&data2, 16384);
let hash2 = meta2.info_hash;
let storage2 = make_storage(&data2, 16384);
session
.add_torrent(meta2.into(), Some(storage2))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let count = session.save_resume_state().await.unwrap();
assert!(count <= 2, "should save at most 2 resume files");
let torrents_dir = tmp.path().join("torrents");
if count > 0 {
assert!(torrents_dir.exists(), "torrents/ directory should exist");
}
let path1 = crate::resume_file::resume_file_path(tmp.path(), &hash1);
let path2 = crate::resume_file::resume_file_path(tmp.path(), &hash2);
let files_exist = path1.exists() as usize + path2.exists() as usize;
assert_eq!(
files_exist, count,
"number of files on disk should match returned count"
);
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn save_resume_state_round_trip() {
let tmp = tempfile::TempDir::new().unwrap();
let session = SessionHandle::start(resume_test_settings(tmp.path()))
.await
.unwrap();
let data = vec![0xCD; 32768];
let meta = make_test_torrent(&data, 16384);
let info_hash = meta.info_hash;
let storage = make_storage(&data, 16384);
session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let count = session.save_resume_state().await.unwrap();
if count > 0 {
let path = crate::resume_file::resume_file_path(tmp.path(), &info_hash);
assert!(path.exists(), "resume file should exist after save");
let bytes = std::fs::read(&path).unwrap();
let rd = crate::resume_file::deserialize_resume(&bytes).unwrap();
assert_eq!(
rd.info_hash,
info_hash.as_bytes().to_vec(),
"deserialized info_hash should match"
);
assert_eq!(rd.name, "test", "deserialized name should match");
}
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn save_resume_state_clears_dirty_flag() {
let tmp = tempfile::TempDir::new().unwrap();
let session = SessionHandle::start(resume_test_settings(tmp.path()))
.await
.unwrap();
let data = vec![0xEE; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let first_count = session.save_resume_state().await.unwrap();
let second_count = session.save_resume_state().await.unwrap();
assert_eq!(
second_count, 0,
"second save should return 0 after dirty flag cleared (first saved {first_count})"
);
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn save_resume_state_second_save_skips_clean() {
let tmp = tempfile::TempDir::new().unwrap();
let session = SessionHandle::start(resume_test_settings(tmp.path()))
.await
.unwrap();
let data1 = vec![0xAA; 16384];
let meta1 = make_test_torrent(&data1, 16384);
let storage1 = make_storage(&data1, 16384);
session
.add_torrent(meta1.into(), Some(storage1))
.await
.unwrap();
let data2 = vec![0xBB; 16384];
let meta2 = make_test_torrent(&data2, 16384);
let storage2 = make_storage(&data2, 16384);
session
.add_torrent(meta2.into(), Some(storage2))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let first = session.save_resume_state().await.unwrap();
let second = session.save_resume_state().await.unwrap();
assert_eq!(
second, 0,
"second save should skip all clean torrents (first saved {first})"
);
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn load_resume_empty_dir_returns_zeros() {
let tmp = tempfile::TempDir::new().unwrap();
let mut settings = test_settings();
settings.resume_data_dir = Some(tmp.path().to_path_buf());
let session = SessionHandle::start(settings).await.unwrap();
let result = session.load_resume_state().await.unwrap();
assert_eq!(result.restored, 0);
assert_eq!(result.skipped, 0);
assert_eq!(result.failed, 0);
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn load_resume_corrupt_file_counted_as_failed() {
let tmp = tempfile::TempDir::new().unwrap();
let torrents_dir = tmp.path().join("torrents");
std::fs::create_dir_all(&torrents_dir).unwrap();
let mut settings = test_settings();
settings.resume_data_dir = Some(tmp.path().to_path_buf());
let session = SessionHandle::start(settings).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
std::fs::write(
torrents_dir.join("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef.resume"),
b"this is not valid bencode",
)
.unwrap();
let result = session.load_resume_state().await.unwrap();
assert_eq!(result.restored, 0);
assert_eq!(result.skipped, 0);
assert_eq!(result.failed, 1);
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn load_resume_duplicate_skipped() {
let tmp = tempfile::TempDir::new().unwrap();
let mut settings = test_settings();
settings.resume_data_dir = Some(tmp.path().to_path_buf());
let session = SessionHandle::start(settings).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let info_hash = meta.info_hash;
let storage = make_storage(&data, 16384);
session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let _ = session.save_resume_state().await;
let result = session.load_resume_state().await.unwrap();
assert!(
session.list_torrents().await.unwrap().contains(&info_hash),
"original torrent should still exist"
);
assert_eq!(result.skipped, 1, "duplicate should be skipped");
assert_eq!(result.failed, 0);
session.shutdown().await.unwrap();
}
#[test]
fn reconstruct_torrent_meta_returns_some_with_correct_fields() {
use crate::resume_file::reconstruct_torrent_meta;
use irontide_core::FastResumeData;
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let info_hash = meta.info_hash;
let info_bytes = irontide_bencode::to_bytes(&meta.info).unwrap();
let mut rd = FastResumeData::new(
info_hash.as_bytes().to_vec(),
"test-torrent".into(),
"/downloads".into(),
);
rd.info = Some(info_bytes);
rd.trackers = vec![
vec!["http://tracker1.example.com/announce".into()],
vec!["http://tracker2.example.com/announce".into()],
];
rd.url_seeds = vec!["http://seed.example.com/".into()];
rd.http_seeds = vec!["http://httpseed.example.com/".into()];
let reconstructed = reconstruct_torrent_meta(&rd).expect("should reconstruct");
assert_eq!(reconstructed.info_hash, info_hash);
assert_eq!(
reconstructed.announce.as_deref(),
Some("http://tracker1.example.com/announce")
);
assert!(reconstructed.announce_list.is_some());
assert_eq!(reconstructed.announce_list.as_ref().unwrap().len(), 2);
assert_eq!(
reconstructed.url_list,
vec!["http://seed.example.com/".to_string()]
);
assert_eq!(
reconstructed.httpseeds,
vec!["http://httpseed.example.com/".to_string()]
);
assert!(reconstructed.info_bytes.is_some());
assert!(reconstructed.comment.is_none());
assert!(reconstructed.created_by.is_none());
assert!(reconstructed.creation_date.is_none());
}
#[test]
fn reconstruct_torrent_meta_returns_none_without_info() {
use crate::resume_file::reconstruct_torrent_meta;
use irontide_core::FastResumeData;
let rd = FastResumeData::new(vec![0xAB; 20], "magnet".into(), "/tmp".into());
assert!(rd.info.is_none());
assert!(reconstruct_torrent_meta(&rd).is_none());
}
#[test]
fn reconstruct_magnet_returns_some_with_correct_fields() {
use crate::resume_file::reconstruct_magnet;
use irontide_core::FastResumeData;
let mut rd = FastResumeData::new(vec![0xCC; 20], "my-torrent".into(), "/downloads".into());
rd.trackers = vec![
vec!["http://tracker1.com/announce".into()],
vec![
"http://tracker2.com/announce".into(),
"http://tracker3.com/announce".into(),
],
];
let magnet = reconstruct_magnet(&rd).expect("should reconstruct magnet");
assert!(magnet.info_hashes.v1.is_some());
assert!(magnet.info_hashes.v2.is_none());
assert_eq!(magnet.display_name.as_deref(), Some("my-torrent"));
assert_eq!(magnet.trackers.len(), 3);
assert!(magnet.peers.is_empty());
assert!(magnet.selected_files.is_none());
}
#[test]
fn reconstruct_magnet_preserves_info_hash2() {
use crate::resume_file::reconstruct_magnet;
use irontide_core::FastResumeData;
let mut rd = FastResumeData::new(vec![0xDD; 20], "v2-magnet".into(), "/tmp".into());
rd.info_hash2 = Some(vec![0xEE; 32]);
let magnet = reconstruct_magnet(&rd).expect("should reconstruct");
assert!(magnet.info_hashes.v1.is_some());
assert!(magnet.info_hashes.v2.is_some());
let v2 = magnet.info_hashes.v2.unwrap();
assert_eq!(v2.as_bytes(), &[0xEE; 32]);
}
#[test]
fn reconstruct_magnet_empty_name_is_none() {
use crate::resume_file::reconstruct_magnet;
use irontide_core::FastResumeData;
let rd = FastResumeData::new(vec![0xFF; 20], String::new(), "/tmp".into());
let magnet = reconstruct_magnet(&rd).expect("should reconstruct");
assert!(
magnet.display_name.is_none(),
"empty name should map to None"
);
}
#[tokio::test]
async fn shutdown_saves_resume_files() {
let tmp = tempfile::TempDir::new().unwrap();
let session = SessionHandle::start(resume_test_settings(tmp.path()))
.await
.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let info_hash = meta.info_hash;
let storage = make_storage(&data, 16384);
session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
session.pause_torrent(info_hash).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
session.resume_torrent(info_hash).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let path = crate::resume_file::resume_file_path(tmp.path(), &info_hash);
session.shutdown().await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
assert!(path.exists(), "resume file should exist after shutdown");
}
#[tokio::test]
async fn auto_restore_on_startup() {
let tmp = tempfile::TempDir::new().unwrap();
let info_hash;
{
let session = SessionHandle::start(resume_test_settings(tmp.path()))
.await
.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
info_hash = meta.info_hash;
let storage = make_storage(&data, 16384);
session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let _ = session.save_resume_state().await;
session.shutdown().await.unwrap();
}
let path = crate::resume_file::resume_file_path(tmp.path(), &info_hash);
assert!(path.exists(), "resume file should exist before restart");
{
let session = SessionHandle::start(resume_test_settings(tmp.path()))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let list = session.list_torrents().await.unwrap();
assert!(
list.contains(&info_hash),
"torrent should be auto-restored on startup"
);
session.shutdown().await.unwrap();
}
}
#[tokio::test]
async fn shutdown_with_readonly_resume_dir_completes() {
let tmp = tempfile::TempDir::new().unwrap();
let readonly_dir = PathBuf::from("/proc/irontide-test-nonexistent");
let mut settings = test_settings();
settings.resume_data_dir = Some(readonly_dir);
let session = SessionHandle::start(settings).await.unwrap();
let data = vec![0xAB; 16384];
let meta = make_test_torrent(&data, 16384);
let storage = make_storage(&data, 16384);
session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
session.shutdown().await.unwrap();
drop(tmp);
}
#[tokio::test]
async fn orphan_resume_file_deleted_on_startup() {
let tmp = tempfile::TempDir::new().unwrap();
let torrents_dir = tmp.path().join("torrents");
std::fs::create_dir_all(&torrents_dir).unwrap();
let orphan_path = torrents_dir.join("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef.resume");
std::fs::write(&orphan_path, b"not valid bencode").unwrap();
assert!(orphan_path.exists(), "orphan file should exist before test");
let session = SessionHandle::start(resume_test_settings(tmp.path()))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(
!orphan_path.exists(),
"orphan resume file should be deleted on startup"
);
session.shutdown().await.unwrap();
}
#[tokio::test]
async fn multi_torrent_save_load_round_trip() {
let tmp = tempfile::TempDir::new().unwrap();
let datasets: [u8; 3] = [0xAA, 0xBB, 0xCC];
let mut hashes = Vec::with_capacity(3);
{
let session = SessionHandle::start(resume_test_settings(tmp.path()))
.await
.unwrap();
for &byte in &datasets {
let data = vec![byte; 16384];
let meta = make_test_torrent(&data, 16384);
let info_hash = meta.info_hash;
let storage = make_storage(&data, 16384);
session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
hashes.push(info_hash);
}
tokio::time::sleep(Duration::from_millis(100)).await;
let saved = session.save_resume_state().await.unwrap();
assert_eq!(saved, 3, "all 3 torrents should be saved");
let files = crate::resume_file::scan_resume_dir(tmp.path());
assert_eq!(files.len(), 3, "3 .resume files should be on disk");
for hash in &hashes {
let path = crate::resume_file::resume_file_path(tmp.path(), hash);
assert!(
path.exists(),
"resume file for {} should exist",
hex::encode(hash.as_bytes())
);
}
session.shutdown().await.unwrap();
}
{
let session = SessionHandle::start(resume_test_settings(tmp.path()))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
let list = session.list_torrents().await.unwrap();
assert_eq!(list.len(), 3, "all 3 torrents should be auto-restored");
for hash in &hashes {
assert!(
list.contains(hash),
"torrent {} should be present after restore",
hex::encode(hash.as_bytes())
);
}
session.shutdown().await.unwrap();
}
}
#[tokio::test]
async fn corrupt_one_of_three_resume_files() {
let tmp = tempfile::TempDir::new().unwrap();
let datasets: [u8; 3] = [0xDD, 0xEE, 0xFF];
let mut hashes = Vec::with_capacity(3);
{
let session = SessionHandle::start(resume_test_settings(tmp.path()))
.await
.unwrap();
for &byte in &datasets {
let data = vec![byte; 16384];
let meta = make_test_torrent(&data, 16384);
let info_hash = meta.info_hash;
let storage = make_storage(&data, 16384);
session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
hashes.push(info_hash);
}
tokio::time::sleep(Duration::from_millis(100)).await;
let saved = session.save_resume_state().await.unwrap();
assert_eq!(saved, 3, "all 3 torrents should be saved");
session.shutdown().await.unwrap();
}
let corrupt_path = crate::resume_file::resume_file_path(tmp.path(), &hashes[1]);
assert!(
corrupt_path.exists(),
"file to corrupt must exist before overwrite"
);
std::fs::write(&corrupt_path, b"CORRUPTED GARBAGE DATA 0xDEAD").unwrap();
{
let session = SessionHandle::start(resume_test_settings(tmp.path()))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
let list = session.list_torrents().await.unwrap();
assert_eq!(
list.len(),
2,
"2 torrents should be restored (1 corrupt skipped)"
);
assert!(
list.contains(&hashes[0]),
"first torrent should be restored"
);
assert!(
list.contains(&hashes[2]),
"third torrent should be restored"
);
assert!(
!list.contains(&hashes[1]),
"corrupted torrent should not be restored"
);
assert!(
!corrupt_path.exists(),
"corrupt resume file should be deleted by orphan cleanup"
);
session.shutdown().await.unwrap();
}
}
#[tokio::test]
async fn remove_torrent_deletes_resume_file() {
let tmp = tempfile::TempDir::new().unwrap();
let data = vec![0x42; 16384];
let meta = make_test_torrent(&data, 16384);
let info_hash = meta.info_hash;
let storage = make_storage(&data, 16384);
let session = SessionHandle::start(resume_test_settings(tmp.path()))
.await
.unwrap();
session
.add_torrent(meta.into(), Some(storage))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let saved = session.save_resume_state().await.unwrap();
assert!(saved > 0, "torrent should be saved to a resume file");
let resume_path = crate::resume_file::resume_file_path(tmp.path(), &info_hash);
assert!(resume_path.exists(), "resume file should exist after save");
session.remove_torrent(info_hash).await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
let list = session.list_torrents().await.unwrap();
assert!(
!list.contains(&info_hash),
"torrent should be gone from session after removal"
);
assert!(
!resume_path.exists(),
"resume file should be deleted when torrent is removed"
);
let remaining = crate::resume_file::scan_resume_dir(tmp.path());
assert!(
remaining.is_empty(),
"no resume files should remain after removing the only torrent"
);
session.shutdown().await.unwrap();
}
}