1use std::collections::HashMap;
7use std::net::{IpAddr, SocketAddr};
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicU32, Ordering};
11
12use dashmap::DashMap;
13use tokio::sync::{broadcast, mpsc, oneshot};
14
15use tracing::{debug, info, warn};
16
17use irontide_core::{DEFAULT_CHUNK_SIZE, Id20, Lengths, Magnet, TorrentMetaV1};
18use irontide_dht::DhtHandle;
19use irontide_storage::TorrentStorage;
20
21use crate::alert::{Alert, AlertCategory, AlertKind, AlertStream, post_alert};
22use crate::settings::Settings;
23use crate::torrent::TorrentHandle;
24use crate::types::{
25 FileInfo, SessionStats, TorrentConfig, TorrentInfo, TorrentState, TorrentStats, TorrentSummary,
26};
27
28type SharedBucket = Arc<parking_lot::Mutex<crate::rate_limiter::TokenBucket>>;
30
31type QueueMoveFn = fn(&mut [crate::queue::QueueEntry], Id20) -> Vec<(Id20, i32, i32)>;
33
34pub(crate) type SharedBanManager = Arc<parking_lot::RwLock<crate::ban::BanManager>>;
36
37pub(crate) type SharedIpFilter = Arc<parking_lot::RwLock<crate::ip_filter::IpFilter>>;
39
40#[derive(Debug, Clone)]
42pub struct ResumeLoadResult {
43 pub restored: usize,
45 pub skipped: usize,
47 pub failed: usize,
49}
50
51struct TorrentEntry {
53 handle: TorrentHandle,
54 meta: Option<TorrentMetaV1>,
55 queue_position: i32,
57 auto_managed: bool,
59 started_at: Option<tokio::time::Instant>,
61 prev_downloaded: u64,
63 prev_uploaded: u64,
65}
66
67impl TorrentEntry {
68 fn is_private(&self) -> bool {
70 self.meta
71 .as_ref()
72 .is_some_and(|m| m.info.private == Some(1))
73 }
74}
75
76enum SessionCommand {
78 AddTorrent {
79 meta: Box<irontide_core::TorrentMeta>,
80 storage: Option<Arc<dyn TorrentStorage>>,
81 download_dir: Option<PathBuf>,
82 reply: oneshot::Sender<crate::Result<Id20>>,
83 },
84 AddMagnet {
85 magnet: Magnet,
86 download_dir: Option<PathBuf>,
87 reply: oneshot::Sender<crate::Result<Id20>>,
88 },
89 RemoveTorrent {
90 info_hash: Id20,
91 reply: oneshot::Sender<crate::Result<()>>,
92 },
93 PauseTorrent {
94 info_hash: Id20,
95 reply: oneshot::Sender<crate::Result<()>>,
96 },
97 ResumeTorrent {
98 info_hash: Id20,
99 reply: oneshot::Sender<crate::Result<()>>,
100 },
101 TorrentStats {
102 info_hash: Id20,
103 reply: oneshot::Sender<crate::Result<TorrentStats>>,
104 },
105 TorrentInfo {
106 info_hash: Id20,
107 reply: oneshot::Sender<crate::Result<TorrentInfo>>,
108 },
109 ListTorrents {
110 reply: oneshot::Sender<Vec<Id20>>,
111 },
112 SessionStats {
113 reply: oneshot::Sender<SessionStats>,
114 },
115 SaveTorrentResumeData {
116 info_hash: Id20,
117 reply: oneshot::Sender<crate::Result<irontide_core::FastResumeData>>,
118 },
119 SaveSessionState {
120 reply: oneshot::Sender<crate::Result<crate::persistence::SessionState>>,
121 },
122 LoadResumeState {
124 reply: oneshot::Sender<crate::Result<ResumeLoadResult>>,
125 },
126 QueuePosition {
127 info_hash: Id20,
128 reply: oneshot::Sender<crate::Result<i32>>,
129 },
130 SetQueuePosition {
131 info_hash: Id20,
132 pos: i32,
133 reply: oneshot::Sender<crate::Result<()>>,
134 },
135 QueuePositionUp {
136 info_hash: Id20,
137 reply: oneshot::Sender<crate::Result<()>>,
138 },
139 QueuePositionDown {
140 info_hash: Id20,
141 reply: oneshot::Sender<crate::Result<()>>,
142 },
143 QueuePositionTop {
144 info_hash: Id20,
145 reply: oneshot::Sender<crate::Result<()>>,
146 },
147 QueuePositionBottom {
148 info_hash: Id20,
149 reply: oneshot::Sender<crate::Result<()>>,
150 },
151 BanPeer {
152 ip: IpAddr,
153 reply: oneshot::Sender<()>,
154 },
155 UnbanPeer {
156 ip: IpAddr,
157 reply: oneshot::Sender<bool>,
158 },
159 BannedPeers {
160 reply: oneshot::Sender<Vec<IpAddr>>,
161 },
162 SetIpFilter {
163 filter: crate::ip_filter::IpFilter,
164 reply: oneshot::Sender<()>,
165 },
166 GetIpFilter {
167 reply: oneshot::Sender<crate::ip_filter::IpFilter>,
168 },
169 GetSettings {
170 reply: oneshot::Sender<Settings>,
171 },
172 ApplySettings {
173 settings: Box<Settings>,
174 reply: oneshot::Sender<crate::Result<()>>,
175 },
176 MoveTorrentStorage {
177 info_hash: Id20,
178 new_path: std::path::PathBuf,
179 reply: oneshot::Sender<crate::Result<()>>,
180 },
181 AddPeers {
182 info_hash: Id20,
183 peers: Vec<SocketAddr>,
184 source: crate::peer_state::PeerSource,
185 reply: oneshot::Sender<crate::Result<()>>,
186 },
187 OpenFile {
188 info_hash: Id20,
189 file_index: usize,
190 reply: oneshot::Sender<crate::Result<crate::streaming::FileStream>>,
191 },
192 ForceReannounce {
193 info_hash: Id20,
194 reply: oneshot::Sender<crate::Result<()>>,
195 },
196 TrackerList {
197 info_hash: Id20,
198 reply: oneshot::Sender<crate::Result<Vec<crate::tracker_manager::TrackerInfo>>>,
199 },
200 Scrape {
201 info_hash: Id20,
202 reply: oneshot::Sender<crate::Result<Option<(String, irontide_tracker::ScrapeInfo)>>>,
203 },
204 SetFilePriority {
205 info_hash: Id20,
206 index: usize,
207 priority: irontide_core::FilePriority,
208 reply: oneshot::Sender<crate::Result<()>>,
209 },
210 FilePriorities {
211 info_hash: Id20,
212 reply: oneshot::Sender<crate::Result<Vec<irontide_core::FilePriority>>>,
213 },
214 SetDownloadLimit {
215 info_hash: Id20,
216 bytes_per_sec: u64,
217 reply: oneshot::Sender<crate::Result<()>>,
218 },
219 SetUploadLimit {
220 info_hash: Id20,
221 bytes_per_sec: u64,
222 reply: oneshot::Sender<crate::Result<()>>,
223 },
224 DownloadLimit {
225 info_hash: Id20,
226 reply: oneshot::Sender<crate::Result<u64>>,
227 },
228 UploadLimit {
229 info_hash: Id20,
230 reply: oneshot::Sender<crate::Result<u64>>,
231 },
232 SetSequentialDownload {
233 info_hash: Id20,
234 enabled: bool,
235 reply: oneshot::Sender<crate::Result<()>>,
236 },
237 IsSequentialDownload {
238 info_hash: Id20,
239 reply: oneshot::Sender<crate::Result<bool>>,
240 },
241 SetSuperSeeding {
242 info_hash: Id20,
243 enabled: bool,
244 reply: oneshot::Sender<crate::Result<()>>,
245 },
246 IsSuperSeeding {
247 info_hash: Id20,
248 reply: oneshot::Sender<crate::Result<bool>>,
249 },
250 SetSeedMode {
252 info_hash: Id20,
253 enabled: bool,
254 reply: oneshot::Sender<crate::Result<()>>,
255 },
256 AddTracker {
257 info_hash: Id20,
258 url: String,
259 reply: oneshot::Sender<crate::Result<()>>,
260 },
261 ReplaceTrackers {
262 info_hash: Id20,
263 urls: Vec<String>,
264 reply: oneshot::Sender<crate::Result<()>>,
265 },
266 ForceRecheck {
268 info_hash: Id20,
269 reply: oneshot::Sender<crate::Result<()>>,
270 },
271 RenameFile {
273 info_hash: Id20,
274 file_index: usize,
275 new_name: String,
276 reply: oneshot::Sender<crate::Result<()>>,
277 },
278 SetMaxConnections {
280 info_hash: Id20,
281 limit: usize,
282 reply: oneshot::Sender<crate::Result<()>>,
283 },
284 MaxConnections {
286 info_hash: Id20,
287 reply: oneshot::Sender<crate::Result<usize>>,
288 },
289 SetMaxUploads {
291 info_hash: Id20,
292 limit: usize,
293 reply: oneshot::Sender<crate::Result<()>>,
294 },
295 MaxUploads {
297 info_hash: Id20,
298 reply: oneshot::Sender<crate::Result<usize>>,
299 },
300 GetPeerInfo {
302 info_hash: Id20,
303 reply: oneshot::Sender<crate::Result<Vec<crate::types::PeerInfo>>>,
304 },
305 GetDownloadQueue {
307 info_hash: Id20,
308 reply: oneshot::Sender<crate::Result<Vec<crate::types::PartialPieceInfo>>>,
309 },
310 HavePiece {
312 info_hash: Id20,
313 index: u32,
314 reply: oneshot::Sender<crate::Result<bool>>,
315 },
316 PieceAvailability {
318 info_hash: Id20,
319 reply: oneshot::Sender<crate::Result<Vec<u32>>>,
320 },
321 FileProgress {
323 info_hash: Id20,
324 reply: oneshot::Sender<crate::Result<Vec<u64>>>,
325 },
326 InfoHashesQuery {
328 info_hash: Id20,
329 reply: oneshot::Sender<crate::Result<irontide_core::InfoHashes>>,
330 },
331 TorrentFile {
333 info_hash: Id20,
334 reply: oneshot::Sender<crate::Result<Option<irontide_core::TorrentMetaV1>>>,
335 },
336 TorrentFileV2 {
338 info_hash: Id20,
339 reply: oneshot::Sender<crate::Result<Option<irontide_core::TorrentMetaV2>>>,
340 },
341 ForceDhtAnnounce {
343 info_hash: Id20,
344 reply: oneshot::Sender<crate::Result<()>>,
345 },
346 ForceLsdAnnounce {
348 info_hash: Id20,
349 reply: oneshot::Sender<crate::Result<()>>,
350 },
351 ReadPiece {
353 info_hash: Id20,
354 index: u32,
355 reply: oneshot::Sender<crate::Result<bytes::Bytes>>,
356 },
357 FlushCache {
359 info_hash: Id20,
360 reply: oneshot::Sender<crate::Result<()>>,
361 },
362 IsValid {
364 info_hash: Id20,
365 reply: oneshot::Sender<bool>,
366 },
367 ClearError {
369 info_hash: Id20,
370 reply: oneshot::Sender<crate::Result<()>>,
371 },
372 FileStatus {
374 info_hash: Id20,
375 reply: oneshot::Sender<crate::Result<Vec<crate::types::FileStatus>>>,
376 },
377 Flags {
379 info_hash: Id20,
380 reply: oneshot::Sender<crate::Result<crate::types::TorrentFlags>>,
381 },
382 SetFlags {
384 info_hash: Id20,
385 flags: crate::types::TorrentFlags,
386 reply: oneshot::Sender<crate::Result<()>>,
387 },
388 UnsetFlags {
390 info_hash: Id20,
391 flags: crate::types::TorrentFlags,
392 reply: oneshot::Sender<crate::Result<()>>,
393 },
394 ConnectPeer {
396 info_hash: Id20,
397 addr: SocketAddr,
398 reply: oneshot::Sender<crate::Result<()>>,
399 },
400 DhtPutImmutable {
401 value: Vec<u8>,
402 reply: oneshot::Sender<crate::Result<Id20>>,
403 },
404 DhtGetImmutable {
405 target: Id20,
406 reply: oneshot::Sender<crate::Result<Option<Vec<u8>>>>,
407 },
408 DhtPutMutable {
409 keypair_bytes: [u8; 32],
410 value: Vec<u8>,
411 seq: i64,
412 salt: Vec<u8>,
413 reply: oneshot::Sender<crate::Result<Id20>>,
414 },
415 #[allow(clippy::type_complexity)]
416 DhtGetMutable {
417 public_key: [u8; 32],
418 salt: Vec<u8>,
419 reply: oneshot::Sender<crate::Result<Option<(Vec<u8>, i64)>>>,
420 },
421 SaveResumeState {
423 reply: oneshot::Sender<crate::Result<usize>>,
424 },
425 PostSessionStats,
427 Shutdown,
428}
429
430#[derive(Clone)]
432pub struct SessionHandle {
433 cmd_tx: mpsc::Sender<SessionCommand>,
434 alert_tx: broadcast::Sender<Alert>,
435 alert_mask: Arc<AtomicU32>,
436 counters: Arc<crate::stats::SessionCounters>,
437 #[allow(dead_code)]
439 factory: Arc<crate::transport::NetworkFactory>,
440}
441
442impl SessionHandle {
443 pub async fn start(settings: Settings) -> crate::Result<Self> {
445 Self::start_with_plugins(settings, Arc::new(Vec::new())).await
446 }
447
448 pub async fn start_with_backend(
450 settings: Settings,
451 backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
452 ) -> crate::Result<Self> {
453 Self::start_with_plugins_and_backend(settings, Arc::new(Vec::new()), backend).await
454 }
455
456 pub async fn start_with_plugins(
458 settings: Settings,
459 plugins: Arc<Vec<Box<dyn crate::extension::ExtensionPlugin>>>,
460 ) -> crate::Result<Self> {
461 let disk_config = crate::disk::DiskConfig::from(&settings);
462 let backend = crate::disk_backend::create_backend_from_config(&disk_config);
463 Self::start_with_plugins_and_backend(settings, plugins, backend).await
464 }
465
466 pub async fn start_with_plugins_and_backend(
469 settings: Settings,
470 plugins: Arc<Vec<Box<dyn crate::extension::ExtensionPlugin>>>,
471 backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
472 ) -> crate::Result<Self> {
473 Self::start_full(
474 settings,
475 plugins,
476 backend,
477 Arc::new(crate::transport::NetworkFactory::tokio()),
478 )
479 .await
480 }
481
482 pub async fn start_with_transport(
486 settings: Settings,
487 factory: Arc<crate::transport::NetworkFactory>,
488 ) -> crate::Result<Self> {
489 let disk_config = crate::disk::DiskConfig::from(&settings);
490 let backend = crate::disk_backend::create_backend_from_config(&disk_config);
491 Self::start_full(settings, Arc::new(Vec::new()), backend, factory).await
492 }
493
494 pub async fn start_full(
501 settings: Settings,
502 plugins: Arc<Vec<Box<dyn crate::extension::ExtensionPlugin>>>,
503 backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
504 factory: Arc<crate::transport::NetworkFactory>,
505 ) -> crate::Result<Self> {
506 let mut settings = settings;
507
508 if settings.force_proxy {
510 if settings.proxy.proxy_type == crate::proxy::ProxyType::None {
511 return Err(crate::Error::Config(
512 "force_proxy requires a proxy to be configured".into(),
513 ));
514 }
515 settings.enable_upnp = false;
516 settings.enable_natpmp = false;
517 settings.enable_dht = false;
518 settings.enable_lsd = false;
519 }
520
521 if settings.anonymous_mode {
523 settings.enable_dht = false;
524 settings.enable_lsd = false;
525 settings.enable_upnp = false;
526 settings.enable_natpmp = false;
527 }
528
529 let (cmd_tx, cmd_rx) = mpsc::channel(256);
530
531 let (alert_tx, _) = broadcast::channel(settings.alert_channel_size);
533 let alert_mask = Arc::new(AtomicU32::new(settings.alert_mask.bits()));
534
535 let (lsd, lsd_peers_rx) = if settings.enable_lsd {
536 match crate::lsd::LsdHandle::start(settings.listen_port).await {
537 Ok((handle, rx)) => (Some(handle), Some(rx)),
538 Err(e) => {
539 warn!("LSD unavailable (port 6771): {e}");
540 (None, None)
541 }
542 }
543 } else {
544 (None, None)
545 };
546
547 let global_upload_bucket = Arc::new(parking_lot::Mutex::new(
548 crate::rate_limiter::TokenBucket::new(settings.upload_rate_limit),
549 ));
550 let global_download_bucket = Arc::new(parking_lot::Mutex::new(
551 crate::rate_limiter::TokenBucket::new(settings.download_rate_limit),
552 ));
553
554 let (utp_socket, utp_listener) = if settings.enable_utp {
556 match irontide_utp::UtpSocket::bind(settings.to_utp_config(settings.listen_port)).await
557 {
558 Ok((socket, listener)) => (Some(socket), Some(listener)),
559 Err(e) => {
560 warn!("uTP bind failed: {e}");
561 (None, None)
562 }
563 }
564 } else {
565 (None, None)
566 };
567
568 let (utp_socket_v6, utp_listener_v6) = if settings.enable_utp && settings.enable_ipv6 {
570 match irontide_utp::UtpSocket::bind(settings.to_utp_config_v6(settings.listen_port))
571 .await
572 {
573 Ok((socket, listener)) => (Some(socket), Some(listener)),
574 Err(e) => {
575 debug!("uTP IPv6 bind failed (non-fatal): {e}");
576 (None, None)
577 }
578 }
579 } else {
580 (None, None)
581 };
582
583 let (nat, nat_events_rx) = if settings.enable_upnp || settings.enable_natpmp {
585 let nat_config = settings.to_nat_config();
586 let (handle, events_rx) = irontide_nat::NatHandle::start(nat_config);
587 let udp_port = if settings.enable_utp {
588 Some(settings.listen_port)
589 } else {
590 None
591 };
592 handle.map_ports(settings.listen_port, udp_port).await;
593 (Some(handle), Some(events_rx))
594 } else {
595 (None, None)
596 };
597
598 let sam_session = if settings.enable_i2p {
600 let tunnel_config = settings.to_sam_tunnel_config();
601 match crate::i2p::SamSession::create(
602 &settings.i2p_hostname,
603 settings.i2p_port,
604 "torrent",
605 tunnel_config,
606 )
607 .await
608 {
609 Ok(session) => {
610 let b32 = session.destination().to_b32_address();
611 info!("I2P SAM session created: {}", b32);
612 post_alert(
613 &alert_tx,
614 &alert_mask,
615 AlertKind::I2pSessionCreated { b32_address: b32 },
616 );
617 Some(Arc::new(session))
618 }
619 Err(e) => {
620 warn!("I2P SAM session failed: {e}");
621 post_alert(
622 &alert_tx,
623 &alert_mask,
624 AlertKind::I2pError {
625 message: format!("SAM session creation failed: {e}"),
626 },
627 );
628 None
629 }
630 }
631 } else {
632 None
633 };
634
635 let ssl_manager = if settings.ssl_listen_port != 0 || settings.ssl_cert_path.is_some() {
637 match crate::ssl_manager::SslManager::new(&settings) {
638 Ok(mgr) => {
639 info!("SSL manager initialized");
640 Some(Arc::new(mgr))
641 }
642 Err(e) => {
643 warn!(error = %e, "SSL manager initialization failed");
644 None
645 }
646 }
647 } else {
648 None
649 };
650
651 let tcp_listener: Option<Box<dyn crate::transport::TransportListener>> = match factory
653 .bind_tcp(SocketAddr::from(([0, 0, 0, 0], settings.listen_port)))
654 .await
655 {
656 Ok(l) => {
657 info!(port = settings.listen_port, "TCP listener started");
658 Some(l)
659 }
660 Err(e) => {
661 warn!(port = settings.listen_port, error = %e, "TCP listener bind failed");
662 None
663 }
664 };
665
666 let ssl_listener: Option<Box<dyn crate::transport::TransportListener>> = if settings
668 .ssl_listen_port
669 != 0
670 {
671 match factory
672 .bind_tcp(SocketAddr::from(([0, 0, 0, 0], settings.ssl_listen_port)))
673 .await
674 {
675 Ok(l) => {
676 info!(port = settings.ssl_listen_port, "SSL listener started");
677 Some(l)
678 }
679 Err(e) => {
680 warn!(port = settings.ssl_listen_port, error = %e, "SSL listener bind failed");
681 None
682 }
683 }
684 } else {
685 None
686 };
687
688 let (dht_v4, dht_v4_ip_rx) = if settings.enable_dht {
690 match DhtHandle::start(settings.to_dht_config()).await {
691 Ok((handle, ip_rx)) => {
692 info!("DHT v4 started");
693 (Some(handle), Some(ip_rx))
694 }
695 Err(e) => {
696 warn!("DHT v4 start failed: {e}");
697 (None, None)
698 }
699 }
700 } else {
701 (None, None)
702 };
703
704 let (dht_v6, dht_v6_ip_rx) = if settings.enable_dht && settings.enable_ipv6 {
705 match DhtHandle::start(settings.to_dht_config_v6()).await {
706 Ok((handle, ip_rx)) => {
707 info!("DHT v6 started");
708 (Some(handle), Some(ip_rx))
709 }
710 Err(e) => {
711 debug!("DHT v6 start failed (non-fatal): {e}");
712 (None, None)
713 }
714 }
715 } else {
716 (None, None)
717 };
718
719 let ban_config = crate::ban::BanConfig::from(&settings);
720 let ban_manager: SharedBanManager = Arc::new(parking_lot::RwLock::new(
721 crate::ban::BanManager::new(ban_config),
722 ));
723
724 let ip_filter: SharedIpFilter =
725 Arc::new(parking_lot::RwLock::new(crate::ip_filter::IpFilter::new()));
726
727 let disk_config = crate::disk::DiskConfig::from(&settings);
728 let spawner = crate::blocking_spawner::BlockingSpawner::new(settings.max_blocking_threads);
729 let (disk_manager, disk_actor_handle) =
730 crate::disk::DiskManagerHandle::new_with_backend(disk_config, backend, spawner);
731
732 let counters = Arc::new(crate::stats::SessionCounters::new());
733
734 let hash_pool = std::sync::Arc::new(crate::hash_pool::HashPool::new(
736 settings.hashing_threads,
737 64,
738 ));
739
740 let info_hash_registry = Arc::new(DashMap::new());
742 let (validated_tx, validated_conn_rx) = mpsc::channel(64);
743 let listener_task = crate::listener::ListenerTask::new(
744 tcp_listener,
745 utp_listener,
746 utp_listener_v6,
747 Arc::clone(&info_hash_registry),
748 validated_tx,
749 );
750 let _listener_task = tokio::spawn(listener_task.run());
751
752 let external_ip = settings.external_ip;
753 let actor = SessionActor {
754 settings,
755 torrents: HashMap::new(),
756 dht_v4,
757 dht_v6,
758 lsd,
759 lsd_peers_rx,
760 cmd_rx,
761 alert_tx: alert_tx.clone(),
762 alert_mask: Arc::clone(&alert_mask),
763 global_upload_bucket,
764 global_download_bucket,
765 utp_socket,
766 utp_socket_v6,
767 nat,
768 nat_events_rx,
769 ban_manager,
770 ip_filter,
771 disk_manager,
772 disk_actor_handle,
773 external_ip,
774 dht_v4_ip_rx,
775 dht_v6_ip_rx,
776 plugins,
777 sam_session,
778 ssl_manager,
779 ssl_listener,
780 validated_conn_rx,
781 info_hash_registry,
782 _listener_task,
783 counters: Arc::clone(&counters),
784 factory: Arc::clone(&factory),
785 hash_pool,
786 };
787
788 let join_handle = tokio::spawn(actor.run());
789 tokio::spawn(async move {
790 match join_handle.await {
791 Ok(()) => {
792 tracing::warn!("session actor exited cleanly");
793 }
794 Err(e) if e.is_panic() => {
795 let panic_payload = e.into_panic();
796 let msg = if let Some(s) = panic_payload.downcast_ref::<&str>() {
797 (*s).to_string()
798 } else if let Some(s) = panic_payload.downcast_ref::<String>() {
799 s.clone()
800 } else {
801 "unknown panic payload".to_string()
802 };
803 tracing::error!("session actor PANICKED: {msg}");
804 }
805 Err(e) => {
806 tracing::error!("session actor task error: {e}");
807 }
808 }
809 });
810 Ok(SessionHandle {
811 cmd_tx,
812 alert_tx,
813 alert_mask,
814 counters,
815 factory,
816 })
817 }
818
819 pub async fn add_torrent(
821 &self,
822 meta: irontide_core::TorrentMeta,
823 storage: Option<Arc<dyn TorrentStorage>>,
824 ) -> crate::Result<Id20> {
825 self.add_torrent_with_dir(meta, storage, None).await
826 }
827
828 pub async fn add_torrent_with_dir(
830 &self,
831 meta: irontide_core::TorrentMeta,
832 storage: Option<Arc<dyn TorrentStorage>>,
833 download_dir: Option<PathBuf>,
834 ) -> crate::Result<Id20> {
835 let (tx, rx) = oneshot::channel();
836 self.cmd_tx
837 .send(SessionCommand::AddTorrent {
838 meta: Box::new(meta),
839 storage,
840 download_dir,
841 reply: tx,
842 })
843 .await
844 .map_err(|_| crate::Error::Shutdown)?;
845 rx.await.map_err(|_| crate::Error::Shutdown)?
846 }
847
848 pub async fn add_magnet(&self, magnet: Magnet) -> crate::Result<Id20> {
850 self.add_magnet_with_dir(magnet, None).await
851 }
852
853 pub async fn add_magnet_with_dir(
855 &self,
856 magnet: Magnet,
857 download_dir: Option<PathBuf>,
858 ) -> crate::Result<Id20> {
859 let (tx, rx) = oneshot::channel();
860 self.cmd_tx
861 .send(SessionCommand::AddMagnet {
862 magnet,
863 download_dir,
864 reply: tx,
865 })
866 .await
867 .map_err(|_| crate::Error::Shutdown)?;
868 rx.await.map_err(|_| crate::Error::Shutdown)?
869 }
870
871 pub async fn remove_torrent(&self, info_hash: Id20) -> crate::Result<()> {
873 let (tx, rx) = oneshot::channel();
874 self.cmd_tx
875 .send(SessionCommand::RemoveTorrent {
876 info_hash,
877 reply: tx,
878 })
879 .await
880 .map_err(|_| crate::Error::Shutdown)?;
881 rx.await.map_err(|_| crate::Error::Shutdown)?
882 }
883
884 pub async fn pause_torrent(&self, info_hash: Id20) -> crate::Result<()> {
886 let (tx, rx) = oneshot::channel();
887 self.cmd_tx
888 .send(SessionCommand::PauseTorrent {
889 info_hash,
890 reply: tx,
891 })
892 .await
893 .map_err(|_| crate::Error::Shutdown)?;
894 rx.await.map_err(|_| crate::Error::Shutdown)?
895 }
896
897 pub async fn resume_torrent(&self, info_hash: Id20) -> crate::Result<()> {
899 let (tx, rx) = oneshot::channel();
900 self.cmd_tx
901 .send(SessionCommand::ResumeTorrent {
902 info_hash,
903 reply: tx,
904 })
905 .await
906 .map_err(|_| crate::Error::Shutdown)?;
907 rx.await.map_err(|_| crate::Error::Shutdown)?
908 }
909
910 pub async fn torrent_stats(&self, info_hash: Id20) -> crate::Result<TorrentStats> {
912 let (tx, rx) = oneshot::channel();
913 self.cmd_tx
914 .send(SessionCommand::TorrentStats {
915 info_hash,
916 reply: tx,
917 })
918 .await
919 .map_err(|_| crate::Error::Shutdown)?;
920 rx.await.map_err(|_| crate::Error::Shutdown)?
921 }
922
923 pub async fn torrent_info(&self, info_hash: Id20) -> crate::Result<TorrentInfo> {
925 let (tx, rx) = oneshot::channel();
926 self.cmd_tx
927 .send(SessionCommand::TorrentInfo {
928 info_hash,
929 reply: tx,
930 })
931 .await
932 .map_err(|_| crate::Error::Shutdown)?;
933 rx.await.map_err(|_| crate::Error::Shutdown)?
934 }
935
936 pub async fn list_torrents(&self) -> crate::Result<Vec<Id20>> {
938 let (tx, rx) = oneshot::channel();
939 self.cmd_tx
940 .send(SessionCommand::ListTorrents { reply: tx })
941 .await
942 .map_err(|_| crate::Error::Shutdown)?;
943 rx.await.map_err(|_| crate::Error::Shutdown)
944 }
945
946 pub async fn session_stats(&self) -> crate::Result<SessionStats> {
948 let (tx, rx) = oneshot::channel();
949 self.cmd_tx
950 .send(SessionCommand::SessionStats { reply: tx })
951 .await
952 .map_err(|_| crate::Error::Shutdown)?;
953 rx.await.map_err(|_| crate::Error::Shutdown)
954 }
955
956 pub fn subscribe(&self) -> broadcast::Receiver<Alert> {
958 self.alert_tx.subscribe()
959 }
960
961 pub fn subscribe_filtered(&self, filter: AlertCategory) -> AlertStream {
963 AlertStream::new(self.alert_tx.subscribe(), filter)
964 }
965
966 pub async fn post_session_stats(&self) -> crate::Result<()> {
968 self.cmd_tx
969 .send(SessionCommand::PostSessionStats)
970 .await
971 .map_err(|_| crate::Error::Shutdown)
972 }
973
974 pub fn counters(&self) -> &Arc<crate::stats::SessionCounters> {
976 &self.counters
977 }
978
979 pub fn set_alert_mask(&self, mask: AlertCategory) {
981 self.alert_mask.store(mask.bits(), Ordering::Relaxed);
982 }
983
984 pub fn alert_mask(&self) -> AlertCategory {
986 AlertCategory::from_bits_truncate(self.alert_mask.load(Ordering::Relaxed))
987 }
988
989 pub async fn add_peers(
991 &self,
992 info_hash: Id20,
993 peers: Vec<SocketAddr>,
994 source: crate::peer_state::PeerSource,
995 ) -> crate::Result<()> {
996 let (tx, rx) = oneshot::channel();
997 self.cmd_tx
998 .send(SessionCommand::AddPeers {
999 info_hash,
1000 peers,
1001 source,
1002 reply: tx,
1003 })
1004 .await
1005 .map_err(|_| crate::Error::Shutdown)?;
1006 rx.await.map_err(|_| crate::Error::Shutdown)?
1007 }
1008
1009 pub async fn shutdown(&self) -> crate::Result<()> {
1011 let _ = tokio::time::timeout(
1013 std::time::Duration::from_secs(10),
1014 self.cmd_tx.send(SessionCommand::Shutdown),
1015 )
1016 .await;
1017 Ok(())
1018 }
1019
1020 pub async fn save_torrent_resume_data(
1022 &self,
1023 info_hash: Id20,
1024 ) -> crate::Result<irontide_core::FastResumeData> {
1025 let (tx, rx) = oneshot::channel();
1026 self.cmd_tx
1027 .send(SessionCommand::SaveTorrentResumeData {
1028 info_hash,
1029 reply: tx,
1030 })
1031 .await
1032 .map_err(|_| crate::Error::Shutdown)?;
1033 rx.await.map_err(|_| crate::Error::Shutdown)?
1034 }
1035
1036 pub async fn save_session_state(&self) -> crate::Result<crate::persistence::SessionState> {
1038 let (tx, rx) = oneshot::channel();
1039 self.cmd_tx
1040 .send(SessionCommand::SaveSessionState { reply: tx })
1041 .await
1042 .map_err(|_| crate::Error::Shutdown)?;
1043 rx.await.map_err(|_| crate::Error::Shutdown)?
1044 }
1045
1046 pub async fn load_resume_state(&self) -> crate::Result<ResumeLoadResult> {
1057 let (tx, rx) = oneshot::channel();
1058 self.cmd_tx
1059 .send(SessionCommand::LoadResumeState { reply: tx })
1060 .await
1061 .map_err(|_| crate::Error::Shutdown)?;
1062 rx.await.map_err(|_| crate::Error::Shutdown)?
1063 }
1064
1065 pub async fn save_resume_state(&self) -> crate::Result<usize> {
1075 let (tx, rx) = oneshot::channel();
1076 self.cmd_tx
1077 .send(SessionCommand::SaveResumeState { reply: tx })
1078 .await
1079 .map_err(|_| crate::Error::Shutdown)?;
1080 rx.await.map_err(|_| crate::Error::Shutdown)?
1081 }
1082
1083 pub async fn queue_position(&self, info_hash: Id20) -> crate::Result<i32> {
1085 let (tx, rx) = oneshot::channel();
1086 self.cmd_tx
1087 .send(SessionCommand::QueuePosition {
1088 info_hash,
1089 reply: tx,
1090 })
1091 .await
1092 .map_err(|_| crate::Error::Shutdown)?;
1093 rx.await.map_err(|_| crate::Error::Shutdown)?
1094 }
1095
1096 pub async fn set_queue_position(&self, info_hash: Id20, pos: i32) -> crate::Result<()> {
1098 let (tx, rx) = oneshot::channel();
1099 self.cmd_tx
1100 .send(SessionCommand::SetQueuePosition {
1101 info_hash,
1102 pos,
1103 reply: tx,
1104 })
1105 .await
1106 .map_err(|_| crate::Error::Shutdown)?;
1107 rx.await.map_err(|_| crate::Error::Shutdown)?
1108 }
1109
1110 pub async fn queue_position_up(&self, info_hash: Id20) -> crate::Result<()> {
1112 let (tx, rx) = oneshot::channel();
1113 self.cmd_tx
1114 .send(SessionCommand::QueuePositionUp {
1115 info_hash,
1116 reply: tx,
1117 })
1118 .await
1119 .map_err(|_| crate::Error::Shutdown)?;
1120 rx.await.map_err(|_| crate::Error::Shutdown)?
1121 }
1122
1123 pub async fn queue_position_down(&self, info_hash: Id20) -> crate::Result<()> {
1125 let (tx, rx) = oneshot::channel();
1126 self.cmd_tx
1127 .send(SessionCommand::QueuePositionDown {
1128 info_hash,
1129 reply: tx,
1130 })
1131 .await
1132 .map_err(|_| crate::Error::Shutdown)?;
1133 rx.await.map_err(|_| crate::Error::Shutdown)?
1134 }
1135
1136 pub async fn queue_position_top(&self, info_hash: Id20) -> crate::Result<()> {
1138 let (tx, rx) = oneshot::channel();
1139 self.cmd_tx
1140 .send(SessionCommand::QueuePositionTop {
1141 info_hash,
1142 reply: tx,
1143 })
1144 .await
1145 .map_err(|_| crate::Error::Shutdown)?;
1146 rx.await.map_err(|_| crate::Error::Shutdown)?
1147 }
1148
1149 pub async fn queue_position_bottom(&self, info_hash: Id20) -> crate::Result<()> {
1151 let (tx, rx) = oneshot::channel();
1152 self.cmd_tx
1153 .send(SessionCommand::QueuePositionBottom {
1154 info_hash,
1155 reply: tx,
1156 })
1157 .await
1158 .map_err(|_| crate::Error::Shutdown)?;
1159 rx.await.map_err(|_| crate::Error::Shutdown)?
1160 }
1161
1162 pub async fn ban_peer(&self, ip: IpAddr) -> crate::Result<()> {
1164 let (tx, rx) = oneshot::channel();
1165 self.cmd_tx
1166 .send(SessionCommand::BanPeer { ip, reply: tx })
1167 .await
1168 .map_err(|_| crate::Error::Shutdown)?;
1169 rx.await.map_err(|_| crate::Error::Shutdown)
1170 }
1171
1172 pub async fn unban_peer(&self, ip: IpAddr) -> crate::Result<bool> {
1174 let (tx, rx) = oneshot::channel();
1175 self.cmd_tx
1176 .send(SessionCommand::UnbanPeer { ip, reply: tx })
1177 .await
1178 .map_err(|_| crate::Error::Shutdown)?;
1179 rx.await.map_err(|_| crate::Error::Shutdown)
1180 }
1181
1182 pub async fn set_ip_filter(&self, filter: crate::ip_filter::IpFilter) -> crate::Result<()> {
1185 let (tx, rx) = oneshot::channel();
1186 self.cmd_tx
1187 .send(SessionCommand::SetIpFilter { filter, reply: tx })
1188 .await
1189 .map_err(|_| crate::Error::Shutdown)?;
1190 rx.await.map_err(|_| crate::Error::Shutdown)
1191 }
1192
1193 pub async fn ip_filter(&self) -> crate::Result<crate::ip_filter::IpFilter> {
1195 let (tx, rx) = oneshot::channel();
1196 self.cmd_tx
1197 .send(SessionCommand::GetIpFilter { reply: tx })
1198 .await
1199 .map_err(|_| crate::Error::Shutdown)?;
1200 rx.await.map_err(|_| crate::Error::Shutdown)
1201 }
1202
1203 pub async fn settings(&self) -> crate::Result<Settings> {
1205 let (tx, rx) = oneshot::channel();
1206 self.cmd_tx
1207 .send(SessionCommand::GetSettings { reply: tx })
1208 .await
1209 .map_err(|_| crate::Error::Shutdown)?;
1210 rx.await.map_err(|_| crate::Error::Shutdown)
1211 }
1212
1213 pub async fn apply_settings(&self, settings: Settings) -> crate::Result<()> {
1219 let (tx, rx) = oneshot::channel();
1220 self.cmd_tx
1221 .send(SessionCommand::ApplySettings {
1222 settings: Box::new(settings),
1223 reply: tx,
1224 })
1225 .await
1226 .map_err(|_| crate::Error::Shutdown)?;
1227 rx.await.map_err(|_| crate::Error::Shutdown)?
1228 }
1229
1230 pub async fn banned_peers(&self) -> crate::Result<Vec<IpAddr>> {
1232 let (tx, rx) = oneshot::channel();
1233 self.cmd_tx
1234 .send(SessionCommand::BannedPeers { reply: tx })
1235 .await
1236 .map_err(|_| crate::Error::Shutdown)?;
1237 rx.await.map_err(|_| crate::Error::Shutdown)
1238 }
1239
1240 pub async fn move_torrent_storage(
1242 &self,
1243 info_hash: Id20,
1244 new_path: std::path::PathBuf,
1245 ) -> crate::Result<()> {
1246 let (tx, rx) = oneshot::channel();
1247 self.cmd_tx
1248 .send(SessionCommand::MoveTorrentStorage {
1249 info_hash,
1250 new_path,
1251 reply: tx,
1252 })
1253 .await
1254 .map_err(|_| crate::Error::Shutdown)?;
1255 rx.await.map_err(|_| crate::Error::Shutdown)?
1256 }
1257
1258 pub async fn open_file(
1264 &self,
1265 info_hash: Id20,
1266 file_index: usize,
1267 ) -> crate::Result<crate::streaming::FileStream> {
1268 let (tx, rx) = oneshot::channel();
1269 self.cmd_tx
1270 .send(SessionCommand::OpenFile {
1271 info_hash,
1272 file_index,
1273 reply: tx,
1274 })
1275 .await
1276 .map_err(|_| crate::Error::Shutdown)?;
1277 rx.await.map_err(|_| crate::Error::Shutdown)?
1278 }
1279
1280 pub async fn force_reannounce(&self, info_hash: Id20) -> crate::Result<()> {
1282 let (tx, rx) = oneshot::channel();
1283 self.cmd_tx
1284 .send(SessionCommand::ForceReannounce {
1285 info_hash,
1286 reply: tx,
1287 })
1288 .await
1289 .map_err(|_| crate::Error::Shutdown)?;
1290 rx.await.map_err(|_| crate::Error::Shutdown)?
1291 }
1292
1293 pub async fn tracker_list(
1295 &self,
1296 info_hash: Id20,
1297 ) -> crate::Result<Vec<crate::tracker_manager::TrackerInfo>> {
1298 let (tx, rx) = oneshot::channel();
1299 self.cmd_tx
1300 .send(SessionCommand::TrackerList {
1301 info_hash,
1302 reply: tx,
1303 })
1304 .await
1305 .map_err(|_| crate::Error::Shutdown)?;
1306 rx.await.map_err(|_| crate::Error::Shutdown)?
1307 }
1308
1309 pub async fn scrape(
1311 &self,
1312 info_hash: Id20,
1313 ) -> crate::Result<Option<(String, irontide_tracker::ScrapeInfo)>> {
1314 let (tx, rx) = oneshot::channel();
1315 self.cmd_tx
1316 .send(SessionCommand::Scrape {
1317 info_hash,
1318 reply: tx,
1319 })
1320 .await
1321 .map_err(|_| crate::Error::Shutdown)?;
1322 rx.await.map_err(|_| crate::Error::Shutdown)?
1323 }
1324
1325 pub async fn set_file_priority(
1327 &self,
1328 info_hash: Id20,
1329 index: usize,
1330 priority: irontide_core::FilePriority,
1331 ) -> crate::Result<()> {
1332 let (tx, rx) = oneshot::channel();
1333 self.cmd_tx
1334 .send(SessionCommand::SetFilePriority {
1335 info_hash,
1336 index,
1337 priority,
1338 reply: tx,
1339 })
1340 .await
1341 .map_err(|_| crate::Error::Shutdown)?;
1342 rx.await.map_err(|_| crate::Error::Shutdown)?
1343 }
1344
1345 pub async fn file_priorities(
1347 &self,
1348 info_hash: Id20,
1349 ) -> crate::Result<Vec<irontide_core::FilePriority>> {
1350 let (tx, rx) = oneshot::channel();
1351 self.cmd_tx
1352 .send(SessionCommand::FilePriorities {
1353 info_hash,
1354 reply: tx,
1355 })
1356 .await
1357 .map_err(|_| crate::Error::Shutdown)?;
1358 rx.await.map_err(|_| crate::Error::Shutdown)?
1359 }
1360
1361 pub async fn set_download_limit(
1363 &self,
1364 info_hash: Id20,
1365 bytes_per_sec: u64,
1366 ) -> crate::Result<()> {
1367 let (tx, rx) = oneshot::channel();
1368 self.cmd_tx
1369 .send(SessionCommand::SetDownloadLimit {
1370 info_hash,
1371 bytes_per_sec,
1372 reply: tx,
1373 })
1374 .await
1375 .map_err(|_| crate::Error::Shutdown)?;
1376 rx.await.map_err(|_| crate::Error::Shutdown)?
1377 }
1378
1379 pub async fn set_upload_limit(&self, info_hash: Id20, bytes_per_sec: u64) -> crate::Result<()> {
1381 let (tx, rx) = oneshot::channel();
1382 self.cmd_tx
1383 .send(SessionCommand::SetUploadLimit {
1384 info_hash,
1385 bytes_per_sec,
1386 reply: tx,
1387 })
1388 .await
1389 .map_err(|_| crate::Error::Shutdown)?;
1390 rx.await.map_err(|_| crate::Error::Shutdown)?
1391 }
1392
1393 pub async fn download_limit(&self, info_hash: Id20) -> crate::Result<u64> {
1395 let (tx, rx) = oneshot::channel();
1396 self.cmd_tx
1397 .send(SessionCommand::DownloadLimit {
1398 info_hash,
1399 reply: tx,
1400 })
1401 .await
1402 .map_err(|_| crate::Error::Shutdown)?;
1403 rx.await.map_err(|_| crate::Error::Shutdown)?
1404 }
1405
1406 pub async fn upload_limit(&self, info_hash: Id20) -> crate::Result<u64> {
1408 let (tx, rx) = oneshot::channel();
1409 self.cmd_tx
1410 .send(SessionCommand::UploadLimit {
1411 info_hash,
1412 reply: tx,
1413 })
1414 .await
1415 .map_err(|_| crate::Error::Shutdown)?;
1416 rx.await.map_err(|_| crate::Error::Shutdown)?
1417 }
1418
1419 pub async fn set_sequential_download(
1421 &self,
1422 info_hash: Id20,
1423 enabled: bool,
1424 ) -> crate::Result<()> {
1425 let (tx, rx) = oneshot::channel();
1426 self.cmd_tx
1427 .send(SessionCommand::SetSequentialDownload {
1428 info_hash,
1429 enabled,
1430 reply: tx,
1431 })
1432 .await
1433 .map_err(|_| crate::Error::Shutdown)?;
1434 rx.await.map_err(|_| crate::Error::Shutdown)?
1435 }
1436
1437 pub async fn is_sequential_download(&self, info_hash: Id20) -> crate::Result<bool> {
1439 let (tx, rx) = oneshot::channel();
1440 self.cmd_tx
1441 .send(SessionCommand::IsSequentialDownload {
1442 info_hash,
1443 reply: tx,
1444 })
1445 .await
1446 .map_err(|_| crate::Error::Shutdown)?;
1447 rx.await.map_err(|_| crate::Error::Shutdown)?
1448 }
1449
1450 pub async fn set_super_seeding(&self, info_hash: Id20, enabled: bool) -> crate::Result<()> {
1452 let (tx, rx) = oneshot::channel();
1453 self.cmd_tx
1454 .send(SessionCommand::SetSuperSeeding {
1455 info_hash,
1456 enabled,
1457 reply: tx,
1458 })
1459 .await
1460 .map_err(|_| crate::Error::Shutdown)?;
1461 rx.await.map_err(|_| crate::Error::Shutdown)?
1462 }
1463
1464 pub async fn is_super_seeding(&self, info_hash: Id20) -> crate::Result<bool> {
1466 let (tx, rx) = oneshot::channel();
1467 self.cmd_tx
1468 .send(SessionCommand::IsSuperSeeding {
1469 info_hash,
1470 reply: tx,
1471 })
1472 .await
1473 .map_err(|_| crate::Error::Shutdown)?;
1474 rx.await.map_err(|_| crate::Error::Shutdown)?
1475 }
1476
1477 pub async fn set_seed_mode(&self, info_hash: Id20, enabled: bool) -> crate::Result<()> {
1493 let (tx, rx) = oneshot::channel();
1494 self.cmd_tx
1495 .send(SessionCommand::SetSeedMode {
1496 info_hash,
1497 enabled,
1498 reply: tx,
1499 })
1500 .await
1501 .map_err(|_| crate::Error::Shutdown)?;
1502 rx.await.map_err(|_| crate::Error::Shutdown)?
1503 }
1504
1505 pub async fn add_tracker(&self, info_hash: Id20, url: String) -> crate::Result<()> {
1509 let (tx, rx) = oneshot::channel();
1510 self.cmd_tx
1511 .send(SessionCommand::AddTracker {
1512 info_hash,
1513 url,
1514 reply: tx,
1515 })
1516 .await
1517 .map_err(|_| crate::Error::Shutdown)?;
1518 rx.await.map_err(|_| crate::Error::Shutdown)?
1519 }
1520
1521 pub async fn replace_trackers(&self, info_hash: Id20, urls: Vec<String>) -> crate::Result<()> {
1523 let (tx, rx) = oneshot::channel();
1524 self.cmd_tx
1525 .send(SessionCommand::ReplaceTrackers {
1526 info_hash,
1527 urls,
1528 reply: tx,
1529 })
1530 .await
1531 .map_err(|_| crate::Error::Shutdown)?;
1532 rx.await.map_err(|_| crate::Error::Shutdown)?
1533 }
1534
1535 pub async fn force_recheck(&self, info_hash: Id20) -> crate::Result<()> {
1541 let (tx, rx) = oneshot::channel();
1542 self.cmd_tx
1543 .send(SessionCommand::ForceRecheck {
1544 info_hash,
1545 reply: tx,
1546 })
1547 .await
1548 .map_err(|_| crate::Error::Shutdown)?;
1549 rx.await.map_err(|_| crate::Error::Shutdown)?
1550 }
1551
1552 pub async fn rename_file(
1558 &self,
1559 info_hash: Id20,
1560 file_index: usize,
1561 new_name: String,
1562 ) -> crate::Result<()> {
1563 let (tx, rx) = oneshot::channel();
1564 self.cmd_tx
1565 .send(SessionCommand::RenameFile {
1566 info_hash,
1567 file_index,
1568 new_name,
1569 reply: tx,
1570 })
1571 .await
1572 .map_err(|_| crate::Error::Shutdown)?;
1573 rx.await.map_err(|_| crate::Error::Shutdown)?
1574 }
1575
1576 pub async fn set_max_connections(&self, info_hash: Id20, limit: usize) -> crate::Result<()> {
1578 let (tx, rx) = oneshot::channel();
1579 self.cmd_tx
1580 .send(SessionCommand::SetMaxConnections {
1581 info_hash,
1582 limit,
1583 reply: tx,
1584 })
1585 .await
1586 .map_err(|_| crate::Error::Shutdown)?;
1587 rx.await.map_err(|_| crate::Error::Shutdown)?
1588 }
1589
1590 pub async fn max_connections(&self, info_hash: Id20) -> crate::Result<usize> {
1592 let (tx, rx) = oneshot::channel();
1593 self.cmd_tx
1594 .send(SessionCommand::MaxConnections {
1595 info_hash,
1596 reply: tx,
1597 })
1598 .await
1599 .map_err(|_| crate::Error::Shutdown)?;
1600 rx.await.map_err(|_| crate::Error::Shutdown)?
1601 }
1602
1603 pub async fn set_max_uploads(&self, info_hash: Id20, limit: usize) -> crate::Result<()> {
1605 let (tx, rx) = oneshot::channel();
1606 self.cmd_tx
1607 .send(SessionCommand::SetMaxUploads {
1608 info_hash,
1609 limit,
1610 reply: tx,
1611 })
1612 .await
1613 .map_err(|_| crate::Error::Shutdown)?;
1614 rx.await.map_err(|_| crate::Error::Shutdown)?
1615 }
1616
1617 pub async fn max_uploads(&self, info_hash: Id20) -> crate::Result<usize> {
1619 let (tx, rx) = oneshot::channel();
1620 self.cmd_tx
1621 .send(SessionCommand::MaxUploads {
1622 info_hash,
1623 reply: tx,
1624 })
1625 .await
1626 .map_err(|_| crate::Error::Shutdown)?;
1627 rx.await.map_err(|_| crate::Error::Shutdown)?
1628 }
1629
1630 pub async fn get_peer_info(
1632 &self,
1633 info_hash: Id20,
1634 ) -> crate::Result<Vec<crate::types::PeerInfo>> {
1635 let (tx, rx) = oneshot::channel();
1636 self.cmd_tx
1637 .send(SessionCommand::GetPeerInfo {
1638 info_hash,
1639 reply: tx,
1640 })
1641 .await
1642 .map_err(|_| crate::Error::Shutdown)?;
1643 rx.await.map_err(|_| crate::Error::Shutdown)?
1644 }
1645
1646 pub async fn get_download_queue(
1648 &self,
1649 info_hash: Id20,
1650 ) -> crate::Result<Vec<crate::types::PartialPieceInfo>> {
1651 let (tx, rx) = oneshot::channel();
1652 self.cmd_tx
1653 .send(SessionCommand::GetDownloadQueue {
1654 info_hash,
1655 reply: tx,
1656 })
1657 .await
1658 .map_err(|_| crate::Error::Shutdown)?;
1659 rx.await.map_err(|_| crate::Error::Shutdown)?
1660 }
1661
1662 pub async fn have_piece(&self, info_hash: Id20, index: u32) -> crate::Result<bool> {
1664 let (tx, rx) = oneshot::channel();
1665 self.cmd_tx
1666 .send(SessionCommand::HavePiece {
1667 info_hash,
1668 index,
1669 reply: tx,
1670 })
1671 .await
1672 .map_err(|_| crate::Error::Shutdown)?;
1673 rx.await.map_err(|_| crate::Error::Shutdown)?
1674 }
1675
1676 pub async fn piece_availability(&self, info_hash: Id20) -> crate::Result<Vec<u32>> {
1678 let (tx, rx) = oneshot::channel();
1679 self.cmd_tx
1680 .send(SessionCommand::PieceAvailability {
1681 info_hash,
1682 reply: tx,
1683 })
1684 .await
1685 .map_err(|_| crate::Error::Shutdown)?;
1686 rx.await.map_err(|_| crate::Error::Shutdown)?
1687 }
1688
1689 pub async fn file_progress(&self, info_hash: Id20) -> crate::Result<Vec<u64>> {
1691 let (tx, rx) = oneshot::channel();
1692 self.cmd_tx
1693 .send(SessionCommand::FileProgress {
1694 info_hash,
1695 reply: tx,
1696 })
1697 .await
1698 .map_err(|_| crate::Error::Shutdown)?;
1699 rx.await.map_err(|_| crate::Error::Shutdown)?
1700 }
1701
1702 pub async fn info_hashes(&self, info_hash: Id20) -> crate::Result<irontide_core::InfoHashes> {
1704 let (tx, rx) = oneshot::channel();
1705 self.cmd_tx
1706 .send(SessionCommand::InfoHashesQuery {
1707 info_hash,
1708 reply: tx,
1709 })
1710 .await
1711 .map_err(|_| crate::Error::Shutdown)?;
1712 rx.await.map_err(|_| crate::Error::Shutdown)?
1713 }
1714
1715 pub async fn torrent_file(
1719 &self,
1720 info_hash: Id20,
1721 ) -> crate::Result<Option<irontide_core::TorrentMetaV1>> {
1722 let (tx, rx) = oneshot::channel();
1723 self.cmd_tx
1724 .send(SessionCommand::TorrentFile {
1725 info_hash,
1726 reply: tx,
1727 })
1728 .await
1729 .map_err(|_| crate::Error::Shutdown)?;
1730 rx.await.map_err(|_| crate::Error::Shutdown)?
1731 }
1732
1733 pub async fn torrent_file_v2(
1738 &self,
1739 info_hash: Id20,
1740 ) -> crate::Result<Option<irontide_core::TorrentMetaV2>> {
1741 let (tx, rx) = oneshot::channel();
1742 self.cmd_tx
1743 .send(SessionCommand::TorrentFileV2 {
1744 info_hash,
1745 reply: tx,
1746 })
1747 .await
1748 .map_err(|_| crate::Error::Shutdown)?;
1749 rx.await.map_err(|_| crate::Error::Shutdown)?
1750 }
1751
1752 pub async fn force_dht_announce(&self, info_hash: Id20) -> crate::Result<()> {
1754 let (tx, rx) = oneshot::channel();
1755 self.cmd_tx
1756 .send(SessionCommand::ForceDhtAnnounce {
1757 info_hash,
1758 reply: tx,
1759 })
1760 .await
1761 .map_err(|_| crate::Error::Shutdown)?;
1762 rx.await.map_err(|_| crate::Error::Shutdown)?
1763 }
1764
1765 pub async fn force_lsd_announce(&self, info_hash: Id20) -> crate::Result<()> {
1769 let (tx, rx) = oneshot::channel();
1770 self.cmd_tx
1771 .send(SessionCommand::ForceLsdAnnounce {
1772 info_hash,
1773 reply: tx,
1774 })
1775 .await
1776 .map_err(|_| crate::Error::Shutdown)?;
1777 rx.await.map_err(|_| crate::Error::Shutdown)?
1778 }
1779
1780 pub async fn read_piece(&self, info_hash: Id20, index: u32) -> crate::Result<bytes::Bytes> {
1782 let (tx, rx) = oneshot::channel();
1783 self.cmd_tx
1784 .send(SessionCommand::ReadPiece {
1785 info_hash,
1786 index,
1787 reply: tx,
1788 })
1789 .await
1790 .map_err(|_| crate::Error::Shutdown)?;
1791 rx.await.map_err(|_| crate::Error::Shutdown)?
1792 }
1793
1794 pub async fn flush_cache(&self, info_hash: Id20) -> crate::Result<()> {
1796 let (tx, rx) = oneshot::channel();
1797 self.cmd_tx
1798 .send(SessionCommand::FlushCache {
1799 info_hash,
1800 reply: tx,
1801 })
1802 .await
1803 .map_err(|_| crate::Error::Shutdown)?;
1804 rx.await.map_err(|_| crate::Error::Shutdown)?
1805 }
1806
1807 pub async fn is_valid(&self, info_hash: Id20) -> bool {
1809 let (tx, rx) = oneshot::channel();
1810 if self
1811 .cmd_tx
1812 .send(SessionCommand::IsValid {
1813 info_hash,
1814 reply: tx,
1815 })
1816 .await
1817 .is_err()
1818 {
1819 return false;
1820 }
1821 rx.await.unwrap_or(false)
1822 }
1823
1824 pub async fn clear_error(&self, info_hash: Id20) -> crate::Result<()> {
1826 let (tx, rx) = oneshot::channel();
1827 self.cmd_tx
1828 .send(SessionCommand::ClearError {
1829 info_hash,
1830 reply: tx,
1831 })
1832 .await
1833 .map_err(|_| crate::Error::Shutdown)?;
1834 rx.await.map_err(|_| crate::Error::Shutdown)?
1835 }
1836
1837 pub async fn file_status(
1839 &self,
1840 info_hash: Id20,
1841 ) -> crate::Result<Vec<crate::types::FileStatus>> {
1842 let (tx, rx) = oneshot::channel();
1843 self.cmd_tx
1844 .send(SessionCommand::FileStatus {
1845 info_hash,
1846 reply: tx,
1847 })
1848 .await
1849 .map_err(|_| crate::Error::Shutdown)?;
1850 rx.await.map_err(|_| crate::Error::Shutdown)?
1851 }
1852
1853 pub async fn flags(&self, info_hash: Id20) -> crate::Result<crate::types::TorrentFlags> {
1855 let (tx, rx) = oneshot::channel();
1856 self.cmd_tx
1857 .send(SessionCommand::Flags {
1858 info_hash,
1859 reply: tx,
1860 })
1861 .await
1862 .map_err(|_| crate::Error::Shutdown)?;
1863 rx.await.map_err(|_| crate::Error::Shutdown)?
1864 }
1865
1866 pub async fn set_flags(
1868 &self,
1869 info_hash: Id20,
1870 flags: crate::types::TorrentFlags,
1871 ) -> crate::Result<()> {
1872 let (tx, rx) = oneshot::channel();
1873 self.cmd_tx
1874 .send(SessionCommand::SetFlags {
1875 info_hash,
1876 flags,
1877 reply: tx,
1878 })
1879 .await
1880 .map_err(|_| crate::Error::Shutdown)?;
1881 rx.await.map_err(|_| crate::Error::Shutdown)?
1882 }
1883
1884 pub async fn unset_flags(
1886 &self,
1887 info_hash: Id20,
1888 flags: crate::types::TorrentFlags,
1889 ) -> crate::Result<()> {
1890 let (tx, rx) = oneshot::channel();
1891 self.cmd_tx
1892 .send(SessionCommand::UnsetFlags {
1893 info_hash,
1894 flags,
1895 reply: tx,
1896 })
1897 .await
1898 .map_err(|_| crate::Error::Shutdown)?;
1899 rx.await.map_err(|_| crate::Error::Shutdown)?
1900 }
1901
1902 pub async fn connect_peer(&self, info_hash: Id20, addr: SocketAddr) -> crate::Result<()> {
1904 let (tx, rx) = oneshot::channel();
1905 self.cmd_tx
1906 .send(SessionCommand::ConnectPeer {
1907 info_hash,
1908 addr,
1909 reply: tx,
1910 })
1911 .await
1912 .map_err(|_| crate::Error::Shutdown)?;
1913 rx.await.map_err(|_| crate::Error::Shutdown)?
1914 }
1915
1916 pub async fn dht_put_immutable(&self, value: Vec<u8>) -> crate::Result<Id20> {
1920 let (tx, rx) = oneshot::channel();
1921 self.cmd_tx
1922 .send(SessionCommand::DhtPutImmutable { value, reply: tx })
1923 .await
1924 .map_err(|_| crate::Error::Shutdown)?;
1925 rx.await.map_err(|_| crate::Error::Shutdown)?
1926 }
1927
1928 pub async fn dht_get_immutable(&self, target: Id20) -> crate::Result<Option<Vec<u8>>> {
1932 let (tx, rx) = oneshot::channel();
1933 self.cmd_tx
1934 .send(SessionCommand::DhtGetImmutable { target, reply: tx })
1935 .await
1936 .map_err(|_| crate::Error::Shutdown)?;
1937 rx.await.map_err(|_| crate::Error::Shutdown)?
1938 }
1939
1940 pub async fn dht_put_mutable(
1944 &self,
1945 keypair_bytes: [u8; 32],
1946 value: Vec<u8>,
1947 seq: i64,
1948 salt: Vec<u8>,
1949 ) -> crate::Result<Id20> {
1950 let (tx, rx) = oneshot::channel();
1951 self.cmd_tx
1952 .send(SessionCommand::DhtPutMutable {
1953 keypair_bytes,
1954 value,
1955 seq,
1956 salt,
1957 reply: tx,
1958 })
1959 .await
1960 .map_err(|_| crate::Error::Shutdown)?;
1961 rx.await.map_err(|_| crate::Error::Shutdown)?
1962 }
1963
1964 pub async fn dht_get_mutable(
1968 &self,
1969 public_key: [u8; 32],
1970 salt: Vec<u8>,
1971 ) -> crate::Result<Option<(Vec<u8>, i64)>> {
1972 let (tx, rx) = oneshot::channel();
1973 self.cmd_tx
1974 .send(SessionCommand::DhtGetMutable {
1975 public_key,
1976 salt,
1977 reply: tx,
1978 })
1979 .await
1980 .map_err(|_| crate::Error::Shutdown)?;
1981 rx.await.map_err(|_| crate::Error::Shutdown)?
1982 }
1983
1984 pub async fn list_torrent_summaries(&self) -> crate::Result<Vec<TorrentSummary>> {
1991 let ids = self.list_torrents().await?;
1992 let mut summaries = Vec::with_capacity(ids.len());
1993 for id in ids {
1994 if let Ok(stats) = self.torrent_stats(id).await {
1995 summaries.push(TorrentSummary::from(&stats));
1996 }
1997 }
1998 Ok(summaries)
1999 }
2000
2001 pub async fn add_magnet_uri(&self, uri: &str) -> crate::Result<irontide_core::InfoHashes> {
2006 let magnet = irontide_core::Magnet::parse(uri)?;
2007 let info_hashes = magnet.info_hashes.clone();
2008 self.add_magnet(magnet).await?;
2009 Ok(info_hashes)
2010 }
2011
2012 pub async fn add_torrent_bytes(
2017 &self,
2018 bytes: &[u8],
2019 ) -> crate::Result<irontide_core::InfoHashes> {
2020 let meta = irontide_core::torrent_from_bytes_any(bytes)?;
2021 let info_hashes = meta.info_hashes();
2022 self.add_torrent(meta, None).await?;
2023 Ok(info_hashes)
2024 }
2025}
2026
2027struct SessionActor {
2032 settings: Settings,
2033 torrents: HashMap<Id20, TorrentEntry>,
2034 dht_v4: Option<DhtHandle>,
2035 dht_v6: Option<DhtHandle>,
2036 lsd: Option<crate::lsd::LsdHandle>,
2037 lsd_peers_rx: Option<mpsc::Receiver<(Id20, SocketAddr)>>,
2038 cmd_rx: mpsc::Receiver<SessionCommand>,
2039 alert_tx: broadcast::Sender<Alert>,
2040 alert_mask: Arc<AtomicU32>,
2041 global_upload_bucket: SharedBucket,
2042 global_download_bucket: SharedBucket,
2043 utp_socket: Option<irontide_utp::UtpSocket>,
2044 utp_socket_v6: Option<irontide_utp::UtpSocket>,
2045 nat: Option<irontide_nat::NatHandle>,
2046 nat_events_rx: Option<mpsc::Receiver<irontide_nat::NatEvent>>,
2047 ban_manager: SharedBanManager,
2048 ip_filter: SharedIpFilter,
2049 disk_manager: crate::disk::DiskManagerHandle,
2050 #[allow(dead_code)]
2051 disk_actor_handle: tokio::task::JoinHandle<()>,
2052 external_ip: Option<std::net::IpAddr>,
2054 dht_v4_ip_rx: Option<mpsc::Receiver<std::net::IpAddr>>,
2056 dht_v6_ip_rx: Option<mpsc::Receiver<std::net::IpAddr>>,
2058 plugins: Arc<Vec<Box<dyn crate::extension::ExtensionPlugin>>>,
2060 sam_session: Option<Arc<crate::i2p::SamSession>>,
2062 ssl_manager: Option<Arc<crate::ssl_manager::SslManager>>,
2064 ssl_listener: Option<Box<dyn crate::transport::TransportListener>>,
2066 validated_conn_rx: mpsc::Receiver<crate::listener::IdentifiedConnection>,
2068 info_hash_registry: Arc<DashMap<Id20, ()>>,
2073 #[allow(dead_code)]
2075 _listener_task: tokio::task::JoinHandle<()>,
2076 counters: Arc<crate::stats::SessionCounters>,
2078 factory: Arc<crate::transport::NetworkFactory>,
2080 hash_pool: std::sync::Arc<crate::hash_pool::HashPool>,
2082}
2083
2084impl SessionActor {
2085 async fn run(mut self) {
2086 let mut refill_interval = tokio::time::interval(std::time::Duration::from_millis(100));
2087 refill_interval.tick().await; let auto_manage_secs = self.settings.auto_manage_interval.max(1);
2090 let mut auto_manage_interval =
2091 tokio::time::interval(std::time::Duration::from_secs(auto_manage_secs));
2092 auto_manage_interval.tick().await; let stats_interval_ms = self.settings.stats_report_interval;
2096 let mut stats_timer = if stats_interval_ms > 0 {
2097 Some(tokio::time::interval(std::time::Duration::from_millis(
2098 stats_interval_ms,
2099 )))
2100 } else {
2101 None
2102 };
2103 if let Some(ref mut t) = stats_timer {
2104 t.tick().await; }
2106
2107 let sample_interval_secs = self.settings.dht_sample_infohashes_interval;
2109 let mut sample_timer = if sample_interval_secs > 0 {
2110 Some(tokio::time::interval(std::time::Duration::from_secs(
2111 sample_interval_secs,
2112 )))
2113 } else {
2114 None
2115 };
2116 if let Some(ref mut t) = sample_timer {
2117 t.tick().await; }
2119
2120 let mut resume_save_interval = if self.settings.save_resume_interval_secs > 0 {
2122 Some(tokio::time::interval(std::time::Duration::from_secs(
2123 self.settings.save_resume_interval_secs,
2124 )))
2125 } else {
2126 None
2127 };
2128 if let Some(ref mut t) = resume_save_interval {
2129 t.tick().await; }
2131
2132 {
2134 let resume_dir = self.effective_resume_dir();
2135 let resume_files = crate::resume_file::scan_resume_dir(&resume_dir);
2136 if !resume_files.is_empty() {
2137 match self.handle_load_resume_state().await {
2139 Ok(result) => {
2140 info!(
2141 restored = result.restored,
2142 skipped = result.skipped,
2143 failed = result.failed,
2144 "auto-restored torrents on startup"
2145 );
2146 }
2147 Err(e) => {
2148 warn!("auto-restore on startup failed: {e}");
2149 }
2150 }
2151
2152 let active_hashes: std::collections::HashSet<String> = self
2155 .torrents
2156 .keys()
2157 .map(|h| hex::encode(h.as_bytes()))
2158 .collect();
2159
2160 let current_files = crate::resume_file::scan_resume_dir(&resume_dir);
2162 for path in ¤t_files {
2163 if let Some(stem) = path.file_stem().and_then(|s| s.to_str())
2164 && !active_hashes.contains(stem)
2165 {
2166 if let Err(e) = std::fs::remove_file(path) {
2167 warn!(path = %path.display(), "failed to remove orphan resume file: {e}");
2168 } else {
2169 debug!(path = %path.display(), "removed orphan resume file");
2170 }
2171 }
2172 }
2173 }
2174 }
2175
2176 loop {
2177 tokio::select! {
2178 cmd = self.cmd_rx.recv() => {
2179 match cmd {
2180 Some(SessionCommand::AddTorrent {
2181 meta,
2182 storage,
2183 download_dir,
2184 reply,
2185 }) => {
2186 let result = self.handle_add_torrent(*meta, storage, download_dir).await;
2187 let _ = reply.send(result);
2188 }
2189 Some(SessionCommand::AddMagnet { magnet, download_dir, reply }) => {
2190 let result = self.handle_add_magnet(magnet, download_dir).await;
2191 let _ = reply.send(result);
2192 }
2193 Some(SessionCommand::RemoveTorrent { info_hash, reply }) => {
2194 let result = self.handle_remove_torrent(info_hash).await;
2195 let _ = reply.send(result);
2196 }
2197 Some(SessionCommand::PauseTorrent { info_hash, reply }) => {
2198 let result = self.handle_pause_torrent(info_hash).await;
2199 let _ = reply.send(result);
2200 }
2201 Some(SessionCommand::ResumeTorrent { info_hash, reply }) => {
2202 let result = self.handle_resume_torrent(info_hash).await;
2203 let _ = reply.send(result);
2204 }
2205 Some(SessionCommand::TorrentStats { info_hash, reply }) => {
2206 let result = self.handle_torrent_stats(info_hash).await;
2207 let _ = reply.send(result);
2208 }
2209 Some(SessionCommand::TorrentInfo { info_hash, reply }) => {
2210 let result = self.handle_torrent_info(info_hash);
2211 let _ = reply.send(result);
2212 }
2213 Some(SessionCommand::ListTorrents { reply }) => {
2214 let list: Vec<Id20> = self.torrents.keys().copied().collect();
2215 let _ = reply.send(list);
2216 }
2217 Some(SessionCommand::SessionStats { reply }) => {
2218 let stats = self.make_session_stats().await;
2219 let _ = reply.send(stats);
2220 }
2221 Some(SessionCommand::SaveTorrentResumeData { info_hash, reply }) => {
2222 let result = self.handle_save_torrent_resume(info_hash).await;
2223 let _ = reply.send(result);
2224 }
2225 Some(SessionCommand::SaveSessionState { reply }) => {
2226 let result = self.handle_save_session_state().await;
2227 let _ = reply.send(result);
2228 }
2229 Some(SessionCommand::LoadResumeState { reply }) => {
2230 let result = self.handle_load_resume_state().await;
2231 let _ = reply.send(result);
2232 }
2233 Some(SessionCommand::QueuePosition { info_hash, reply }) => {
2234 let result = match self.torrents.get(&info_hash) {
2235 Some(entry) => Ok(entry.queue_position),
2236 None => Err(crate::Error::TorrentNotFound(info_hash)),
2237 };
2238 let _ = reply.send(result);
2239 }
2240 Some(SessionCommand::SetQueuePosition { info_hash, pos, reply }) => {
2241 let result = self.handle_set_queue_position(info_hash, pos);
2242 let _ = reply.send(result);
2243 }
2244 Some(SessionCommand::QueuePositionUp { info_hash, reply }) => {
2245 let result = self.handle_queue_move(info_hash, crate::queue::move_up);
2246 let _ = reply.send(result);
2247 }
2248 Some(SessionCommand::QueuePositionDown { info_hash, reply }) => {
2249 let result = self.handle_queue_move(info_hash, crate::queue::move_down);
2250 let _ = reply.send(result);
2251 }
2252 Some(SessionCommand::QueuePositionTop { info_hash, reply }) => {
2253 let result = self.handle_queue_move(info_hash, crate::queue::move_top);
2254 let _ = reply.send(result);
2255 }
2256 Some(SessionCommand::QueuePositionBottom { info_hash, reply }) => {
2257 let result = self.handle_queue_move(info_hash, crate::queue::move_bottom);
2258 let _ = reply.send(result);
2259 }
2260 Some(SessionCommand::BanPeer { ip, reply }) => {
2261 self.ban_manager.write().ban(ip);
2262 let _ = reply.send(());
2263 }
2264 Some(SessionCommand::UnbanPeer { ip, reply }) => {
2265 let was_banned = self.ban_manager.write().unban(&ip);
2266 let _ = reply.send(was_banned);
2267 }
2268 Some(SessionCommand::BannedPeers { reply }) => {
2269 let list: Vec<IpAddr> = self.ban_manager.read()
2270 .banned_list().iter().copied().collect();
2271 let _ = reply.send(list);
2272 }
2273 Some(SessionCommand::SetIpFilter { filter, reply }) => {
2274 *self.ip_filter.write() = filter;
2275 let _ = reply.send(());
2276 }
2277 Some(SessionCommand::GetIpFilter { reply }) => {
2278 let filter = self.ip_filter.read().clone();
2279 let _ = reply.send(filter);
2280 }
2281 Some(SessionCommand::GetSettings { reply }) => {
2282 let _ = reply.send(self.settings.clone());
2283 }
2284 Some(SessionCommand::ApplySettings { settings, reply }) => {
2285 let result = self.handle_apply_settings(*settings);
2286 let _ = reply.send(result);
2287 }
2288 Some(SessionCommand::MoveTorrentStorage { info_hash, new_path, reply }) => {
2289 let result = self.handle_move_torrent_storage(info_hash, new_path).await;
2290 let _ = reply.send(result);
2291 }
2292 Some(SessionCommand::AddPeers { info_hash, peers, source, reply }) => {
2293 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2294 entry.handle.add_peers(peers, source).await
2295 } else {
2296 Err(crate::Error::TorrentNotFound(info_hash))
2297 };
2298 let _ = reply.send(result);
2299 }
2300 Some(SessionCommand::OpenFile { info_hash, file_index, reply }) => {
2301 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2302 entry.handle.open_file(file_index).await
2303 } else {
2304 Err(crate::Error::TorrentNotFound(info_hash))
2305 };
2306 let _ = reply.send(result);
2307 }
2308 Some(SessionCommand::ForceReannounce { info_hash, reply }) => {
2309 let result = match self.torrents.get(&info_hash) {
2310 Some(entry) => {
2311 entry.handle.force_reannounce().await
2312 }
2313 None => Err(crate::Error::TorrentNotFound(info_hash)),
2314 };
2315 let _ = reply.send(result);
2316 }
2317 Some(SessionCommand::TrackerList { info_hash, reply }) => {
2318 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2319 entry.handle.tracker_list().await
2320 } else {
2321 Err(crate::Error::TorrentNotFound(info_hash))
2322 };
2323 let _ = reply.send(result);
2324 }
2325 Some(SessionCommand::Scrape { info_hash, reply }) => {
2326 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2327 entry.handle.scrape().await
2328 } else {
2329 Err(crate::Error::TorrentNotFound(info_hash))
2330 };
2331 let _ = reply.send(result);
2332 }
2333 Some(SessionCommand::SetFilePriority { info_hash, index, priority, reply }) => {
2334 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2335 entry.handle.set_file_priority(index, priority).await
2336 } else {
2337 Err(crate::Error::TorrentNotFound(info_hash))
2338 };
2339 let _ = reply.send(result);
2340 }
2341 Some(SessionCommand::FilePriorities { info_hash, reply }) => {
2342 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2343 entry.handle.file_priorities().await
2344 } else {
2345 Err(crate::Error::TorrentNotFound(info_hash))
2346 };
2347 let _ = reply.send(result);
2348 }
2349 Some(SessionCommand::SetDownloadLimit { info_hash, bytes_per_sec, reply }) => {
2350 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2351 entry.handle.set_download_limit(bytes_per_sec).await
2352 } else {
2353 Err(crate::Error::TorrentNotFound(info_hash))
2354 };
2355 let _ = reply.send(result);
2356 }
2357 Some(SessionCommand::SetUploadLimit { info_hash, bytes_per_sec, reply }) => {
2358 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2359 entry.handle.set_upload_limit(bytes_per_sec).await
2360 } else {
2361 Err(crate::Error::TorrentNotFound(info_hash))
2362 };
2363 let _ = reply.send(result);
2364 }
2365 Some(SessionCommand::DownloadLimit { info_hash, reply }) => {
2366 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2367 entry.handle.download_limit().await
2368 } else {
2369 Err(crate::Error::TorrentNotFound(info_hash))
2370 };
2371 let _ = reply.send(result);
2372 }
2373 Some(SessionCommand::UploadLimit { info_hash, reply }) => {
2374 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2375 entry.handle.upload_limit().await
2376 } else {
2377 Err(crate::Error::TorrentNotFound(info_hash))
2378 };
2379 let _ = reply.send(result);
2380 }
2381 Some(SessionCommand::SetSequentialDownload { info_hash, enabled, reply }) => {
2382 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2383 entry.handle.set_sequential_download(enabled).await
2384 } else {
2385 Err(crate::Error::TorrentNotFound(info_hash))
2386 };
2387 let _ = reply.send(result);
2388 }
2389 Some(SessionCommand::IsSequentialDownload { info_hash, reply }) => {
2390 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2391 entry.handle.is_sequential_download().await
2392 } else {
2393 Err(crate::Error::TorrentNotFound(info_hash))
2394 };
2395 let _ = reply.send(result);
2396 }
2397 Some(SessionCommand::SetSuperSeeding { info_hash, enabled, reply }) => {
2398 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2399 entry.handle.set_super_seeding(enabled).await
2400 } else {
2401 Err(crate::Error::TorrentNotFound(info_hash))
2402 };
2403 let _ = reply.send(result);
2404 }
2405 Some(SessionCommand::IsSuperSeeding { info_hash, reply }) => {
2406 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2407 entry.handle.is_super_seeding().await
2408 } else {
2409 Err(crate::Error::TorrentNotFound(info_hash))
2410 };
2411 let _ = reply.send(result);
2412 }
2413 Some(SessionCommand::SetSeedMode { info_hash, enabled, reply }) => {
2414 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2415 entry.handle.set_seed_mode(enabled).await
2416 } else {
2417 Err(crate::Error::TorrentNotFound(info_hash))
2418 };
2419 let _ = reply.send(result);
2420 }
2421 Some(SessionCommand::AddTracker { info_hash, url, reply }) => {
2422 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2423 entry.handle.add_tracker(url).await
2424 } else {
2425 Err(crate::Error::TorrentNotFound(info_hash))
2426 };
2427 let _ = reply.send(result);
2428 }
2429 Some(SessionCommand::ReplaceTrackers { info_hash, urls, reply }) => {
2430 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2431 entry.handle.replace_trackers(urls).await
2432 } else {
2433 Err(crate::Error::TorrentNotFound(info_hash))
2434 };
2435 let _ = reply.send(result);
2436 }
2437 Some(SessionCommand::ForceRecheck { info_hash, reply }) => {
2438 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2439 entry.handle.force_recheck().await
2440 } else {
2441 Err(crate::Error::TorrentNotFound(info_hash))
2442 };
2443 let _ = reply.send(result);
2444 }
2445 Some(SessionCommand::RenameFile { info_hash, file_index, new_name, reply }) => {
2446 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2447 entry.handle.rename_file(file_index, new_name).await
2448 } else {
2449 Err(crate::Error::TorrentNotFound(info_hash))
2450 };
2451 let _ = reply.send(result);
2452 }
2453 Some(SessionCommand::SetMaxConnections { info_hash, limit, reply }) => {
2454 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2455 entry.handle.set_max_connections(limit).await
2456 } else {
2457 Err(crate::Error::TorrentNotFound(info_hash))
2458 };
2459 let _ = reply.send(result);
2460 }
2461 Some(SessionCommand::MaxConnections { info_hash, reply }) => {
2462 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2463 entry.handle.max_connections().await
2464 } else {
2465 Err(crate::Error::TorrentNotFound(info_hash))
2466 };
2467 let _ = reply.send(result);
2468 }
2469 Some(SessionCommand::SetMaxUploads { info_hash, limit, reply }) => {
2470 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2471 entry.handle.set_max_uploads(limit).await
2472 } else {
2473 Err(crate::Error::TorrentNotFound(info_hash))
2474 };
2475 let _ = reply.send(result);
2476 }
2477 Some(SessionCommand::MaxUploads { info_hash, reply }) => {
2478 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2479 entry.handle.max_uploads().await
2480 } else {
2481 Err(crate::Error::TorrentNotFound(info_hash))
2482 };
2483 let _ = reply.send(result);
2484 }
2485 Some(SessionCommand::GetPeerInfo { info_hash, reply }) => {
2486 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2487 entry.handle.get_peer_info().await
2488 } else {
2489 Err(crate::Error::TorrentNotFound(info_hash))
2490 };
2491 let _ = reply.send(result);
2492 }
2493 Some(SessionCommand::GetDownloadQueue { info_hash, reply }) => {
2494 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2495 entry.handle.get_download_queue().await
2496 } else {
2497 Err(crate::Error::TorrentNotFound(info_hash))
2498 };
2499 let _ = reply.send(result);
2500 }
2501 Some(SessionCommand::HavePiece { info_hash, index, reply }) => {
2502 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2503 entry.handle.have_piece(index).await
2504 } else {
2505 Err(crate::Error::TorrentNotFound(info_hash))
2506 };
2507 let _ = reply.send(result);
2508 }
2509 Some(SessionCommand::PieceAvailability { info_hash, reply }) => {
2510 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2511 entry.handle.piece_availability().await
2512 } else {
2513 Err(crate::Error::TorrentNotFound(info_hash))
2514 };
2515 let _ = reply.send(result);
2516 }
2517 Some(SessionCommand::FileProgress { info_hash, reply }) => {
2518 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2519 entry.handle.file_progress().await
2520 } else {
2521 Err(crate::Error::TorrentNotFound(info_hash))
2522 };
2523 let _ = reply.send(result);
2524 }
2525 Some(SessionCommand::InfoHashesQuery { info_hash, reply }) => {
2526 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2527 entry.handle.info_hashes().await
2528 } else {
2529 Err(crate::Error::TorrentNotFound(info_hash))
2530 };
2531 let _ = reply.send(result);
2532 }
2533 Some(SessionCommand::TorrentFile { info_hash, reply }) => {
2534 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2535 entry.handle.torrent_file().await
2536 } else {
2537 Err(crate::Error::TorrentNotFound(info_hash))
2538 };
2539 let _ = reply.send(result);
2540 }
2541 Some(SessionCommand::TorrentFileV2 { info_hash, reply }) => {
2542 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2543 entry.handle.torrent_file_v2().await
2544 } else {
2545 Err(crate::Error::TorrentNotFound(info_hash))
2546 };
2547 let _ = reply.send(result);
2548 }
2549 Some(SessionCommand::ForceDhtAnnounce { info_hash, reply }) => {
2550 let result = match self.torrents.get(&info_hash) {
2551 Some(entry) => {
2552 entry.handle.force_dht_announce().await
2553 }
2554 None => Err(crate::Error::TorrentNotFound(info_hash)),
2555 };
2556 let _ = reply.send(result);
2557 }
2558 Some(SessionCommand::ForceLsdAnnounce { info_hash, reply }) => {
2559 let result = match self.torrents.get(&info_hash) {
2561 Some(entry) if entry.is_private() => {
2562 Err(crate::Error::InvalidSettings(
2564 "LSD disabled for private torrent".into(),
2565 ))
2566 }
2567 Some(_) => {
2568 if let Some(ref lsd) = self.lsd {
2569 lsd.announce(vec![info_hash]).await;
2570 }
2571 Ok(())
2572 }
2573 None => Err(crate::Error::TorrentNotFound(info_hash)),
2574 };
2575 let _ = reply.send(result);
2576 }
2577 Some(SessionCommand::ReadPiece { info_hash, index, reply }) => {
2578 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2579 entry.handle.read_piece(index).await
2580 } else {
2581 Err(crate::Error::TorrentNotFound(info_hash))
2582 };
2583 let _ = reply.send(result);
2584 }
2585 Some(SessionCommand::FlushCache { info_hash, reply }) => {
2586 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2587 entry.handle.flush_cache().await
2588 } else {
2589 Err(crate::Error::TorrentNotFound(info_hash))
2590 };
2591 let _ = reply.send(result);
2592 }
2593 Some(SessionCommand::IsValid { info_hash, reply }) => {
2594 let valid = self.torrents.get(&info_hash)
2595 .map(|e| e.handle.is_valid())
2596 .unwrap_or(false);
2597 let _ = reply.send(valid);
2598 }
2599 Some(SessionCommand::ClearError { info_hash, reply }) => {
2600 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2601 entry.handle.clear_error().await
2602 } else {
2603 Err(crate::Error::TorrentNotFound(info_hash))
2604 };
2605 let _ = reply.send(result);
2606 }
2607 Some(SessionCommand::FileStatus { info_hash, reply }) => {
2608 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2609 entry.handle.file_status().await
2610 } else {
2611 Err(crate::Error::TorrentNotFound(info_hash))
2612 };
2613 let _ = reply.send(result);
2614 }
2615 Some(SessionCommand::Flags { info_hash, reply }) => {
2616 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2617 entry.handle.flags().await
2618 } else {
2619 Err(crate::Error::TorrentNotFound(info_hash))
2620 };
2621 let _ = reply.send(result);
2622 }
2623 Some(SessionCommand::SetFlags { info_hash, flags, reply }) => {
2624 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2625 entry.handle.set_flags(flags).await
2626 } else {
2627 Err(crate::Error::TorrentNotFound(info_hash))
2628 };
2629 let _ = reply.send(result);
2630 }
2631 Some(SessionCommand::UnsetFlags { info_hash, flags, reply }) => {
2632 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2633 entry.handle.unset_flags(flags).await
2634 } else {
2635 Err(crate::Error::TorrentNotFound(info_hash))
2636 };
2637 let _ = reply.send(result);
2638 }
2639 Some(SessionCommand::ConnectPeer { info_hash, addr, reply }) => {
2640 let result = if let Some(entry) = self.torrents.get(&info_hash) {
2641 entry.handle.connect_peer(addr).await
2642 } else {
2643 Err(crate::Error::TorrentNotFound(info_hash))
2644 };
2645 let _ = reply.send(result);
2646 }
2647 Some(SessionCommand::DhtPutImmutable { value, reply }) => {
2648 let result = self.handle_dht_put_immutable(value).await;
2649 let _ = reply.send(result);
2650 }
2651 Some(SessionCommand::DhtGetImmutable { target, reply }) => {
2652 let result = self.handle_dht_get_immutable(target).await;
2653 let _ = reply.send(result);
2654 }
2655 Some(SessionCommand::DhtPutMutable { keypair_bytes, value, seq, salt, reply }) => {
2656 let result = self.handle_dht_put_mutable(keypair_bytes, value, seq, salt).await;
2657 let _ = reply.send(result);
2658 }
2659 Some(SessionCommand::DhtGetMutable { public_key, salt, reply }) => {
2660 let result = self.handle_dht_get_mutable(public_key, salt).await;
2661 let _ = reply.send(result);
2662 }
2663 Some(SessionCommand::PostSessionStats) => {
2664 self.fire_stats_alert();
2665 }
2666 Some(SessionCommand::SaveResumeState { reply }) => {
2667 let count = self.save_dirty_resume_files().await;
2668 let _ = reply.send(Ok(count));
2669 }
2670 Some(SessionCommand::Shutdown) | None => {
2671 self.shutdown_all().await;
2672 return;
2673 }
2674 }
2675 }
2676 result = async {
2677 match &mut self.lsd_peers_rx {
2678 Some(rx) => rx.recv().await,
2679 None => std::future::pending().await,
2680 }
2681 } => {
2682 if let Some((info_hash, peer_addr)) = result
2683 && let Some(entry) = self.torrents.get(&info_hash)
2684 && !entry.is_private() {
2686 let _ = entry.handle.add_peers(vec![peer_addr], crate::peer_state::PeerSource::Lsd).await;
2687 }
2688 }
2689 Some(conn) = self.validated_conn_rx.recv() => {
2691 self.handle_identified_inbound(conn);
2692 }
2693 result = async {
2695 if let Some(ref mut listener) = self.ssl_listener {
2696 listener.accept().await
2697 } else {
2698 std::future::pending().await
2699 }
2700 } => {
2701 if let Ok((stream, addr)) = result {
2702 self.handle_ssl_incoming(stream, addr).await;
2703 }
2704 }
2705 _ = refill_interval.tick() => {
2707 let elapsed = std::time::Duration::from_millis(100);
2708 self.global_upload_bucket.lock().refill(elapsed);
2709 self.global_download_bucket.lock().refill(elapsed);
2710 }
2711 _ = auto_manage_interval.tick() => {
2713 self.evaluate_queue().await;
2714 }
2715 event = recv_nat_event(&mut self.nat_events_rx) => {
2717 match event {
2718 irontide_nat::NatEvent::MappingSucceeded { port, protocol } => {
2719 info!(port, %protocol, "port mapping succeeded");
2720 post_alert(
2721 &self.alert_tx,
2722 &self.alert_mask,
2723 AlertKind::PortMappingSucceeded { port, protocol },
2724 );
2725 }
2726 irontide_nat::NatEvent::MappingFailed { port, message } => {
2727 warn!(port, %message, "port mapping failed");
2728 post_alert(
2729 &self.alert_tx,
2730 &self.alert_mask,
2731 AlertKind::PortMappingFailed { port, message },
2732 );
2733 }
2734 irontide_nat::NatEvent::ExternalIpDiscovered { ip } => {
2735 info!(%ip, "external IP discovered via NAT traversal");
2736 self.external_ip = Some(ip);
2737 for entry in self.torrents.values() {
2739 let _ = entry.handle.update_external_ip(ip).await;
2740 }
2741 if let Some(dht) = &self.dht_v4 {
2743 let _ = dht.update_external_ip(ip, irontide_dht::IpVoteSource::Nat).await;
2744 }
2745 if let Some(dht) = &self.dht_v6 {
2746 let _ = dht.update_external_ip(ip, irontide_dht::IpVoteSource::Nat).await;
2747 }
2748 }
2749 }
2750 }
2751 Some(ip) = recv_dht_ip(&mut self.dht_v4_ip_rx) => {
2753 info!(%ip, "external IP discovered via DHT v4 (BEP 42)");
2754 self.external_ip = Some(ip);
2755 for entry in self.torrents.values() {
2756 let _ = entry.handle.update_external_ip(ip).await;
2757 }
2758 }
2759 Some(ip) = recv_dht_ip(&mut self.dht_v6_ip_rx) => {
2761 info!(%ip, "external IP discovered via DHT v6 (BEP 42)");
2762 self.external_ip = Some(ip);
2763 for entry in self.torrents.values() {
2764 let _ = entry.handle.update_external_ip(ip).await;
2765 }
2766 }
2767 _ = async {
2769 match &mut stats_timer {
2770 Some(t) => t.tick().await,
2771 None => std::future::pending().await,
2772 }
2773 } => {
2774 self.fire_stats_alert();
2775 }
2776 _ = async {
2778 match &mut sample_timer {
2779 Some(t) => t.tick().await,
2780 None => std::future::pending().await,
2781 }
2782 } => {
2783 self.fire_sample_infohashes().await;
2784 }
2785 _ = async {
2787 match &mut resume_save_interval {
2788 Some(t) => t.tick().await,
2789 None => std::future::pending().await,
2790 }
2791 } => {
2792 let count = self.save_dirty_resume_files().await;
2793 if count > 0 {
2794 info!(count, "periodic resume save completed");
2795 }
2796 }
2797 }
2798 }
2799 }
2800
2801 fn global_buckets_if_limited(&self) -> (Option<SharedBucket>, Option<SharedBucket>) {
2803 let up = if self.settings.upload_rate_limit > 0 {
2804 Some(Arc::clone(&self.global_upload_bucket))
2805 } else {
2806 None
2807 };
2808 let down = if self.settings.download_rate_limit > 0 {
2809 Some(Arc::clone(&self.global_download_bucket))
2810 } else {
2811 None
2812 };
2813 (up, down)
2814 }
2815
2816 fn make_slot_tuner(&self) -> crate::slot_tuner::SlotTuner {
2817 if self.settings.auto_upload_slots {
2818 crate::slot_tuner::SlotTuner::new(
2819 4, self.settings.auto_upload_slots_min,
2821 self.settings.auto_upload_slots_max,
2822 )
2823 } else {
2824 crate::slot_tuner::SlotTuner::disabled(4)
2825 }
2826 }
2827
2828 fn make_torrent_config(&self) -> TorrentConfig {
2829 TorrentConfig::from(&self.settings)
2830 }
2831
2832 fn next_queue_position(&self) -> i32 {
2834 self.torrents
2835 .values()
2836 .filter(|e| e.auto_managed)
2837 .map(|e| e.queue_position)
2838 .max()
2839 .map(|m| m + 1)
2840 .unwrap_or(0)
2841 }
2842
2843 async fn handle_add_torrent(
2844 &mut self,
2845 torrent_meta: irontide_core::TorrentMeta,
2846 storage: Option<Arc<dyn TorrentStorage>>,
2847 download_dir: Option<PathBuf>,
2848 ) -> crate::Result<Id20> {
2849 let version = torrent_meta.version();
2850 let meta_v2 = torrent_meta.as_v2().cloned();
2851
2852 let meta = match torrent_meta.as_v1() {
2856 Some(v1) => v1.clone(),
2857 None => {
2858 let v2 = torrent_meta.as_v2().unwrap();
2859 synthesize_v1_from_v2(v2)?
2860 }
2861 };
2862
2863 let info_hash = meta.info_hash;
2864
2865 if self.torrents.contains_key(&info_hash) {
2866 return Err(crate::Error::DuplicateTorrent(info_hash));
2867 }
2868
2869 if self.torrents.len() >= self.settings.max_torrents {
2870 return Err(crate::Error::SessionAtCapacity(self.settings.max_torrents));
2871 }
2872
2873 let mut torrent_config = self.make_torrent_config();
2874 if let Some(dir) = download_dir {
2875 torrent_config.download_dir = dir;
2876 }
2877
2878 let storage: Arc<dyn TorrentStorage> = match storage {
2880 Some(s) => s,
2881 None => {
2882 let lengths = Lengths::new(
2883 meta.info.total_length(),
2884 meta.info.piece_length,
2885 DEFAULT_CHUNK_SIZE,
2886 );
2887 let files = meta.info.files();
2888 let file_paths: Vec<PathBuf> = files
2889 .iter()
2890 .map(|f| f.path.iter().collect::<PathBuf>())
2891 .collect();
2892 let file_lengths: Vec<u64> = files.iter().map(|f| f.length).collect();
2893 let prealloc_mode = torrent_config.preallocate_mode.unwrap_or_else(|| {
2894 irontide_storage::PreallocateMode::from(
2895 torrent_config.storage_mode == irontide_core::StorageMode::Full,
2896 )
2897 });
2898 match irontide_storage::FilesystemStorage::new(
2899 &torrent_config.download_dir,
2900 file_paths,
2901 file_lengths,
2902 lengths.clone(),
2903 None,
2904 prealloc_mode,
2905 torrent_config.filesystem_direct_io,
2906 ) {
2907 Ok(s) => Arc::new(s),
2908 Err(e) => {
2909 warn!(
2910 "failed to create filesystem storage: {e}, falling back to memory"
2911 );
2912 Arc::new(irontide_storage::MemoryStorage::new(lengths))
2913 }
2914 }
2915 }
2916 };
2917 let disk_handle = self.disk_manager.register_torrent(info_hash, storage).await;
2918
2919 let (global_up, global_down) = self.global_buckets_if_limited();
2920 let slot_tuner = self.make_slot_tuner();
2921
2922 let handle = TorrentHandle::from_torrent(
2923 meta.clone(),
2924 version,
2925 meta_v2,
2926 disk_handle,
2927 self.disk_manager.clone(),
2928 torrent_config,
2929 self.dht_v4.clone(),
2930 self.dht_v6.clone(),
2931 global_up,
2932 global_down,
2933 slot_tuner,
2934 self.alert_tx.clone(),
2935 Arc::clone(&self.alert_mask),
2936 self.utp_socket.clone(),
2937 self.utp_socket_v6.clone(),
2938 Arc::clone(&self.ban_manager),
2939 Arc::clone(&self.ip_filter),
2940 Arc::clone(&self.plugins),
2941 self.sam_session.clone(),
2942 self.ssl_manager.clone(),
2943 Arc::clone(&self.factory),
2944 Some(Arc::clone(&self.hash_pool)),
2945 )
2946 .await?;
2947
2948 let name = meta.info.name.clone();
2949 self.torrents.insert(
2950 info_hash,
2951 TorrentEntry {
2952 handle,
2953 meta: Some(meta),
2954 queue_position: -1,
2955 auto_managed: true,
2956 started_at: Some(tokio::time::Instant::now()),
2957 prev_downloaded: 0,
2958 prev_uploaded: 0,
2959 },
2960 );
2961 self.info_hash_registry.insert(info_hash, ());
2962
2963 let pos = self.next_queue_position();
2965 if let Some(entry) = self.torrents.get_mut(&info_hash)
2966 && entry.auto_managed
2967 {
2968 entry.queue_position = pos;
2969 }
2970
2971 info!(%info_hash, "torrent added to session");
2972 post_alert(
2973 &self.alert_tx,
2974 &self.alert_mask,
2975 AlertKind::TorrentAdded { info_hash, name },
2976 );
2977 let is_private = self
2979 .torrents
2980 .get(&info_hash)
2981 .is_some_and(|e| e.is_private());
2982 if let Some(ref lsd) = self.lsd
2983 && !is_private
2984 {
2985 lsd.announce(vec![info_hash]).await;
2986 }
2987 Ok(info_hash)
2988 }
2989
2990 async fn handle_add_magnet(
2991 &mut self,
2992 magnet: Magnet,
2993 download_dir: Option<PathBuf>,
2994 ) -> crate::Result<Id20> {
2995 let info_hash = magnet.info_hash();
2996 let display_name = magnet.display_name.clone().unwrap_or_default();
2997 if self.torrents.contains_key(&info_hash) {
2998 return Err(crate::Error::DuplicateTorrent(info_hash));
2999 }
3000 if self.torrents.len() >= self.settings.max_torrents {
3001 return Err(crate::Error::SessionAtCapacity(self.settings.max_torrents));
3002 }
3003 let mut config = self.make_torrent_config();
3004 if let Some(dir) = download_dir {
3005 config.download_dir = dir;
3006 }
3007 let (global_up, global_down) = self.global_buckets_if_limited();
3008 let slot_tuner = self.make_slot_tuner();
3009 let handle = TorrentHandle::from_magnet(
3010 magnet,
3011 self.disk_manager.clone(),
3012 config,
3013 self.dht_v4.clone(),
3014 self.dht_v6.clone(),
3015 global_up,
3016 global_down,
3017 slot_tuner,
3018 self.alert_tx.clone(),
3019 Arc::clone(&self.alert_mask),
3020 self.utp_socket.clone(),
3021 self.utp_socket_v6.clone(),
3022 Arc::clone(&self.ban_manager),
3023 Arc::clone(&self.ip_filter),
3024 Arc::clone(&self.plugins),
3025 self.sam_session.clone(),
3026 self.ssl_manager.clone(),
3027 Arc::clone(&self.factory),
3028 Some(Arc::clone(&self.hash_pool)),
3029 )
3030 .await?;
3031 self.spawn_metadata_resolver(info_hash, &handle);
3035
3036 self.torrents.insert(
3037 info_hash,
3038 TorrentEntry {
3039 handle,
3040 meta: None,
3041 queue_position: -1,
3042 auto_managed: true,
3043 started_at: Some(tokio::time::Instant::now()),
3044 prev_downloaded: 0,
3045 prev_uploaded: 0,
3046 },
3047 );
3048 self.info_hash_registry.insert(info_hash, ());
3049
3050 let pos = self.next_queue_position();
3052 if let Some(entry) = self.torrents.get_mut(&info_hash)
3053 && entry.auto_managed
3054 {
3055 entry.queue_position = pos;
3056 }
3057
3058 info!(%info_hash, "magnet torrent added to session");
3059 post_alert(
3060 &self.alert_tx,
3061 &self.alert_mask,
3062 AlertKind::TorrentAdded {
3063 info_hash,
3064 name: display_name,
3065 },
3066 );
3067 if let Some(ref lsd) = self.lsd {
3071 lsd.announce(vec![info_hash]).await;
3072 }
3073 Ok(info_hash)
3074 }
3075
3076 fn spawn_metadata_resolver(&self, info_hash: Id20, torrent_handle: &TorrentHandle) {
3083 let dht = match self.dht_v4 {
3084 Some(ref dht) => dht.clone(),
3085 None => return, };
3087 let factory = Arc::clone(&self.factory);
3088 let connect_timeout = std::time::Duration::from_secs(self.settings.peer_connect_timeout);
3089 let handle = torrent_handle.clone();
3090
3091 tokio::spawn(async move {
3092 let peer_rx = match dht.get_peers(info_hash).await {
3093 Ok(rx) => rx,
3094 Err(e) => {
3095 debug!(
3096 %info_hash,
3097 "metadata resolver: failed to start DHT get_peers: {e}"
3098 );
3099 return;
3100 }
3101 };
3102
3103 let peer_id = irontide_core::PeerId::generate().0;
3104 match crate::metadata_resolver::resolve_metadata(
3105 info_hash,
3106 peer_id,
3107 peer_rx,
3108 factory,
3109 connect_timeout,
3110 crate::metadata_resolver::DEFAULT_MAX_CONCURRENT,
3111 )
3112 .await
3113 {
3114 Ok((meta, peers)) => {
3115 let info_bytes = if let Some(b) = meta.info_bytes {
3116 b.to_vec()
3117 } else {
3118 match irontide_bencode::to_bytes(&meta.info) {
3119 Ok(bytes) => bytes,
3120 Err(e) => {
3121 debug!(
3122 %info_hash,
3123 "metadata resolver: failed to re-encode info dict: {e}"
3124 );
3125 return;
3126 }
3127 }
3128 };
3129 debug!(
3130 %info_hash,
3131 num_peers = peers.len(),
3132 "metadata resolver: pre-resolved metadata, sending to torrent actor"
3133 );
3134 handle.send_pre_resolved_metadata(info_bytes, peers);
3135 }
3136 Err(e) => {
3137 debug!(
3138 %info_hash,
3139 "metadata resolver: failed to resolve metadata: {e}"
3140 );
3141 }
3142 }
3143 });
3144 }
3145
3146 async fn handle_remove_torrent(&mut self, info_hash: Id20) -> crate::Result<()> {
3147 let entry = self
3148 .torrents
3149 .remove(&info_hash)
3150 .ok_or(crate::Error::TorrentNotFound(info_hash))?;
3151 self.info_hash_registry.remove(&info_hash);
3152 let was_auto_managed = entry.auto_managed;
3153 let removed_position = entry.queue_position;
3154 entry.handle.shutdown().await?;
3155 self.disk_manager.unregister_torrent(info_hash).await;
3156
3157 if was_auto_managed && removed_position >= 0 {
3159 let mut entries = self.queue_entries();
3160 let changed = crate::queue::remove_position(&mut entries, removed_position);
3161 self.apply_queue_changes(&changed);
3162 }
3163
3164 let resume_dir = self.effective_resume_dir();
3168 if let Err(e) = crate::resume_file::delete_resume_file(&resume_dir, &info_hash) {
3169 if e.kind() != std::io::ErrorKind::NotFound {
3171 warn!(%info_hash, "failed to delete resume file on removal: {e}");
3172 }
3173 }
3174
3175 info!(%info_hash, "torrent removed from session");
3176 post_alert(
3177 &self.alert_tx,
3178 &self.alert_mask,
3179 AlertKind::TorrentRemoved { info_hash },
3180 );
3181 Ok(())
3182 }
3183
3184 async fn handle_pause_torrent(&mut self, info_hash: Id20) -> crate::Result<()> {
3185 let entry = self
3186 .torrents
3187 .get(&info_hash)
3188 .ok_or(crate::Error::TorrentNotFound(info_hash))?;
3189 entry.handle.pause().await
3190 }
3191
3192 async fn handle_resume_torrent(&mut self, info_hash: Id20) -> crate::Result<()> {
3193 let entry = self
3194 .torrents
3195 .get(&info_hash)
3196 .ok_or(crate::Error::TorrentNotFound(info_hash))?;
3197 entry.handle.resume().await
3198 }
3199
3200 async fn handle_move_torrent_storage(
3201 &self,
3202 info_hash: Id20,
3203 new_path: std::path::PathBuf,
3204 ) -> crate::Result<()> {
3205 let entry = self
3206 .torrents
3207 .get(&info_hash)
3208 .ok_or(crate::Error::TorrentNotFound(info_hash))?;
3209 entry.handle.move_storage(new_path).await
3210 }
3211
3212 async fn handle_torrent_stats(&self, info_hash: Id20) -> crate::Result<TorrentStats> {
3213 let entry = self
3214 .torrents
3215 .get(&info_hash)
3216 .ok_or(crate::Error::TorrentNotFound(info_hash))?;
3217 let mut stats = entry.handle.stats().await?;
3218 stats.queue_position = entry.queue_position;
3220 stats.auto_managed = entry.auto_managed;
3221 Ok(stats)
3222 }
3223
3224 fn handle_torrent_info(&self, info_hash: Id20) -> crate::Result<TorrentInfo> {
3225 let entry = self
3226 .torrents
3227 .get(&info_hash)
3228 .ok_or(crate::Error::TorrentNotFound(info_hash))?;
3229
3230 let meta = entry
3231 .meta
3232 .as_ref()
3233 .ok_or(crate::Error::MetadataNotReady(info_hash))?;
3234 let files: Vec<FileInfo> = if let Some(ref file_list) = meta.info.files {
3235 file_list
3236 .iter()
3237 .map(|f| FileInfo {
3238 path: f.path.iter().collect::<PathBuf>(),
3239 length: f.length,
3240 })
3241 .collect()
3242 } else {
3243 vec![FileInfo {
3244 path: PathBuf::from(&meta.info.name),
3245 length: meta.info.total_length(),
3246 }]
3247 };
3248
3249 Ok(TorrentInfo {
3250 info_hash,
3251 name: meta.info.name.clone(),
3252 total_length: meta.info.total_length(),
3253 piece_length: meta.info.piece_length,
3254 num_pieces: meta.info.num_pieces() as u32,
3255 files,
3256 private: meta.info.private == Some(1),
3257 })
3258 }
3259
3260 fn update_session_gauges(&self) {
3262 use crate::stats::*;
3263 let c = &self.counters;
3264 c.set(SES_NUM_TORRENTS, self.torrents.len() as i64);
3265 c.set(SES_ACTIVE_TORRENTS, self.torrents.len() as i64);
3266
3267 let dht_nodes = self.dht_v4.is_some() as i64 + self.dht_v6.is_some() as i64;
3269 c.set(DHT_NODES, dht_nodes);
3270 c.set(DHT_NODES_V4, self.dht_v4.is_some() as i64);
3271 c.set(DHT_NODES_V6, self.dht_v6.is_some() as i64);
3272
3273 let ban_count = self.ban_manager.read().banned_list().len() as i64;
3275 c.set(PEER_NUM_BANNED, ban_count);
3276 }
3277
3278 fn fire_stats_alert(&self) {
3280 self.update_session_gauges();
3281 let values = self.counters.snapshot();
3282 crate::alert::post_alert(
3283 &self.alert_tx,
3284 &self.alert_mask,
3285 crate::alert::AlertKind::SessionStatsAlert { values },
3286 );
3287 }
3288
3289 async fn fire_sample_infohashes(&self) {
3291 let dht = match (&self.dht_v4, &self.dht_v6) {
3292 (Some(d), _) | (_, Some(d)) => d,
3293 _ => return,
3294 };
3295 let mut buf = [0u8; 20];
3296 irontide_core::random_bytes(&mut buf);
3297 let target = Id20::from(buf);
3298 match dht.sample_infohashes(target).await {
3299 Ok(result) => {
3300 post_alert(
3301 &self.alert_tx,
3302 &self.alert_mask,
3303 AlertKind::DhtSampleInfohashes {
3304 num_samples: result.samples.len(),
3305 total_estimate: result.num,
3306 },
3307 );
3308 }
3309 Err(e) => {
3310 debug!("sample_infohashes failed: {e}");
3311 }
3312 }
3313 }
3314
3315 async fn make_session_stats(&self) -> SessionStats {
3316 self.update_session_gauges();
3317
3318 let mut total_downloaded = 0u64;
3319 let mut total_uploaded = 0u64;
3320
3321 for entry in self.torrents.values() {
3322 if let Ok(stats) = entry.handle.stats().await {
3323 total_downloaded += stats.downloaded;
3324 total_uploaded += stats.uploaded;
3325 }
3326 }
3327
3328 SessionStats {
3329 active_torrents: self.torrents.len(),
3330 total_downloaded,
3331 total_uploaded,
3332 dht_nodes: self.dht_v4.is_some() as usize + self.dht_v6.is_some() as usize,
3333 }
3334 }
3335
3336 async fn handle_save_torrent_resume(
3337 &self,
3338 info_hash: Id20,
3339 ) -> crate::Result<irontide_core::FastResumeData> {
3340 let entry = self
3341 .torrents
3342 .get(&info_hash)
3343 .ok_or(crate::Error::TorrentNotFound(info_hash))?;
3344 let mut resume = entry.handle.save_resume_data().await?;
3345 resume.queue_position = entry.queue_position as i64;
3348 resume.auto_managed = if entry.auto_managed { 1 } else { 0 };
3349 Ok(resume)
3350 }
3351
3352 async fn handle_save_session_state(&self) -> crate::Result<crate::persistence::SessionState> {
3353 use crate::persistence::SessionState;
3354
3355 let mut torrents = Vec::new();
3356 for (info_hash, entry) in &self.torrents {
3357 match entry.handle.save_resume_data().await {
3358 Ok(rd) => torrents.push(rd),
3359 Err(e) => {
3360 warn!(%info_hash, "failed to save resume data: {e}");
3361 }
3362 }
3363 }
3364
3365 let (banned_peers, peer_strikes) = {
3367 let ban_mgr = self.ban_manager.read();
3368 let banned_peers: Vec<String> = ban_mgr
3369 .banned_list()
3370 .iter()
3371 .map(|ip| ip.to_string())
3372 .collect();
3373 let peer_strikes: Vec<crate::persistence::PeerStrikeEntry> = ban_mgr
3374 .strikes_map()
3375 .iter()
3376 .map(|(ip, &count)| crate::persistence::PeerStrikeEntry {
3377 ip: ip.to_string(),
3378 count: count as i64,
3379 })
3380 .collect();
3381 (banned_peers, peer_strikes)
3382 };
3383
3384 let mut dht_entries = Vec::new();
3385 let mut dht_node_id = None;
3386 if let Some(ref dht) = self.dht_v4 {
3387 if let Ok(stats) = dht.stats().await {
3389 dht_node_id = Some(stats.node_id.to_hex());
3390 }
3391 for (_id, addr) in dht.get_routing_nodes().await {
3392 dht_entries.push(crate::persistence::DhtNodeEntry {
3393 host: addr.ip().to_string(),
3394 port: addr.port() as i64,
3395 });
3396 }
3397 }
3398 if let Some(ref dht) = self.dht_v6 {
3399 for (_id, addr) in dht.get_routing_nodes().await {
3400 dht_entries.push(crate::persistence::DhtNodeEntry {
3401 host: addr.ip().to_string(),
3402 port: addr.port() as i64,
3403 });
3404 }
3405 }
3406
3407 Ok(SessionState {
3408 dht_nodes: dht_entries,
3409 dht_node_id,
3410 torrents,
3411 banned_peers,
3412 peer_strikes,
3413 })
3414 }
3415
3416 fn effective_resume_dir(&self) -> PathBuf {
3418 self.settings
3419 .resume_data_dir
3420 .clone()
3421 .unwrap_or_else(crate::resume_file::default_resume_dir)
3422 }
3423
3424 async fn handle_load_resume_state(&mut self) -> crate::Result<ResumeLoadResult> {
3430 let resume_dir = self.effective_resume_dir();
3431 let paths = crate::resume_file::scan_resume_dir(&resume_dir);
3432
3433 let mut restored = 0usize;
3434 let mut skipped = 0usize;
3435 let mut failed = 0usize;
3436
3437 for path in &paths {
3438 let file_name = path
3439 .file_name()
3440 .and_then(|n| n.to_str())
3441 .unwrap_or("<unknown>");
3442
3443 let bytes = match std::fs::read(path) {
3445 Ok(b) => b,
3446 Err(e) => {
3447 warn!(file = %file_name, "failed to read resume file: {e}");
3448 failed = failed.saturating_add(1);
3449 continue;
3450 }
3451 };
3452
3453 let rd = match crate::resume_file::deserialize_resume(&bytes) {
3454 Ok(rd) => rd,
3455 Err(e) => {
3456 warn!(file = %file_name, "failed to deserialize resume file: {e}");
3457 failed = failed.saturating_add(1);
3458 continue;
3459 }
3460 };
3461
3462 if let Some(meta) = crate::resume_file::reconstruct_torrent_meta(&rd) {
3464 let info_hash = meta.info_hash;
3465 let pieces = rd.pieces.clone();
3466 let torrent_meta = irontide_core::TorrentMeta::V1(meta);
3467
3468 let restore_dir = if rd.save_path.is_empty() {
3470 None
3471 } else {
3472 Some(PathBuf::from(&rd.save_path))
3473 };
3474 match self.handle_add_torrent(torrent_meta, None, restore_dir).await {
3475 Ok(added_hash) => {
3476 if !pieces.is_empty()
3478 && let Some(entry) = self.torrents.get(&added_hash)
3479 && let Err(e) = entry.handle.restore_resume_bitmap(pieces).await
3480 {
3481 warn!(
3482 %info_hash,
3483 "failed to restore piece bitmap, torrent will recheck: {e}"
3484 );
3485 }
3486 info!(%info_hash, "restored torrent from resume file");
3487 restored = restored.saturating_add(1);
3488 }
3489 Err(crate::Error::DuplicateTorrent(_)) => {
3490 debug!(%info_hash, "skipped duplicate torrent from resume");
3491 skipped = skipped.saturating_add(1);
3492 }
3493 Err(e) => {
3494 warn!(%info_hash, "failed to add restored torrent: {e}");
3495 failed = failed.saturating_add(1);
3496 }
3497 }
3498 } else if let Some(magnet) = crate::resume_file::reconstruct_magnet(&rd) {
3499 let info_hash = magnet.info_hash();
3501 let restore_dir = if rd.save_path.is_empty() {
3502 None
3503 } else {
3504 Some(PathBuf::from(&rd.save_path))
3505 };
3506 match self.handle_add_magnet(magnet, restore_dir).await {
3507 Ok(_) => {
3508 info!(%info_hash, "restored magnet from resume file");
3509 restored = restored.saturating_add(1);
3510 }
3511 Err(crate::Error::DuplicateTorrent(_)) => {
3512 debug!(%info_hash, "skipped duplicate magnet from resume");
3513 skipped = skipped.saturating_add(1);
3514 }
3515 Err(e) => {
3516 warn!(%info_hash, "failed to add restored magnet: {e}");
3517 failed = failed.saturating_add(1);
3518 }
3519 }
3520 } else {
3521 warn!(file = %file_name, "resume file has no valid info dict and no valid info hash");
3522 failed = failed.saturating_add(1);
3523 }
3524 }
3525
3526 info!(restored, skipped, failed, "resume state loaded");
3527 Ok(ResumeLoadResult {
3528 restored,
3529 skipped,
3530 failed,
3531 })
3532 }
3533
3534 async fn save_dirty_resume_files(&mut self) -> usize {
3538 let resume_dir = self.effective_resume_dir();
3539
3540 if let Err(e) = std::fs::create_dir_all(resume_dir.join("torrents")) {
3541 warn!("failed to create resume dir: {e}");
3542 return 0;
3543 }
3544
3545 let mut saved = 0usize;
3546 let info_hashes: Vec<Id20> = self.torrents.keys().copied().collect();
3548
3549 for info_hash in &info_hashes {
3550 let entry = match self.torrents.get(info_hash) {
3551 Some(e) => e,
3552 None => continue,
3553 };
3554
3555 let needs_save = match entry.handle.stats().await {
3557 Ok(stats) => stats.need_save_resume,
3558 Err(_) => continue,
3559 };
3560 if !needs_save {
3561 continue;
3562 }
3563
3564 let rd = match entry.handle.save_resume_data().await {
3566 Ok(rd) => rd,
3567 Err(e) => {
3568 warn!(%info_hash, "failed to build resume data: {e}");
3569 continue;
3570 }
3571 };
3572
3573 let bytes = match crate::resume_file::serialize_resume(&rd) {
3575 Ok(b) => b,
3576 Err(e) => {
3577 warn!(%info_hash, "failed to serialize resume data: {e}");
3578 continue;
3579 }
3580 };
3581
3582 let path = crate::resume_file::resume_file_path(&resume_dir, info_hash);
3584 if let Err(e) = crate::resume_file::atomic_write(&path, &bytes) {
3585 warn!(%info_hash, "failed to write resume file: {e}");
3586 continue;
3587 }
3588
3589 if let Err(e) = entry.handle.clear_save_resume_flag().await {
3591 warn!(%info_hash, "failed to clear save_resume flag: {e}");
3592 }
3593
3594 saved = saved.saturating_add(1);
3595 }
3596 saved
3597 }
3598
3599 fn handle_apply_settings(&mut self, new: Settings) -> crate::Result<()> {
3604 new.validate()?;
3605
3606 if new.upload_rate_limit != self.settings.upload_rate_limit {
3608 self.global_upload_bucket
3609 .lock()
3610 .set_rate(new.upload_rate_limit);
3611 }
3612 if new.download_rate_limit != self.settings.download_rate_limit {
3613 self.global_download_bucket
3614 .lock()
3615 .set_rate(new.download_rate_limit);
3616 }
3617
3618 if new.alert_mask != self.settings.alert_mask {
3620 self.alert_mask
3621 .store(new.alert_mask.bits(), Ordering::Relaxed);
3622 }
3623
3624 self.settings = new;
3626
3627 post_alert(&self.alert_tx, &self.alert_mask, AlertKind::SettingsChanged);
3629
3630 Ok(())
3631 }
3632
3633 fn queue_entries(&self) -> Vec<crate::queue::QueueEntry> {
3635 self.torrents
3636 .iter()
3637 .filter(|(_, e)| e.auto_managed)
3638 .map(|(&hash, e)| crate::queue::QueueEntry {
3639 info_hash: hash,
3640 position: e.queue_position,
3641 })
3642 .collect()
3643 }
3644
3645 fn handle_set_queue_position(&mut self, info_hash: Id20, pos: i32) -> crate::Result<()> {
3646 if !self.torrents.contains_key(&info_hash) {
3647 return Err(crate::Error::TorrentNotFound(info_hash));
3648 }
3649 let mut entries = self.queue_entries();
3650 let changed = crate::queue::set_position(&mut entries, info_hash, pos);
3651 self.apply_queue_changes(&changed);
3652 Ok(())
3653 }
3654
3655 fn handle_queue_move(&mut self, info_hash: Id20, op: QueueMoveFn) -> crate::Result<()> {
3656 if !self.torrents.contains_key(&info_hash) {
3657 return Err(crate::Error::TorrentNotFound(info_hash));
3658 }
3659 let mut entries = self.queue_entries();
3660 let changed = op(&mut entries, info_hash);
3661 self.apply_queue_changes(&changed);
3662 Ok(())
3663 }
3664
3665 fn apply_queue_changes(&mut self, changed: &[(Id20, i32, i32)]) {
3667 for &(hash, old_pos, new_pos) in changed {
3668 if let Some(entry) = self.torrents.get_mut(&hash) {
3669 entry.queue_position = new_pos;
3670 }
3671 crate::alert::post_alert(
3672 &self.alert_tx,
3673 &self.alert_mask,
3674 crate::alert::AlertKind::TorrentQueuePositionChanged {
3675 info_hash: hash,
3676 old_pos,
3677 new_pos,
3678 },
3679 );
3680 }
3681 }
3682
3683 async fn evaluate_queue(&mut self) {
3684 let now = tokio::time::Instant::now();
3685 let startup_duration = std::time::Duration::from_secs(self.settings.auto_manage_startup);
3686 let auto_manage_secs = self.settings.auto_manage_interval.max(1);
3687 let mut candidates = Vec::new();
3688
3689 let hashes: Vec<Id20> = self.torrents.keys().copied().collect();
3691
3692 for &info_hash in &hashes {
3693 let (auto_managed, queue_position, started_at, prev_downloaded, prev_uploaded) = {
3694 let entry = match self.torrents.get(&info_hash) {
3695 Some(e) => e,
3696 None => continue,
3697 };
3698 if !entry.auto_managed {
3699 continue;
3700 }
3701 (
3702 entry.auto_managed,
3703 entry.queue_position,
3704 entry.started_at,
3705 entry.prev_downloaded,
3706 entry.prev_uploaded,
3707 )
3708 };
3709
3710 let _ = auto_managed; let stats = match self.torrents.get(&info_hash) {
3714 Some(entry) => match entry.handle.stats().await {
3715 Ok(s) => s,
3716 Err(_) => continue,
3717 },
3718 None => continue,
3719 };
3720
3721 let category = match stats.state {
3722 TorrentState::Downloading
3723 | TorrentState::FetchingMetadata
3724 | TorrentState::Checking => crate::queue::QueueCategory::Downloading,
3725 TorrentState::Seeding | TorrentState::Complete => {
3726 crate::queue::QueueCategory::Seeding
3727 }
3728 TorrentState::Paused => {
3729 if stats.pieces_have >= stats.pieces_total && stats.pieces_total > 0 {
3731 crate::queue::QueueCategory::Seeding
3732 } else {
3733 crate::queue::QueueCategory::Downloading
3734 }
3735 }
3736 TorrentState::Stopped | TorrentState::Sharing => continue,
3737 };
3738
3739 let is_active = stats.state != TorrentState::Paused;
3740
3741 let download_rate = stats.downloaded.saturating_sub(prev_downloaded) / auto_manage_secs;
3743 let upload_rate = stats.uploaded.saturating_sub(prev_uploaded) / auto_manage_secs;
3744
3745 let past_startup = started_at
3746 .map(|t| now.duration_since(t) > startup_duration)
3747 .unwrap_or(true);
3748
3749 let is_inactive = past_startup
3750 && match category {
3751 crate::queue::QueueCategory::Downloading => {
3752 download_rate < self.settings.inactive_down_rate
3753 }
3754 crate::queue::QueueCategory::Seeding => {
3755 upload_rate < self.settings.inactive_up_rate
3756 }
3757 };
3758
3759 candidates.push(crate::queue::QueueCandidate {
3760 info_hash,
3761 position: queue_position,
3762 category,
3763 is_active,
3764 is_inactive,
3765 });
3766 }
3767
3768 for &hash in &hashes {
3770 if let Some(entry) = self.torrents.get(&hash)
3771 && let Ok(stats) = entry.handle.stats().await
3772 && let Some(entry) = self.torrents.get_mut(&hash)
3773 {
3774 entry.prev_downloaded = stats.downloaded;
3775 entry.prev_uploaded = stats.uploaded;
3776 }
3777 }
3778
3779 let decision = crate::queue::evaluate(
3780 &candidates,
3781 self.settings.active_downloads,
3782 self.settings.active_seeds,
3783 self.settings.active_limit,
3784 self.settings.dont_count_slow_torrents,
3785 self.settings.auto_manage_prefer_seeds,
3786 );
3787
3788 for hash in &decision.to_pause {
3790 if let Some(entry) = self.torrents.get(hash) {
3791 let _ = entry.handle.pause().await;
3792 }
3793 post_alert(
3794 &self.alert_tx,
3795 &self.alert_mask,
3796 AlertKind::TorrentAutoManaged {
3797 info_hash: *hash,
3798 paused: true,
3799 },
3800 );
3801 }
3802
3803 for hash in &decision.to_resume {
3804 if let Some(entry) = self.torrents.get_mut(hash) {
3805 let _ = entry.handle.resume().await;
3806 entry.started_at = Some(tokio::time::Instant::now());
3807 }
3808 post_alert(
3809 &self.alert_tx,
3810 &self.alert_mask,
3811 AlertKind::TorrentAutoManaged {
3812 info_hash: *hash,
3813 paused: false,
3814 },
3815 );
3816 }
3817 }
3818
3819 fn handle_identified_inbound(&self, conn: crate::listener::IdentifiedConnection) {
3821 if let Some(entry) = self.torrents.get(&conn.info_hash) {
3822 debug!(%conn.addr, %conn.info_hash, "routing validated inbound peer");
3823 let handle = entry.handle.clone();
3824 tokio::spawn(async move {
3825 let _ = handle.send_incoming_peer(conn.stream, conn.addr).await;
3826 });
3827 } else {
3828 debug!(%conn.addr, %conn.info_hash, "validated peer for removed torrent, dropping");
3830 }
3831 }
3832
3833 async fn handle_ssl_incoming(
3840 &mut self,
3841 stream: crate::transport::BoxedStream,
3842 addr: std::net::SocketAddr,
3843 ) {
3844 use tokio_rustls::LazyConfigAcceptor;
3845
3846 let acceptor = LazyConfigAcceptor::new(rustls::server::Acceptor::default(), stream);
3847
3848 let start_handshake = match acceptor.await {
3849 Ok(sh) => sh,
3850 Err(e) => {
3851 debug!(%addr, error = %e, "SSL ClientHello read failed");
3852 return;
3853 }
3854 };
3855
3856 let client_hello = start_handshake.client_hello();
3858 let sni = match client_hello.server_name() {
3859 Some(name) => name.to_string(),
3860 None => {
3861 debug!(%addr, "SSL connection missing SNI");
3862 return;
3863 }
3864 };
3865
3866 let info_hash = match Id20::from_hex(&sni) {
3868 Ok(h) => h,
3869 Err(_) => {
3870 debug!(%addr, sni = %sni, "SSL SNI is not a valid info hash");
3871 return;
3872 }
3873 };
3874
3875 let torrent = match self.torrents.get(&info_hash) {
3877 Some(t) => t,
3878 None => {
3879 debug!(%addr, %info_hash, "SSL connection for unknown torrent");
3880 return;
3881 }
3882 };
3883
3884 let ssl_cert = match torrent.meta.as_ref().and_then(|m| m.ssl_cert.as_ref()) {
3886 Some(cert) => cert.clone(),
3887 None => {
3888 debug!(%addr, %info_hash, "SSL connection for non-SSL torrent");
3889 return;
3890 }
3891 };
3892
3893 let server_config = match self.ssl_manager.as_ref() {
3895 Some(mgr) => match mgr.server_config(&ssl_cert) {
3896 Ok(cfg) => cfg,
3897 Err(e) => {
3898 warn!(%addr, %info_hash, error = %e, "failed to build SSL server config");
3899 return;
3900 }
3901 },
3902 None => {
3903 debug!(%addr, "SSL manager not initialized");
3904 return;
3905 }
3906 };
3907
3908 let tls_stream = match start_handshake.into_stream(server_config).await {
3910 Ok(s) => s,
3911 Err(e) => {
3912 warn!(%addr, %info_hash, error = %e, "SSL handshake failed");
3913 post_alert(
3914 &self.alert_tx,
3915 &self.alert_mask,
3916 AlertKind::SslTorrentError {
3917 info_hash,
3918 message: format!("inbound TLS handshake from {addr}: {e}"),
3919 },
3920 );
3921 return;
3922 }
3923 };
3924
3925 let _ = torrent.handle.spawn_ssl_peer(addr, tls_stream).await;
3927 }
3928
3929 async fn handle_dht_put_immutable(&self, value: Vec<u8>) -> crate::Result<Id20> {
3930 let dht = self.dht_v4.as_ref().ok_or(crate::Error::DhtDisabled)?;
3931 match dht.put_immutable(value.clone()).await {
3932 Ok(target) => {
3933 post_alert(
3934 &self.alert_tx,
3935 &self.alert_mask,
3936 AlertKind::DhtPutComplete { target },
3937 );
3938 Ok(target)
3939 }
3940 Err(e) => {
3941 let target = irontide_core::sha1(&value);
3942 post_alert(
3943 &self.alert_tx,
3944 &self.alert_mask,
3945 AlertKind::DhtItemError {
3946 target,
3947 message: e.to_string(),
3948 },
3949 );
3950 Err(crate::Error::Dht(e))
3951 }
3952 }
3953 }
3954
3955 async fn handle_dht_get_immutable(&self, target: Id20) -> crate::Result<Option<Vec<u8>>> {
3956 let dht = self.dht_v4.as_ref().ok_or(crate::Error::DhtDisabled)?;
3957 match dht.get_immutable(target).await {
3958 Ok(value) => {
3959 post_alert(
3960 &self.alert_tx,
3961 &self.alert_mask,
3962 AlertKind::DhtGetResult {
3963 target,
3964 value: value.clone(),
3965 },
3966 );
3967 Ok(value)
3968 }
3969 Err(e) => {
3970 post_alert(
3971 &self.alert_tx,
3972 &self.alert_mask,
3973 AlertKind::DhtItemError {
3974 target,
3975 message: e.to_string(),
3976 },
3977 );
3978 Err(crate::Error::Dht(e))
3979 }
3980 }
3981 }
3982
3983 async fn handle_dht_put_mutable(
3984 &self,
3985 keypair_bytes: [u8; 32],
3986 value: Vec<u8>,
3987 seq: i64,
3988 salt: Vec<u8>,
3989 ) -> crate::Result<Id20> {
3990 let dht = self.dht_v4.as_ref().ok_or(crate::Error::DhtDisabled)?;
3991 match dht.put_mutable(keypair_bytes, value, seq, salt).await {
3992 Ok(target) => {
3993 post_alert(
3994 &self.alert_tx,
3995 &self.alert_mask,
3996 AlertKind::DhtMutablePutComplete { target, seq },
3997 );
3998 Ok(target)
3999 }
4000 Err(e) => {
4001 post_alert(
4002 &self.alert_tx,
4003 &self.alert_mask,
4004 AlertKind::DhtItemError {
4005 target: Id20::from([0u8; 20]),
4006 message: e.to_string(),
4007 },
4008 );
4009 Err(crate::Error::Dht(e))
4010 }
4011 }
4012 }
4013
4014 async fn handle_dht_get_mutable(
4015 &self,
4016 public_key: [u8; 32],
4017 salt: Vec<u8>,
4018 ) -> crate::Result<Option<(Vec<u8>, i64)>> {
4019 let dht = self.dht_v4.as_ref().ok_or(crate::Error::DhtDisabled)?;
4020 let target = irontide_dht::compute_mutable_target(&public_key, &salt);
4021 match dht.get_mutable(public_key, salt).await {
4022 Ok(result) => {
4023 let (value, seq) = match &result {
4024 Some((v, s)) => (Some(v.clone()), Some(*s)),
4025 None => (None, None),
4026 };
4027 post_alert(
4028 &self.alert_tx,
4029 &self.alert_mask,
4030 AlertKind::DhtMutableGetResult {
4031 target,
4032 value,
4033 seq,
4034 public_key,
4035 },
4036 );
4037 Ok(result)
4038 }
4039 Err(e) => {
4040 post_alert(
4041 &self.alert_tx,
4042 &self.alert_mask,
4043 AlertKind::DhtItemError {
4044 target,
4045 message: e.to_string(),
4046 },
4047 );
4048 Err(crate::Error::Dht(e))
4049 }
4050 }
4051 }
4052
4053 async fn shutdown_all(&mut self) {
4054 let save_count = self.save_dirty_resume_files().await;
4056 if save_count > 0 {
4057 info!(save_count, "saved resume files on shutdown");
4058 }
4059
4060 for (info_hash, entry) in self.torrents.drain() {
4061 debug!(%info_hash, "shutting down torrent");
4062 let _ = entry.handle.shutdown().await;
4063 }
4064 if let Some(ref dht) = self.dht_v4 {
4065 let _ = dht.shutdown().await;
4066 }
4067 if let Some(ref dht) = self.dht_v6 {
4068 let _ = dht.shutdown().await;
4069 }
4070 if let Some(ref nat) = self.nat {
4071 nat.shutdown().await;
4072 }
4073 if let Some(ref lsd) = self.lsd {
4074 lsd.shutdown().await;
4075 }
4076 if let Some(ref socket) = self.utp_socket
4077 && let Err(e) = socket.shutdown().await
4078 {
4079 debug!(error = %e, "uTP socket shutdown error");
4080 }
4081 if let Some(ref socket) = self.utp_socket_v6
4082 && let Err(e) = socket.shutdown().await
4083 {
4084 debug!(error = %e, "uTP v6 socket shutdown error");
4085 }
4086 self.disk_manager.shutdown().await;
4087 }
4088}
4089
4090async fn recv_nat_event(
4093 rx: &mut Option<mpsc::Receiver<irontide_nat::NatEvent>>,
4094) -> irontide_nat::NatEvent {
4095 match rx {
4096 Some(r) => match r.recv().await {
4097 Some(event) => event,
4098 None => std::future::pending().await,
4099 },
4100 None => std::future::pending().await,
4101 }
4102}
4103
4104async fn recv_dht_ip(
4106 rx: &mut Option<mpsc::Receiver<std::net::IpAddr>>,
4107) -> Option<std::net::IpAddr> {
4108 match rx {
4109 Some(r) => r.recv().await,
4110 None => std::future::pending().await,
4111 }
4112}
4113
4114fn synthesize_v1_from_v2(
4120 v2: &irontide_core::TorrentMetaV2,
4121) -> crate::Result<irontide_core::TorrentMetaV1> {
4122 use irontide_core::{FileEntry, InfoDict};
4123
4124 let info_hash = v2.info_hashes.best_v1();
4125
4126 let v2_files = v2.info.files();
4128 let file_entries: Vec<FileEntry> = v2_files
4129 .iter()
4130 .map(|f| FileEntry {
4131 length: f.attr.length,
4132 path: f.path.clone(),
4133 attr: None,
4134 mtime: None,
4135 symlink_path: None,
4136 })
4137 .collect();
4138
4139 let num_pieces = v2.info.num_pieces() as usize;
4142 let pieces = vec![0u8; num_pieces * 20];
4143
4144 let info = InfoDict {
4145 name: v2.info.name.clone(),
4146 piece_length: v2.info.piece_length,
4147 pieces,
4148 length: if file_entries.len() == 1 {
4149 Some(file_entries[0].length)
4150 } else {
4151 None
4152 },
4153 files: if file_entries.len() > 1 {
4154 Some(file_entries)
4155 } else {
4156 None
4157 },
4158 private: None,
4159 source: None,
4160 ssl_cert: v2.ssl_cert.clone(),
4161 similar: Vec::new(),
4162 collections: Vec::new(),
4163 };
4164
4165 Ok(irontide_core::TorrentMetaV1 {
4166 info_hash,
4167 announce: v2.announce.clone(),
4168 announce_list: v2.announce_list.clone(),
4169 comment: v2.comment.clone(),
4170 created_by: v2.created_by.clone(),
4171 creation_date: v2.creation_date,
4172 info,
4173 info_bytes: None,
4174 url_list: Vec::new(),
4175 httpseeds: Vec::new(),
4176 ssl_cert: v2.ssl_cert.clone(),
4177 })
4178}
4179
4180#[cfg(test)]
4181mod tests {
4182 use super::*;
4183 use crate::types::TorrentState;
4184 use irontide_core::{DEFAULT_CHUNK_SIZE, Lengths, torrent_from_bytes};
4185 use irontide_storage::MemoryStorage;
4186 use std::time::Duration;
4187
4188 fn make_test_torrent(data: &[u8], piece_length: u64) -> TorrentMetaV1 {
4189 use serde::Serialize;
4190
4191 let mut pieces = Vec::new();
4192 let mut offset = 0;
4193 while offset < data.len() {
4194 let end = (offset + piece_length as usize).min(data.len());
4195 let hash = irontide_core::sha1(&data[offset..end]);
4196 pieces.extend_from_slice(hash.as_bytes());
4197 offset = end;
4198 }
4199
4200 #[derive(Serialize)]
4201 struct Info<'a> {
4202 length: u64,
4203 name: &'a str,
4204 #[serde(rename = "piece length")]
4205 piece_length: u64,
4206 #[serde(with = "serde_bytes")]
4207 pieces: &'a [u8],
4208 }
4209
4210 #[derive(Serialize)]
4211 struct Torrent<'a> {
4212 info: Info<'a>,
4213 }
4214
4215 let t = Torrent {
4216 info: Info {
4217 length: data.len() as u64,
4218 name: "test",
4219 piece_length,
4220 pieces: &pieces,
4221 },
4222 };
4223
4224 let bytes = irontide_bencode::to_bytes(&t).unwrap();
4225 torrent_from_bytes(&bytes).unwrap()
4226 }
4227
4228 fn make_storage(data: &[u8], piece_length: u64) -> Arc<MemoryStorage> {
4229 let lengths = Lengths::new(data.len() as u64, piece_length, DEFAULT_CHUNK_SIZE);
4230 Arc::new(MemoryStorage::new(lengths))
4231 }
4232
4233 fn test_settings() -> Settings {
4234 Settings {
4235 listen_port: 0,
4236 download_dir: PathBuf::from("/tmp"),
4237 max_torrents: 10,
4238 enable_dht: false,
4239 enable_pex: false,
4240 enable_lsd: false,
4241 enable_fast_extension: false,
4242 enable_utp: false,
4243 enable_upnp: false,
4244 enable_natpmp: false,
4245 enable_ipv6: false,
4246 alert_channel_size: 64,
4247 disk_io_threads: 2,
4248 storage_mode: irontide_core::StorageMode::Sparse,
4249 disk_cache_size: 1024 * 1024,
4250 ..Settings::default()
4251 }
4252 }
4253
4254 #[tokio::test]
4257 async fn session_start_and_shutdown() {
4258 let session = SessionHandle::start(test_settings()).await.unwrap();
4259 let stats = session.session_stats().await.unwrap();
4260 assert_eq!(stats.active_torrents, 0);
4261 session.shutdown().await.unwrap();
4262 }
4263
4264 #[tokio::test]
4267 async fn add_and_list_torrent() {
4268 let session = SessionHandle::start(test_settings()).await.unwrap();
4269 let data = vec![0xAB; 16384];
4270 let meta = make_test_torrent(&data, 16384);
4271 let expected_hash = meta.info_hash;
4272
4273 let storage = make_storage(&data, 16384);
4274 let info_hash = session
4275 .add_torrent(meta.into(), Some(storage))
4276 .await
4277 .unwrap();
4278 assert_eq!(info_hash, expected_hash);
4279
4280 let list = session.list_torrents().await.unwrap();
4281 assert_eq!(list.len(), 1);
4282 assert!(list.contains(&info_hash));
4283
4284 session.shutdown().await.unwrap();
4285 }
4286
4287 #[tokio::test]
4290 async fn remove_torrent() {
4291 let session = SessionHandle::start(test_settings()).await.unwrap();
4292 let data = vec![0xAB; 16384];
4293 let meta = make_test_torrent(&data, 16384);
4294 let storage = make_storage(&data, 16384);
4295
4296 let info_hash = session
4297 .add_torrent(meta.into(), Some(storage))
4298 .await
4299 .unwrap();
4300 session.remove_torrent(info_hash).await.unwrap();
4301
4302 tokio::time::sleep(Duration::from_millis(50)).await;
4303
4304 let list = session.list_torrents().await.unwrap();
4305 assert!(list.is_empty());
4306
4307 session.shutdown().await.unwrap();
4308 }
4309
4310 #[tokio::test]
4313 async fn duplicate_torrent_rejected() {
4314 let session = SessionHandle::start(test_settings()).await.unwrap();
4315 let data = vec![0xAB; 16384];
4316 let meta = make_test_torrent(&data, 16384);
4317 let storage1 = make_storage(&data, 16384);
4318 let storage2 = make_storage(&data, 16384);
4319
4320 session
4321 .add_torrent(meta.clone().into(), Some(storage1))
4322 .await
4323 .unwrap();
4324 let result = session.add_torrent(meta.into(), Some(storage2)).await;
4325 assert!(result.is_err());
4326 assert!(result.unwrap_err().to_string().contains("duplicate"));
4327
4328 session.shutdown().await.unwrap();
4329 }
4330
4331 #[tokio::test]
4334 async fn session_at_capacity() {
4335 let mut config = test_settings();
4336 config.max_torrents = 1;
4337 let session = SessionHandle::start(config).await.unwrap();
4338
4339 let data1 = vec![0xAA; 16384];
4340 let meta1 = make_test_torrent(&data1, 16384);
4341 let storage1 = make_storage(&data1, 16384);
4342 session
4343 .add_torrent(meta1.into(), Some(storage1))
4344 .await
4345 .unwrap();
4346
4347 let data2 = vec![0xBB; 16384];
4348 let meta2 = make_test_torrent(&data2, 16384);
4349 let storage2 = make_storage(&data2, 16384);
4350 let result = session.add_torrent(meta2.into(), Some(storage2)).await;
4351 assert!(result.is_err());
4352 assert!(result.unwrap_err().to_string().contains("capacity"));
4353
4354 session.shutdown().await.unwrap();
4355 }
4356
4357 #[tokio::test]
4360 async fn torrent_stats_via_session() {
4361 let session = SessionHandle::start(test_settings()).await.unwrap();
4362 let data = vec![0xAB; 32768];
4363 let meta = make_test_torrent(&data, 16384);
4364 let storage = make_storage(&data, 16384);
4365
4366 let info_hash = session
4367 .add_torrent(meta.into(), Some(storage))
4368 .await
4369 .unwrap();
4370 let stats = session.torrent_stats(info_hash).await.unwrap();
4371 assert_eq!(stats.state, TorrentState::Downloading);
4372 assert_eq!(stats.pieces_total, 2);
4373
4374 session.shutdown().await.unwrap();
4375 }
4376
4377 #[tokio::test]
4380 async fn torrent_info_via_session() {
4381 let session = SessionHandle::start(test_settings()).await.unwrap();
4382 let data = vec![0xAB; 32768];
4383 let meta = make_test_torrent(&data, 16384);
4384 let storage = make_storage(&data, 16384);
4385
4386 let info_hash = session
4387 .add_torrent(meta.into(), Some(storage))
4388 .await
4389 .unwrap();
4390 let info = session.torrent_info(info_hash).await.unwrap();
4391 assert_eq!(info.info_hash, info_hash);
4392 assert_eq!(info.name, "test");
4393 assert_eq!(info.total_length, 32768);
4394 assert_eq!(info.num_pieces, 2);
4395 assert!(!info.private);
4396 assert_eq!(info.files.len(), 1);
4397 assert_eq!(info.files[0].length, 32768);
4398
4399 session.shutdown().await.unwrap();
4400 }
4401
4402 #[tokio::test]
4405 async fn pause_resume_via_session() {
4406 let session = SessionHandle::start(test_settings()).await.unwrap();
4407 let data = vec![0xAB; 16384];
4408 let meta = make_test_torrent(&data, 16384);
4409 let storage = make_storage(&data, 16384);
4410
4411 let info_hash = session
4412 .add_torrent(meta.into(), Some(storage))
4413 .await
4414 .unwrap();
4415
4416 session.pause_torrent(info_hash).await.unwrap();
4417 tokio::time::sleep(Duration::from_millis(50)).await;
4418 let stats = session.torrent_stats(info_hash).await.unwrap();
4419 assert_eq!(stats.state, TorrentState::Paused);
4420
4421 session.resume_torrent(info_hash).await.unwrap();
4422 tokio::time::sleep(Duration::from_millis(50)).await;
4423 let stats = session.torrent_stats(info_hash).await.unwrap();
4424 assert_eq!(stats.state, TorrentState::Downloading);
4425
4426 session.shutdown().await.unwrap();
4427 }
4428
4429 #[tokio::test]
4432 async fn not_found_errors() {
4433 let session = SessionHandle::start(test_settings()).await.unwrap();
4434 let fake_hash = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
4435
4436 assert!(session.torrent_stats(fake_hash).await.is_err());
4437 assert!(session.torrent_info(fake_hash).await.is_err());
4438 assert!(session.pause_torrent(fake_hash).await.is_err());
4439 assert!(session.resume_torrent(fake_hash).await.is_err());
4440 assert!(session.remove_torrent(fake_hash).await.is_err());
4441
4442 session.shutdown().await.unwrap();
4443 }
4444
4445 #[tokio::test]
4448 async fn session_stats_aggregate() {
4449 let session = SessionHandle::start(test_settings()).await.unwrap();
4450
4451 let data1 = vec![0xAA; 16384];
4452 let meta1 = make_test_torrent(&data1, 16384);
4453 let storage1 = make_storage(&data1, 16384);
4454 session
4455 .add_torrent(meta1.into(), Some(storage1))
4456 .await
4457 .unwrap();
4458
4459 let data2 = vec![0xBB; 16384];
4460 let meta2 = make_test_torrent(&data2, 16384);
4461 let storage2 = make_storage(&data2, 16384);
4462 session
4463 .add_torrent(meta2.into(), Some(storage2))
4464 .await
4465 .unwrap();
4466
4467 let stats = session.session_stats().await.unwrap();
4468 assert_eq!(stats.active_torrents, 2);
4469
4470 session.shutdown().await.unwrap();
4471 }
4472
4473 #[tokio::test]
4476 async fn add_magnet_and_list() {
4477 use irontide_core::Magnet;
4478
4479 let session = SessionHandle::start(test_settings()).await.unwrap();
4480 let magnet = Magnet {
4481 info_hashes: irontide_core::InfoHashes::v1_only(
4482 Id20::from_hex("aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d").unwrap(),
4483 ),
4484 display_name: Some("test-magnet".into()),
4485 trackers: vec![],
4486 peers: vec![],
4487 selected_files: None,
4488 };
4489 let expected_hash = magnet.info_hash();
4490
4491 let info_hash = session.add_magnet(magnet).await.unwrap();
4492 assert_eq!(info_hash, expected_hash);
4493
4494 let list = session.list_torrents().await.unwrap();
4495 assert_eq!(list.len(), 1);
4496 assert!(list.contains(&info_hash));
4497
4498 let err = session.torrent_info(info_hash).await.unwrap_err();
4500 assert!(err.to_string().contains("metadata not yet available"));
4501
4502 session.shutdown().await.unwrap();
4503 }
4504
4505 #[tokio::test]
4508 async fn add_magnet_duplicate_rejected() {
4509 use irontide_core::Magnet;
4510
4511 let session = SessionHandle::start(test_settings()).await.unwrap();
4512 let magnet = Magnet {
4513 info_hashes: irontide_core::InfoHashes::v1_only(
4514 Id20::from_hex("aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d").unwrap(),
4515 ),
4516 display_name: Some("test-magnet".into()),
4517 trackers: vec![],
4518 peers: vec![],
4519 selected_files: None,
4520 };
4521
4522 session.add_magnet(magnet.clone()).await.unwrap();
4523 let result = session.add_magnet(magnet).await;
4524 assert!(result.is_err());
4525 assert!(result.unwrap_err().to_string().contains("duplicate"));
4526
4527 session.shutdown().await.unwrap();
4528 }
4529
4530 #[tokio::test]
4533 async fn session_with_lsd_enabled() {
4534 use irontide_core::Magnet;
4535
4536 let mut config = test_settings();
4538 config.enable_lsd = true;
4539
4540 let session = SessionHandle::start(config).await.unwrap();
4541
4542 let data = vec![0xAB; 16384];
4544 let meta = make_test_torrent(&data, 16384);
4545 let storage = make_storage(&data, 16384);
4546 session
4547 .add_torrent(meta.into(), Some(storage))
4548 .await
4549 .unwrap();
4550
4551 let magnet = Magnet {
4553 info_hashes: irontide_core::InfoHashes::v1_only(
4554 Id20::from_hex("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap(),
4555 ),
4556 display_name: Some("lsd-test".into()),
4557 trackers: vec![],
4558 peers: vec![],
4559 selected_files: None,
4560 };
4561 session.add_magnet(magnet).await.unwrap();
4562
4563 let list = session.list_torrents().await.unwrap();
4564 assert_eq!(list.len(), 2);
4565
4566 session.shutdown().await.unwrap();
4567 }
4568
4569 #[tokio::test]
4572 async fn add_v2_only_torrent() {
4573 use irontide_bencode::BencodeValue;
4574 use std::collections::BTreeMap;
4575
4576 let session = SessionHandle::start(test_settings()).await.unwrap();
4577
4578 let mut attr_map: BTreeMap<Vec<u8>, BencodeValue> = BTreeMap::new();
4580 attr_map.insert(b"length".to_vec(), BencodeValue::Integer(16384));
4581 let mut file_node: BTreeMap<Vec<u8>, BencodeValue> = BTreeMap::new();
4582 file_node.insert(b"".to_vec(), BencodeValue::Dict(attr_map));
4583 let mut ft_map: BTreeMap<Vec<u8>, BencodeValue> = BTreeMap::new();
4584 ft_map.insert(b"test.dat".to_vec(), BencodeValue::Dict(file_node));
4585
4586 let mut info_map: BTreeMap<Vec<u8>, BencodeValue> = BTreeMap::new();
4587 info_map.insert(b"file tree".to_vec(), BencodeValue::Dict(ft_map));
4588 info_map.insert(b"meta version".to_vec(), BencodeValue::Integer(2));
4589 info_map.insert(b"name".to_vec(), BencodeValue::Bytes(b"v2test".to_vec()));
4590 info_map.insert(b"piece length".to_vec(), BencodeValue::Integer(16384));
4591
4592 let mut root_map: BTreeMap<Vec<u8>, BencodeValue> = BTreeMap::new();
4593 root_map.insert(b"info".to_vec(), BencodeValue::Dict(info_map));
4594
4595 let bytes = irontide_bencode::to_bytes(&BencodeValue::Dict(root_map)).unwrap();
4596 let meta = irontide_core::torrent_from_bytes_any(&bytes).unwrap();
4597 assert!(meta.is_v2());
4598
4599 let info_hash = session.add_torrent(meta, None).await.unwrap();
4601 let list = session.list_torrents().await.unwrap();
4602 assert!(list.contains(&info_hash));
4603
4604 session.shutdown().await.unwrap();
4605 }
4606
4607 #[tokio::test]
4610 async fn save_torrent_resume_data_via_session() {
4611 let session = SessionHandle::start(test_settings()).await.unwrap();
4612 let data = vec![0xAB; 32768];
4613 let meta = make_test_torrent(&data, 16384);
4614 let info_hash = meta.info_hash;
4615 let storage = make_storage(&data, 16384);
4616 session
4617 .add_torrent(meta.into(), Some(storage))
4618 .await
4619 .unwrap();
4620
4621 let rd = session.save_torrent_resume_data(info_hash).await.unwrap();
4622 assert_eq!(rd.info_hash, info_hash.as_bytes().as_slice());
4623 assert_eq!(rd.name, "test");
4624 assert_eq!(rd.file_format, "libtorrent resume file");
4625 assert_eq!(rd.file_version, 1);
4626 assert!(!rd.pieces.is_empty());
4627 assert_eq!(rd.paused, 0);
4628
4629 session.shutdown().await.unwrap();
4630 }
4631
4632 #[tokio::test]
4635 async fn save_session_state_captures_all_torrents() {
4636 let session = SessionHandle::start(test_settings()).await.unwrap();
4637
4638 let data1 = vec![0xAA; 16384];
4639 let meta1 = make_test_torrent(&data1, 16384);
4640 let storage1 = make_storage(&data1, 16384);
4641 session
4642 .add_torrent(meta1.into(), Some(storage1))
4643 .await
4644 .unwrap();
4645
4646 let data2 = vec![0xBB; 16384];
4647 let meta2 = make_test_torrent(&data2, 16384);
4648 let storage2 = make_storage(&data2, 16384);
4649 session
4650 .add_torrent(meta2.into(), Some(storage2))
4651 .await
4652 .unwrap();
4653
4654 let state = session.save_session_state().await.unwrap();
4655 assert_eq!(state.torrents.len(), 2);
4656
4657 for rd in &state.torrents {
4658 assert_eq!(rd.file_format, "libtorrent resume file");
4659 assert_eq!(rd.info_hash.len(), 20);
4660 }
4661
4662 session.shutdown().await.unwrap();
4663 }
4664
4665 #[tokio::test]
4668 async fn save_resume_data_not_found() {
4669 let session = SessionHandle::start(test_settings()).await.unwrap();
4670 let fake_hash = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
4671 let result = session.save_torrent_resume_data(fake_hash).await;
4672 assert!(result.is_err());
4673 assert!(result.unwrap_err().to_string().contains("not found"));
4674 session.shutdown().await.unwrap();
4675 }
4676
4677 #[tokio::test]
4680 async fn subscribe_receives_torrent_added_alert() {
4681 use crate::alert::AlertKind;
4682
4683 let session = SessionHandle::start(test_settings()).await.unwrap();
4684 let mut alerts = session.subscribe();
4685
4686 let data = vec![0xAB; 16384];
4687 let meta = make_test_torrent(&data, 16384);
4688 let storage = make_storage(&data, 16384);
4689 let _info_hash = session
4690 .add_torrent(meta.into(), Some(storage))
4691 .await
4692 .unwrap();
4693
4694 let alert = tokio::time::timeout(Duration::from_secs(2), alerts.recv())
4695 .await
4696 .unwrap()
4697 .unwrap();
4698 assert!(matches!(alert.kind, AlertKind::TorrentAdded { .. }));
4699 session.shutdown().await.unwrap();
4700 }
4701
4702 #[tokio::test]
4705 async fn subscribe_receives_torrent_removed_alert() {
4706 use crate::alert::AlertKind;
4707 use crate::types::TorrentState;
4708
4709 let session = SessionHandle::start(test_settings()).await.unwrap();
4710 let mut alerts = session.subscribe();
4711
4712 let data = vec![0xAB; 16384];
4713 let meta = make_test_torrent(&data, 16384);
4714 let storage = make_storage(&data, 16384);
4715 let info_hash = session
4716 .add_torrent(meta.into(), Some(storage))
4717 .await
4718 .unwrap();
4719
4720 while let Ok(Ok(a)) = tokio::time::timeout(Duration::from_secs(1), alerts.recv()).await {
4722 if matches!(
4723 a.kind,
4724 AlertKind::StateChanged {
4725 new_state: TorrentState::Downloading,
4726 ..
4727 }
4728 ) {
4729 break;
4730 }
4731 }
4732
4733 session.remove_torrent(info_hash).await.unwrap();
4734
4735 loop {
4737 let alert = tokio::time::timeout(Duration::from_secs(2), alerts.recv())
4738 .await
4739 .unwrap()
4740 .unwrap();
4741 if matches!(alert.kind, AlertKind::TorrentRemoved { .. }) {
4742 break;
4743 }
4744 }
4745 session.shutdown().await.unwrap();
4746 }
4747
4748 #[tokio::test]
4751 async fn multiple_subscribers_each_receive_alerts() {
4752 use crate::alert::AlertKind;
4753
4754 let session = SessionHandle::start(test_settings()).await.unwrap();
4755 let mut sub1 = session.subscribe();
4756 let mut sub2 = session.subscribe();
4757
4758 let data = vec![0xAB; 16384];
4759 let meta = make_test_torrent(&data, 16384);
4760 let storage = make_storage(&data, 16384);
4761 session
4762 .add_torrent(meta.into(), Some(storage))
4763 .await
4764 .unwrap();
4765
4766 let a1 = tokio::time::timeout(Duration::from_secs(2), sub1.recv())
4767 .await
4768 .unwrap()
4769 .unwrap();
4770 let a2 = tokio::time::timeout(Duration::from_secs(2), sub2.recv())
4771 .await
4772 .unwrap()
4773 .unwrap();
4774
4775 assert!(matches!(a1.kind, AlertKind::TorrentAdded { .. }));
4776 assert!(matches!(a2.kind, AlertKind::TorrentAdded { .. }));
4777 session.shutdown().await.unwrap();
4778 }
4779
4780 #[tokio::test]
4783 async fn set_alert_mask_filters_at_runtime() {
4784 use crate::alert::{AlertCategory, AlertKind};
4785
4786 let session = SessionHandle::start(test_settings()).await.unwrap();
4787 let mut alerts = session.subscribe();
4788
4789 let data = vec![0xAB; 16384];
4791 let meta = make_test_torrent(&data, 16384);
4792 let storage = make_storage(&data, 16384);
4793 session
4794 .add_torrent(meta.into(), Some(storage))
4795 .await
4796 .unwrap();
4797
4798 let alert = tokio::time::timeout(Duration::from_secs(2), alerts.recv())
4799 .await
4800 .unwrap()
4801 .unwrap();
4802 assert!(matches!(alert.kind, AlertKind::TorrentAdded { .. }));
4803
4804 while tokio::time::timeout(Duration::from_millis(200), alerts.recv())
4806 .await
4807 .is_ok()
4808 {}
4809
4810 session.set_alert_mask(AlertCategory::empty());
4812
4813 let data2 = vec![0xBB; 16384];
4814 let meta2 = make_test_torrent(&data2, 16384);
4815 let storage2 = make_storage(&data2, 16384);
4816 session
4817 .add_torrent(meta2.into(), Some(storage2))
4818 .await
4819 .unwrap();
4820
4821 let result = tokio::time::timeout(Duration::from_millis(200), alerts.recv()).await;
4823 assert!(result.is_err(), "should have timed out with empty mask");
4824
4825 session.set_alert_mask(AlertCategory::STATUS);
4827
4828 let data3 = vec![0xCC; 16384];
4829 let meta3 = make_test_torrent(&data3, 16384);
4830 let storage3 = make_storage(&data3, 16384);
4831 session
4832 .add_torrent(meta3.into(), Some(storage3))
4833 .await
4834 .unwrap();
4835
4836 let alert = tokio::time::timeout(Duration::from_secs(2), alerts.recv())
4837 .await
4838 .unwrap()
4839 .unwrap();
4840 assert!(matches!(alert.kind, AlertKind::TorrentAdded { .. }));
4841
4842 session.shutdown().await.unwrap();
4843 }
4844
4845 #[tokio::test]
4848 async fn alert_stream_filters_per_subscriber() {
4849 use crate::alert::{AlertCategory, AlertKind};
4850
4851 let session = SessionHandle::start(test_settings()).await.unwrap();
4852
4853 let mut status_sub = session.subscribe_filtered(AlertCategory::STATUS);
4855 let mut peer_sub = session.subscribe_filtered(AlertCategory::PEER);
4857
4858 let data = vec![0xAB; 16384];
4859 let meta = make_test_torrent(&data, 16384);
4860 let storage = make_storage(&data, 16384);
4861 session
4862 .add_torrent(meta.into(), Some(storage))
4863 .await
4864 .unwrap();
4865
4866 let alert = tokio::time::timeout(Duration::from_secs(2), status_sub.recv())
4868 .await
4869 .unwrap()
4870 .unwrap();
4871 assert!(matches!(alert.kind, AlertKind::TorrentAdded { .. }));
4872
4873 let result = tokio::time::timeout(Duration::from_millis(200), peer_sub.recv()).await;
4875 assert!(
4876 result.is_err(),
4877 "PEER subscriber should not get STATUS alerts"
4878 );
4879
4880 session.shutdown().await.unwrap();
4881 }
4882
4883 #[tokio::test]
4886 async fn state_changed_tracks_transitions() {
4887 use crate::alert::AlertKind;
4888
4889 let session = SessionHandle::start(test_settings()).await.unwrap();
4890 let mut alerts = session.subscribe();
4891
4892 let data = vec![0xAB; 16384];
4893 let meta = make_test_torrent(&data, 16384);
4894 let storage = make_storage(&data, 16384);
4895 let info_hash = session
4896 .add_torrent(meta.into(), Some(storage))
4897 .await
4898 .unwrap();
4899
4900 let _ = tokio::time::timeout(Duration::from_secs(1), alerts.recv())
4902 .await
4903 .unwrap();
4904
4905 session.pause_torrent(info_hash).await.unwrap();
4907 tokio::time::sleep(Duration::from_millis(100)).await;
4908
4909 let mut state_changes = Vec::new();
4911 let mut paused_alerts = Vec::new();
4912 loop {
4913 match tokio::time::timeout(Duration::from_millis(200), alerts.recv()).await {
4914 Ok(Ok(a)) => match &a.kind {
4915 AlertKind::StateChanged {
4916 prev_state,
4917 new_state,
4918 ..
4919 } => {
4920 state_changes.push((*prev_state, *new_state));
4921 }
4922 AlertKind::TorrentPaused { .. } => {
4923 paused_alerts.push(a);
4924 }
4925 _ => {} },
4927 _ => break,
4928 }
4929 }
4930
4931 assert!(
4932 state_changes.contains(&(TorrentState::Downloading, TorrentState::Paused)),
4933 "expected Downloading→Paused, got: {state_changes:?}"
4934 );
4935 assert!(!paused_alerts.is_empty(), "expected TorrentPaused alert");
4936
4937 session.resume_torrent(info_hash).await.unwrap();
4939 tokio::time::sleep(Duration::from_millis(100)).await;
4940
4941 let mut resume_state_changes = Vec::new();
4942 let mut resumed_alerts = Vec::new();
4943 loop {
4944 match tokio::time::timeout(Duration::from_millis(200), alerts.recv()).await {
4945 Ok(Ok(a)) => match &a.kind {
4946 AlertKind::StateChanged {
4947 prev_state,
4948 new_state,
4949 ..
4950 } => {
4951 resume_state_changes.push((*prev_state, *new_state));
4952 }
4953 AlertKind::TorrentResumed { .. } => {
4954 resumed_alerts.push(a);
4955 }
4956 _ => {}
4957 },
4958 _ => break,
4959 }
4960 }
4961
4962 assert!(
4963 resume_state_changes.contains(&(TorrentState::Paused, TorrentState::Downloading)),
4964 "expected Paused→Downloading, got: {resume_state_changes:?}"
4965 );
4966 assert!(!resumed_alerts.is_empty(), "expected TorrentResumed alert");
4967
4968 session.shutdown().await.unwrap();
4969 }
4970
4971 #[tokio::test]
4972 async fn session_config_creates_utp_socket() {
4973 let mut config = test_settings();
4975 config.enable_utp = true;
4976 let session = SessionHandle::start(config).await.unwrap();
4977 let stats = session.session_stats().await.unwrap();
4978 assert_eq!(stats.active_torrents, 0);
4979 session.shutdown().await.unwrap();
4980 }
4981
4982 #[test]
4983 fn settings_nat_defaults() {
4984 let s = Settings::default();
4985 assert!(s.enable_upnp, "enable_upnp should default to true");
4986 assert!(s.enable_natpmp, "enable_natpmp should default to true");
4987 }
4988
4989 #[tokio::test]
4990 async fn session_with_nat_disabled() {
4991 let config = test_settings();
4992 assert!(!config.enable_upnp);
4994 assert!(!config.enable_natpmp);
4995 let session = SessionHandle::start(config).await.unwrap();
4996 let stats = session.session_stats().await.unwrap();
4997 assert_eq!(stats.active_torrents, 0);
4998 session.shutdown().await.unwrap();
4999 }
5000
5001 #[test]
5004 fn anonymous_mode_disables_discovery() {
5005 let mut config = test_settings();
5006 config.anonymous_mode = true;
5007 config.enable_dht = true;
5008 config.enable_lsd = true;
5009 config.enable_upnp = true;
5010 config.enable_natpmp = true;
5011
5012 if config.anonymous_mode {
5015 config.enable_dht = false;
5016 config.enable_lsd = false;
5017 config.enable_upnp = false;
5018 config.enable_natpmp = false;
5019 }
5020
5021 assert!(!config.enable_dht);
5022 assert!(!config.enable_lsd);
5023 assert!(!config.enable_upnp);
5024 assert!(!config.enable_natpmp);
5025 }
5026
5027 #[tokio::test]
5028 async fn anonymous_mode_session_starts_with_discovery_disabled() {
5029 let mut config = test_settings();
5030 config.anonymous_mode = true;
5031 config.enable_dht = true;
5033 config.enable_lsd = true;
5034
5035 let session = SessionHandle::start(config).await.unwrap();
5036 let stats = session.session_stats().await.unwrap();
5037 assert_eq!(stats.active_torrents, 0);
5038 session.shutdown().await.unwrap();
5039 }
5040
5041 #[test]
5042 fn force_proxy_requires_proxy_configured() {
5043 let mut config = test_settings();
5044 config.force_proxy = true;
5045 config.proxy = crate::proxy::ProxyConfig::default(); assert_eq!(config.proxy.proxy_type, crate::proxy::ProxyType::None);
5049 assert!(config.force_proxy);
5050 }
5052
5053 #[tokio::test]
5054 async fn force_proxy_errors_without_proxy() {
5055 let mut config = test_settings();
5056 config.force_proxy = true;
5057 let result = SessionHandle::start(config).await;
5060 assert!(result.is_err());
5061 match result {
5062 Err(e) => assert!(
5063 e.to_string().contains("force_proxy"),
5064 "error should mention force_proxy: {e}"
5065 ),
5066 Ok(_) => panic!("expected error"),
5067 }
5068 }
5069
5070 #[test]
5071 fn force_proxy_disables_features() {
5072 let mut config = test_settings();
5073 config.force_proxy = true;
5074 config.proxy = crate::proxy::ProxyConfig {
5075 proxy_type: crate::proxy::ProxyType::Socks5,
5076 hostname: "proxy.example.com".into(),
5077 port: 1080,
5078 ..Default::default()
5079 };
5080 config.enable_dht = true;
5081 config.enable_lsd = true;
5082 config.enable_upnp = true;
5083 config.enable_natpmp = true;
5084
5085 if config.force_proxy {
5087 config.enable_upnp = false;
5088 config.enable_natpmp = false;
5089 config.enable_dht = false;
5090 config.enable_lsd = false;
5091 }
5092
5093 assert!(!config.enable_dht);
5094 assert!(!config.enable_lsd);
5095 assert!(!config.enable_upnp);
5096 assert!(!config.enable_natpmp);
5097 }
5098
5099 #[test]
5100 fn proxy_config_round_trip() {
5101 let s = Settings {
5102 proxy: crate::proxy::ProxyConfig {
5103 proxy_type: crate::proxy::ProxyType::Socks5Password,
5104 hostname: "localhost".into(),
5105 port: 9050,
5106 username: Some("user".into()),
5107 password: Some("pass".into()),
5108 ..Default::default()
5109 },
5110 force_proxy: true,
5111 anonymous_mode: true,
5112 ..test_settings()
5113 };
5114
5115 assert_eq!(s.proxy.proxy_type, crate::proxy::ProxyType::Socks5Password);
5116 assert_eq!(s.proxy.hostname, "localhost");
5117 assert_eq!(s.proxy.port, 9050);
5118 assert!(s.force_proxy);
5119 assert!(s.anonymous_mode);
5120 assert_eq!(s.proxy.to_url(), "socks5://user:pass@localhost:9050");
5121 }
5122
5123 #[tokio::test]
5124 async fn apply_settings_runtime() {
5125 let session = SessionHandle::start(test_settings()).await.unwrap();
5126 let original = session.settings().await.unwrap();
5127 assert_eq!(original.max_torrents, 10);
5128
5129 let mut new = original.clone();
5130 new.max_torrents = 200;
5131 new.upload_rate_limit = 1_000_000;
5132 session.apply_settings(new).await.unwrap();
5133
5134 let updated = session.settings().await.unwrap();
5135 assert_eq!(updated.max_torrents, 200);
5136 assert_eq!(updated.upload_rate_limit, 1_000_000);
5137
5138 session.shutdown().await.unwrap();
5139 }
5140
5141 #[tokio::test]
5142 async fn apply_settings_validation_error() {
5143 let session = SessionHandle::start(test_settings()).await.unwrap();
5144
5145 let mut bad = Settings::default();
5147 bad.force_proxy = true;
5148 let result = session.apply_settings(bad).await;
5149 assert!(result.is_err());
5150
5151 let current = session.settings().await.unwrap();
5153 assert!(!current.force_proxy);
5154
5155 session.shutdown().await.unwrap();
5156 }
5157
5158 #[tokio::test]
5161 async fn session_stats_counters_accessible() {
5162 let session = SessionHandle::start(test_settings()).await.unwrap();
5163 let counters = session.counters();
5164 assert!(counters.uptime_secs() >= 0);
5166 assert_eq!(counters.len(), crate::stats::NUM_METRICS);
5167 session.shutdown().await.unwrap();
5168 }
5169
5170 #[tokio::test]
5171 async fn post_session_stats_fires_alert() {
5172 use crate::alert::{AlertCategory, AlertKind};
5173
5174 let session = SessionHandle::start(test_settings()).await.unwrap();
5175 let mut stats_sub = session.subscribe_filtered(AlertCategory::STATS);
5176
5177 session.post_session_stats().await.unwrap();
5178
5179 let alert = tokio::time::timeout(Duration::from_secs(2), stats_sub.recv())
5180 .await
5181 .expect("timed out waiting for SessionStatsAlert")
5182 .expect("recv error");
5183 assert!(
5184 matches!(alert.kind, AlertKind::SessionStatsAlert { ref values } if values.len() == crate::stats::NUM_METRICS),
5185 "expected SessionStatsAlert with {} values, got {:?}",
5186 crate::stats::NUM_METRICS,
5187 alert.kind,
5188 );
5189 session.shutdown().await.unwrap();
5190 }
5191
5192 #[tokio::test]
5193 async fn session_stats_include_torrent_count() {
5194 use crate::alert::{AlertCategory, AlertKind};
5195
5196 let session = SessionHandle::start(test_settings()).await.unwrap();
5197 let mut stats_sub = session.subscribe_filtered(AlertCategory::STATS);
5198
5199 let data = vec![0xAB; 16384];
5201 let meta = make_test_torrent(&data, 16384);
5202 let storage = make_storage(&data, 16384);
5203 session
5204 .add_torrent(meta.into(), Some(storage))
5205 .await
5206 .unwrap();
5207
5208 session.post_session_stats().await.unwrap();
5209
5210 let alert = tokio::time::timeout(Duration::from_secs(2), stats_sub.recv())
5211 .await
5212 .expect("timed out waiting for SessionStatsAlert")
5213 .expect("recv error");
5214 match alert.kind {
5215 AlertKind::SessionStatsAlert { values } => {
5216 assert!(
5217 values[crate::stats::SES_NUM_TORRENTS] > 0,
5218 "SES_NUM_TORRENTS should be > 0 after adding a torrent, got {}",
5219 values[crate::stats::SES_NUM_TORRENTS],
5220 );
5221 }
5222 other => panic!("expected SessionStatsAlert, got {other:?}"),
5223 }
5224 session.shutdown().await.unwrap();
5225 }
5226
5227 #[tokio::test]
5228 async fn stats_timer_disabled_when_zero() {
5229 use crate::alert::{AlertCategory, AlertKind};
5230
5231 let mut config = test_settings();
5232 config.stats_report_interval = 0;
5233 let session = SessionHandle::start(config).await.unwrap();
5234 let mut stats_sub = session.subscribe_filtered(AlertCategory::STATS);
5235
5236 let result = tokio::time::timeout(Duration::from_millis(200), stats_sub.recv()).await;
5238 assert!(
5239 result.is_err(),
5240 "no SessionStatsAlert should fire when stats_report_interval is 0"
5241 );
5242 session.shutdown().await.unwrap();
5243 }
5244
5245 #[tokio::test]
5246 async fn sample_infohashes_timer_disabled_when_zero() {
5247 use crate::alert::{AlertCategory, AlertKind};
5248
5249 let mut config = test_settings();
5250 config.dht_sample_infohashes_interval = 0;
5251 let session = SessionHandle::start(config).await.unwrap();
5252 let mut dht_sub = session.subscribe_filtered(AlertCategory::DHT);
5253
5254 let result = tokio::time::timeout(Duration::from_millis(200), dht_sub.recv()).await;
5256 assert!(
5257 result.is_err(),
5258 "no DhtSampleInfohashes alert should fire when interval is 0"
5259 );
5260 session.shutdown().await.unwrap();
5261 }
5262
5263 #[tokio::test]
5266 async fn open_file_not_found() {
5267 let session = SessionHandle::start(test_settings()).await.unwrap();
5268 let fake_hash = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
5269 let result = session.open_file(fake_hash, 0).await;
5270 assert!(result.is_err());
5271 let err = result.err().unwrap();
5272 assert!(err.to_string().contains("not found"));
5273 session.shutdown().await.unwrap();
5274 }
5275
5276 #[tokio::test]
5279 async fn open_file_routes_to_torrent() {
5280 let session = SessionHandle::start(test_settings()).await.unwrap();
5281 let data = vec![0xAB; 32768];
5282 let meta = make_test_torrent(&data, 16384);
5283 let storage = make_storage(&data, 16384);
5284
5285 let info_hash = session
5286 .add_torrent(meta.into(), Some(storage))
5287 .await
5288 .unwrap();
5289
5290 let stream = session.open_file(info_hash, 0).await;
5292 assert!(stream.is_ok(), "open_file should succeed for file_index 0");
5293
5294 let result = session.open_file(info_hash, 999).await;
5296 assert!(
5297 result.is_err(),
5298 "open_file should fail for invalid file_index"
5299 );
5300
5301 session.shutdown().await.unwrap();
5302 }
5303
5304 #[tokio::test]
5307 async fn session_force_reannounce() {
5308 let session = SessionHandle::start(test_settings()).await.unwrap();
5309 let data = vec![0xAB; 16384];
5310 let meta = make_test_torrent(&data, 16384);
5311 let storage = make_storage(&data, 16384);
5312 let info_hash = session
5313 .add_torrent(meta.into(), Some(storage))
5314 .await
5315 .unwrap();
5316
5317 let result = session.force_reannounce(info_hash).await;
5319 assert!(
5320 result.is_ok(),
5321 "force_reannounce should succeed: {result:?}"
5322 );
5323
5324 let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
5326 assert!(session.force_reannounce(fake).await.is_err());
5327
5328 session.shutdown().await.unwrap();
5329 }
5330
5331 #[tokio::test]
5334 async fn session_tracker_list() {
5335 let session = SessionHandle::start(test_settings()).await.unwrap();
5336 let data = vec![0xAB; 16384];
5337 let meta = make_test_torrent(&data, 16384);
5338 let storage = make_storage(&data, 16384);
5339 let info_hash = session
5340 .add_torrent(meta.into(), Some(storage))
5341 .await
5342 .unwrap();
5343
5344 let trackers = session.tracker_list(info_hash).await.unwrap();
5346 assert!(trackers.is_empty(), "test torrent has no trackers");
5347
5348 let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
5350 assert!(session.tracker_list(fake).await.is_err());
5351
5352 session.shutdown().await.unwrap();
5353 }
5354
5355 #[tokio::test]
5358 async fn session_scrape() {
5359 let session = SessionHandle::start(test_settings()).await.unwrap();
5360 let data = vec![0xAB; 16384];
5361 let meta = make_test_torrent(&data, 16384);
5362 let storage = make_storage(&data, 16384);
5363 let info_hash = session
5364 .add_torrent(meta.into(), Some(storage))
5365 .await
5366 .unwrap();
5367
5368 let scrape = session.scrape(info_hash).await.unwrap();
5370 assert!(scrape.is_none(), "test torrent has no trackers to scrape");
5371
5372 let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
5374 assert!(session.scrape(fake).await.is_err());
5375
5376 session.shutdown().await.unwrap();
5377 }
5378
5379 #[tokio::test]
5382 async fn session_set_file_priority() {
5383 let session = SessionHandle::start(test_settings()).await.unwrap();
5384 let data = vec![0xAB; 16384];
5385 let meta = make_test_torrent(&data, 16384);
5386 let storage = make_storage(&data, 16384);
5387 let info_hash = session
5388 .add_torrent(meta.into(), Some(storage))
5389 .await
5390 .unwrap();
5391
5392 let result = session
5394 .set_file_priority(info_hash, 0, irontide_core::FilePriority::Normal)
5395 .await;
5396 assert!(
5397 result.is_ok(),
5398 "set_file_priority should succeed: {result:?}"
5399 );
5400
5401 let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
5403 assert!(
5404 session
5405 .set_file_priority(fake, 0, irontide_core::FilePriority::Normal)
5406 .await
5407 .is_err()
5408 );
5409
5410 session.shutdown().await.unwrap();
5411 }
5412
5413 #[tokio::test]
5416 async fn session_file_priorities() {
5417 let session = SessionHandle::start(test_settings()).await.unwrap();
5418 let data = vec![0xAB; 16384];
5419 let meta = make_test_torrent(&data, 16384);
5420 let storage = make_storage(&data, 16384);
5421 let info_hash = session
5422 .add_torrent(meta.into(), Some(storage))
5423 .await
5424 .unwrap();
5425
5426 let priorities = session.file_priorities(info_hash).await.unwrap();
5428 assert_eq!(
5429 priorities.len(),
5430 1,
5431 "single-file torrent should have 1 file priority"
5432 );
5433 assert_eq!(priorities[0], irontide_core::FilePriority::Normal);
5434
5435 let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
5437 assert!(session.file_priorities(fake).await.is_err());
5438
5439 session.shutdown().await.unwrap();
5440 }
5441
5442 #[tokio::test]
5445 async fn set_download_limit_zero_means_unlimited() {
5446 let session = SessionHandle::start(test_settings()).await.unwrap();
5447 let data = vec![0xAB; 16384];
5448 let meta = make_test_torrent(&data, 16384);
5449 let storage = make_storage(&data, 16384);
5450 let info_hash = session
5451 .add_torrent(meta.into(), Some(storage))
5452 .await
5453 .unwrap();
5454
5455 session.set_download_limit(info_hash, 50_000).await.unwrap();
5457 session.set_download_limit(info_hash, 0).await.unwrap();
5458 let limit = session.download_limit(info_hash).await.unwrap();
5459 assert_eq!(limit, 0, "0 means unlimited");
5460
5461 session.shutdown().await.unwrap();
5462 }
5463
5464 #[tokio::test]
5467 async fn set_upload_limit_persists() {
5468 let session = SessionHandle::start(test_settings()).await.unwrap();
5469 let data = vec![0xAB; 16384];
5470 let meta = make_test_torrent(&data, 16384);
5471 let storage = make_storage(&data, 16384);
5472 let info_hash = session
5473 .add_torrent(meta.into(), Some(storage))
5474 .await
5475 .unwrap();
5476
5477 session.set_upload_limit(info_hash, 100_000).await.unwrap();
5478 let limit = session.upload_limit(info_hash).await.unwrap();
5479 assert_eq!(limit, 100_000);
5480
5481 session.shutdown().await.unwrap();
5482 }
5483
5484 #[tokio::test]
5487 async fn download_limit_default_is_zero() {
5488 let session = SessionHandle::start(test_settings()).await.unwrap();
5489 let data = vec![0xAB; 16384];
5490 let meta = make_test_torrent(&data, 16384);
5491 let storage = make_storage(&data, 16384);
5492 let info_hash = session
5493 .add_torrent(meta.into(), Some(storage))
5494 .await
5495 .unwrap();
5496
5497 let limit = session.download_limit(info_hash).await.unwrap();
5499 assert_eq!(limit, 0, "default download limit should be 0 (unlimited)");
5500
5501 session.shutdown().await.unwrap();
5502 }
5503
5504 #[tokio::test]
5507 async fn rate_limit_round_trip() {
5508 let session = SessionHandle::start(test_settings()).await.unwrap();
5509 let data = vec![0xAB; 16384];
5510 let meta = make_test_torrent(&data, 16384);
5511 let storage = make_storage(&data, 16384);
5512 let info_hash = session
5513 .add_torrent(meta.into(), Some(storage))
5514 .await
5515 .unwrap();
5516
5517 session
5519 .set_download_limit(info_hash, 1_000_000)
5520 .await
5521 .unwrap();
5522 session.set_upload_limit(info_hash, 500_000).await.unwrap();
5523
5524 let dl = session.download_limit(info_hash).await.unwrap();
5526 let ul = session.upload_limit(info_hash).await.unwrap();
5527 assert_eq!(dl, 1_000_000);
5528 assert_eq!(ul, 500_000);
5529
5530 session
5532 .set_download_limit(info_hash, 2_000_000)
5533 .await
5534 .unwrap();
5535 let dl = session.download_limit(info_hash).await.unwrap();
5536 assert_eq!(dl, 2_000_000);
5537
5538 let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
5540 assert!(session.download_limit(fake).await.is_err());
5541 assert!(session.upload_limit(fake).await.is_err());
5542 assert!(session.set_download_limit(fake, 100).await.is_err());
5543 assert!(session.set_upload_limit(fake, 100).await.is_err());
5544
5545 session.shutdown().await.unwrap();
5546 }
5547
5548 #[tokio::test]
5551 async fn sequential_download_toggle() {
5552 let session = SessionHandle::start(test_settings()).await.unwrap();
5553 let data = vec![0xAB; 16384];
5554 let meta = make_test_torrent(&data, 16384);
5555 let storage = make_storage(&data, 16384);
5556 let info_hash = session
5557 .add_torrent(meta.into(), Some(storage))
5558 .await
5559 .unwrap();
5560
5561 session
5563 .set_sequential_download(info_hash, true)
5564 .await
5565 .unwrap();
5566 assert!(session.is_sequential_download(info_hash).await.unwrap());
5567
5568 session
5570 .set_sequential_download(info_hash, false)
5571 .await
5572 .unwrap();
5573 assert!(!session.is_sequential_download(info_hash).await.unwrap());
5574
5575 session.shutdown().await.unwrap();
5576 }
5577
5578 #[tokio::test]
5581 async fn super_seeding_toggle() {
5582 let session = SessionHandle::start(test_settings()).await.unwrap();
5583 let data = vec![0xAB; 16384];
5584 let meta = make_test_torrent(&data, 16384);
5585 let storage = make_storage(&data, 16384);
5586 let info_hash = session
5587 .add_torrent(meta.into(), Some(storage))
5588 .await
5589 .unwrap();
5590
5591 session.set_super_seeding(info_hash, true).await.unwrap();
5593 assert!(session.is_super_seeding(info_hash).await.unwrap());
5594
5595 session.set_super_seeding(info_hash, false).await.unwrap();
5597 assert!(!session.is_super_seeding(info_hash).await.unwrap());
5598
5599 session.shutdown().await.unwrap();
5600 }
5601
5602 #[tokio::test]
5605 async fn sequential_download_default_false() {
5606 let session = SessionHandle::start(test_settings()).await.unwrap();
5607 let data = vec![0xAB; 16384];
5608 let meta = make_test_torrent(&data, 16384);
5609 let storage = make_storage(&data, 16384);
5610 let info_hash = session
5611 .add_torrent(meta.into(), Some(storage))
5612 .await
5613 .unwrap();
5614
5615 assert!(!session.is_sequential_download(info_hash).await.unwrap());
5617
5618 session.shutdown().await.unwrap();
5619 }
5620
5621 #[tokio::test]
5624 async fn super_seeding_default_false() {
5625 let session = SessionHandle::start(test_settings()).await.unwrap();
5626 let data = vec![0xAB; 16384];
5627 let meta = make_test_torrent(&data, 16384);
5628 let storage = make_storage(&data, 16384);
5629 let info_hash = session
5630 .add_torrent(meta.into(), Some(storage))
5631 .await
5632 .unwrap();
5633
5634 assert!(!session.is_super_seeding(info_hash).await.unwrap());
5636
5637 session.shutdown().await.unwrap();
5638 }
5639
5640 #[tokio::test]
5643 async fn seed_mode_flips_user_flag() {
5644 let session = SessionHandle::start(test_settings()).await.unwrap();
5645 let data = vec![0xAB; 16384];
5646 let meta = make_test_torrent(&data, 16384);
5647 let storage = make_storage(&data, 16384);
5648 let info_hash = session
5649 .add_torrent(meta.into(), Some(storage))
5650 .await
5651 .unwrap();
5652
5653 let stats_before = session.torrent_stats(info_hash).await.unwrap();
5655 assert!(
5656 !stats_before.user_seed_mode,
5657 "new torrent should not start in user seed mode"
5658 );
5659
5660 session.set_seed_mode(info_hash, true).await.unwrap();
5662 let stats_on = session.torrent_stats(info_hash).await.unwrap();
5663 assert!(
5664 stats_on.user_seed_mode,
5665 "stats should reflect user_seed_mode=true after enabling"
5666 );
5667
5668 session.set_seed_mode(info_hash, false).await.unwrap();
5670 let stats_off = session.torrent_stats(info_hash).await.unwrap();
5671 assert!(
5672 !stats_off.user_seed_mode,
5673 "stats should reflect user_seed_mode=false after disabling"
5674 );
5675
5676 session.shutdown().await.unwrap();
5677 }
5678
5679 #[tokio::test]
5682 async fn seed_mode_round_trip() {
5683 let session = SessionHandle::start(test_settings()).await.unwrap();
5687 let data = vec![0xAB; 16384];
5688 let meta = make_test_torrent(&data, 16384);
5689 let storage = make_storage(&data, 16384);
5690 let info_hash = session
5691 .add_torrent(meta.into(), Some(storage))
5692 .await
5693 .unwrap();
5694
5695 for (i, enabled) in [true, false, true, true, false].iter().enumerate() {
5696 session.set_seed_mode(info_hash, *enabled).await.unwrap();
5697 let stats = session.torrent_stats(info_hash).await.unwrap();
5698 assert_eq!(
5699 stats.user_seed_mode, *enabled,
5700 "iteration {i}: stats.user_seed_mode should track the toggle"
5701 );
5702 }
5703
5704 session.shutdown().await.unwrap();
5705 }
5706
5707 #[tokio::test]
5710 async fn seed_mode_missing_info_hash_errors() {
5711 let session = SessionHandle::start(test_settings()).await.unwrap();
5712 let fake =
5713 irontide_core::Id20::from_hex("ffffffffffffffffffffffffffffffffffffffff").unwrap();
5714 let err = session
5715 .set_seed_mode(fake, true)
5716 .await
5717 .expect_err("set_seed_mode on unknown info hash must return an error");
5718 match err {
5719 crate::Error::TorrentNotFound(h) => assert_eq!(h, fake),
5720 other => panic!("expected TorrentNotFound, got {other:?}"),
5721 }
5722 session.shutdown().await.unwrap();
5723 }
5724
5725 #[tokio::test]
5728 async fn seed_mode_idempotent() {
5729 let session = SessionHandle::start(test_settings()).await.unwrap();
5731 let data = vec![0xAB; 16384];
5732 let meta = make_test_torrent(&data, 16384);
5733 let storage = make_storage(&data, 16384);
5734 let info_hash = session
5735 .add_torrent(meta.into(), Some(storage))
5736 .await
5737 .unwrap();
5738
5739 session.set_seed_mode(info_hash, true).await.unwrap();
5741 session.set_seed_mode(info_hash, true).await.unwrap();
5742 assert!(
5743 session
5744 .torrent_stats(info_hash)
5745 .await
5746 .unwrap()
5747 .user_seed_mode
5748 );
5749
5750 session.set_seed_mode(info_hash, false).await.unwrap();
5752 session.set_seed_mode(info_hash, false).await.unwrap();
5753 assert!(
5754 !session
5755 .torrent_stats(info_hash)
5756 .await
5757 .unwrap()
5758 .user_seed_mode
5759 );
5760
5761 session.shutdown().await.unwrap();
5762 }
5763
5764 #[tokio::test]
5767 async fn add_tracker_increases_count() {
5768 let session = SessionHandle::start(test_settings()).await.unwrap();
5769 let data = vec![0xAB; 16384];
5770 let meta = make_test_torrent(&data, 16384);
5771 let storage = make_storage(&data, 16384);
5772 let info_hash = session
5773 .add_torrent(meta.into(), Some(storage))
5774 .await
5775 .unwrap();
5776
5777 let before = session.tracker_list(info_hash).await.unwrap();
5779 assert!(before.is_empty());
5780
5781 session
5783 .add_tracker(info_hash, "udp://tracker.example.com:6969/announce".into())
5784 .await
5785 .unwrap();
5786
5787 let after = session.tracker_list(info_hash).await.unwrap();
5788 assert_eq!(after.len(), 1);
5789 assert_eq!(after[0].url, "udp://tracker.example.com:6969/announce");
5790
5791 session.shutdown().await.unwrap();
5792 }
5793
5794 #[tokio::test]
5797 async fn replace_trackers_replaces_all() {
5798 let session = SessionHandle::start(test_settings()).await.unwrap();
5799 let data = vec![0xAB; 16384];
5800 let meta = make_test_torrent(&data, 16384);
5801 let storage = make_storage(&data, 16384);
5802 let info_hash = session
5803 .add_torrent(meta.into(), Some(storage))
5804 .await
5805 .unwrap();
5806
5807 session
5809 .add_tracker(info_hash, "udp://tracker1.example.com:6969/announce".into())
5810 .await
5811 .unwrap();
5812 session
5813 .add_tracker(info_hash, "http://tracker2.example.com/announce".into())
5814 .await
5815 .unwrap();
5816 assert_eq!(session.tracker_list(info_hash).await.unwrap().len(), 2);
5817
5818 session
5820 .replace_trackers(
5821 info_hash,
5822 vec!["http://replacement.example.com/announce".into()],
5823 )
5824 .await
5825 .unwrap();
5826
5827 let after = session.tracker_list(info_hash).await.unwrap();
5828 assert_eq!(after.len(), 1);
5829 assert_eq!(after[0].url, "http://replacement.example.com/announce");
5830
5831 session.shutdown().await.unwrap();
5832 }
5833
5834 #[tokio::test]
5837 async fn add_tracker_deduplicates() {
5838 let session = SessionHandle::start(test_settings()).await.unwrap();
5839 let data = vec![0xAB; 16384];
5840 let meta = make_test_torrent(&data, 16384);
5841 let storage = make_storage(&data, 16384);
5842 let info_hash = session
5843 .add_torrent(meta.into(), Some(storage))
5844 .await
5845 .unwrap();
5846
5847 session
5849 .add_tracker(info_hash, "udp://tracker.example.com:6969/announce".into())
5850 .await
5851 .unwrap();
5852 session
5853 .add_tracker(info_hash, "udp://tracker.example.com:6969/announce".into())
5854 .await
5855 .unwrap();
5856
5857 let trackers = session.tracker_list(info_hash).await.unwrap();
5859 assert_eq!(trackers.len(), 1);
5860
5861 session.shutdown().await.unwrap();
5862 }
5863
5864 #[tokio::test]
5867 async fn info_hashes_matches_added_torrent() {
5868 let session = SessionHandle::start(test_settings()).await.unwrap();
5869 let data = vec![0xAB; 16384];
5870 let meta = make_test_torrent(&data, 16384);
5871 let expected_v1 = meta.info_hash;
5872 let storage = make_storage(&data, 16384);
5873
5874 let info_hash = session
5875 .add_torrent(meta.into(), Some(storage))
5876 .await
5877 .unwrap();
5878 let hashes = session.info_hashes(info_hash).await.unwrap();
5879 assert_eq!(hashes.v1, Some(expected_v1));
5880 assert!(hashes.v2.is_none());
5882
5883 session.shutdown().await.unwrap();
5884 }
5885
5886 #[tokio::test]
5889 async fn torrent_file_returns_meta() {
5890 let session = SessionHandle::start(test_settings()).await.unwrap();
5891 let data = vec![0xAB; 32768];
5892 let meta = make_test_torrent(&data, 16384);
5893 let storage = make_storage(&data, 16384);
5894
5895 let info_hash = session
5896 .add_torrent(meta.into(), Some(storage))
5897 .await
5898 .unwrap();
5899 let torrent = session.torrent_file(info_hash).await.unwrap();
5900 assert!(torrent.is_some());
5901 let torrent = torrent.unwrap();
5902 assert_eq!(torrent.info_hash, info_hash);
5903 assert_eq!(torrent.info.name, "test");
5904 assert_eq!(torrent.info.total_length(), 32768);
5905
5906 session.shutdown().await.unwrap();
5907 }
5908
5909 #[tokio::test]
5912 async fn torrent_file_none_before_metadata() {
5913 let session = SessionHandle::start(test_settings()).await.unwrap();
5914 let magnet = irontide_core::Magnet::parse(
5915 "magnet:?xt=urn:btih:aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d&dn=test",
5916 )
5917 .unwrap();
5918
5919 let info_hash = session.add_magnet(magnet).await.unwrap();
5920 let torrent = session.torrent_file(info_hash).await.unwrap();
5921 assert!(torrent.is_none());
5923
5924 session.shutdown().await.unwrap();
5925 }
5926
5927 #[tokio::test]
5930 async fn force_dht_announce_no_error() {
5931 let session = SessionHandle::start(test_settings()).await.unwrap();
5932 let data = vec![0xAB; 16384];
5933 let meta = make_test_torrent(&data, 16384);
5934 let storage = make_storage(&data, 16384);
5935 let info_hash = session
5936 .add_torrent(meta.into(), Some(storage))
5937 .await
5938 .unwrap();
5939
5940 let result = session.force_dht_announce(info_hash).await;
5942 assert!(
5943 result.is_ok(),
5944 "force_dht_announce should succeed: {result:?}"
5945 );
5946
5947 let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
5949 assert!(session.force_dht_announce(fake).await.is_err());
5950
5951 session.shutdown().await.unwrap();
5952 }
5953
5954 #[tokio::test]
5957 async fn force_lsd_announce_no_error() {
5958 let session = SessionHandle::start(test_settings()).await.unwrap();
5959 let data = vec![0xAB; 16384];
5960 let meta = make_test_torrent(&data, 16384);
5961 let storage = make_storage(&data, 16384);
5962 let info_hash = session
5963 .add_torrent(meta.into(), Some(storage))
5964 .await
5965 .unwrap();
5966
5967 let result = session.force_lsd_announce(info_hash).await;
5969 assert!(
5970 result.is_ok(),
5971 "force_lsd_announce should succeed: {result:?}"
5972 );
5973
5974 let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
5976 assert!(session.force_lsd_announce(fake).await.is_err());
5977
5978 session.shutdown().await.unwrap();
5979 }
5980
5981 #[tokio::test]
5984 async fn read_piece_after_download() {
5985 let data = vec![0xCD; 32768]; let meta = make_test_torrent(&data, 16384);
5987 let lengths = Lengths::new(data.len() as u64, 16384, DEFAULT_CHUNK_SIZE);
5988 let storage = Arc::new(MemoryStorage::new(lengths));
5989 storage.write_chunk(0, 0, &data[..16384]).unwrap();
5991 storage.write_chunk(1, 0, &data[16384..]).unwrap();
5992
5993 let session = SessionHandle::start(test_settings()).await.unwrap();
5994 let info_hash = session
5995 .add_torrent(meta.into(), Some(storage))
5996 .await
5997 .unwrap();
5998
5999 let piece_data = session.read_piece(info_hash, 0).await.unwrap();
6001 assert_eq!(piece_data.len(), 16384);
6002 assert!(piece_data.iter().all(|&b| b == 0xCD));
6003
6004 let piece_data = session.read_piece(info_hash, 1).await.unwrap();
6006 assert_eq!(piece_data.len(), 16384);
6007 assert!(piece_data.iter().all(|&b| b == 0xCD));
6008
6009 let result = session.read_piece(info_hash, 999).await;
6011 assert!(result.is_err(), "read_piece out of range should fail");
6012
6013 let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
6015 assert!(session.read_piece(fake, 0).await.is_err());
6016
6017 session.shutdown().await.unwrap();
6018 }
6019
6020 #[tokio::test]
6023 async fn flush_cache_completes() {
6024 let session = SessionHandle::start(test_settings()).await.unwrap();
6025 let data = vec![0xAB; 16384];
6026 let meta = make_test_torrent(&data, 16384);
6027 let storage = make_storage(&data, 16384);
6028 let info_hash = session
6029 .add_torrent(meta.into(), Some(storage))
6030 .await
6031 .unwrap();
6032
6033 let result = session.flush_cache(info_hash).await;
6035 assert!(result.is_ok(), "flush_cache should succeed: {result:?}");
6036
6037 let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
6039 assert!(session.flush_cache(fake).await.is_err());
6040
6041 session.shutdown().await.unwrap();
6042 }
6043
6044 fn test_settings_with_dht() -> Settings {
6047 let mut s = test_settings();
6048 s.enable_dht = true;
6049 s
6050 }
6051
6052 #[tokio::test]
6053 async fn test_dht_disabled_returns_error() {
6054 let session = SessionHandle::start(test_settings()).await.unwrap();
6055
6056 let err = session
6058 .dht_put_immutable(b"test".to_vec())
6059 .await
6060 .unwrap_err();
6061 assert!(
6062 format!("{err:?}").contains("DhtDisabled"),
6063 "expected DhtDisabled, got {err:?}"
6064 );
6065
6066 let target = Id20::from([0u8; 20]);
6067 let err = session.dht_get_immutable(target).await.unwrap_err();
6068 assert!(
6069 format!("{err:?}").contains("DhtDisabled"),
6070 "expected DhtDisabled, got {err:?}"
6071 );
6072
6073 let err = session
6074 .dht_put_mutable([42u8; 32], b"val".to_vec(), 1, Vec::new())
6075 .await
6076 .unwrap_err();
6077 assert!(
6078 format!("{err:?}").contains("DhtDisabled"),
6079 "expected DhtDisabled, got {err:?}"
6080 );
6081
6082 let err = session
6083 .dht_get_mutable([42u8; 32], Vec::new())
6084 .await
6085 .unwrap_err();
6086 assert!(
6087 format!("{err:?}").contains("DhtDisabled"),
6088 "expected DhtDisabled, got {err:?}"
6089 );
6090
6091 session.shutdown().await.unwrap();
6092 }
6093
6094 #[tokio::test]
6095 async fn test_dht_put_get_immutable_round_trip() {
6096 let session = SessionHandle::start(test_settings_with_dht())
6097 .await
6098 .unwrap();
6099
6100 let value = b"hello BEP 44".to_vec();
6102 let target = session.dht_put_immutable(value.clone()).await.unwrap();
6103
6104 let got = session.dht_get_immutable(target).await.unwrap();
6107 assert_eq!(got, Some(value));
6108
6109 session.shutdown().await.unwrap();
6110 }
6111
6112 #[tokio::test]
6113 async fn test_dht_put_immutable_fires_alert() {
6114 use crate::alert::{AlertCategory, AlertKind};
6115
6116 let session = SessionHandle::start(test_settings_with_dht())
6117 .await
6118 .unwrap();
6119 let mut alerts = session.subscribe_filtered(AlertCategory::DHT);
6120
6121 let value = b"alert test".to_vec();
6122 let target = session.dht_put_immutable(value).await.unwrap();
6123
6124 let alert = tokio::time::timeout(Duration::from_secs(5), alerts.recv())
6126 .await
6127 .expect("timeout waiting for alert")
6128 .expect("alert channel closed");
6129
6130 match alert.kind {
6131 AlertKind::DhtPutComplete { target: t } => {
6132 assert_eq!(t, target);
6133 }
6134 other => panic!("expected DhtPutComplete, got {other:?}"),
6135 }
6136
6137 session.shutdown().await.unwrap();
6138 }
6139
6140 fn make_private_torrent(data: &[u8], piece_length: u64) -> TorrentMetaV1 {
6144 use serde::Serialize;
6145
6146 let mut pieces = Vec::new();
6147 let mut offset = 0;
6148 while offset < data.len() {
6149 let end = (offset + piece_length as usize).min(data.len());
6150 let hash = irontide_core::sha1(&data[offset..end]);
6151 pieces.extend_from_slice(hash.as_bytes());
6152 offset = end;
6153 }
6154
6155 #[derive(Serialize)]
6156 struct Info<'a> {
6157 length: u64,
6158 name: &'a str,
6159 #[serde(rename = "piece length")]
6160 piece_length: u64,
6161 #[serde(with = "serde_bytes")]
6162 pieces: &'a [u8],
6163 private: i64,
6164 }
6165
6166 #[derive(Serialize)]
6167 struct Torrent<'a> {
6168 info: Info<'a>,
6169 }
6170
6171 let t = Torrent {
6172 info: Info {
6173 length: data.len() as u64,
6174 name: "private-test",
6175 piece_length,
6176 pieces: &pieces,
6177 private: 1,
6178 },
6179 };
6180
6181 let bytes = irontide_bencode::to_bytes(&t).unwrap();
6182 torrent_from_bytes(&bytes).unwrap()
6183 }
6184
6185 #[test]
6186 fn is_private_true_via_parsed_meta() {
6187 let data = vec![0xAB; 16384];
6189 let meta = make_private_torrent(&data, 16384);
6190 assert_eq!(
6191 meta.info.private,
6192 Some(1),
6193 "private field should be Some(1)"
6194 );
6195 }
6196
6197 #[test]
6198 fn is_private_false_for_public_torrent() {
6199 let data = vec![0xAB; 16384];
6201 let meta = make_test_torrent(&data, 16384);
6202 assert_eq!(
6203 meta.info.private, None,
6204 "public torrent should have no private flag"
6205 );
6206 }
6207
6208 #[test]
6209 fn private_torrent_config_disables_lsd() {
6210 let config = TorrentConfig::default();
6212 assert!(
6213 config.enable_lsd,
6214 "default TorrentConfig should have LSD enabled"
6215 );
6216 }
6217
6218 #[tokio::test]
6219 async fn force_lsd_announce_private_torrent_returns_error() {
6220 let session = SessionHandle::start(test_settings()).await.unwrap();
6221 let data = vec![0xAB; 16384];
6222 let meta = make_private_torrent(&data, 16384);
6223 let storage = make_storage(&data, 16384);
6224 let info_hash = session
6225 .add_torrent(meta.into(), Some(storage))
6226 .await
6227 .unwrap();
6228
6229 let result = session.force_lsd_announce(info_hash).await;
6231 assert!(
6232 result.is_err(),
6233 "force_lsd_announce on private torrent should return error, got: {result:?}"
6234 );
6235 let err_str = format!("{:?}", result.unwrap_err());
6236 assert!(
6237 err_str.contains("InvalidSettings") || err_str.contains("LSD disabled"),
6238 "expected InvalidSettings error, got: {err_str}"
6239 );
6240
6241 session.shutdown().await.unwrap();
6242 }
6243
6244 fn resume_test_settings(dir: &std::path::Path) -> Settings {
6247 Settings {
6248 resume_data_dir: Some(dir.to_path_buf()),
6249 save_resume_interval_secs: 0, ..test_settings()
6251 }
6252 }
6253
6254 #[tokio::test]
6255 async fn save_resume_state_empty_session_returns_zero() {
6256 let tmp = tempfile::TempDir::new().unwrap();
6257 let session = SessionHandle::start(resume_test_settings(tmp.path()))
6258 .await
6259 .unwrap();
6260
6261 let count = session.save_resume_state().await.unwrap();
6262 assert_eq!(count, 0, "empty session should save 0 resume files");
6263
6264 session.shutdown().await.unwrap();
6265 }
6266
6267 #[tokio::test]
6268 async fn save_resume_state_saves_dirty_torrents() {
6269 let tmp = tempfile::TempDir::new().unwrap();
6270 let session = SessionHandle::start(resume_test_settings(tmp.path()))
6271 .await
6272 .unwrap();
6273
6274 let data1 = vec![0xAA; 16384];
6276 let meta1 = make_test_torrent(&data1, 16384);
6277 let hash1 = meta1.info_hash;
6278 let storage1 = make_storage(&data1, 16384);
6279 session
6280 .add_torrent(meta1.into(), Some(storage1))
6281 .await
6282 .unwrap();
6283
6284 let data2 = vec![0xBB; 16384];
6285 let meta2 = make_test_torrent(&data2, 16384);
6286 let hash2 = meta2.info_hash;
6287 let storage2 = make_storage(&data2, 16384);
6288 session
6289 .add_torrent(meta2.into(), Some(storage2))
6290 .await
6291 .unwrap();
6292
6293 tokio::time::sleep(Duration::from_millis(50)).await;
6296
6297 let count = session.save_resume_state().await.unwrap();
6298 assert!(count <= 2, "should save at most 2 resume files");
6302
6303 let torrents_dir = tmp.path().join("torrents");
6305 if count > 0 {
6306 assert!(torrents_dir.exists(), "torrents/ directory should exist");
6307 }
6308
6309 let path1 = crate::resume_file::resume_file_path(tmp.path(), &hash1);
6311 let path2 = crate::resume_file::resume_file_path(tmp.path(), &hash2);
6312 let files_exist = path1.exists() as usize + path2.exists() as usize;
6313 assert_eq!(
6314 files_exist, count,
6315 "number of files on disk should match returned count"
6316 );
6317
6318 session.shutdown().await.unwrap();
6319 }
6320
6321 #[tokio::test]
6322 async fn save_resume_state_round_trip() {
6323 let tmp = tempfile::TempDir::new().unwrap();
6324 let session = SessionHandle::start(resume_test_settings(tmp.path()))
6325 .await
6326 .unwrap();
6327
6328 let data = vec![0xCD; 32768];
6329 let meta = make_test_torrent(&data, 16384);
6330 let info_hash = meta.info_hash;
6331 let storage = make_storage(&data, 16384);
6332 session
6333 .add_torrent(meta.into(), Some(storage))
6334 .await
6335 .unwrap();
6336
6337 tokio::time::sleep(Duration::from_millis(50)).await;
6339
6340 let count = session.save_resume_state().await.unwrap();
6341
6342 if count > 0 {
6344 let path = crate::resume_file::resume_file_path(tmp.path(), &info_hash);
6345 assert!(path.exists(), "resume file should exist after save");
6346
6347 let bytes = std::fs::read(&path).unwrap();
6348 let rd = crate::resume_file::deserialize_resume(&bytes).unwrap();
6349 assert_eq!(
6350 rd.info_hash,
6351 info_hash.as_bytes().to_vec(),
6352 "deserialized info_hash should match"
6353 );
6354 assert_eq!(rd.name, "test", "deserialized name should match");
6355 }
6356
6357 session.shutdown().await.unwrap();
6358 }
6359
6360 #[tokio::test]
6361 async fn save_resume_state_clears_dirty_flag() {
6362 let tmp = tempfile::TempDir::new().unwrap();
6363 let session = SessionHandle::start(resume_test_settings(tmp.path()))
6364 .await
6365 .unwrap();
6366
6367 let data = vec![0xEE; 16384];
6368 let meta = make_test_torrent(&data, 16384);
6369 let storage = make_storage(&data, 16384);
6370 session
6371 .add_torrent(meta.into(), Some(storage))
6372 .await
6373 .unwrap();
6374
6375 tokio::time::sleep(Duration::from_millis(50)).await;
6376
6377 let first_count = session.save_resume_state().await.unwrap();
6378
6379 let second_count = session.save_resume_state().await.unwrap();
6381 assert_eq!(
6382 second_count, 0,
6383 "second save should return 0 after dirty flag cleared (first saved {first_count})"
6384 );
6385
6386 session.shutdown().await.unwrap();
6387 }
6388
6389 #[tokio::test]
6390 async fn save_resume_state_second_save_skips_clean() {
6391 let tmp = tempfile::TempDir::new().unwrap();
6392 let session = SessionHandle::start(resume_test_settings(tmp.path()))
6393 .await
6394 .unwrap();
6395
6396 let data1 = vec![0xAA; 16384];
6397 let meta1 = make_test_torrent(&data1, 16384);
6398 let storage1 = make_storage(&data1, 16384);
6399 session
6400 .add_torrent(meta1.into(), Some(storage1))
6401 .await
6402 .unwrap();
6403
6404 let data2 = vec![0xBB; 16384];
6405 let meta2 = make_test_torrent(&data2, 16384);
6406 let storage2 = make_storage(&data2, 16384);
6407 session
6408 .add_torrent(meta2.into(), Some(storage2))
6409 .await
6410 .unwrap();
6411
6412 tokio::time::sleep(Duration::from_millis(50)).await;
6413
6414 let first = session.save_resume_state().await.unwrap();
6416
6417 let second = session.save_resume_state().await.unwrap();
6419 assert_eq!(
6420 second, 0,
6421 "second save should skip all clean torrents (first saved {first})"
6422 );
6423
6424 session.shutdown().await.unwrap();
6425 }
6426
6427 #[tokio::test]
6432 async fn load_resume_empty_dir_returns_zeros() {
6433 let tmp = tempfile::TempDir::new().unwrap();
6434 let mut settings = test_settings();
6435 settings.resume_data_dir = Some(tmp.path().to_path_buf());
6436
6437 let session = SessionHandle::start(settings).await.unwrap();
6438 let result = session.load_resume_state().await.unwrap();
6439 assert_eq!(result.restored, 0);
6440 assert_eq!(result.skipped, 0);
6441 assert_eq!(result.failed, 0);
6442
6443 session.shutdown().await.unwrap();
6444 }
6445
6446 #[tokio::test]
6449 async fn load_resume_corrupt_file_counted_as_failed() {
6450 let tmp = tempfile::TempDir::new().unwrap();
6451 let torrents_dir = tmp.path().join("torrents");
6452 std::fs::create_dir_all(&torrents_dir).unwrap();
6453
6454 let mut settings = test_settings();
6455 settings.resume_data_dir = Some(tmp.path().to_path_buf());
6456
6457 let session = SessionHandle::start(settings).await.unwrap();
6459
6460 tokio::time::sleep(Duration::from_millis(50)).await;
6463
6464 std::fs::write(
6467 torrents_dir.join("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef.resume"),
6468 b"this is not valid bencode",
6469 )
6470 .unwrap();
6471
6472 let result = session.load_resume_state().await.unwrap();
6473 assert_eq!(result.restored, 0);
6474 assert_eq!(result.skipped, 0);
6475 assert_eq!(result.failed, 1);
6476
6477 session.shutdown().await.unwrap();
6478 }
6479
6480 #[tokio::test]
6483 async fn load_resume_duplicate_skipped() {
6484 let tmp = tempfile::TempDir::new().unwrap();
6485 let mut settings = test_settings();
6486 settings.resume_data_dir = Some(tmp.path().to_path_buf());
6487
6488 let session = SessionHandle::start(settings).await.unwrap();
6489
6490 let data = vec![0xAB; 16384];
6492 let meta = make_test_torrent(&data, 16384);
6493 let info_hash = meta.info_hash;
6494 let storage = make_storage(&data, 16384);
6495 session
6496 .add_torrent(meta.into(), Some(storage))
6497 .await
6498 .unwrap();
6499
6500 tokio::time::sleep(Duration::from_millis(50)).await;
6502
6503 let _ = session.save_resume_state().await;
6505
6506 let result = session.load_resume_state().await.unwrap();
6508 assert!(
6509 session.list_torrents().await.unwrap().contains(&info_hash),
6510 "original torrent should still exist"
6511 );
6512 assert_eq!(result.skipped, 1, "duplicate should be skipped");
6513 assert_eq!(result.failed, 0);
6514
6515 session.shutdown().await.unwrap();
6516 }
6517
6518 #[test]
6521 fn reconstruct_torrent_meta_returns_some_with_correct_fields() {
6522 use crate::resume_file::reconstruct_torrent_meta;
6523 use irontide_core::FastResumeData;
6524
6525 let data = vec![0xAB; 16384];
6526 let meta = make_test_torrent(&data, 16384);
6527 let info_hash = meta.info_hash;
6528
6529 let info_bytes = irontide_bencode::to_bytes(&meta.info).unwrap();
6531 let mut rd = FastResumeData::new(
6532 info_hash.as_bytes().to_vec(),
6533 "test-torrent".into(),
6534 "/downloads".into(),
6535 );
6536 rd.info = Some(info_bytes);
6537 rd.trackers = vec![
6538 vec!["http://tracker1.example.com/announce".into()],
6539 vec!["http://tracker2.example.com/announce".into()],
6540 ];
6541 rd.url_seeds = vec!["http://seed.example.com/".into()];
6542 rd.http_seeds = vec!["http://httpseed.example.com/".into()];
6543
6544 let reconstructed = reconstruct_torrent_meta(&rd).expect("should reconstruct");
6545
6546 assert_eq!(reconstructed.info_hash, info_hash);
6547 assert_eq!(
6548 reconstructed.announce.as_deref(),
6549 Some("http://tracker1.example.com/announce")
6550 );
6551 assert!(reconstructed.announce_list.is_some());
6552 assert_eq!(reconstructed.announce_list.as_ref().unwrap().len(), 2);
6553 assert_eq!(
6554 reconstructed.url_list,
6555 vec!["http://seed.example.com/".to_string()]
6556 );
6557 assert_eq!(
6558 reconstructed.httpseeds,
6559 vec!["http://httpseed.example.com/".to_string()]
6560 );
6561 assert!(reconstructed.info_bytes.is_some());
6562 assert!(reconstructed.comment.is_none());
6563 assert!(reconstructed.created_by.is_none());
6564 assert!(reconstructed.creation_date.is_none());
6565 }
6566
6567 #[test]
6570 fn reconstruct_torrent_meta_returns_none_without_info() {
6571 use crate::resume_file::reconstruct_torrent_meta;
6572 use irontide_core::FastResumeData;
6573
6574 let rd = FastResumeData::new(vec![0xAB; 20], "magnet".into(), "/tmp".into());
6575 assert!(rd.info.is_none());
6577 assert!(reconstruct_torrent_meta(&rd).is_none());
6578 }
6579
6580 #[test]
6583 fn reconstruct_magnet_returns_some_with_correct_fields() {
6584 use crate::resume_file::reconstruct_magnet;
6585 use irontide_core::FastResumeData;
6586
6587 let mut rd = FastResumeData::new(vec![0xCC; 20], "my-torrent".into(), "/downloads".into());
6588 rd.trackers = vec![
6589 vec!["http://tracker1.com/announce".into()],
6590 vec![
6591 "http://tracker2.com/announce".into(),
6592 "http://tracker3.com/announce".into(),
6593 ],
6594 ];
6595
6596 let magnet = reconstruct_magnet(&rd).expect("should reconstruct magnet");
6597
6598 assert!(magnet.info_hashes.v1.is_some());
6599 assert!(magnet.info_hashes.v2.is_none());
6600 assert_eq!(magnet.display_name.as_deref(), Some("my-torrent"));
6601 assert_eq!(magnet.trackers.len(), 3);
6603 assert!(magnet.peers.is_empty());
6604 assert!(magnet.selected_files.is_none());
6605 }
6606
6607 #[test]
6610 fn reconstruct_magnet_preserves_info_hash2() {
6611 use crate::resume_file::reconstruct_magnet;
6612 use irontide_core::FastResumeData;
6613
6614 let mut rd = FastResumeData::new(vec![0xDD; 20], "v2-magnet".into(), "/tmp".into());
6615 rd.info_hash2 = Some(vec![0xEE; 32]);
6616
6617 let magnet = reconstruct_magnet(&rd).expect("should reconstruct");
6618 assert!(magnet.info_hashes.v1.is_some());
6619 assert!(magnet.info_hashes.v2.is_some());
6620
6621 let v2 = magnet.info_hashes.v2.unwrap();
6622 assert_eq!(v2.as_bytes(), &[0xEE; 32]);
6623 }
6624
6625 #[test]
6628 fn reconstruct_magnet_empty_name_is_none() {
6629 use crate::resume_file::reconstruct_magnet;
6630 use irontide_core::FastResumeData;
6631
6632 let rd = FastResumeData::new(vec![0xFF; 20], String::new(), "/tmp".into());
6633 let magnet = reconstruct_magnet(&rd).expect("should reconstruct");
6634 assert!(
6635 magnet.display_name.is_none(),
6636 "empty name should map to None"
6637 );
6638 }
6639
6640 #[tokio::test]
6645 async fn shutdown_saves_resume_files() {
6646 let tmp = tempfile::TempDir::new().unwrap();
6647 let session = SessionHandle::start(resume_test_settings(tmp.path()))
6648 .await
6649 .unwrap();
6650
6651 let data = vec![0xAB; 16384];
6652 let meta = make_test_torrent(&data, 16384);
6653 let info_hash = meta.info_hash;
6654 let storage = make_storage(&data, 16384);
6655 session
6656 .add_torrent(meta.into(), Some(storage))
6657 .await
6658 .unwrap();
6659
6660 session.pause_torrent(info_hash).await.unwrap();
6662 tokio::time::sleep(Duration::from_millis(50)).await;
6663 session.resume_torrent(info_hash).await.unwrap();
6664 tokio::time::sleep(Duration::from_millis(50)).await;
6665
6666 let path = crate::resume_file::resume_file_path(tmp.path(), &info_hash);
6667
6668 session.shutdown().await.unwrap();
6672 tokio::time::sleep(Duration::from_millis(200)).await;
6673
6674 assert!(path.exists(), "resume file should exist after shutdown");
6675 }
6676
6677 #[tokio::test]
6680 async fn auto_restore_on_startup() {
6681 let tmp = tempfile::TempDir::new().unwrap();
6682
6683 let info_hash;
6684 {
6685 let session = SessionHandle::start(resume_test_settings(tmp.path()))
6687 .await
6688 .unwrap();
6689
6690 let data = vec![0xAB; 16384];
6691 let meta = make_test_torrent(&data, 16384);
6692 info_hash = meta.info_hash;
6693 let storage = make_storage(&data, 16384);
6694 session
6695 .add_torrent(meta.into(), Some(storage))
6696 .await
6697 .unwrap();
6698
6699 tokio::time::sleep(Duration::from_millis(50)).await;
6700 let _ = session.save_resume_state().await;
6701 session.shutdown().await.unwrap();
6702 }
6703
6704 let path = crate::resume_file::resume_file_path(tmp.path(), &info_hash);
6706 assert!(path.exists(), "resume file should exist before restart");
6707
6708 {
6709 let session = SessionHandle::start(resume_test_settings(tmp.path()))
6711 .await
6712 .unwrap();
6713
6714 tokio::time::sleep(Duration::from_millis(100)).await;
6716
6717 let list = session.list_torrents().await.unwrap();
6718 assert!(
6719 list.contains(&info_hash),
6720 "torrent should be auto-restored on startup"
6721 );
6722
6723 session.shutdown().await.unwrap();
6724 }
6725 }
6726
6727 #[tokio::test]
6730 async fn shutdown_with_readonly_resume_dir_completes() {
6731 let tmp = tempfile::TempDir::new().unwrap();
6732 let readonly_dir = PathBuf::from("/proc/irontide-test-nonexistent");
6735 let mut settings = test_settings();
6736 settings.resume_data_dir = Some(readonly_dir);
6737
6738 let session = SessionHandle::start(settings).await.unwrap();
6739
6740 let data = vec![0xAB; 16384];
6741 let meta = make_test_torrent(&data, 16384);
6742 let storage = make_storage(&data, 16384);
6743 session
6744 .add_torrent(meta.into(), Some(storage))
6745 .await
6746 .unwrap();
6747
6748 tokio::time::sleep(Duration::from_millis(50)).await;
6749
6750 session.shutdown().await.unwrap();
6753
6754 drop(tmp);
6756 }
6757
6758 #[tokio::test]
6761 async fn orphan_resume_file_deleted_on_startup() {
6762 let tmp = tempfile::TempDir::new().unwrap();
6763 let torrents_dir = tmp.path().join("torrents");
6764 std::fs::create_dir_all(&torrents_dir).unwrap();
6765
6766 let orphan_path = torrents_dir.join("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef.resume");
6774 std::fs::write(&orphan_path, b"not valid bencode").unwrap();
6775 assert!(orphan_path.exists(), "orphan file should exist before test");
6776
6777 let session = SessionHandle::start(resume_test_settings(tmp.path()))
6778 .await
6779 .unwrap();
6780
6781 tokio::time::sleep(Duration::from_millis(100)).await;
6783
6784 assert!(
6785 !orphan_path.exists(),
6786 "orphan resume file should be deleted on startup"
6787 );
6788
6789 session.shutdown().await.unwrap();
6790 }
6791
6792 #[tokio::test]
6801 async fn multi_torrent_save_load_round_trip() {
6802 let tmp = tempfile::TempDir::new().unwrap();
6803
6804 let datasets: [u8; 3] = [0xAA, 0xBB, 0xCC];
6806 let mut hashes = Vec::with_capacity(3);
6807
6808 {
6809 let session = SessionHandle::start(resume_test_settings(tmp.path()))
6811 .await
6812 .unwrap();
6813
6814 for &byte in &datasets {
6815 let data = vec![byte; 16384];
6816 let meta = make_test_torrent(&data, 16384);
6817 let info_hash = meta.info_hash;
6818 let storage = make_storage(&data, 16384);
6819 session
6820 .add_torrent(meta.into(), Some(storage))
6821 .await
6822 .unwrap();
6823 hashes.push(info_hash);
6824 }
6825
6826 tokio::time::sleep(Duration::from_millis(100)).await;
6828
6829 let saved = session.save_resume_state().await.unwrap();
6830 assert_eq!(saved, 3, "all 3 torrents should be saved");
6831
6832 let files = crate::resume_file::scan_resume_dir(tmp.path());
6834 assert_eq!(files.len(), 3, "3 .resume files should be on disk");
6835
6836 for hash in &hashes {
6837 let path = crate::resume_file::resume_file_path(tmp.path(), hash);
6838 assert!(
6839 path.exists(),
6840 "resume file for {} should exist",
6841 hex::encode(hash.as_bytes())
6842 );
6843 }
6844
6845 session.shutdown().await.unwrap();
6846 }
6847
6848 {
6849 let session = SessionHandle::start(resume_test_settings(tmp.path()))
6857 .await
6858 .unwrap();
6859
6860 tokio::time::sleep(Duration::from_millis(200)).await;
6862
6863 let list = session.list_torrents().await.unwrap();
6864 assert_eq!(list.len(), 3, "all 3 torrents should be auto-restored");
6865
6866 for hash in &hashes {
6867 assert!(
6868 list.contains(hash),
6869 "torrent {} should be present after restore",
6870 hex::encode(hash.as_bytes())
6871 );
6872 }
6873
6874 session.shutdown().await.unwrap();
6875 }
6876 }
6877
6878 #[tokio::test]
6884 async fn corrupt_one_of_three_resume_files() {
6885 let tmp = tempfile::TempDir::new().unwrap();
6886
6887 let datasets: [u8; 3] = [0xDD, 0xEE, 0xFF];
6888 let mut hashes = Vec::with_capacity(3);
6889
6890 {
6891 let session = SessionHandle::start(resume_test_settings(tmp.path()))
6893 .await
6894 .unwrap();
6895
6896 for &byte in &datasets {
6897 let data = vec![byte; 16384];
6898 let meta = make_test_torrent(&data, 16384);
6899 let info_hash = meta.info_hash;
6900 let storage = make_storage(&data, 16384);
6901 session
6902 .add_torrent(meta.into(), Some(storage))
6903 .await
6904 .unwrap();
6905 hashes.push(info_hash);
6906 }
6907
6908 tokio::time::sleep(Duration::from_millis(100)).await;
6909
6910 let saved = session.save_resume_state().await.unwrap();
6911 assert_eq!(saved, 3, "all 3 torrents should be saved");
6912
6913 session.shutdown().await.unwrap();
6914 }
6915
6916 let corrupt_path = crate::resume_file::resume_file_path(tmp.path(), &hashes[1]);
6918 assert!(
6919 corrupt_path.exists(),
6920 "file to corrupt must exist before overwrite"
6921 );
6922 std::fs::write(&corrupt_path, b"CORRUPTED GARBAGE DATA 0xDEAD").unwrap();
6923
6924 {
6925 let session = SessionHandle::start(resume_test_settings(tmp.path()))
6928 .await
6929 .unwrap();
6930
6931 tokio::time::sleep(Duration::from_millis(200)).await;
6933
6934 let list = session.list_torrents().await.unwrap();
6935 assert_eq!(
6936 list.len(),
6937 2,
6938 "2 torrents should be restored (1 corrupt skipped)"
6939 );
6940
6941 assert!(
6943 list.contains(&hashes[0]),
6944 "first torrent should be restored"
6945 );
6946 assert!(
6947 list.contains(&hashes[2]),
6948 "third torrent should be restored"
6949 );
6950
6951 assert!(
6953 !list.contains(&hashes[1]),
6954 "corrupted torrent should not be restored"
6955 );
6956
6957 assert!(
6959 !corrupt_path.exists(),
6960 "corrupt resume file should be deleted by orphan cleanup"
6961 );
6962
6963 session.shutdown().await.unwrap();
6964 }
6965 }
6966
6967 #[tokio::test]
6974 async fn remove_torrent_deletes_resume_file() {
6975 let tmp = tempfile::TempDir::new().unwrap();
6976
6977 let data = vec![0x42; 16384];
6978 let meta = make_test_torrent(&data, 16384);
6979 let info_hash = meta.info_hash;
6980 let storage = make_storage(&data, 16384);
6981
6982 let session = SessionHandle::start(resume_test_settings(tmp.path()))
6983 .await
6984 .unwrap();
6985
6986 session
6987 .add_torrent(meta.into(), Some(storage))
6988 .await
6989 .unwrap();
6990
6991 tokio::time::sleep(Duration::from_millis(100)).await;
6993
6994 let saved = session.save_resume_state().await.unwrap();
6995 assert!(saved > 0, "torrent should be saved to a resume file");
6996
6997 let resume_path = crate::resume_file::resume_file_path(tmp.path(), &info_hash);
6998 assert!(resume_path.exists(), "resume file should exist after save");
6999
7000 session.remove_torrent(info_hash).await.unwrap();
7002 tokio::time::sleep(Duration::from_millis(50)).await;
7003
7004 let list = session.list_torrents().await.unwrap();
7005 assert!(
7006 !list.contains(&info_hash),
7007 "torrent should be gone from session after removal"
7008 );
7009
7010 assert!(
7011 !resume_path.exists(),
7012 "resume file should be deleted when torrent is removed"
7013 );
7014
7015 let remaining = crate::resume_file::scan_resume_dir(tmp.path());
7017 assert!(
7018 remaining.is_empty(),
7019 "no resume files should remain after removing the only torrent"
7020 );
7021
7022 session.shutdown().await.unwrap();
7023 }
7024}