Skip to main content

irontide_session/
session.rs

1//! SessionHandle / SessionActor — multi-torrent session manager.
2//!
3//! Actor model: SessionHandle is the cloneable public API (mpsc sender),
4//! SessionActor is the single-owner event loop (internal).
5
6use std::collections::HashMap;
7use std::net::{IpAddr, SocketAddr};
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicU32, Ordering};
11
12use dashmap::DashMap;
13use tokio::sync::{broadcast, mpsc, oneshot};
14
15use tracing::{debug, info, warn};
16
17use irontide_core::{DEFAULT_CHUNK_SIZE, Id20, Lengths, Magnet, TorrentMetaV1};
18use irontide_dht::DhtHandle;
19use irontide_storage::TorrentStorage;
20
21use crate::alert::{Alert, AlertCategory, AlertKind, AlertStream, post_alert};
22use crate::settings::Settings;
23use crate::torrent::TorrentHandle;
24use crate::types::{
25    FileInfo, SessionStats, TorrentConfig, TorrentInfo, TorrentState, TorrentStats, TorrentSummary,
26};
27
28/// Shared global rate limiter bucket.
29type SharedBucket = Arc<parking_lot::Mutex<crate::rate_limiter::TokenBucket>>;
30
31/// Function signature for queue move operations (move_up, move_down, etc.).
32type QueueMoveFn = fn(&mut [crate::queue::QueueEntry], Id20) -> Vec<(Id20, i32, i32)>;
33
34/// Shared session-wide ban manager, accessed by TorrentActors via `Arc`.
35pub(crate) type SharedBanManager = Arc<parking_lot::RwLock<crate::ban::BanManager>>;
36
37/// Shared session-wide IP filter, accessed by TorrentActors via `Arc`.
38pub(crate) type SharedIpFilter = Arc<parking_lot::RwLock<crate::ip_filter::IpFilter>>;
39
40/// Result of loading resume state from disk (M161 Phase 4).
41#[derive(Debug, Clone)]
42pub struct ResumeLoadResult {
43    /// Number of torrents successfully restored.
44    pub restored: usize,
45    /// Number of resume files skipped (duplicate, already exists).
46    pub skipped: usize,
47    /// Number of resume files that failed to load.
48    pub failed: usize,
49}
50
51/// Entry for a torrent managed by the session.
52struct TorrentEntry {
53    handle: TorrentHandle,
54    meta: Option<TorrentMetaV1>,
55    /// Queue position (-1 = not queued / not auto-managed).
56    queue_position: i32,
57    /// Whether the queue system controls this torrent.
58    auto_managed: bool,
59    /// When the torrent was last started/resumed (for startup grace period).
60    started_at: Option<tokio::time::Instant>,
61    /// Previous downloaded bytes (for rate calculation between auto-manage ticks).
62    prev_downloaded: u64,
63    /// Previous uploaded bytes (for rate calculation between auto-manage ticks).
64    prev_uploaded: u64,
65}
66
67impl TorrentEntry {
68    /// Returns `true` if this torrent has the private flag set (BEP 27).
69    fn is_private(&self) -> bool {
70        self.meta
71            .as_ref()
72            .is_some_and(|m| m.info.private == Some(1))
73    }
74}
75
76/// Commands sent from SessionHandle to SessionActor.
77enum SessionCommand {
78    AddTorrent {
79        meta: Box<irontide_core::TorrentMeta>,
80        storage: Option<Arc<dyn TorrentStorage>>,
81        download_dir: Option<PathBuf>,
82        reply: oneshot::Sender<crate::Result<Id20>>,
83    },
84    AddMagnet {
85        magnet: Magnet,
86        download_dir: Option<PathBuf>,
87        reply: oneshot::Sender<crate::Result<Id20>>,
88    },
89    RemoveTorrent {
90        info_hash: Id20,
91        reply: oneshot::Sender<crate::Result<()>>,
92    },
93    PauseTorrent {
94        info_hash: Id20,
95        reply: oneshot::Sender<crate::Result<()>>,
96    },
97    ResumeTorrent {
98        info_hash: Id20,
99        reply: oneshot::Sender<crate::Result<()>>,
100    },
101    TorrentStats {
102        info_hash: Id20,
103        reply: oneshot::Sender<crate::Result<TorrentStats>>,
104    },
105    TorrentInfo {
106        info_hash: Id20,
107        reply: oneshot::Sender<crate::Result<TorrentInfo>>,
108    },
109    ListTorrents {
110        reply: oneshot::Sender<Vec<Id20>>,
111    },
112    SessionStats {
113        reply: oneshot::Sender<SessionStats>,
114    },
115    SaveTorrentResumeData {
116        info_hash: Id20,
117        reply: oneshot::Sender<crate::Result<irontide_core::FastResumeData>>,
118    },
119    SaveSessionState {
120        reply: oneshot::Sender<crate::Result<crate::persistence::SessionState>>,
121    },
122    /// Load and restore torrents from per-torrent resume files on disk (M161).
123    LoadResumeState {
124        reply: oneshot::Sender<crate::Result<ResumeLoadResult>>,
125    },
126    QueuePosition {
127        info_hash: Id20,
128        reply: oneshot::Sender<crate::Result<i32>>,
129    },
130    SetQueuePosition {
131        info_hash: Id20,
132        pos: i32,
133        reply: oneshot::Sender<crate::Result<()>>,
134    },
135    QueuePositionUp {
136        info_hash: Id20,
137        reply: oneshot::Sender<crate::Result<()>>,
138    },
139    QueuePositionDown {
140        info_hash: Id20,
141        reply: oneshot::Sender<crate::Result<()>>,
142    },
143    QueuePositionTop {
144        info_hash: Id20,
145        reply: oneshot::Sender<crate::Result<()>>,
146    },
147    QueuePositionBottom {
148        info_hash: Id20,
149        reply: oneshot::Sender<crate::Result<()>>,
150    },
151    BanPeer {
152        ip: IpAddr,
153        reply: oneshot::Sender<()>,
154    },
155    UnbanPeer {
156        ip: IpAddr,
157        reply: oneshot::Sender<bool>,
158    },
159    BannedPeers {
160        reply: oneshot::Sender<Vec<IpAddr>>,
161    },
162    SetIpFilter {
163        filter: crate::ip_filter::IpFilter,
164        reply: oneshot::Sender<()>,
165    },
166    GetIpFilter {
167        reply: oneshot::Sender<crate::ip_filter::IpFilter>,
168    },
169    GetSettings {
170        reply: oneshot::Sender<Settings>,
171    },
172    ApplySettings {
173        settings: Box<Settings>,
174        reply: oneshot::Sender<crate::Result<()>>,
175    },
176    MoveTorrentStorage {
177        info_hash: Id20,
178        new_path: std::path::PathBuf,
179        reply: oneshot::Sender<crate::Result<()>>,
180    },
181    AddPeers {
182        info_hash: Id20,
183        peers: Vec<SocketAddr>,
184        source: crate::peer_state::PeerSource,
185        reply: oneshot::Sender<crate::Result<()>>,
186    },
187    OpenFile {
188        info_hash: Id20,
189        file_index: usize,
190        reply: oneshot::Sender<crate::Result<crate::streaming::FileStream>>,
191    },
192    ForceReannounce {
193        info_hash: Id20,
194        reply: oneshot::Sender<crate::Result<()>>,
195    },
196    TrackerList {
197        info_hash: Id20,
198        reply: oneshot::Sender<crate::Result<Vec<crate::tracker_manager::TrackerInfo>>>,
199    },
200    Scrape {
201        info_hash: Id20,
202        reply: oneshot::Sender<crate::Result<Option<(String, irontide_tracker::ScrapeInfo)>>>,
203    },
204    SetFilePriority {
205        info_hash: Id20,
206        index: usize,
207        priority: irontide_core::FilePriority,
208        reply: oneshot::Sender<crate::Result<()>>,
209    },
210    FilePriorities {
211        info_hash: Id20,
212        reply: oneshot::Sender<crate::Result<Vec<irontide_core::FilePriority>>>,
213    },
214    SetDownloadLimit {
215        info_hash: Id20,
216        bytes_per_sec: u64,
217        reply: oneshot::Sender<crate::Result<()>>,
218    },
219    SetUploadLimit {
220        info_hash: Id20,
221        bytes_per_sec: u64,
222        reply: oneshot::Sender<crate::Result<()>>,
223    },
224    DownloadLimit {
225        info_hash: Id20,
226        reply: oneshot::Sender<crate::Result<u64>>,
227    },
228    UploadLimit {
229        info_hash: Id20,
230        reply: oneshot::Sender<crate::Result<u64>>,
231    },
232    SetSequentialDownload {
233        info_hash: Id20,
234        enabled: bool,
235        reply: oneshot::Sender<crate::Result<()>>,
236    },
237    IsSequentialDownload {
238        info_hash: Id20,
239        reply: oneshot::Sender<crate::Result<bool>>,
240    },
241    SetSuperSeeding {
242        info_hash: Id20,
243        enabled: bool,
244        reply: oneshot::Sender<crate::Result<()>>,
245    },
246    IsSuperSeeding {
247        info_hash: Id20,
248        reply: oneshot::Sender<crate::Result<bool>>,
249    },
250    /// Enable or disable user-requested seed-only mode for a torrent (M159).
251    SetSeedMode {
252        info_hash: Id20,
253        enabled: bool,
254        reply: oneshot::Sender<crate::Result<()>>,
255    },
256    AddTracker {
257        info_hash: Id20,
258        url: String,
259        reply: oneshot::Sender<crate::Result<()>>,
260    },
261    ReplaceTrackers {
262        info_hash: Id20,
263        urls: Vec<String>,
264        reply: oneshot::Sender<crate::Result<()>>,
265    },
266    /// Trigger a full piece verification (force recheck) for a torrent.
267    ForceRecheck {
268        info_hash: Id20,
269        reply: oneshot::Sender<crate::Result<()>>,
270    },
271    /// Rename a file within a torrent on disk.
272    RenameFile {
273        info_hash: Id20,
274        file_index: usize,
275        new_name: String,
276        reply: oneshot::Sender<crate::Result<()>>,
277    },
278    /// Set per-torrent maximum connections (0 = use global default).
279    SetMaxConnections {
280        info_hash: Id20,
281        limit: usize,
282        reply: oneshot::Sender<crate::Result<()>>,
283    },
284    /// Get per-torrent maximum connection limit.
285    MaxConnections {
286        info_hash: Id20,
287        reply: oneshot::Sender<crate::Result<usize>>,
288    },
289    /// Set per-torrent maximum upload slots (unchoke slots).
290    SetMaxUploads {
291        info_hash: Id20,
292        limit: usize,
293        reply: oneshot::Sender<crate::Result<()>>,
294    },
295    /// Get per-torrent maximum upload slots (unchoke slots).
296    MaxUploads {
297        info_hash: Id20,
298        reply: oneshot::Sender<crate::Result<usize>>,
299    },
300    /// Get per-peer details for all connected peers of a torrent.
301    GetPeerInfo {
302        info_hash: Id20,
303        reply: oneshot::Sender<crate::Result<Vec<crate::types::PeerInfo>>>,
304    },
305    /// Get in-flight piece download status for a torrent.
306    GetDownloadQueue {
307        info_hash: Id20,
308        reply: oneshot::Sender<crate::Result<Vec<crate::types::PartialPieceInfo>>>,
309    },
310    /// Check whether a specific piece has been downloaded.
311    HavePiece {
312        info_hash: Id20,
313        index: u32,
314        reply: oneshot::Sender<crate::Result<bool>>,
315    },
316    /// Get per-piece availability counts from connected peers.
317    PieceAvailability {
318        info_hash: Id20,
319        reply: oneshot::Sender<crate::Result<Vec<u32>>>,
320    },
321    /// Get per-file bytes-downloaded progress.
322    FileProgress {
323        info_hash: Id20,
324        reply: oneshot::Sender<crate::Result<Vec<u64>>>,
325    },
326    /// Get the torrent's identity hashes (v1 and/or v2).
327    InfoHashesQuery {
328        info_hash: Id20,
329        reply: oneshot::Sender<crate::Result<irontide_core::InfoHashes>>,
330    },
331    /// Get the full v1 metainfo for a torrent.
332    TorrentFile {
333        info_hash: Id20,
334        reply: oneshot::Sender<crate::Result<Option<irontide_core::TorrentMetaV1>>>,
335    },
336    /// Get the full v2 metainfo for a torrent.
337    TorrentFileV2 {
338        info_hash: Id20,
339        reply: oneshot::Sender<crate::Result<Option<irontide_core::TorrentMetaV2>>>,
340    },
341    /// Force an immediate DHT announce for a torrent.
342    ForceDhtAnnounce {
343        info_hash: Id20,
344        reply: oneshot::Sender<crate::Result<()>>,
345    },
346    /// Force an immediate LSD announce for a torrent (session-level only).
347    ForceLsdAnnounce {
348        info_hash: Id20,
349        reply: oneshot::Sender<crate::Result<()>>,
350    },
351    /// Read all data for a specific piece from disk.
352    ReadPiece {
353        info_hash: Id20,
354        index: u32,
355        reply: oneshot::Sender<crate::Result<bytes::Bytes>>,
356    },
357    /// Flush the disk write cache for a torrent.
358    FlushCache {
359        info_hash: Id20,
360        reply: oneshot::Sender<crate::Result<()>>,
361    },
362    /// Check if a torrent handle is still valid (torrent exists and channel open).
363    IsValid {
364        info_hash: Id20,
365        reply: oneshot::Sender<bool>,
366    },
367    /// Clear error state on a torrent.
368    ClearError {
369        info_hash: Id20,
370        reply: oneshot::Sender<crate::Result<()>>,
371    },
372    /// Get per-file open/mode status for a torrent.
373    FileStatus {
374        info_hash: Id20,
375        reply: oneshot::Sender<crate::Result<Vec<crate::types::FileStatus>>>,
376    },
377    /// Read the current torrent flags.
378    Flags {
379        info_hash: Id20,
380        reply: oneshot::Sender<crate::Result<crate::types::TorrentFlags>>,
381    },
382    /// Set (enable) the specified torrent flags.
383    SetFlags {
384        info_hash: Id20,
385        flags: crate::types::TorrentFlags,
386        reply: oneshot::Sender<crate::Result<()>>,
387    },
388    /// Unset (disable) the specified torrent flags.
389    UnsetFlags {
390        info_hash: Id20,
391        flags: crate::types::TorrentFlags,
392        reply: oneshot::Sender<crate::Result<()>>,
393    },
394    /// Immediately initiate a peer connection for a torrent.
395    ConnectPeer {
396        info_hash: Id20,
397        addr: SocketAddr,
398        reply: oneshot::Sender<crate::Result<()>>,
399    },
400    DhtPutImmutable {
401        value: Vec<u8>,
402        reply: oneshot::Sender<crate::Result<Id20>>,
403    },
404    DhtGetImmutable {
405        target: Id20,
406        reply: oneshot::Sender<crate::Result<Option<Vec<u8>>>>,
407    },
408    DhtPutMutable {
409        keypair_bytes: [u8; 32],
410        value: Vec<u8>,
411        seq: i64,
412        salt: Vec<u8>,
413        reply: oneshot::Sender<crate::Result<Id20>>,
414    },
415    #[allow(clippy::type_complexity)]
416    DhtGetMutable {
417        public_key: [u8; 32],
418        salt: Vec<u8>,
419        reply: oneshot::Sender<crate::Result<Option<(Vec<u8>, i64)>>>,
420    },
421    /// Save per-torrent resume files for all dirty torrents (M161).
422    SaveResumeState {
423        reply: oneshot::Sender<crate::Result<usize>>,
424    },
425    /// Trigger an immediate session stats snapshot and alert (M50).
426    PostSessionStats,
427    Shutdown,
428}
429
430/// Cloneable handle for interacting with a running session.
431#[derive(Clone)]
432pub struct SessionHandle {
433    cmd_tx: mpsc::Sender<SessionCommand>,
434    alert_tx: broadcast::Sender<Alert>,
435    alert_mask: Arc<AtomicU32>,
436    counters: Arc<crate::stats::SessionCounters>,
437    /// Network transport factory (M51). Used by future simulation tasks.
438    #[allow(dead_code)]
439    factory: Arc<crate::transport::NetworkFactory>,
440}
441
442impl SessionHandle {
443    /// Start a new session with the given settings and no plugins.
444    pub async fn start(settings: Settings) -> crate::Result<Self> {
445        Self::start_with_plugins(settings, Arc::new(Vec::new())).await
446    }
447
448    /// Start a new session with a custom disk I/O backend and no plugins.
449    pub async fn start_with_backend(
450        settings: Settings,
451        backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
452    ) -> crate::Result<Self> {
453        Self::start_with_plugins_and_backend(settings, Arc::new(Vec::new()), backend).await
454    }
455
456    /// Start a new session with the given settings and extension plugins.
457    pub async fn start_with_plugins(
458        settings: Settings,
459        plugins: Arc<Vec<Box<dyn crate::extension::ExtensionPlugin>>>,
460    ) -> crate::Result<Self> {
461        let disk_config = crate::disk::DiskConfig::from(&settings);
462        let backend = crate::disk_backend::create_backend_from_config(&disk_config);
463        Self::start_with_plugins_and_backend(settings, plugins, backend).await
464    }
465
466    /// Start a new session with the given settings, extension plugins, and
467    /// a custom disk I/O backend.
468    pub async fn start_with_plugins_and_backend(
469        settings: Settings,
470        plugins: Arc<Vec<Box<dyn crate::extension::ExtensionPlugin>>>,
471        backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
472    ) -> crate::Result<Self> {
473        Self::start_full(
474            settings,
475            plugins,
476            backend,
477            Arc::new(crate::transport::NetworkFactory::tokio()),
478        )
479        .await
480    }
481
482    /// Start a new session with the given settings and a custom transport factory.
483    ///
484    /// Uses default plugins (none) and default disk backend.
485    pub async fn start_with_transport(
486        settings: Settings,
487        factory: Arc<crate::transport::NetworkFactory>,
488    ) -> crate::Result<Self> {
489        let disk_config = crate::disk::DiskConfig::from(&settings);
490        let backend = crate::disk_backend::create_backend_from_config(&disk_config);
491        Self::start_full(settings, Arc::new(Vec::new()), backend, factory).await
492    }
493
494    /// Start a new session with all customizable parameters.
495    ///
496    /// This is the most general constructor — all other `start_*` variants
497    /// delegate to this method. The `factory` parameter controls how TCP
498    /// listeners and connections are created: use [`crate::transport::NetworkFactory::tokio()`]
499    /// for real networking or a custom factory for simulation.
500    pub async fn start_full(
501        settings: Settings,
502        plugins: Arc<Vec<Box<dyn crate::extension::ExtensionPlugin>>>,
503        backend: Arc<dyn crate::disk_backend::DiskIoBackend>,
504        factory: Arc<crate::transport::NetworkFactory>,
505    ) -> crate::Result<Self> {
506        let mut settings = settings;
507
508        // Force proxy mode: all connections must go through proxy.
509        if settings.force_proxy {
510            if settings.proxy.proxy_type == crate::proxy::ProxyType::None {
511                return Err(crate::Error::Config(
512                    "force_proxy requires a proxy to be configured".into(),
513                ));
514            }
515            settings.enable_upnp = false;
516            settings.enable_natpmp = false;
517            settings.enable_dht = false;
518            settings.enable_lsd = false;
519        }
520
521        // Anonymous mode: suppress identity and disable discovery.
522        if settings.anonymous_mode {
523            settings.enable_dht = false;
524            settings.enable_lsd = false;
525            settings.enable_upnp = false;
526            settings.enable_natpmp = false;
527        }
528
529        let (cmd_tx, cmd_rx) = mpsc::channel(256);
530
531        // Alert broadcast channel
532        let (alert_tx, _) = broadcast::channel(settings.alert_channel_size);
533        let alert_mask = Arc::new(AtomicU32::new(settings.alert_mask.bits()));
534
535        let (lsd, lsd_peers_rx) = if settings.enable_lsd {
536            match crate::lsd::LsdHandle::start(settings.listen_port).await {
537                Ok((handle, rx)) => (Some(handle), Some(rx)),
538                Err(e) => {
539                    warn!("LSD unavailable (port 6771): {e}");
540                    (None, None)
541                }
542            }
543        } else {
544            (None, None)
545        };
546
547        let global_upload_bucket = Arc::new(parking_lot::Mutex::new(
548            crate::rate_limiter::TokenBucket::new(settings.upload_rate_limit),
549        ));
550        let global_download_bucket = Arc::new(parking_lot::Mutex::new(
551            crate::rate_limiter::TokenBucket::new(settings.download_rate_limit),
552        ));
553
554        // uTP socket (shared across all torrents)
555        let (utp_socket, utp_listener) = if settings.enable_utp {
556            match irontide_utp::UtpSocket::bind(settings.to_utp_config(settings.listen_port)).await
557            {
558                Ok((socket, listener)) => (Some(socket), Some(listener)),
559                Err(e) => {
560                    warn!("uTP bind failed: {e}");
561                    (None, None)
562                }
563            }
564        } else {
565            (None, None)
566        };
567
568        // IPv6 uTP socket (dual-stack)
569        let (utp_socket_v6, utp_listener_v6) = if settings.enable_utp && settings.enable_ipv6 {
570            match irontide_utp::UtpSocket::bind(settings.to_utp_config_v6(settings.listen_port))
571                .await
572            {
573                Ok((socket, listener)) => (Some(socket), Some(listener)),
574                Err(e) => {
575                    debug!("uTP IPv6 bind failed (non-fatal): {e}");
576                    (None, None)
577                }
578            }
579        } else {
580            (None, None)
581        };
582
583        // NAT port mapping (PCP / NAT-PMP / UPnP)
584        let (nat, nat_events_rx) = if settings.enable_upnp || settings.enable_natpmp {
585            let nat_config = settings.to_nat_config();
586            let (handle, events_rx) = irontide_nat::NatHandle::start(nat_config);
587            let udp_port = if settings.enable_utp {
588                Some(settings.listen_port)
589            } else {
590                None
591            };
592            handle.map_ports(settings.listen_port, udp_port).await;
593            (Some(handle), Some(events_rx))
594        } else {
595            (None, None)
596        };
597
598        // I2P SAM session
599        let sam_session = if settings.enable_i2p {
600            let tunnel_config = settings.to_sam_tunnel_config();
601            match crate::i2p::SamSession::create(
602                &settings.i2p_hostname,
603                settings.i2p_port,
604                "torrent",
605                tunnel_config,
606            )
607            .await
608            {
609                Ok(session) => {
610                    let b32 = session.destination().to_b32_address();
611                    info!("I2P SAM session created: {}", b32);
612                    post_alert(
613                        &alert_tx,
614                        &alert_mask,
615                        AlertKind::I2pSessionCreated { b32_address: b32 },
616                    );
617                    Some(Arc::new(session))
618                }
619                Err(e) => {
620                    warn!("I2P SAM session failed: {e}");
621                    post_alert(
622                        &alert_tx,
623                        &alert_mask,
624                        AlertKind::I2pError {
625                            message: format!("SAM session creation failed: {e}"),
626                        },
627                    );
628                    None
629                }
630            }
631        } else {
632            None
633        };
634
635        // SSL manager (M42): create if ssl_listen_port != 0 or cert paths are provided
636        let ssl_manager = if settings.ssl_listen_port != 0 || settings.ssl_cert_path.is_some() {
637            match crate::ssl_manager::SslManager::new(&settings) {
638                Ok(mgr) => {
639                    info!("SSL manager initialized");
640                    Some(Arc::new(mgr))
641                }
642                Err(e) => {
643                    warn!(error = %e, "SSL manager initialization failed");
644                    None
645                }
646            }
647        } else {
648            None
649        };
650
651        // TCP listener: bind on the main listen port for incoming peer connections.
652        let tcp_listener: Option<Box<dyn crate::transport::TransportListener>> = match factory
653            .bind_tcp(SocketAddr::from(([0, 0, 0, 0], settings.listen_port)))
654            .await
655        {
656            Ok(l) => {
657                info!(port = settings.listen_port, "TCP listener started");
658                Some(l)
659            }
660            Err(e) => {
661                warn!(port = settings.listen_port, error = %e, "TCP listener bind failed");
662                None
663            }
664        };
665
666        // SSL listener (M42): bind if ssl_listen_port != 0
667        let ssl_listener: Option<Box<dyn crate::transport::TransportListener>> = if settings
668            .ssl_listen_port
669            != 0
670        {
671            match factory
672                .bind_tcp(SocketAddr::from(([0, 0, 0, 0], settings.ssl_listen_port)))
673                .await
674            {
675                Ok(l) => {
676                    info!(port = settings.ssl_listen_port, "SSL listener started");
677                    Some(l)
678                }
679                Err(e) => {
680                    warn!(port = settings.ssl_listen_port, error = %e, "SSL listener bind failed");
681                    None
682                }
683            }
684        } else {
685            None
686        };
687
688        // Start DHT instances
689        let (dht_v4, dht_v4_ip_rx) = if settings.enable_dht {
690            match DhtHandle::start(settings.to_dht_config()).await {
691                Ok((handle, ip_rx)) => {
692                    info!("DHT v4 started");
693                    (Some(handle), Some(ip_rx))
694                }
695                Err(e) => {
696                    warn!("DHT v4 start failed: {e}");
697                    (None, None)
698                }
699            }
700        } else {
701            (None, None)
702        };
703
704        let (dht_v6, dht_v6_ip_rx) = if settings.enable_dht && settings.enable_ipv6 {
705            match DhtHandle::start(settings.to_dht_config_v6()).await {
706                Ok((handle, ip_rx)) => {
707                    info!("DHT v6 started");
708                    (Some(handle), Some(ip_rx))
709                }
710                Err(e) => {
711                    debug!("DHT v6 start failed (non-fatal): {e}");
712                    (None, None)
713                }
714            }
715        } else {
716            (None, None)
717        };
718
719        let ban_config = crate::ban::BanConfig::from(&settings);
720        let ban_manager: SharedBanManager = Arc::new(parking_lot::RwLock::new(
721            crate::ban::BanManager::new(ban_config),
722        ));
723
724        let ip_filter: SharedIpFilter =
725            Arc::new(parking_lot::RwLock::new(crate::ip_filter::IpFilter::new()));
726
727        let disk_config = crate::disk::DiskConfig::from(&settings);
728        let spawner = crate::blocking_spawner::BlockingSpawner::new(settings.max_blocking_threads);
729        let (disk_manager, disk_actor_handle) =
730            crate::disk::DiskManagerHandle::new_with_backend(disk_config, backend, spawner);
731
732        let counters = Arc::new(crate::stats::SessionCounters::new());
733
734        // M96: Create shared hash pool for parallel piece verification
735        let hash_pool = std::sync::Arc::new(crate::hash_pool::HashPool::new(
736            settings.hashing_threads,
737            64,
738        ));
739
740        // M114: Spawn isolated listener task for TCP/uTP accepts.
741        let info_hash_registry = Arc::new(DashMap::new());
742        let (validated_tx, validated_conn_rx) = mpsc::channel(64);
743        let listener_task = crate::listener::ListenerTask::new(
744            tcp_listener,
745            utp_listener,
746            utp_listener_v6,
747            Arc::clone(&info_hash_registry),
748            validated_tx,
749        );
750        let _listener_task = tokio::spawn(listener_task.run());
751
752        let external_ip = settings.external_ip;
753        let actor = SessionActor {
754            settings,
755            torrents: HashMap::new(),
756            dht_v4,
757            dht_v6,
758            lsd,
759            lsd_peers_rx,
760            cmd_rx,
761            alert_tx: alert_tx.clone(),
762            alert_mask: Arc::clone(&alert_mask),
763            global_upload_bucket,
764            global_download_bucket,
765            utp_socket,
766            utp_socket_v6,
767            nat,
768            nat_events_rx,
769            ban_manager,
770            ip_filter,
771            disk_manager,
772            disk_actor_handle,
773            external_ip,
774            dht_v4_ip_rx,
775            dht_v6_ip_rx,
776            plugins,
777            sam_session,
778            ssl_manager,
779            ssl_listener,
780            validated_conn_rx,
781            info_hash_registry,
782            _listener_task,
783            counters: Arc::clone(&counters),
784            factory: Arc::clone(&factory),
785            hash_pool,
786        };
787
788        let join_handle = tokio::spawn(actor.run());
789        tokio::spawn(async move {
790            match join_handle.await {
791                Ok(()) => {
792                    tracing::warn!("session actor exited cleanly");
793                }
794                Err(e) if e.is_panic() => {
795                    let panic_payload = e.into_panic();
796                    let msg = if let Some(s) = panic_payload.downcast_ref::<&str>() {
797                        (*s).to_string()
798                    } else if let Some(s) = panic_payload.downcast_ref::<String>() {
799                        s.clone()
800                    } else {
801                        "unknown panic payload".to_string()
802                    };
803                    tracing::error!("session actor PANICKED: {msg}");
804                }
805                Err(e) => {
806                    tracing::error!("session actor task error: {e}");
807                }
808            }
809        });
810        Ok(SessionHandle {
811            cmd_tx,
812            alert_tx,
813            alert_mask,
814            counters,
815            factory,
816        })
817    }
818
819    /// Add a torrent from parsed .torrent metadata (v1, v2, or hybrid).
820    pub async fn add_torrent(
821        &self,
822        meta: irontide_core::TorrentMeta,
823        storage: Option<Arc<dyn TorrentStorage>>,
824    ) -> crate::Result<Id20> {
825        self.add_torrent_with_dir(meta, storage, None).await
826    }
827
828    /// Add a torrent with an optional per-torrent download directory override.
829    pub async fn add_torrent_with_dir(
830        &self,
831        meta: irontide_core::TorrentMeta,
832        storage: Option<Arc<dyn TorrentStorage>>,
833        download_dir: Option<PathBuf>,
834    ) -> crate::Result<Id20> {
835        let (tx, rx) = oneshot::channel();
836        self.cmd_tx
837            .send(SessionCommand::AddTorrent {
838                meta: Box::new(meta),
839                storage,
840                download_dir,
841                reply: tx,
842            })
843            .await
844            .map_err(|_| crate::Error::Shutdown)?;
845        rx.await.map_err(|_| crate::Error::Shutdown)?
846    }
847
848    /// Add a torrent from a magnet link (metadata fetched via BEP 9).
849    pub async fn add_magnet(&self, magnet: Magnet) -> crate::Result<Id20> {
850        self.add_magnet_with_dir(magnet, None).await
851    }
852
853    /// Add a magnet link with an optional per-torrent download directory override.
854    pub async fn add_magnet_with_dir(
855        &self,
856        magnet: Magnet,
857        download_dir: Option<PathBuf>,
858    ) -> crate::Result<Id20> {
859        let (tx, rx) = oneshot::channel();
860        self.cmd_tx
861            .send(SessionCommand::AddMagnet {
862                magnet,
863                download_dir,
864                reply: tx,
865            })
866            .await
867            .map_err(|_| crate::Error::Shutdown)?;
868        rx.await.map_err(|_| crate::Error::Shutdown)?
869    }
870
871    /// Remove a torrent from the session.
872    pub async fn remove_torrent(&self, info_hash: Id20) -> crate::Result<()> {
873        let (tx, rx) = oneshot::channel();
874        self.cmd_tx
875            .send(SessionCommand::RemoveTorrent {
876                info_hash,
877                reply: tx,
878            })
879            .await
880            .map_err(|_| crate::Error::Shutdown)?;
881        rx.await.map_err(|_| crate::Error::Shutdown)?
882    }
883
884    /// Pause a torrent.
885    pub async fn pause_torrent(&self, info_hash: Id20) -> crate::Result<()> {
886        let (tx, rx) = oneshot::channel();
887        self.cmd_tx
888            .send(SessionCommand::PauseTorrent {
889                info_hash,
890                reply: tx,
891            })
892            .await
893            .map_err(|_| crate::Error::Shutdown)?;
894        rx.await.map_err(|_| crate::Error::Shutdown)?
895    }
896
897    /// Resume a paused torrent.
898    pub async fn resume_torrent(&self, info_hash: Id20) -> crate::Result<()> {
899        let (tx, rx) = oneshot::channel();
900        self.cmd_tx
901            .send(SessionCommand::ResumeTorrent {
902                info_hash,
903                reply: tx,
904            })
905            .await
906            .map_err(|_| crate::Error::Shutdown)?;
907        rx.await.map_err(|_| crate::Error::Shutdown)?
908    }
909
910    /// Get statistics for a specific torrent.
911    pub async fn torrent_stats(&self, info_hash: Id20) -> crate::Result<TorrentStats> {
912        let (tx, rx) = oneshot::channel();
913        self.cmd_tx
914            .send(SessionCommand::TorrentStats {
915                info_hash,
916                reply: tx,
917            })
918            .await
919            .map_err(|_| crate::Error::Shutdown)?;
920        rx.await.map_err(|_| crate::Error::Shutdown)?
921    }
922
923    /// Get metadata info for a specific torrent.
924    pub async fn torrent_info(&self, info_hash: Id20) -> crate::Result<TorrentInfo> {
925        let (tx, rx) = oneshot::channel();
926        self.cmd_tx
927            .send(SessionCommand::TorrentInfo {
928                info_hash,
929                reply: tx,
930            })
931            .await
932            .map_err(|_| crate::Error::Shutdown)?;
933        rx.await.map_err(|_| crate::Error::Shutdown)?
934    }
935
936    /// List all active torrent info hashes.
937    pub async fn list_torrents(&self) -> crate::Result<Vec<Id20>> {
938        let (tx, rx) = oneshot::channel();
939        self.cmd_tx
940            .send(SessionCommand::ListTorrents { reply: tx })
941            .await
942            .map_err(|_| crate::Error::Shutdown)?;
943        rx.await.map_err(|_| crate::Error::Shutdown)
944    }
945
946    /// Get aggregate session statistics.
947    pub async fn session_stats(&self) -> crate::Result<SessionStats> {
948        let (tx, rx) = oneshot::channel();
949        self.cmd_tx
950            .send(SessionCommand::SessionStats { reply: tx })
951            .await
952            .map_err(|_| crate::Error::Shutdown)?;
953        rx.await.map_err(|_| crate::Error::Shutdown)
954    }
955
956    /// Subscribe to all alerts passing the session-level mask.
957    pub fn subscribe(&self) -> broadcast::Receiver<Alert> {
958        self.alert_tx.subscribe()
959    }
960
961    /// Subscribe with per-subscriber category filtering.
962    pub fn subscribe_filtered(&self, filter: AlertCategory) -> AlertStream {
963        AlertStream::new(self.alert_tx.subscribe(), filter)
964    }
965
966    /// Trigger an immediate session stats snapshot and alert.
967    pub async fn post_session_stats(&self) -> crate::Result<()> {
968        self.cmd_tx
969            .send(SessionCommand::PostSessionStats)
970            .await
971            .map_err(|_| crate::Error::Shutdown)
972    }
973
974    /// Access the shared atomic counters (read-only handle).
975    pub fn counters(&self) -> &Arc<crate::stats::SessionCounters> {
976        &self.counters
977    }
978
979    /// Atomically update the session-level alert mask.
980    pub fn set_alert_mask(&self, mask: AlertCategory) {
981        self.alert_mask.store(mask.bits(), Ordering::Relaxed);
982    }
983
984    /// Read the current session-level alert mask.
985    pub fn alert_mask(&self) -> AlertCategory {
986        AlertCategory::from_bits_truncate(self.alert_mask.load(Ordering::Relaxed))
987    }
988
989    /// Add peers to a specific torrent by info hash.
990    pub async fn add_peers(
991        &self,
992        info_hash: Id20,
993        peers: Vec<SocketAddr>,
994        source: crate::peer_state::PeerSource,
995    ) -> crate::Result<()> {
996        let (tx, rx) = oneshot::channel();
997        self.cmd_tx
998            .send(SessionCommand::AddPeers {
999                info_hash,
1000                peers,
1001                source,
1002                reply: tx,
1003            })
1004            .await
1005            .map_err(|_| crate::Error::Shutdown)?;
1006        rx.await.map_err(|_| crate::Error::Shutdown)?
1007    }
1008
1009    /// Gracefully shut down the session and all torrents.
1010    pub async fn shutdown(&self) -> crate::Result<()> {
1011        // Timeout prevents hang if SessionActor is processing a heavy batch
1012        let _ = tokio::time::timeout(
1013            std::time::Duration::from_secs(10),
1014            self.cmd_tx.send(SessionCommand::Shutdown),
1015        )
1016        .await;
1017        Ok(())
1018    }
1019
1020    /// Save resume data for a specific torrent.
1021    pub async fn save_torrent_resume_data(
1022        &self,
1023        info_hash: Id20,
1024    ) -> crate::Result<irontide_core::FastResumeData> {
1025        let (tx, rx) = oneshot::channel();
1026        self.cmd_tx
1027            .send(SessionCommand::SaveTorrentResumeData {
1028                info_hash,
1029                reply: tx,
1030            })
1031            .await
1032            .map_err(|_| crate::Error::Shutdown)?;
1033        rx.await.map_err(|_| crate::Error::Shutdown)?
1034    }
1035
1036    /// Save full session state (all torrent resume data + DHT node cache).
1037    pub async fn save_session_state(&self) -> crate::Result<crate::persistence::SessionState> {
1038        let (tx, rx) = oneshot::channel();
1039        self.cmd_tx
1040            .send(SessionCommand::SaveSessionState { reply: tx })
1041            .await
1042            .map_err(|_| crate::Error::Shutdown)?;
1043        rx.await.map_err(|_| crate::Error::Shutdown)?
1044    }
1045
1046    /// Load and restore torrents from per-torrent resume files on disk.
1047    ///
1048    /// Scans the resume directory for `.resume` files, deserializes each one,
1049    /// reconstructs the torrent metadata, and re-adds it to the session. For
1050    /// resolved torrents (with stored info dict), the piece bitmap is restored.
1051    /// Unresolved magnets are re-added as magnet links.
1052    ///
1053    /// # Errors
1054    ///
1055    /// Returns [`crate::Error::Shutdown`] if the session has been shut down.
1056    pub async fn load_resume_state(&self) -> crate::Result<ResumeLoadResult> {
1057        let (tx, rx) = oneshot::channel();
1058        self.cmd_tx
1059            .send(SessionCommand::LoadResumeState { reply: tx })
1060            .await
1061            .map_err(|_| crate::Error::Shutdown)?;
1062        rx.await.map_err(|_| crate::Error::Shutdown)?
1063    }
1064
1065    /// Save per-torrent resume files for all dirty torrents.
1066    ///
1067    /// Iterates every torrent in the session, checks the `need_save_resume`
1068    /// dirty flag, serializes resume data to disk, and clears the flag.
1069    /// Returns the number of files written.
1070    ///
1071    /// # Errors
1072    ///
1073    /// Returns [`Error::Shutdown`] if the session actor has stopped.
1074    pub async fn save_resume_state(&self) -> crate::Result<usize> {
1075        let (tx, rx) = oneshot::channel();
1076        self.cmd_tx
1077            .send(SessionCommand::SaveResumeState { reply: tx })
1078            .await
1079            .map_err(|_| crate::Error::Shutdown)?;
1080        rx.await.map_err(|_| crate::Error::Shutdown)?
1081    }
1082
1083    /// Get the queue position of a torrent. Returns -1 if not auto-managed.
1084    pub async fn queue_position(&self, info_hash: Id20) -> crate::Result<i32> {
1085        let (tx, rx) = oneshot::channel();
1086        self.cmd_tx
1087            .send(SessionCommand::QueuePosition {
1088                info_hash,
1089                reply: tx,
1090            })
1091            .await
1092            .map_err(|_| crate::Error::Shutdown)?;
1093        rx.await.map_err(|_| crate::Error::Shutdown)?
1094    }
1095
1096    /// Set the absolute queue position of a torrent. Shifts other torrents.
1097    pub async fn set_queue_position(&self, info_hash: Id20, pos: i32) -> crate::Result<()> {
1098        let (tx, rx) = oneshot::channel();
1099        self.cmd_tx
1100            .send(SessionCommand::SetQueuePosition {
1101                info_hash,
1102                pos,
1103                reply: tx,
1104            })
1105            .await
1106            .map_err(|_| crate::Error::Shutdown)?;
1107        rx.await.map_err(|_| crate::Error::Shutdown)?
1108    }
1109
1110    /// Move a torrent one position up (lower number = higher priority).
1111    pub async fn queue_position_up(&self, info_hash: Id20) -> crate::Result<()> {
1112        let (tx, rx) = oneshot::channel();
1113        self.cmd_tx
1114            .send(SessionCommand::QueuePositionUp {
1115                info_hash,
1116                reply: tx,
1117            })
1118            .await
1119            .map_err(|_| crate::Error::Shutdown)?;
1120        rx.await.map_err(|_| crate::Error::Shutdown)?
1121    }
1122
1123    /// Move a torrent one position down.
1124    pub async fn queue_position_down(&self, info_hash: Id20) -> crate::Result<()> {
1125        let (tx, rx) = oneshot::channel();
1126        self.cmd_tx
1127            .send(SessionCommand::QueuePositionDown {
1128                info_hash,
1129                reply: tx,
1130            })
1131            .await
1132            .map_err(|_| crate::Error::Shutdown)?;
1133        rx.await.map_err(|_| crate::Error::Shutdown)?
1134    }
1135
1136    /// Move a torrent to position 0 (highest priority).
1137    pub async fn queue_position_top(&self, info_hash: Id20) -> crate::Result<()> {
1138        let (tx, rx) = oneshot::channel();
1139        self.cmd_tx
1140            .send(SessionCommand::QueuePositionTop {
1141                info_hash,
1142                reply: tx,
1143            })
1144            .await
1145            .map_err(|_| crate::Error::Shutdown)?;
1146        rx.await.map_err(|_| crate::Error::Shutdown)?
1147    }
1148
1149    /// Move a torrent to the last position (lowest priority).
1150    pub async fn queue_position_bottom(&self, info_hash: Id20) -> crate::Result<()> {
1151        let (tx, rx) = oneshot::channel();
1152        self.cmd_tx
1153            .send(SessionCommand::QueuePositionBottom {
1154                info_hash,
1155                reply: tx,
1156            })
1157            .await
1158            .map_err(|_| crate::Error::Shutdown)?;
1159        rx.await.map_err(|_| crate::Error::Shutdown)?
1160    }
1161
1162    /// Ban a peer IP session-wide. All torrents will disconnect and refuse this IP.
1163    pub async fn ban_peer(&self, ip: IpAddr) -> crate::Result<()> {
1164        let (tx, rx) = oneshot::channel();
1165        self.cmd_tx
1166            .send(SessionCommand::BanPeer { ip, reply: tx })
1167            .await
1168            .map_err(|_| crate::Error::Shutdown)?;
1169        rx.await.map_err(|_| crate::Error::Shutdown)
1170    }
1171
1172    /// Remove a ban and clear strikes for an IP. Returns `true` if the IP was banned.
1173    pub async fn unban_peer(&self, ip: IpAddr) -> crate::Result<bool> {
1174        let (tx, rx) = oneshot::channel();
1175        self.cmd_tx
1176            .send(SessionCommand::UnbanPeer { ip, reply: tx })
1177            .await
1178            .map_err(|_| crate::Error::Shutdown)?;
1179        rx.await.map_err(|_| crate::Error::Shutdown)
1180    }
1181
1182    /// Replace the session-wide IP filter. Connected peers that are now blocked will
1183    /// be refused on subsequent connection attempts.
1184    pub async fn set_ip_filter(&self, filter: crate::ip_filter::IpFilter) -> crate::Result<()> {
1185        let (tx, rx) = oneshot::channel();
1186        self.cmd_tx
1187            .send(SessionCommand::SetIpFilter { filter, reply: tx })
1188            .await
1189            .map_err(|_| crate::Error::Shutdown)?;
1190        rx.await.map_err(|_| crate::Error::Shutdown)
1191    }
1192
1193    /// Get a clone of the current IP filter.
1194    pub async fn ip_filter(&self) -> crate::Result<crate::ip_filter::IpFilter> {
1195        let (tx, rx) = oneshot::channel();
1196        self.cmd_tx
1197            .send(SessionCommand::GetIpFilter { reply: tx })
1198            .await
1199            .map_err(|_| crate::Error::Shutdown)?;
1200        rx.await.map_err(|_| crate::Error::Shutdown)
1201    }
1202
1203    /// Get a clone of the current session settings.
1204    pub async fn settings(&self) -> crate::Result<Settings> {
1205        let (tx, rx) = oneshot::channel();
1206        self.cmd_tx
1207            .send(SessionCommand::GetSettings { reply: tx })
1208            .await
1209            .map_err(|_| crate::Error::Shutdown)?;
1210        rx.await.map_err(|_| crate::Error::Shutdown)
1211    }
1212
1213    /// Apply new settings at runtime.
1214    ///
1215    /// Validates the settings, updates rate limiters immediately, and stores
1216    /// the new settings. Sub-actor reconfiguration (disk, DHT, NAT) takes
1217    /// effect on next session restart.
1218    pub async fn apply_settings(&self, settings: Settings) -> crate::Result<()> {
1219        let (tx, rx) = oneshot::channel();
1220        self.cmd_tx
1221            .send(SessionCommand::ApplySettings {
1222                settings: Box::new(settings),
1223                reply: tx,
1224            })
1225            .await
1226            .map_err(|_| crate::Error::Shutdown)?;
1227        rx.await.map_err(|_| crate::Error::Shutdown)?
1228    }
1229
1230    /// Get the list of currently banned peer IPs.
1231    pub async fn banned_peers(&self) -> crate::Result<Vec<IpAddr>> {
1232        let (tx, rx) = oneshot::channel();
1233        self.cmd_tx
1234            .send(SessionCommand::BannedPeers { reply: tx })
1235            .await
1236            .map_err(|_| crate::Error::Shutdown)?;
1237        rx.await.map_err(|_| crate::Error::Shutdown)
1238    }
1239
1240    /// Move a torrent's data files to a new download directory.
1241    pub async fn move_torrent_storage(
1242        &self,
1243        info_hash: Id20,
1244        new_path: std::path::PathBuf,
1245    ) -> crate::Result<()> {
1246        let (tx, rx) = oneshot::channel();
1247        self.cmd_tx
1248            .send(SessionCommand::MoveTorrentStorage {
1249                info_hash,
1250                new_path,
1251                reply: tx,
1252            })
1253            .await
1254            .map_err(|_| crate::Error::Shutdown)?;
1255        rx.await.map_err(|_| crate::Error::Shutdown)?
1256    }
1257
1258    /// Opens a file stream for sequential reading (`AsyncRead` + `AsyncSeek`).
1259    ///
1260    /// The returned [`FileStream`](crate::streaming::FileStream) reads data from
1261    /// a specific file within a torrent, blocking on pieces that haven't been
1262    /// downloaded yet.
1263    pub async fn open_file(
1264        &self,
1265        info_hash: Id20,
1266        file_index: usize,
1267    ) -> crate::Result<crate::streaming::FileStream> {
1268        let (tx, rx) = oneshot::channel();
1269        self.cmd_tx
1270            .send(SessionCommand::OpenFile {
1271                info_hash,
1272                file_index,
1273                reply: tx,
1274            })
1275            .await
1276            .map_err(|_| crate::Error::Shutdown)?;
1277        rx.await.map_err(|_| crate::Error::Shutdown)?
1278    }
1279
1280    /// Force all trackers for a torrent to re-announce immediately.
1281    pub async fn force_reannounce(&self, info_hash: Id20) -> crate::Result<()> {
1282        let (tx, rx) = oneshot::channel();
1283        self.cmd_tx
1284            .send(SessionCommand::ForceReannounce {
1285                info_hash,
1286                reply: tx,
1287            })
1288            .await
1289            .map_err(|_| crate::Error::Shutdown)?;
1290        rx.await.map_err(|_| crate::Error::Shutdown)?
1291    }
1292
1293    /// Get the list of all configured trackers with their status for a torrent.
1294    pub async fn tracker_list(
1295        &self,
1296        info_hash: Id20,
1297    ) -> crate::Result<Vec<crate::tracker_manager::TrackerInfo>> {
1298        let (tx, rx) = oneshot::channel();
1299        self.cmd_tx
1300            .send(SessionCommand::TrackerList {
1301                info_hash,
1302                reply: tx,
1303            })
1304            .await
1305            .map_err(|_| crate::Error::Shutdown)?;
1306        rx.await.map_err(|_| crate::Error::Shutdown)?
1307    }
1308
1309    /// Scrape trackers for seeder/leecher counts for a torrent.
1310    pub async fn scrape(
1311        &self,
1312        info_hash: Id20,
1313    ) -> crate::Result<Option<(String, irontide_tracker::ScrapeInfo)>> {
1314        let (tx, rx) = oneshot::channel();
1315        self.cmd_tx
1316            .send(SessionCommand::Scrape {
1317                info_hash,
1318                reply: tx,
1319            })
1320            .await
1321            .map_err(|_| crate::Error::Shutdown)?;
1322        rx.await.map_err(|_| crate::Error::Shutdown)?
1323    }
1324
1325    /// Set the download priority of a specific file within a torrent.
1326    pub async fn set_file_priority(
1327        &self,
1328        info_hash: Id20,
1329        index: usize,
1330        priority: irontide_core::FilePriority,
1331    ) -> crate::Result<()> {
1332        let (tx, rx) = oneshot::channel();
1333        self.cmd_tx
1334            .send(SessionCommand::SetFilePriority {
1335                info_hash,
1336                index,
1337                priority,
1338                reply: tx,
1339            })
1340            .await
1341            .map_err(|_| crate::Error::Shutdown)?;
1342        rx.await.map_err(|_| crate::Error::Shutdown)?
1343    }
1344
1345    /// Get the current per-file priorities for a torrent.
1346    pub async fn file_priorities(
1347        &self,
1348        info_hash: Id20,
1349    ) -> crate::Result<Vec<irontide_core::FilePriority>> {
1350        let (tx, rx) = oneshot::channel();
1351        self.cmd_tx
1352            .send(SessionCommand::FilePriorities {
1353                info_hash,
1354                reply: tx,
1355            })
1356            .await
1357            .map_err(|_| crate::Error::Shutdown)?;
1358        rx.await.map_err(|_| crate::Error::Shutdown)?
1359    }
1360
1361    /// Set the per-torrent download rate limit in bytes/sec (0 = unlimited).
1362    pub async fn set_download_limit(
1363        &self,
1364        info_hash: Id20,
1365        bytes_per_sec: u64,
1366    ) -> crate::Result<()> {
1367        let (tx, rx) = oneshot::channel();
1368        self.cmd_tx
1369            .send(SessionCommand::SetDownloadLimit {
1370                info_hash,
1371                bytes_per_sec,
1372                reply: tx,
1373            })
1374            .await
1375            .map_err(|_| crate::Error::Shutdown)?;
1376        rx.await.map_err(|_| crate::Error::Shutdown)?
1377    }
1378
1379    /// Set the per-torrent upload rate limit in bytes/sec (0 = unlimited).
1380    pub async fn set_upload_limit(&self, info_hash: Id20, bytes_per_sec: u64) -> crate::Result<()> {
1381        let (tx, rx) = oneshot::channel();
1382        self.cmd_tx
1383            .send(SessionCommand::SetUploadLimit {
1384                info_hash,
1385                bytes_per_sec,
1386                reply: tx,
1387            })
1388            .await
1389            .map_err(|_| crate::Error::Shutdown)?;
1390        rx.await.map_err(|_| crate::Error::Shutdown)?
1391    }
1392
1393    /// Get the current per-torrent download rate limit in bytes/sec (0 = unlimited).
1394    pub async fn download_limit(&self, info_hash: Id20) -> crate::Result<u64> {
1395        let (tx, rx) = oneshot::channel();
1396        self.cmd_tx
1397            .send(SessionCommand::DownloadLimit {
1398                info_hash,
1399                reply: tx,
1400            })
1401            .await
1402            .map_err(|_| crate::Error::Shutdown)?;
1403        rx.await.map_err(|_| crate::Error::Shutdown)?
1404    }
1405
1406    /// Get the current per-torrent upload rate limit in bytes/sec (0 = unlimited).
1407    pub async fn upload_limit(&self, info_hash: Id20) -> crate::Result<u64> {
1408        let (tx, rx) = oneshot::channel();
1409        self.cmd_tx
1410            .send(SessionCommand::UploadLimit {
1411                info_hash,
1412                reply: tx,
1413            })
1414            .await
1415            .map_err(|_| crate::Error::Shutdown)?;
1416        rx.await.map_err(|_| crate::Error::Shutdown)?
1417    }
1418
1419    /// Enable or disable sequential (in-order) piece downloading for a torrent.
1420    pub async fn set_sequential_download(
1421        &self,
1422        info_hash: Id20,
1423        enabled: bool,
1424    ) -> crate::Result<()> {
1425        let (tx, rx) = oneshot::channel();
1426        self.cmd_tx
1427            .send(SessionCommand::SetSequentialDownload {
1428                info_hash,
1429                enabled,
1430                reply: tx,
1431            })
1432            .await
1433            .map_err(|_| crate::Error::Shutdown)?;
1434        rx.await.map_err(|_| crate::Error::Shutdown)?
1435    }
1436
1437    /// Query whether sequential downloading is enabled for a torrent.
1438    pub async fn is_sequential_download(&self, info_hash: Id20) -> crate::Result<bool> {
1439        let (tx, rx) = oneshot::channel();
1440        self.cmd_tx
1441            .send(SessionCommand::IsSequentialDownload {
1442                info_hash,
1443                reply: tx,
1444            })
1445            .await
1446            .map_err(|_| crate::Error::Shutdown)?;
1447        rx.await.map_err(|_| crate::Error::Shutdown)?
1448    }
1449
1450    /// Enable or disable BEP 16 super seeding mode for a torrent.
1451    pub async fn set_super_seeding(&self, info_hash: Id20, enabled: bool) -> crate::Result<()> {
1452        let (tx, rx) = oneshot::channel();
1453        self.cmd_tx
1454            .send(SessionCommand::SetSuperSeeding {
1455                info_hash,
1456                enabled,
1457                reply: tx,
1458            })
1459            .await
1460            .map_err(|_| crate::Error::Shutdown)?;
1461        rx.await.map_err(|_| crate::Error::Shutdown)?
1462    }
1463
1464    /// Query whether BEP 16 super seeding mode is enabled for a torrent.
1465    pub async fn is_super_seeding(&self, info_hash: Id20) -> crate::Result<bool> {
1466        let (tx, rx) = oneshot::channel();
1467        self.cmd_tx
1468            .send(SessionCommand::IsSuperSeeding {
1469                info_hash,
1470                reply: tx,
1471            })
1472            .await
1473            .map_err(|_| crate::Error::Shutdown)?;
1474        rx.await.map_err(|_| crate::Error::Shutdown)?
1475    }
1476
1477    /// Enable or disable user-requested seed-only mode for a torrent (M159).
1478    ///
1479    /// When `enabled` is `true`, the engine stops scheduling new block requests
1480    /// and cancels all in-flight requests for the torrent, but continues to
1481    /// serve uploads to interested peers. This is distinct from "naturally
1482    /// seeding" (all pieces downloaded): it represents an explicit user toggle
1483    /// layered on top of the download state.
1484    ///
1485    /// Toggling back to `false` resumes normal piece scheduling.
1486    ///
1487    /// # Errors
1488    ///
1489    /// Returns [`crate::Error::TorrentNotFound`] if `info_hash` is not registered
1490    /// in the session, or [`crate::Error::Shutdown`] if the session actor has
1491    /// terminated.
1492    pub async fn set_seed_mode(&self, info_hash: Id20, enabled: bool) -> crate::Result<()> {
1493        let (tx, rx) = oneshot::channel();
1494        self.cmd_tx
1495            .send(SessionCommand::SetSeedMode {
1496                info_hash,
1497                enabled,
1498                reply: tx,
1499            })
1500            .await
1501            .map_err(|_| crate::Error::Shutdown)?;
1502        rx.await.map_err(|_| crate::Error::Shutdown)?
1503    }
1504
1505    /// Add a new tracker URL to a torrent.
1506    ///
1507    /// The URL is validated and deduplicated by the tracker manager.
1508    pub async fn add_tracker(&self, info_hash: Id20, url: String) -> crate::Result<()> {
1509        let (tx, rx) = oneshot::channel();
1510        self.cmd_tx
1511            .send(SessionCommand::AddTracker {
1512                info_hash,
1513                url,
1514                reply: tx,
1515            })
1516            .await
1517            .map_err(|_| crate::Error::Shutdown)?;
1518        rx.await.map_err(|_| crate::Error::Shutdown)?
1519    }
1520
1521    /// Replace all tracker URLs for a torrent.
1522    pub async fn replace_trackers(&self, info_hash: Id20, urls: Vec<String>) -> crate::Result<()> {
1523        let (tx, rx) = oneshot::channel();
1524        self.cmd_tx
1525            .send(SessionCommand::ReplaceTrackers {
1526                info_hash,
1527                urls,
1528                reply: tx,
1529            })
1530            .await
1531            .map_err(|_| crate::Error::Shutdown)?;
1532        rx.await.map_err(|_| crate::Error::Shutdown)?
1533    }
1534
1535    /// Trigger a full piece verification (force recheck) for a torrent.
1536    ///
1537    /// Clears all piece completion data, re-verifies every piece, and
1538    /// transitions to `Seeding` or `Downloading` depending on the result.
1539    /// Returns after the recheck is complete.
1540    pub async fn force_recheck(&self, info_hash: Id20) -> crate::Result<()> {
1541        let (tx, rx) = oneshot::channel();
1542        self.cmd_tx
1543            .send(SessionCommand::ForceRecheck {
1544                info_hash,
1545                reply: tx,
1546            })
1547            .await
1548            .map_err(|_| crate::Error::Shutdown)?;
1549        rx.await.map_err(|_| crate::Error::Shutdown)?
1550    }
1551
1552    /// Rename a file within a torrent on disk.
1553    ///
1554    /// Changes the filename of the specified file (by index) to `new_name`.
1555    /// The file stays in the same directory; only the filename component changes.
1556    /// Fires a `FileRenamed` alert on success.
1557    pub async fn rename_file(
1558        &self,
1559        info_hash: Id20,
1560        file_index: usize,
1561        new_name: String,
1562    ) -> crate::Result<()> {
1563        let (tx, rx) = oneshot::channel();
1564        self.cmd_tx
1565            .send(SessionCommand::RenameFile {
1566                info_hash,
1567                file_index,
1568                new_name,
1569                reply: tx,
1570            })
1571            .await
1572            .map_err(|_| crate::Error::Shutdown)?;
1573        rx.await.map_err(|_| crate::Error::Shutdown)?
1574    }
1575
1576    /// Set the per-torrent maximum number of connections (0 = use global default).
1577    pub async fn set_max_connections(&self, info_hash: Id20, limit: usize) -> crate::Result<()> {
1578        let (tx, rx) = oneshot::channel();
1579        self.cmd_tx
1580            .send(SessionCommand::SetMaxConnections {
1581                info_hash,
1582                limit,
1583                reply: tx,
1584            })
1585            .await
1586            .map_err(|_| crate::Error::Shutdown)?;
1587        rx.await.map_err(|_| crate::Error::Shutdown)?
1588    }
1589
1590    /// Get the current per-torrent maximum connection limit (0 = use global default).
1591    pub async fn max_connections(&self, info_hash: Id20) -> crate::Result<usize> {
1592        let (tx, rx) = oneshot::channel();
1593        self.cmd_tx
1594            .send(SessionCommand::MaxConnections {
1595                info_hash,
1596                reply: tx,
1597            })
1598            .await
1599            .map_err(|_| crate::Error::Shutdown)?;
1600        rx.await.map_err(|_| crate::Error::Shutdown)?
1601    }
1602
1603    /// Set the per-torrent maximum number of upload slots (unchoke slots).
1604    pub async fn set_max_uploads(&self, info_hash: Id20, limit: usize) -> crate::Result<()> {
1605        let (tx, rx) = oneshot::channel();
1606        self.cmd_tx
1607            .send(SessionCommand::SetMaxUploads {
1608                info_hash,
1609                limit,
1610                reply: tx,
1611            })
1612            .await
1613            .map_err(|_| crate::Error::Shutdown)?;
1614        rx.await.map_err(|_| crate::Error::Shutdown)?
1615    }
1616
1617    /// Get the current per-torrent maximum upload slots (unchoke slots).
1618    pub async fn max_uploads(&self, info_hash: Id20) -> crate::Result<usize> {
1619        let (tx, rx) = oneshot::channel();
1620        self.cmd_tx
1621            .send(SessionCommand::MaxUploads {
1622                info_hash,
1623                reply: tx,
1624            })
1625            .await
1626            .map_err(|_| crate::Error::Shutdown)?;
1627        rx.await.map_err(|_| crate::Error::Shutdown)?
1628    }
1629
1630    /// Get per-peer details for all connected peers of a torrent.
1631    pub async fn get_peer_info(
1632        &self,
1633        info_hash: Id20,
1634    ) -> crate::Result<Vec<crate::types::PeerInfo>> {
1635        let (tx, rx) = oneshot::channel();
1636        self.cmd_tx
1637            .send(SessionCommand::GetPeerInfo {
1638                info_hash,
1639                reply: tx,
1640            })
1641            .await
1642            .map_err(|_| crate::Error::Shutdown)?;
1643        rx.await.map_err(|_| crate::Error::Shutdown)?
1644    }
1645
1646    /// Get in-flight piece download status for a torrent (the download queue).
1647    pub async fn get_download_queue(
1648        &self,
1649        info_hash: Id20,
1650    ) -> crate::Result<Vec<crate::types::PartialPieceInfo>> {
1651        let (tx, rx) = oneshot::channel();
1652        self.cmd_tx
1653            .send(SessionCommand::GetDownloadQueue {
1654                info_hash,
1655                reply: tx,
1656            })
1657            .await
1658            .map_err(|_| crate::Error::Shutdown)?;
1659        rx.await.map_err(|_| crate::Error::Shutdown)?
1660    }
1661
1662    /// Check whether a specific piece has been downloaded for a torrent.
1663    pub async fn have_piece(&self, info_hash: Id20, index: u32) -> crate::Result<bool> {
1664        let (tx, rx) = oneshot::channel();
1665        self.cmd_tx
1666            .send(SessionCommand::HavePiece {
1667                info_hash,
1668                index,
1669                reply: tx,
1670            })
1671            .await
1672            .map_err(|_| crate::Error::Shutdown)?;
1673        rx.await.map_err(|_| crate::Error::Shutdown)?
1674    }
1675
1676    /// Get per-piece availability counts from connected peers for a torrent.
1677    pub async fn piece_availability(&self, info_hash: Id20) -> crate::Result<Vec<u32>> {
1678        let (tx, rx) = oneshot::channel();
1679        self.cmd_tx
1680            .send(SessionCommand::PieceAvailability {
1681                info_hash,
1682                reply: tx,
1683            })
1684            .await
1685            .map_err(|_| crate::Error::Shutdown)?;
1686        rx.await.map_err(|_| crate::Error::Shutdown)?
1687    }
1688
1689    /// Get per-file bytes-downloaded progress for a torrent.
1690    pub async fn file_progress(&self, info_hash: Id20) -> crate::Result<Vec<u64>> {
1691        let (tx, rx) = oneshot::channel();
1692        self.cmd_tx
1693            .send(SessionCommand::FileProgress {
1694                info_hash,
1695                reply: tx,
1696            })
1697            .await
1698            .map_err(|_| crate::Error::Shutdown)?;
1699        rx.await.map_err(|_| crate::Error::Shutdown)?
1700    }
1701
1702    /// Get the torrent's identity hashes (v1 and/or v2).
1703    pub async fn info_hashes(&self, info_hash: Id20) -> crate::Result<irontide_core::InfoHashes> {
1704        let (tx, rx) = oneshot::channel();
1705        self.cmd_tx
1706            .send(SessionCommand::InfoHashesQuery {
1707                info_hash,
1708                reply: tx,
1709            })
1710            .await
1711            .map_err(|_| crate::Error::Shutdown)?;
1712        rx.await.map_err(|_| crate::Error::Shutdown)?
1713    }
1714
1715    /// Get the full v1 metainfo for a torrent.
1716    ///
1717    /// Returns `None` for magnet links before metadata has been received.
1718    pub async fn torrent_file(
1719        &self,
1720        info_hash: Id20,
1721    ) -> crate::Result<Option<irontide_core::TorrentMetaV1>> {
1722        let (tx, rx) = oneshot::channel();
1723        self.cmd_tx
1724            .send(SessionCommand::TorrentFile {
1725                info_hash,
1726                reply: tx,
1727            })
1728            .await
1729            .map_err(|_| crate::Error::Shutdown)?;
1730        rx.await.map_err(|_| crate::Error::Shutdown)?
1731    }
1732
1733    /// Get the full v2 metainfo for a torrent.
1734    ///
1735    /// Returns `None` if the torrent is not a v2/hybrid torrent, or for magnet
1736    /// links before metadata has been received.
1737    pub async fn torrent_file_v2(
1738        &self,
1739        info_hash: Id20,
1740    ) -> crate::Result<Option<irontide_core::TorrentMetaV2>> {
1741        let (tx, rx) = oneshot::channel();
1742        self.cmd_tx
1743            .send(SessionCommand::TorrentFileV2 {
1744                info_hash,
1745                reply: tx,
1746            })
1747            .await
1748            .map_err(|_| crate::Error::Shutdown)?;
1749        rx.await.map_err(|_| crate::Error::Shutdown)?
1750    }
1751
1752    /// Force an immediate DHT announce for a torrent.
1753    pub async fn force_dht_announce(&self, info_hash: Id20) -> crate::Result<()> {
1754        let (tx, rx) = oneshot::channel();
1755        self.cmd_tx
1756            .send(SessionCommand::ForceDhtAnnounce {
1757                info_hash,
1758                reply: tx,
1759            })
1760            .await
1761            .map_err(|_| crate::Error::Shutdown)?;
1762        rx.await.map_err(|_| crate::Error::Shutdown)?
1763    }
1764
1765    /// Force an immediate LSD (Local Service Discovery) announce for a torrent.
1766    ///
1767    /// LSD is a session-level component — this does not go through the torrent actor.
1768    pub async fn force_lsd_announce(&self, info_hash: Id20) -> crate::Result<()> {
1769        let (tx, rx) = oneshot::channel();
1770        self.cmd_tx
1771            .send(SessionCommand::ForceLsdAnnounce {
1772                info_hash,
1773                reply: tx,
1774            })
1775            .await
1776            .map_err(|_| crate::Error::Shutdown)?;
1777        rx.await.map_err(|_| crate::Error::Shutdown)?
1778    }
1779
1780    /// Read all data for a specific piece from disk.
1781    pub async fn read_piece(&self, info_hash: Id20, index: u32) -> crate::Result<bytes::Bytes> {
1782        let (tx, rx) = oneshot::channel();
1783        self.cmd_tx
1784            .send(SessionCommand::ReadPiece {
1785                info_hash,
1786                index,
1787                reply: tx,
1788            })
1789            .await
1790            .map_err(|_| crate::Error::Shutdown)?;
1791        rx.await.map_err(|_| crate::Error::Shutdown)?
1792    }
1793
1794    /// Flush the disk write cache for a torrent.
1795    pub async fn flush_cache(&self, info_hash: Id20) -> crate::Result<()> {
1796        let (tx, rx) = oneshot::channel();
1797        self.cmd_tx
1798            .send(SessionCommand::FlushCache {
1799                info_hash,
1800                reply: tx,
1801            })
1802            .await
1803            .map_err(|_| crate::Error::Shutdown)?;
1804        rx.await.map_err(|_| crate::Error::Shutdown)?
1805    }
1806
1807    /// Check if a torrent exists in the session and its handle is still valid.
1808    pub async fn is_valid(&self, info_hash: Id20) -> bool {
1809        let (tx, rx) = oneshot::channel();
1810        if self
1811            .cmd_tx
1812            .send(SessionCommand::IsValid {
1813                info_hash,
1814                reply: tx,
1815            })
1816            .await
1817            .is_err()
1818        {
1819            return false;
1820        }
1821        rx.await.unwrap_or(false)
1822    }
1823
1824    /// Clear the error state on a torrent, resuming it if it was paused due to error.
1825    pub async fn clear_error(&self, info_hash: Id20) -> crate::Result<()> {
1826        let (tx, rx) = oneshot::channel();
1827        self.cmd_tx
1828            .send(SessionCommand::ClearError {
1829                info_hash,
1830                reply: tx,
1831            })
1832            .await
1833            .map_err(|_| crate::Error::Shutdown)?;
1834        rx.await.map_err(|_| crate::Error::Shutdown)?
1835    }
1836
1837    /// Get per-file open/mode status for a torrent.
1838    pub async fn file_status(
1839        &self,
1840        info_hash: Id20,
1841    ) -> crate::Result<Vec<crate::types::FileStatus>> {
1842        let (tx, rx) = oneshot::channel();
1843        self.cmd_tx
1844            .send(SessionCommand::FileStatus {
1845                info_hash,
1846                reply: tx,
1847            })
1848            .await
1849            .map_err(|_| crate::Error::Shutdown)?;
1850        rx.await.map_err(|_| crate::Error::Shutdown)?
1851    }
1852
1853    /// Read the current torrent flags as a [`crate::types::TorrentFlags`] bitflag set.
1854    pub async fn flags(&self, info_hash: Id20) -> crate::Result<crate::types::TorrentFlags> {
1855        let (tx, rx) = oneshot::channel();
1856        self.cmd_tx
1857            .send(SessionCommand::Flags {
1858                info_hash,
1859                reply: tx,
1860            })
1861            .await
1862            .map_err(|_| crate::Error::Shutdown)?;
1863        rx.await.map_err(|_| crate::Error::Shutdown)?
1864    }
1865
1866    /// Set (enable) the specified torrent flags.
1867    pub async fn set_flags(
1868        &self,
1869        info_hash: Id20,
1870        flags: crate::types::TorrentFlags,
1871    ) -> crate::Result<()> {
1872        let (tx, rx) = oneshot::channel();
1873        self.cmd_tx
1874            .send(SessionCommand::SetFlags {
1875                info_hash,
1876                flags,
1877                reply: tx,
1878            })
1879            .await
1880            .map_err(|_| crate::Error::Shutdown)?;
1881        rx.await.map_err(|_| crate::Error::Shutdown)?
1882    }
1883
1884    /// Unset (disable) the specified torrent flags.
1885    pub async fn unset_flags(
1886        &self,
1887        info_hash: Id20,
1888        flags: crate::types::TorrentFlags,
1889    ) -> crate::Result<()> {
1890        let (tx, rx) = oneshot::channel();
1891        self.cmd_tx
1892            .send(SessionCommand::UnsetFlags {
1893                info_hash,
1894                flags,
1895                reply: tx,
1896            })
1897            .await
1898            .map_err(|_| crate::Error::Shutdown)?;
1899        rx.await.map_err(|_| crate::Error::Shutdown)?
1900    }
1901
1902    /// Immediately initiate a peer connection for a torrent.
1903    pub async fn connect_peer(&self, info_hash: Id20, addr: SocketAddr) -> crate::Result<()> {
1904        let (tx, rx) = oneshot::channel();
1905        self.cmd_tx
1906            .send(SessionCommand::ConnectPeer {
1907                info_hash,
1908                addr,
1909                reply: tx,
1910            })
1911            .await
1912            .map_err(|_| crate::Error::Shutdown)?;
1913        rx.await.map_err(|_| crate::Error::Shutdown)?
1914    }
1915
1916    /// Store an immutable item in the DHT (BEP 44).
1917    ///
1918    /// Returns the SHA-1 target hash of the stored value.
1919    pub async fn dht_put_immutable(&self, value: Vec<u8>) -> crate::Result<Id20> {
1920        let (tx, rx) = oneshot::channel();
1921        self.cmd_tx
1922            .send(SessionCommand::DhtPutImmutable { value, reply: tx })
1923            .await
1924            .map_err(|_| crate::Error::Shutdown)?;
1925        rx.await.map_err(|_| crate::Error::Shutdown)?
1926    }
1927
1928    /// Retrieve an immutable item from the DHT (BEP 44).
1929    ///
1930    /// Returns `Some(value)` if found, `None` otherwise.
1931    pub async fn dht_get_immutable(&self, target: Id20) -> crate::Result<Option<Vec<u8>>> {
1932        let (tx, rx) = oneshot::channel();
1933        self.cmd_tx
1934            .send(SessionCommand::DhtGetImmutable { target, reply: tx })
1935            .await
1936            .map_err(|_| crate::Error::Shutdown)?;
1937        rx.await.map_err(|_| crate::Error::Shutdown)?
1938    }
1939
1940    /// Store a mutable item in the DHT (BEP 44).
1941    ///
1942    /// `keypair_bytes` is a 32-byte Ed25519 seed. Returns the target hash.
1943    pub async fn dht_put_mutable(
1944        &self,
1945        keypair_bytes: [u8; 32],
1946        value: Vec<u8>,
1947        seq: i64,
1948        salt: Vec<u8>,
1949    ) -> crate::Result<Id20> {
1950        let (tx, rx) = oneshot::channel();
1951        self.cmd_tx
1952            .send(SessionCommand::DhtPutMutable {
1953                keypair_bytes,
1954                value,
1955                seq,
1956                salt,
1957                reply: tx,
1958            })
1959            .await
1960            .map_err(|_| crate::Error::Shutdown)?;
1961        rx.await.map_err(|_| crate::Error::Shutdown)?
1962    }
1963
1964    /// Retrieve a mutable item from the DHT (BEP 44).
1965    ///
1966    /// Returns `Some((value, seq))` if found, `None` otherwise.
1967    pub async fn dht_get_mutable(
1968        &self,
1969        public_key: [u8; 32],
1970        salt: Vec<u8>,
1971    ) -> crate::Result<Option<(Vec<u8>, i64)>> {
1972        let (tx, rx) = oneshot::channel();
1973        self.cmd_tx
1974            .send(SessionCommand::DhtGetMutable {
1975                public_key,
1976                salt,
1977                reply: tx,
1978            })
1979            .await
1980            .map_err(|_| crate::Error::Shutdown)?;
1981        rx.await.map_err(|_| crate::Error::Shutdown)?
1982    }
1983
1984    // ── M121: Convenience API for future HTTP endpoints ──
1985
1986    /// List all torrents as lightweight summaries.
1987    ///
1988    /// Fetches stats for each active torrent and converts to [`TorrentSummary`].
1989    /// Torrents that fail the stats query (e.g. shutting down) are silently skipped.
1990    pub async fn list_torrent_summaries(&self) -> crate::Result<Vec<TorrentSummary>> {
1991        let ids = self.list_torrents().await?;
1992        let mut summaries = Vec::with_capacity(ids.len());
1993        for id in ids {
1994            if let Ok(stats) = self.torrent_stats(id).await {
1995                summaries.push(TorrentSummary::from(&stats));
1996            }
1997        }
1998        Ok(summaries)
1999    }
2000
2001    /// Add a torrent from a magnet URI string.
2002    ///
2003    /// Parses the URI, extracts info hashes, and adds the magnet to the session.
2004    /// Returns the info hashes (v1 and/or v2) for the added torrent.
2005    pub async fn add_magnet_uri(&self, uri: &str) -> crate::Result<irontide_core::InfoHashes> {
2006        let magnet = irontide_core::Magnet::parse(uri)?;
2007        let info_hashes = magnet.info_hashes.clone();
2008        self.add_magnet(magnet).await?;
2009        Ok(info_hashes)
2010    }
2011
2012    /// Add a torrent from raw .torrent file bytes.
2013    ///
2014    /// Auto-detects v1, v2, or hybrid format. Returns the info hashes for the
2015    /// added torrent.
2016    pub async fn add_torrent_bytes(
2017        &self,
2018        bytes: &[u8],
2019    ) -> crate::Result<irontide_core::InfoHashes> {
2020        let meta = irontide_core::torrent_from_bytes_any(bytes)?;
2021        let info_hashes = meta.info_hashes();
2022        self.add_torrent(meta, None).await?;
2023        Ok(info_hashes)
2024    }
2025}
2026
2027// ---------------------------------------------------------------------------
2028// SessionActor — internal single-owner event loop
2029// ---------------------------------------------------------------------------
2030
2031struct SessionActor {
2032    settings: Settings,
2033    torrents: HashMap<Id20, TorrentEntry>,
2034    dht_v4: Option<DhtHandle>,
2035    dht_v6: Option<DhtHandle>,
2036    lsd: Option<crate::lsd::LsdHandle>,
2037    lsd_peers_rx: Option<mpsc::Receiver<(Id20, SocketAddr)>>,
2038    cmd_rx: mpsc::Receiver<SessionCommand>,
2039    alert_tx: broadcast::Sender<Alert>,
2040    alert_mask: Arc<AtomicU32>,
2041    global_upload_bucket: SharedBucket,
2042    global_download_bucket: SharedBucket,
2043    utp_socket: Option<irontide_utp::UtpSocket>,
2044    utp_socket_v6: Option<irontide_utp::UtpSocket>,
2045    nat: Option<irontide_nat::NatHandle>,
2046    nat_events_rx: Option<mpsc::Receiver<irontide_nat::NatEvent>>,
2047    ban_manager: SharedBanManager,
2048    ip_filter: SharedIpFilter,
2049    disk_manager: crate::disk::DiskManagerHandle,
2050    #[allow(dead_code)]
2051    disk_actor_handle: tokio::task::JoinHandle<()>,
2052    /// External IP discovered via NAT traversal or configured manually (BEP 40).
2053    external_ip: Option<std::net::IpAddr>,
2054    /// BEP 42: External IP consensus from DHT v4 KRPC responses.
2055    dht_v4_ip_rx: Option<mpsc::Receiver<std::net::IpAddr>>,
2056    /// BEP 42: External IP consensus from DHT v6 KRPC responses.
2057    dht_v6_ip_rx: Option<mpsc::Receiver<std::net::IpAddr>>,
2058    /// Registered extension plugins, shared with all TorrentActors.
2059    plugins: Arc<Vec<Box<dyn crate::extension::ExtensionPlugin>>>,
2060    /// I2P SAM session (if enabled).
2061    sam_session: Option<Arc<crate::i2p::SamSession>>,
2062    /// SSL manager for SSL torrent certificate handling (M42).
2063    ssl_manager: Option<Arc<crate::ssl_manager::SslManager>>,
2064    /// SSL/TLS TCP listener (separate port from the main listener) (M42).
2065    ssl_listener: Option<Box<dyn crate::transport::TransportListener>>,
2066    /// Channel receiving pre-validated inbound connections from the ListenerTask (M114).
2067    validated_conn_rx: mpsc::Receiver<crate::listener::IdentifiedConnection>,
2068    /// Registry of active info hashes shared with the ListenerTask (M114).
2069    /// INVARIANT: Must be updated whenever torrents are added/removed.
2070    /// If a new torrent-add path is added without updating this registry,
2071    /// inbound connections for that torrent will be silently rejected.
2072    info_hash_registry: Arc<DashMap<Id20, ()>>,
2073    /// Handle to keep the listener task alive; dropped on session shutdown (M114).
2074    #[allow(dead_code)]
2075    _listener_task: tokio::task::JoinHandle<()>,
2076    /// Shared atomic session counters (M50).
2077    counters: Arc<crate::stats::SessionCounters>,
2078    /// Network transport factory for TCP operations (M51).
2079    factory: Arc<crate::transport::NetworkFactory>,
2080    /// Shared hash pool for parallel piece verification (M96).
2081    hash_pool: std::sync::Arc<crate::hash_pool::HashPool>,
2082}
2083
2084impl SessionActor {
2085    async fn run(mut self) {
2086        let mut refill_interval = tokio::time::interval(std::time::Duration::from_millis(100));
2087        refill_interval.tick().await; // skip first immediate tick
2088
2089        let auto_manage_secs = self.settings.auto_manage_interval.max(1);
2090        let mut auto_manage_interval =
2091            tokio::time::interval(std::time::Duration::from_secs(auto_manage_secs));
2092        auto_manage_interval.tick().await; // skip first immediate tick
2093
2094        // Periodic session stats timer (M50)
2095        let stats_interval_ms = self.settings.stats_report_interval;
2096        let mut stats_timer = if stats_interval_ms > 0 {
2097            Some(tokio::time::interval(std::time::Duration::from_millis(
2098                stats_interval_ms,
2099            )))
2100        } else {
2101            None
2102        };
2103        if let Some(ref mut t) = stats_timer {
2104            t.tick().await; // skip first immediate tick
2105        }
2106
2107        // Periodic sample_infohashes timer (BEP 51, M111)
2108        let sample_interval_secs = self.settings.dht_sample_infohashes_interval;
2109        let mut sample_timer = if sample_interval_secs > 0 {
2110            Some(tokio::time::interval(std::time::Duration::from_secs(
2111                sample_interval_secs,
2112            )))
2113        } else {
2114            None
2115        };
2116        if let Some(ref mut t) = sample_timer {
2117            t.tick().await; // skip first immediate tick
2118        }
2119
2120        // Periodic resume file save timer (M161)
2121        let mut resume_save_interval = if self.settings.save_resume_interval_secs > 0 {
2122            Some(tokio::time::interval(std::time::Duration::from_secs(
2123                self.settings.save_resume_interval_secs,
2124            )))
2125        } else {
2126            None
2127        };
2128        if let Some(ref mut t) = resume_save_interval {
2129            t.tick().await; // skip first immediate tick
2130        }
2131
2132        // Auto-restore torrents from resume files on startup (M161 Phase 5).
2133        {
2134            let resume_dir = self.effective_resume_dir();
2135            let resume_files = crate::resume_file::scan_resume_dir(&resume_dir);
2136            if !resume_files.is_empty() {
2137                // Reuse the existing sequential restore logic from Phase 4.
2138                match self.handle_load_resume_state().await {
2139                    Ok(result) => {
2140                        info!(
2141                            restored = result.restored,
2142                            skipped = result.skipped,
2143                            failed = result.failed,
2144                            "auto-restored torrents on startup"
2145                        );
2146                    }
2147                    Err(e) => {
2148                        warn!("auto-restore on startup failed: {e}");
2149                    }
2150                }
2151
2152                // Orphan cleanup: delete .resume files whose hex stem does not
2153                // match any info hash currently in the session.
2154                let active_hashes: std::collections::HashSet<String> = self
2155                    .torrents
2156                    .keys()
2157                    .map(|h| hex::encode(h.as_bytes()))
2158                    .collect();
2159
2160                // Re-scan after load — some files may have been consumed.
2161                let current_files = crate::resume_file::scan_resume_dir(&resume_dir);
2162                for path in &current_files {
2163                    if let Some(stem) = path.file_stem().and_then(|s| s.to_str())
2164                        && !active_hashes.contains(stem)
2165                    {
2166                        if let Err(e) = std::fs::remove_file(path) {
2167                            warn!(path = %path.display(), "failed to remove orphan resume file: {e}");
2168                        } else {
2169                            debug!(path = %path.display(), "removed orphan resume file");
2170                        }
2171                    }
2172                }
2173            }
2174        }
2175
2176        loop {
2177            tokio::select! {
2178                cmd = self.cmd_rx.recv() => {
2179                    match cmd {
2180                        Some(SessionCommand::AddTorrent {
2181                            meta,
2182                            storage,
2183                            download_dir,
2184                            reply,
2185                        }) => {
2186                            let result = self.handle_add_torrent(*meta, storage, download_dir).await;
2187                            let _ = reply.send(result);
2188                        }
2189                        Some(SessionCommand::AddMagnet { magnet, download_dir, reply }) => {
2190                            let result = self.handle_add_magnet(magnet, download_dir).await;
2191                            let _ = reply.send(result);
2192                        }
2193                        Some(SessionCommand::RemoveTorrent { info_hash, reply }) => {
2194                            let result = self.handle_remove_torrent(info_hash).await;
2195                            let _ = reply.send(result);
2196                        }
2197                        Some(SessionCommand::PauseTorrent { info_hash, reply }) => {
2198                            let result = self.handle_pause_torrent(info_hash).await;
2199                            let _ = reply.send(result);
2200                        }
2201                        Some(SessionCommand::ResumeTorrent { info_hash, reply }) => {
2202                            let result = self.handle_resume_torrent(info_hash).await;
2203                            let _ = reply.send(result);
2204                        }
2205                        Some(SessionCommand::TorrentStats { info_hash, reply }) => {
2206                            let result = self.handle_torrent_stats(info_hash).await;
2207                            let _ = reply.send(result);
2208                        }
2209                        Some(SessionCommand::TorrentInfo { info_hash, reply }) => {
2210                            let result = self.handle_torrent_info(info_hash);
2211                            let _ = reply.send(result);
2212                        }
2213                        Some(SessionCommand::ListTorrents { reply }) => {
2214                            let list: Vec<Id20> = self.torrents.keys().copied().collect();
2215                            let _ = reply.send(list);
2216                        }
2217                        Some(SessionCommand::SessionStats { reply }) => {
2218                            let stats = self.make_session_stats().await;
2219                            let _ = reply.send(stats);
2220                        }
2221                        Some(SessionCommand::SaveTorrentResumeData { info_hash, reply }) => {
2222                            let result = self.handle_save_torrent_resume(info_hash).await;
2223                            let _ = reply.send(result);
2224                        }
2225                        Some(SessionCommand::SaveSessionState { reply }) => {
2226                            let result = self.handle_save_session_state().await;
2227                            let _ = reply.send(result);
2228                        }
2229                        Some(SessionCommand::LoadResumeState { reply }) => {
2230                            let result = self.handle_load_resume_state().await;
2231                            let _ = reply.send(result);
2232                        }
2233                        Some(SessionCommand::QueuePosition { info_hash, reply }) => {
2234                            let result = match self.torrents.get(&info_hash) {
2235                                Some(entry) => Ok(entry.queue_position),
2236                                None => Err(crate::Error::TorrentNotFound(info_hash)),
2237                            };
2238                            let _ = reply.send(result);
2239                        }
2240                        Some(SessionCommand::SetQueuePosition { info_hash, pos, reply }) => {
2241                            let result = self.handle_set_queue_position(info_hash, pos);
2242                            let _ = reply.send(result);
2243                        }
2244                        Some(SessionCommand::QueuePositionUp { info_hash, reply }) => {
2245                            let result = self.handle_queue_move(info_hash, crate::queue::move_up);
2246                            let _ = reply.send(result);
2247                        }
2248                        Some(SessionCommand::QueuePositionDown { info_hash, reply }) => {
2249                            let result = self.handle_queue_move(info_hash, crate::queue::move_down);
2250                            let _ = reply.send(result);
2251                        }
2252                        Some(SessionCommand::QueuePositionTop { info_hash, reply }) => {
2253                            let result = self.handle_queue_move(info_hash, crate::queue::move_top);
2254                            let _ = reply.send(result);
2255                        }
2256                        Some(SessionCommand::QueuePositionBottom { info_hash, reply }) => {
2257                            let result = self.handle_queue_move(info_hash, crate::queue::move_bottom);
2258                            let _ = reply.send(result);
2259                        }
2260                        Some(SessionCommand::BanPeer { ip, reply }) => {
2261                            self.ban_manager.write().ban(ip);
2262                            let _ = reply.send(());
2263                        }
2264                        Some(SessionCommand::UnbanPeer { ip, reply }) => {
2265                            let was_banned = self.ban_manager.write().unban(&ip);
2266                            let _ = reply.send(was_banned);
2267                        }
2268                        Some(SessionCommand::BannedPeers { reply }) => {
2269                            let list: Vec<IpAddr> = self.ban_manager.read()
2270                                .banned_list().iter().copied().collect();
2271                            let _ = reply.send(list);
2272                        }
2273                        Some(SessionCommand::SetIpFilter { filter, reply }) => {
2274                            *self.ip_filter.write() = filter;
2275                            let _ = reply.send(());
2276                        }
2277                        Some(SessionCommand::GetIpFilter { reply }) => {
2278                            let filter = self.ip_filter.read().clone();
2279                            let _ = reply.send(filter);
2280                        }
2281                        Some(SessionCommand::GetSettings { reply }) => {
2282                            let _ = reply.send(self.settings.clone());
2283                        }
2284                        Some(SessionCommand::ApplySettings { settings, reply }) => {
2285                            let result = self.handle_apply_settings(*settings);
2286                            let _ = reply.send(result);
2287                        }
2288                        Some(SessionCommand::MoveTorrentStorage { info_hash, new_path, reply }) => {
2289                            let result = self.handle_move_torrent_storage(info_hash, new_path).await;
2290                            let _ = reply.send(result);
2291                        }
2292                        Some(SessionCommand::AddPeers { info_hash, peers, source, reply }) => {
2293                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2294                                entry.handle.add_peers(peers, source).await
2295                            } else {
2296                                Err(crate::Error::TorrentNotFound(info_hash))
2297                            };
2298                            let _ = reply.send(result);
2299                        }
2300                        Some(SessionCommand::OpenFile { info_hash, file_index, reply }) => {
2301                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2302                                entry.handle.open_file(file_index).await
2303                            } else {
2304                                Err(crate::Error::TorrentNotFound(info_hash))
2305                            };
2306                            let _ = reply.send(result);
2307                        }
2308                        Some(SessionCommand::ForceReannounce { info_hash, reply }) => {
2309                            let result = match self.torrents.get(&info_hash) {
2310                                Some(entry) => {
2311                                    entry.handle.force_reannounce().await
2312                                }
2313                                None => Err(crate::Error::TorrentNotFound(info_hash)),
2314                            };
2315                            let _ = reply.send(result);
2316                        }
2317                        Some(SessionCommand::TrackerList { info_hash, reply }) => {
2318                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2319                                entry.handle.tracker_list().await
2320                            } else {
2321                                Err(crate::Error::TorrentNotFound(info_hash))
2322                            };
2323                            let _ = reply.send(result);
2324                        }
2325                        Some(SessionCommand::Scrape { info_hash, reply }) => {
2326                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2327                                entry.handle.scrape().await
2328                            } else {
2329                                Err(crate::Error::TorrentNotFound(info_hash))
2330                            };
2331                            let _ = reply.send(result);
2332                        }
2333                        Some(SessionCommand::SetFilePriority { info_hash, index, priority, reply }) => {
2334                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2335                                entry.handle.set_file_priority(index, priority).await
2336                            } else {
2337                                Err(crate::Error::TorrentNotFound(info_hash))
2338                            };
2339                            let _ = reply.send(result);
2340                        }
2341                        Some(SessionCommand::FilePriorities { info_hash, reply }) => {
2342                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2343                                entry.handle.file_priorities().await
2344                            } else {
2345                                Err(crate::Error::TorrentNotFound(info_hash))
2346                            };
2347                            let _ = reply.send(result);
2348                        }
2349                        Some(SessionCommand::SetDownloadLimit { info_hash, bytes_per_sec, reply }) => {
2350                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2351                                entry.handle.set_download_limit(bytes_per_sec).await
2352                            } else {
2353                                Err(crate::Error::TorrentNotFound(info_hash))
2354                            };
2355                            let _ = reply.send(result);
2356                        }
2357                        Some(SessionCommand::SetUploadLimit { info_hash, bytes_per_sec, reply }) => {
2358                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2359                                entry.handle.set_upload_limit(bytes_per_sec).await
2360                            } else {
2361                                Err(crate::Error::TorrentNotFound(info_hash))
2362                            };
2363                            let _ = reply.send(result);
2364                        }
2365                        Some(SessionCommand::DownloadLimit { info_hash, reply }) => {
2366                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2367                                entry.handle.download_limit().await
2368                            } else {
2369                                Err(crate::Error::TorrentNotFound(info_hash))
2370                            };
2371                            let _ = reply.send(result);
2372                        }
2373                        Some(SessionCommand::UploadLimit { info_hash, reply }) => {
2374                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2375                                entry.handle.upload_limit().await
2376                            } else {
2377                                Err(crate::Error::TorrentNotFound(info_hash))
2378                            };
2379                            let _ = reply.send(result);
2380                        }
2381                        Some(SessionCommand::SetSequentialDownload { info_hash, enabled, reply }) => {
2382                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2383                                entry.handle.set_sequential_download(enabled).await
2384                            } else {
2385                                Err(crate::Error::TorrentNotFound(info_hash))
2386                            };
2387                            let _ = reply.send(result);
2388                        }
2389                        Some(SessionCommand::IsSequentialDownload { info_hash, reply }) => {
2390                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2391                                entry.handle.is_sequential_download().await
2392                            } else {
2393                                Err(crate::Error::TorrentNotFound(info_hash))
2394                            };
2395                            let _ = reply.send(result);
2396                        }
2397                        Some(SessionCommand::SetSuperSeeding { info_hash, enabled, reply }) => {
2398                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2399                                entry.handle.set_super_seeding(enabled).await
2400                            } else {
2401                                Err(crate::Error::TorrentNotFound(info_hash))
2402                            };
2403                            let _ = reply.send(result);
2404                        }
2405                        Some(SessionCommand::IsSuperSeeding { info_hash, reply }) => {
2406                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2407                                entry.handle.is_super_seeding().await
2408                            } else {
2409                                Err(crate::Error::TorrentNotFound(info_hash))
2410                            };
2411                            let _ = reply.send(result);
2412                        }
2413                        Some(SessionCommand::SetSeedMode { info_hash, enabled, reply }) => {
2414                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2415                                entry.handle.set_seed_mode(enabled).await
2416                            } else {
2417                                Err(crate::Error::TorrentNotFound(info_hash))
2418                            };
2419                            let _ = reply.send(result);
2420                        }
2421                        Some(SessionCommand::AddTracker { info_hash, url, reply }) => {
2422                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2423                                entry.handle.add_tracker(url).await
2424                            } else {
2425                                Err(crate::Error::TorrentNotFound(info_hash))
2426                            };
2427                            let _ = reply.send(result);
2428                        }
2429                        Some(SessionCommand::ReplaceTrackers { info_hash, urls, reply }) => {
2430                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2431                                entry.handle.replace_trackers(urls).await
2432                            } else {
2433                                Err(crate::Error::TorrentNotFound(info_hash))
2434                            };
2435                            let _ = reply.send(result);
2436                        }
2437                        Some(SessionCommand::ForceRecheck { info_hash, reply }) => {
2438                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2439                                entry.handle.force_recheck().await
2440                            } else {
2441                                Err(crate::Error::TorrentNotFound(info_hash))
2442                            };
2443                            let _ = reply.send(result);
2444                        }
2445                        Some(SessionCommand::RenameFile { info_hash, file_index, new_name, reply }) => {
2446                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2447                                entry.handle.rename_file(file_index, new_name).await
2448                            } else {
2449                                Err(crate::Error::TorrentNotFound(info_hash))
2450                            };
2451                            let _ = reply.send(result);
2452                        }
2453                        Some(SessionCommand::SetMaxConnections { info_hash, limit, reply }) => {
2454                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2455                                entry.handle.set_max_connections(limit).await
2456                            } else {
2457                                Err(crate::Error::TorrentNotFound(info_hash))
2458                            };
2459                            let _ = reply.send(result);
2460                        }
2461                        Some(SessionCommand::MaxConnections { info_hash, reply }) => {
2462                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2463                                entry.handle.max_connections().await
2464                            } else {
2465                                Err(crate::Error::TorrentNotFound(info_hash))
2466                            };
2467                            let _ = reply.send(result);
2468                        }
2469                        Some(SessionCommand::SetMaxUploads { info_hash, limit, reply }) => {
2470                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2471                                entry.handle.set_max_uploads(limit).await
2472                            } else {
2473                                Err(crate::Error::TorrentNotFound(info_hash))
2474                            };
2475                            let _ = reply.send(result);
2476                        }
2477                        Some(SessionCommand::MaxUploads { info_hash, reply }) => {
2478                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2479                                entry.handle.max_uploads().await
2480                            } else {
2481                                Err(crate::Error::TorrentNotFound(info_hash))
2482                            };
2483                            let _ = reply.send(result);
2484                        }
2485                        Some(SessionCommand::GetPeerInfo { info_hash, reply }) => {
2486                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2487                                entry.handle.get_peer_info().await
2488                            } else {
2489                                Err(crate::Error::TorrentNotFound(info_hash))
2490                            };
2491                            let _ = reply.send(result);
2492                        }
2493                        Some(SessionCommand::GetDownloadQueue { info_hash, reply }) => {
2494                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2495                                entry.handle.get_download_queue().await
2496                            } else {
2497                                Err(crate::Error::TorrentNotFound(info_hash))
2498                            };
2499                            let _ = reply.send(result);
2500                        }
2501                        Some(SessionCommand::HavePiece { info_hash, index, reply }) => {
2502                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2503                                entry.handle.have_piece(index).await
2504                            } else {
2505                                Err(crate::Error::TorrentNotFound(info_hash))
2506                            };
2507                            let _ = reply.send(result);
2508                        }
2509                        Some(SessionCommand::PieceAvailability { info_hash, reply }) => {
2510                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2511                                entry.handle.piece_availability().await
2512                            } else {
2513                                Err(crate::Error::TorrentNotFound(info_hash))
2514                            };
2515                            let _ = reply.send(result);
2516                        }
2517                        Some(SessionCommand::FileProgress { info_hash, reply }) => {
2518                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2519                                entry.handle.file_progress().await
2520                            } else {
2521                                Err(crate::Error::TorrentNotFound(info_hash))
2522                            };
2523                            let _ = reply.send(result);
2524                        }
2525                        Some(SessionCommand::InfoHashesQuery { info_hash, reply }) => {
2526                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2527                                entry.handle.info_hashes().await
2528                            } else {
2529                                Err(crate::Error::TorrentNotFound(info_hash))
2530                            };
2531                            let _ = reply.send(result);
2532                        }
2533                        Some(SessionCommand::TorrentFile { info_hash, reply }) => {
2534                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2535                                entry.handle.torrent_file().await
2536                            } else {
2537                                Err(crate::Error::TorrentNotFound(info_hash))
2538                            };
2539                            let _ = reply.send(result);
2540                        }
2541                        Some(SessionCommand::TorrentFileV2 { info_hash, reply }) => {
2542                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2543                                entry.handle.torrent_file_v2().await
2544                            } else {
2545                                Err(crate::Error::TorrentNotFound(info_hash))
2546                            };
2547                            let _ = reply.send(result);
2548                        }
2549                        Some(SessionCommand::ForceDhtAnnounce { info_hash, reply }) => {
2550                            let result = match self.torrents.get(&info_hash) {
2551                                Some(entry) => {
2552                                    entry.handle.force_dht_announce().await
2553                                }
2554                                None => Err(crate::Error::TorrentNotFound(info_hash)),
2555                            };
2556                            let _ = reply.send(result);
2557                        }
2558                        Some(SessionCommand::ForceLsdAnnounce { info_hash, reply }) => {
2559                            // LSD is session-level: verify the torrent exists, then announce directly.
2560                            let result = match self.torrents.get(&info_hash) {
2561                                Some(entry) if entry.is_private() => {
2562                                    // BEP 27: private torrents must not use LSD
2563                                    Err(crate::Error::InvalidSettings(
2564                                        "LSD disabled for private torrent".into(),
2565                                    ))
2566                                }
2567                                Some(_) => {
2568                                    if let Some(ref lsd) = self.lsd {
2569                                        lsd.announce(vec![info_hash]).await;
2570                                    }
2571                                    Ok(())
2572                                }
2573                                None => Err(crate::Error::TorrentNotFound(info_hash)),
2574                            };
2575                            let _ = reply.send(result);
2576                        }
2577                        Some(SessionCommand::ReadPiece { info_hash, index, reply }) => {
2578                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2579                                entry.handle.read_piece(index).await
2580                            } else {
2581                                Err(crate::Error::TorrentNotFound(info_hash))
2582                            };
2583                            let _ = reply.send(result);
2584                        }
2585                        Some(SessionCommand::FlushCache { info_hash, reply }) => {
2586                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2587                                entry.handle.flush_cache().await
2588                            } else {
2589                                Err(crate::Error::TorrentNotFound(info_hash))
2590                            };
2591                            let _ = reply.send(result);
2592                        }
2593                        Some(SessionCommand::IsValid { info_hash, reply }) => {
2594                            let valid = self.torrents.get(&info_hash)
2595                                .map(|e| e.handle.is_valid())
2596                                .unwrap_or(false);
2597                            let _ = reply.send(valid);
2598                        }
2599                        Some(SessionCommand::ClearError { info_hash, reply }) => {
2600                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2601                                entry.handle.clear_error().await
2602                            } else {
2603                                Err(crate::Error::TorrentNotFound(info_hash))
2604                            };
2605                            let _ = reply.send(result);
2606                        }
2607                        Some(SessionCommand::FileStatus { info_hash, reply }) => {
2608                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2609                                entry.handle.file_status().await
2610                            } else {
2611                                Err(crate::Error::TorrentNotFound(info_hash))
2612                            };
2613                            let _ = reply.send(result);
2614                        }
2615                        Some(SessionCommand::Flags { info_hash, reply }) => {
2616                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2617                                entry.handle.flags().await
2618                            } else {
2619                                Err(crate::Error::TorrentNotFound(info_hash))
2620                            };
2621                            let _ = reply.send(result);
2622                        }
2623                        Some(SessionCommand::SetFlags { info_hash, flags, reply }) => {
2624                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2625                                entry.handle.set_flags(flags).await
2626                            } else {
2627                                Err(crate::Error::TorrentNotFound(info_hash))
2628                            };
2629                            let _ = reply.send(result);
2630                        }
2631                        Some(SessionCommand::UnsetFlags { info_hash, flags, reply }) => {
2632                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2633                                entry.handle.unset_flags(flags).await
2634                            } else {
2635                                Err(crate::Error::TorrentNotFound(info_hash))
2636                            };
2637                            let _ = reply.send(result);
2638                        }
2639                        Some(SessionCommand::ConnectPeer { info_hash, addr, reply }) => {
2640                            let result = if let Some(entry) = self.torrents.get(&info_hash) {
2641                                entry.handle.connect_peer(addr).await
2642                            } else {
2643                                Err(crate::Error::TorrentNotFound(info_hash))
2644                            };
2645                            let _ = reply.send(result);
2646                        }
2647                        Some(SessionCommand::DhtPutImmutable { value, reply }) => {
2648                            let result = self.handle_dht_put_immutable(value).await;
2649                            let _ = reply.send(result);
2650                        }
2651                        Some(SessionCommand::DhtGetImmutable { target, reply }) => {
2652                            let result = self.handle_dht_get_immutable(target).await;
2653                            let _ = reply.send(result);
2654                        }
2655                        Some(SessionCommand::DhtPutMutable { keypair_bytes, value, seq, salt, reply }) => {
2656                            let result = self.handle_dht_put_mutable(keypair_bytes, value, seq, salt).await;
2657                            let _ = reply.send(result);
2658                        }
2659                        Some(SessionCommand::DhtGetMutable { public_key, salt, reply }) => {
2660                            let result = self.handle_dht_get_mutable(public_key, salt).await;
2661                            let _ = reply.send(result);
2662                        }
2663                        Some(SessionCommand::PostSessionStats) => {
2664                            self.fire_stats_alert();
2665                        }
2666                        Some(SessionCommand::SaveResumeState { reply }) => {
2667                            let count = self.save_dirty_resume_files().await;
2668                            let _ = reply.send(Ok(count));
2669                        }
2670                        Some(SessionCommand::Shutdown) | None => {
2671                            self.shutdown_all().await;
2672                            return;
2673                        }
2674                    }
2675                }
2676                result = async {
2677                    match &mut self.lsd_peers_rx {
2678                        Some(rx) => rx.recv().await,
2679                        None => std::future::pending().await,
2680                    }
2681                } => {
2682                    if let Some((info_hash, peer_addr)) = result
2683                        && let Some(entry) = self.torrents.get(&info_hash)
2684                        && !entry.is_private()  // BEP 27: reject LSD peers for private torrents
2685                    {
2686                        let _ = entry.handle.add_peers(vec![peer_addr], crate::peer_state::PeerSource::Lsd).await;
2687                    }
2688                }
2689                // Pre-validated inbound connections from ListenerTask (M114)
2690                Some(conn) = self.validated_conn_rx.recv() => {
2691                    self.handle_identified_inbound(conn);
2692                }
2693                // SSL inbound connections (M42)
2694                result = async {
2695                    if let Some(ref mut listener) = self.ssl_listener {
2696                        listener.accept().await
2697                    } else {
2698                        std::future::pending().await
2699                    }
2700                } => {
2701                    if let Ok((stream, addr)) = result {
2702                        self.handle_ssl_incoming(stream, addr).await;
2703                    }
2704                }
2705                // Global rate limiter refill (100ms)
2706                _ = refill_interval.tick() => {
2707                    let elapsed = std::time::Duration::from_millis(100);
2708                    self.global_upload_bucket.lock().refill(elapsed);
2709                    self.global_download_bucket.lock().refill(elapsed);
2710                }
2711                // Auto-manage queue evaluation
2712                _ = auto_manage_interval.tick() => {
2713                    self.evaluate_queue().await;
2714                }
2715                // NAT port mapping events
2716                event = recv_nat_event(&mut self.nat_events_rx) => {
2717                    match event {
2718                        irontide_nat::NatEvent::MappingSucceeded { port, protocol } => {
2719                            info!(port, %protocol, "port mapping succeeded");
2720                            post_alert(
2721                                &self.alert_tx,
2722                                &self.alert_mask,
2723                                AlertKind::PortMappingSucceeded { port, protocol },
2724                            );
2725                        }
2726                        irontide_nat::NatEvent::MappingFailed { port, message } => {
2727                            warn!(port, %message, "port mapping failed");
2728                            post_alert(
2729                                &self.alert_tx,
2730                                &self.alert_mask,
2731                                AlertKind::PortMappingFailed { port, message },
2732                            );
2733                        }
2734                        irontide_nat::NatEvent::ExternalIpDiscovered { ip } => {
2735                            info!(%ip, "external IP discovered via NAT traversal");
2736                            self.external_ip = Some(ip);
2737                            // Propagate to all active torrents for BEP 40 peer priority.
2738                            for entry in self.torrents.values() {
2739                                let _ = entry.handle.update_external_ip(ip).await;
2740                            }
2741                            // BEP 42: notify DHT instances of external IP
2742                            if let Some(dht) = &self.dht_v4 {
2743                                let _ = dht.update_external_ip(ip, irontide_dht::IpVoteSource::Nat).await;
2744                            }
2745                            if let Some(dht) = &self.dht_v6 {
2746                                let _ = dht.update_external_ip(ip, irontide_dht::IpVoteSource::Nat).await;
2747                            }
2748                        }
2749                    }
2750                }
2751                // BEP 42: DHT v4 external IP consensus
2752                Some(ip) = recv_dht_ip(&mut self.dht_v4_ip_rx) => {
2753                    info!(%ip, "external IP discovered via DHT v4 (BEP 42)");
2754                    self.external_ip = Some(ip);
2755                    for entry in self.torrents.values() {
2756                        let _ = entry.handle.update_external_ip(ip).await;
2757                    }
2758                }
2759                // BEP 42: DHT v6 external IP consensus
2760                Some(ip) = recv_dht_ip(&mut self.dht_v6_ip_rx) => {
2761                    info!(%ip, "external IP discovered via DHT v6 (BEP 42)");
2762                    self.external_ip = Some(ip);
2763                    for entry in self.torrents.values() {
2764                        let _ = entry.handle.update_external_ip(ip).await;
2765                    }
2766                }
2767                // Periodic session stats (M50)
2768                _ = async {
2769                    match &mut stats_timer {
2770                        Some(t) => t.tick().await,
2771                        None => std::future::pending().await,
2772                    }
2773                } => {
2774                    self.fire_stats_alert();
2775                }
2776                // Periodic sample_infohashes (BEP 51, M111)
2777                _ = async {
2778                    match &mut sample_timer {
2779                        Some(t) => t.tick().await,
2780                        None => std::future::pending().await,
2781                    }
2782                } => {
2783                    self.fire_sample_infohashes().await;
2784                }
2785                // Periodic resume file save (M161)
2786                _ = async {
2787                    match &mut resume_save_interval {
2788                        Some(t) => t.tick().await,
2789                        None => std::future::pending().await,
2790                    }
2791                } => {
2792                    let count = self.save_dirty_resume_files().await;
2793                    if count > 0 {
2794                        info!(count, "periodic resume save completed");
2795                    }
2796                }
2797            }
2798        }
2799    }
2800
2801    /// Return clones of global buckets if they have a non-zero rate, else None.
2802    fn global_buckets_if_limited(&self) -> (Option<SharedBucket>, Option<SharedBucket>) {
2803        let up = if self.settings.upload_rate_limit > 0 {
2804            Some(Arc::clone(&self.global_upload_bucket))
2805        } else {
2806            None
2807        };
2808        let down = if self.settings.download_rate_limit > 0 {
2809            Some(Arc::clone(&self.global_download_bucket))
2810        } else {
2811            None
2812        };
2813        (up, down)
2814    }
2815
2816    fn make_slot_tuner(&self) -> crate::slot_tuner::SlotTuner {
2817        if self.settings.auto_upload_slots {
2818            crate::slot_tuner::SlotTuner::new(
2819                4, // initial slots
2820                self.settings.auto_upload_slots_min,
2821                self.settings.auto_upload_slots_max,
2822            )
2823        } else {
2824            crate::slot_tuner::SlotTuner::disabled(4)
2825        }
2826    }
2827
2828    fn make_torrent_config(&self) -> TorrentConfig {
2829        TorrentConfig::from(&self.settings)
2830    }
2831
2832    /// Returns the next available queue position (one past the max).
2833    fn next_queue_position(&self) -> i32 {
2834        self.torrents
2835            .values()
2836            .filter(|e| e.auto_managed)
2837            .map(|e| e.queue_position)
2838            .max()
2839            .map(|m| m + 1)
2840            .unwrap_or(0)
2841    }
2842
2843    async fn handle_add_torrent(
2844        &mut self,
2845        torrent_meta: irontide_core::TorrentMeta,
2846        storage: Option<Arc<dyn TorrentStorage>>,
2847        download_dir: Option<PathBuf>,
2848    ) -> crate::Result<Id20> {
2849        let version = torrent_meta.version();
2850        let meta_v2 = torrent_meta.as_v2().cloned();
2851
2852        // For v2-only torrents, synthesize a minimal v1 metadata wrapper.
2853        // The session uses info_hash (Id20) as the primary key, so we use
2854        // the SHA-256 truncated to 20 bytes (as per BEP 52 tracker/DHT compat).
2855        let meta = match torrent_meta.as_v1() {
2856            Some(v1) => v1.clone(),
2857            None => {
2858                let v2 = torrent_meta.as_v2().unwrap();
2859                synthesize_v1_from_v2(v2)?
2860            }
2861        };
2862
2863        let info_hash = meta.info_hash;
2864
2865        if self.torrents.contains_key(&info_hash) {
2866            return Err(crate::Error::DuplicateTorrent(info_hash));
2867        }
2868
2869        if self.torrents.len() >= self.settings.max_torrents {
2870            return Err(crate::Error::SessionAtCapacity(self.settings.max_torrents));
2871        }
2872
2873        let mut torrent_config = self.make_torrent_config();
2874        if let Some(dir) = download_dir {
2875            torrent_config.download_dir = dir;
2876        }
2877
2878        // Create or use provided storage, then register with disk manager
2879        let storage: Arc<dyn TorrentStorage> = match storage {
2880            Some(s) => s,
2881            None => {
2882                let lengths = Lengths::new(
2883                    meta.info.total_length(),
2884                    meta.info.piece_length,
2885                    DEFAULT_CHUNK_SIZE,
2886                );
2887                let files = meta.info.files();
2888                let file_paths: Vec<PathBuf> = files
2889                    .iter()
2890                    .map(|f| f.path.iter().collect::<PathBuf>())
2891                    .collect();
2892                let file_lengths: Vec<u64> = files.iter().map(|f| f.length).collect();
2893                let prealloc_mode = torrent_config.preallocate_mode.unwrap_or_else(|| {
2894                    irontide_storage::PreallocateMode::from(
2895                        torrent_config.storage_mode == irontide_core::StorageMode::Full,
2896                    )
2897                });
2898                match irontide_storage::FilesystemStorage::new(
2899                    &torrent_config.download_dir,
2900                    file_paths,
2901                    file_lengths,
2902                    lengths.clone(),
2903                    None,
2904                    prealloc_mode,
2905                    torrent_config.filesystem_direct_io,
2906                ) {
2907                    Ok(s) => Arc::new(s),
2908                    Err(e) => {
2909                        warn!(
2910                            "failed to create filesystem storage: {e}, falling back to memory"
2911                        );
2912                        Arc::new(irontide_storage::MemoryStorage::new(lengths))
2913                    }
2914                }
2915            }
2916        };
2917        let disk_handle = self.disk_manager.register_torrent(info_hash, storage).await;
2918
2919        let (global_up, global_down) = self.global_buckets_if_limited();
2920        let slot_tuner = self.make_slot_tuner();
2921
2922        let handle = TorrentHandle::from_torrent(
2923            meta.clone(),
2924            version,
2925            meta_v2,
2926            disk_handle,
2927            self.disk_manager.clone(),
2928            torrent_config,
2929            self.dht_v4.clone(),
2930            self.dht_v6.clone(),
2931            global_up,
2932            global_down,
2933            slot_tuner,
2934            self.alert_tx.clone(),
2935            Arc::clone(&self.alert_mask),
2936            self.utp_socket.clone(),
2937            self.utp_socket_v6.clone(),
2938            Arc::clone(&self.ban_manager),
2939            Arc::clone(&self.ip_filter),
2940            Arc::clone(&self.plugins),
2941            self.sam_session.clone(),
2942            self.ssl_manager.clone(),
2943            Arc::clone(&self.factory),
2944            Some(Arc::clone(&self.hash_pool)),
2945        )
2946        .await?;
2947
2948        let name = meta.info.name.clone();
2949        self.torrents.insert(
2950            info_hash,
2951            TorrentEntry {
2952                handle,
2953                meta: Some(meta),
2954                queue_position: -1,
2955                auto_managed: true,
2956                started_at: Some(tokio::time::Instant::now()),
2957                prev_downloaded: 0,
2958                prev_uploaded: 0,
2959            },
2960        );
2961        self.info_hash_registry.insert(info_hash, ());
2962
2963        // Assign queue position for auto-managed torrents
2964        let pos = self.next_queue_position();
2965        if let Some(entry) = self.torrents.get_mut(&info_hash)
2966            && entry.auto_managed
2967        {
2968            entry.queue_position = pos;
2969        }
2970
2971        info!(%info_hash, "torrent added to session");
2972        post_alert(
2973            &self.alert_tx,
2974            &self.alert_mask,
2975            AlertKind::TorrentAdded { info_hash, name },
2976        );
2977        // BEP 27: private torrents must not use LSD
2978        let is_private = self
2979            .torrents
2980            .get(&info_hash)
2981            .is_some_and(|e| e.is_private());
2982        if let Some(ref lsd) = self.lsd
2983            && !is_private
2984        {
2985            lsd.announce(vec![info_hash]).await;
2986        }
2987        Ok(info_hash)
2988    }
2989
2990    async fn handle_add_magnet(
2991        &mut self,
2992        magnet: Magnet,
2993        download_dir: Option<PathBuf>,
2994    ) -> crate::Result<Id20> {
2995        let info_hash = magnet.info_hash();
2996        let display_name = magnet.display_name.clone().unwrap_or_default();
2997        if self.torrents.contains_key(&info_hash) {
2998            return Err(crate::Error::DuplicateTorrent(info_hash));
2999        }
3000        if self.torrents.len() >= self.settings.max_torrents {
3001            return Err(crate::Error::SessionAtCapacity(self.settings.max_torrents));
3002        }
3003        let mut config = self.make_torrent_config();
3004        if let Some(dir) = download_dir {
3005            config.download_dir = dir;
3006        }
3007        let (global_up, global_down) = self.global_buckets_if_limited();
3008        let slot_tuner = self.make_slot_tuner();
3009        let handle = TorrentHandle::from_magnet(
3010            magnet,
3011            self.disk_manager.clone(),
3012            config,
3013            self.dht_v4.clone(),
3014            self.dht_v6.clone(),
3015            global_up,
3016            global_down,
3017            slot_tuner,
3018            self.alert_tx.clone(),
3019            Arc::clone(&self.alert_mask),
3020            self.utp_socket.clone(),
3021            self.utp_socket_v6.clone(),
3022            Arc::clone(&self.ban_manager),
3023            Arc::clone(&self.ip_filter),
3024            Arc::clone(&self.plugins),
3025            self.sam_session.clone(),
3026            self.ssl_manager.clone(),
3027            Arc::clone(&self.factory),
3028            Some(Arc::clone(&self.hash_pool)),
3029        )
3030        .await?;
3031        // M147: Spawn background metadata resolver before registering.
3032        // This races against the TorrentActor's own FetchingMetadata phase —
3033        // first to resolve wins.
3034        self.spawn_metadata_resolver(info_hash, &handle);
3035
3036        self.torrents.insert(
3037            info_hash,
3038            TorrentEntry {
3039                handle,
3040                meta: None,
3041                queue_position: -1,
3042                auto_managed: true,
3043                started_at: Some(tokio::time::Instant::now()),
3044                prev_downloaded: 0,
3045                prev_uploaded: 0,
3046            },
3047        );
3048        self.info_hash_registry.insert(info_hash, ());
3049
3050        // Assign queue position for auto-managed torrents
3051        let pos = self.next_queue_position();
3052        if let Some(entry) = self.torrents.get_mut(&info_hash)
3053            && entry.auto_managed
3054        {
3055            entry.queue_position = pos;
3056        }
3057
3058        info!(%info_hash, "magnet torrent added to session");
3059        post_alert(
3060            &self.alert_tx,
3061            &self.alert_mask,
3062            AlertKind::TorrentAdded {
3063                info_hash,
3064                name: display_name,
3065            },
3066        );
3067        // BEP 27: magnet metadata not available yet — we allow this one-time LAN
3068        // announce. Once metadata resolves, all subsequent LSD ops are gated by
3069        // is_private() checks in ForceLsdAnnounce and lsd_peers_rx handlers.
3070        if let Some(ref lsd) = self.lsd {
3071            lsd.announce(vec![info_hash]).await;
3072        }
3073        Ok(info_hash)
3074    }
3075
3076    /// M147: Spawn a background task that pre-resolves magnet metadata via DHT.
3077    ///
3078    /// The resolver connects to peers discovered via DHT `get_peers`, performs
3079    /// BT + BEP 10 extension + BEP 9 ut_metadata exchanges, and sends the
3080    /// assembled metadata back to the TorrentActor via `PreResolvedMetadata`.
3081    /// This races against the TorrentActor's own FetchingMetadata phase.
3082    fn spawn_metadata_resolver(&self, info_hash: Id20, torrent_handle: &TorrentHandle) {
3083        let dht = match self.dht_v4 {
3084            Some(ref dht) => dht.clone(),
3085            None => return, // No DHT = skip background resolution
3086        };
3087        let factory = Arc::clone(&self.factory);
3088        let connect_timeout = std::time::Duration::from_secs(self.settings.peer_connect_timeout);
3089        let handle = torrent_handle.clone();
3090
3091        tokio::spawn(async move {
3092            let peer_rx = match dht.get_peers(info_hash).await {
3093                Ok(rx) => rx,
3094                Err(e) => {
3095                    debug!(
3096                        %info_hash,
3097                        "metadata resolver: failed to start DHT get_peers: {e}"
3098                    );
3099                    return;
3100                }
3101            };
3102
3103            let peer_id = irontide_core::PeerId::generate().0;
3104            match crate::metadata_resolver::resolve_metadata(
3105                info_hash,
3106                peer_id,
3107                peer_rx,
3108                factory,
3109                connect_timeout,
3110                crate::metadata_resolver::DEFAULT_MAX_CONCURRENT,
3111            )
3112            .await
3113            {
3114                Ok((meta, peers)) => {
3115                    let info_bytes = if let Some(b) = meta.info_bytes {
3116                        b.to_vec()
3117                    } else {
3118                        match irontide_bencode::to_bytes(&meta.info) {
3119                            Ok(bytes) => bytes,
3120                            Err(e) => {
3121                                debug!(
3122                                    %info_hash,
3123                                    "metadata resolver: failed to re-encode info dict: {e}"
3124                                );
3125                                return;
3126                            }
3127                        }
3128                    };
3129                    debug!(
3130                        %info_hash,
3131                        num_peers = peers.len(),
3132                        "metadata resolver: pre-resolved metadata, sending to torrent actor"
3133                    );
3134                    handle.send_pre_resolved_metadata(info_bytes, peers);
3135                }
3136                Err(e) => {
3137                    debug!(
3138                        %info_hash,
3139                        "metadata resolver: failed to resolve metadata: {e}"
3140                    );
3141                }
3142            }
3143        });
3144    }
3145
3146    async fn handle_remove_torrent(&mut self, info_hash: Id20) -> crate::Result<()> {
3147        let entry = self
3148            .torrents
3149            .remove(&info_hash)
3150            .ok_or(crate::Error::TorrentNotFound(info_hash))?;
3151        self.info_hash_registry.remove(&info_hash);
3152        let was_auto_managed = entry.auto_managed;
3153        let removed_position = entry.queue_position;
3154        entry.handle.shutdown().await?;
3155        self.disk_manager.unregister_torrent(info_hash).await;
3156
3157        // Shift queue positions for remaining auto-managed torrents
3158        if was_auto_managed && removed_position >= 0 {
3159            let mut entries = self.queue_entries();
3160            let changed = crate::queue::remove_position(&mut entries, removed_position);
3161            self.apply_queue_changes(&changed);
3162        }
3163
3164        // Delete the resume file for this torrent so it is not restored
3165        // on the next startup. Errors are logged but not propagated — the
3166        // torrent is already removed from the in-memory state.
3167        let resume_dir = self.effective_resume_dir();
3168        if let Err(e) = crate::resume_file::delete_resume_file(&resume_dir, &info_hash) {
3169            // NotFound is expected when no resume file was ever written.
3170            if e.kind() != std::io::ErrorKind::NotFound {
3171                warn!(%info_hash, "failed to delete resume file on removal: {e}");
3172            }
3173        }
3174
3175        info!(%info_hash, "torrent removed from session");
3176        post_alert(
3177            &self.alert_tx,
3178            &self.alert_mask,
3179            AlertKind::TorrentRemoved { info_hash },
3180        );
3181        Ok(())
3182    }
3183
3184    async fn handle_pause_torrent(&mut self, info_hash: Id20) -> crate::Result<()> {
3185        let entry = self
3186            .torrents
3187            .get(&info_hash)
3188            .ok_or(crate::Error::TorrentNotFound(info_hash))?;
3189        entry.handle.pause().await
3190    }
3191
3192    async fn handle_resume_torrent(&mut self, info_hash: Id20) -> crate::Result<()> {
3193        let entry = self
3194            .torrents
3195            .get(&info_hash)
3196            .ok_or(crate::Error::TorrentNotFound(info_hash))?;
3197        entry.handle.resume().await
3198    }
3199
3200    async fn handle_move_torrent_storage(
3201        &self,
3202        info_hash: Id20,
3203        new_path: std::path::PathBuf,
3204    ) -> crate::Result<()> {
3205        let entry = self
3206            .torrents
3207            .get(&info_hash)
3208            .ok_or(crate::Error::TorrentNotFound(info_hash))?;
3209        entry.handle.move_storage(new_path).await
3210    }
3211
3212    async fn handle_torrent_stats(&self, info_hash: Id20) -> crate::Result<TorrentStats> {
3213        let entry = self
3214            .torrents
3215            .get(&info_hash)
3216            .ok_or(crate::Error::TorrentNotFound(info_hash))?;
3217        let mut stats = entry.handle.stats().await?;
3218        // Enrich with session-level data that the torrent actor doesn't own.
3219        stats.queue_position = entry.queue_position;
3220        stats.auto_managed = entry.auto_managed;
3221        Ok(stats)
3222    }
3223
3224    fn handle_torrent_info(&self, info_hash: Id20) -> crate::Result<TorrentInfo> {
3225        let entry = self
3226            .torrents
3227            .get(&info_hash)
3228            .ok_or(crate::Error::TorrentNotFound(info_hash))?;
3229
3230        let meta = entry
3231            .meta
3232            .as_ref()
3233            .ok_or(crate::Error::MetadataNotReady(info_hash))?;
3234        let files: Vec<FileInfo> = if let Some(ref file_list) = meta.info.files {
3235            file_list
3236                .iter()
3237                .map(|f| FileInfo {
3238                    path: f.path.iter().collect::<PathBuf>(),
3239                    length: f.length,
3240                })
3241                .collect()
3242        } else {
3243            vec![FileInfo {
3244                path: PathBuf::from(&meta.info.name),
3245                length: meta.info.total_length(),
3246            }]
3247        };
3248
3249        Ok(TorrentInfo {
3250            info_hash,
3251            name: meta.info.name.clone(),
3252            total_length: meta.info.total_length(),
3253            piece_length: meta.info.piece_length,
3254            num_pieces: meta.info.num_pieces() as u32,
3255            files,
3256            private: meta.info.private == Some(1),
3257        })
3258    }
3259
3260    /// Update gauge metrics that come from session-level state.
3261    fn update_session_gauges(&self) {
3262        use crate::stats::*;
3263        let c = &self.counters;
3264        c.set(SES_NUM_TORRENTS, self.torrents.len() as i64);
3265        c.set(SES_ACTIVE_TORRENTS, self.torrents.len() as i64);
3266
3267        // DHT presence (instance count, not routing table size)
3268        let dht_nodes = self.dht_v4.is_some() as i64 + self.dht_v6.is_some() as i64;
3269        c.set(DHT_NODES, dht_nodes);
3270        c.set(DHT_NODES_V4, self.dht_v4.is_some() as i64);
3271        c.set(DHT_NODES_V6, self.dht_v6.is_some() as i64);
3272
3273        // Ban count
3274        let ban_count = self.ban_manager.read().banned_list().len() as i64;
3275        c.set(PEER_NUM_BANNED, ban_count);
3276    }
3277
3278    /// Snapshot counters and fire a SessionStatsAlert.
3279    fn fire_stats_alert(&self) {
3280        self.update_session_gauges();
3281        let values = self.counters.snapshot();
3282        crate::alert::post_alert(
3283            &self.alert_tx,
3284            &self.alert_mask,
3285            crate::alert::AlertKind::SessionStatsAlert { values },
3286        );
3287    }
3288
3289    /// Fire a periodic BEP 51 sample_infohashes query to the DHT (M111).
3290    async fn fire_sample_infohashes(&self) {
3291        let dht = match (&self.dht_v4, &self.dht_v6) {
3292            (Some(d), _) | (_, Some(d)) => d,
3293            _ => return,
3294        };
3295        let mut buf = [0u8; 20];
3296        irontide_core::random_bytes(&mut buf);
3297        let target = Id20::from(buf);
3298        match dht.sample_infohashes(target).await {
3299            Ok(result) => {
3300                post_alert(
3301                    &self.alert_tx,
3302                    &self.alert_mask,
3303                    AlertKind::DhtSampleInfohashes {
3304                        num_samples: result.samples.len(),
3305                        total_estimate: result.num,
3306                    },
3307                );
3308            }
3309            Err(e) => {
3310                debug!("sample_infohashes failed: {e}");
3311            }
3312        }
3313    }
3314
3315    async fn make_session_stats(&self) -> SessionStats {
3316        self.update_session_gauges();
3317
3318        let mut total_downloaded = 0u64;
3319        let mut total_uploaded = 0u64;
3320
3321        for entry in self.torrents.values() {
3322            if let Ok(stats) = entry.handle.stats().await {
3323                total_downloaded += stats.downloaded;
3324                total_uploaded += stats.uploaded;
3325            }
3326        }
3327
3328        SessionStats {
3329            active_torrents: self.torrents.len(),
3330            total_downloaded,
3331            total_uploaded,
3332            dht_nodes: self.dht_v4.is_some() as usize + self.dht_v6.is_some() as usize,
3333        }
3334    }
3335
3336    async fn handle_save_torrent_resume(
3337        &self,
3338        info_hash: Id20,
3339    ) -> crate::Result<irontide_core::FastResumeData> {
3340        let entry = self
3341            .torrents
3342            .get(&info_hash)
3343            .ok_or(crate::Error::TorrentNotFound(info_hash))?;
3344        let mut resume = entry.handle.save_resume_data().await?;
3345        // Patch in queue state from SessionActor's TorrentEntry (the
3346        // TorrentHandle doesn't know about queue position / auto-managed).
3347        resume.queue_position = entry.queue_position as i64;
3348        resume.auto_managed = if entry.auto_managed { 1 } else { 0 };
3349        Ok(resume)
3350    }
3351
3352    async fn handle_save_session_state(&self) -> crate::Result<crate::persistence::SessionState> {
3353        use crate::persistence::SessionState;
3354
3355        let mut torrents = Vec::new();
3356        for (info_hash, entry) in &self.torrents {
3357            match entry.handle.save_resume_data().await {
3358                Ok(rd) => torrents.push(rd),
3359                Err(e) => {
3360                    warn!(%info_hash, "failed to save resume data: {e}");
3361                }
3362            }
3363        }
3364
3365        // Serialize smart ban state (scoped to drop RwLockReadGuard before awaits)
3366        let (banned_peers, peer_strikes) = {
3367            let ban_mgr = self.ban_manager.read();
3368            let banned_peers: Vec<String> = ban_mgr
3369                .banned_list()
3370                .iter()
3371                .map(|ip| ip.to_string())
3372                .collect();
3373            let peer_strikes: Vec<crate::persistence::PeerStrikeEntry> = ban_mgr
3374                .strikes_map()
3375                .iter()
3376                .map(|(ip, &count)| crate::persistence::PeerStrikeEntry {
3377                    ip: ip.to_string(),
3378                    count: count as i64,
3379                })
3380                .collect();
3381            (banned_peers, peer_strikes)
3382        };
3383
3384        let mut dht_entries = Vec::new();
3385        let mut dht_node_id = None;
3386        if let Some(ref dht) = self.dht_v4 {
3387            // Save the (possibly BEP 42-regenerated) node ID for next session
3388            if let Ok(stats) = dht.stats().await {
3389                dht_node_id = Some(stats.node_id.to_hex());
3390            }
3391            for (_id, addr) in dht.get_routing_nodes().await {
3392                dht_entries.push(crate::persistence::DhtNodeEntry {
3393                    host: addr.ip().to_string(),
3394                    port: addr.port() as i64,
3395                });
3396            }
3397        }
3398        if let Some(ref dht) = self.dht_v6 {
3399            for (_id, addr) in dht.get_routing_nodes().await {
3400                dht_entries.push(crate::persistence::DhtNodeEntry {
3401                    host: addr.ip().to_string(),
3402                    port: addr.port() as i64,
3403                });
3404            }
3405        }
3406
3407        Ok(SessionState {
3408            dht_nodes: dht_entries,
3409            dht_node_id,
3410            torrents,
3411            banned_peers,
3412            peer_strikes,
3413        })
3414    }
3415
3416    /// Compute the effective resume data directory from settings.
3417    fn effective_resume_dir(&self) -> PathBuf {
3418        self.settings
3419            .resume_data_dir
3420            .clone()
3421            .unwrap_or_else(crate::resume_file::default_resume_dir)
3422    }
3423
3424    /// Load and restore torrents from per-torrent resume files on disk.
3425    ///
3426    /// Scans the resume directory, deserializes each `.resume` file, reconstructs
3427    /// the torrent metadata or magnet, adds it to the session, and restores the
3428    /// piece bitmap where available.
3429    async fn handle_load_resume_state(&mut self) -> crate::Result<ResumeLoadResult> {
3430        let resume_dir = self.effective_resume_dir();
3431        let paths = crate::resume_file::scan_resume_dir(&resume_dir);
3432
3433        let mut restored = 0usize;
3434        let mut skipped = 0usize;
3435        let mut failed = 0usize;
3436
3437        for path in &paths {
3438            let file_name = path
3439                .file_name()
3440                .and_then(|n| n.to_str())
3441                .unwrap_or("<unknown>");
3442
3443            // Read and deserialize
3444            let bytes = match std::fs::read(path) {
3445                Ok(b) => b,
3446                Err(e) => {
3447                    warn!(file = %file_name, "failed to read resume file: {e}");
3448                    failed = failed.saturating_add(1);
3449                    continue;
3450                }
3451            };
3452
3453            let rd = match crate::resume_file::deserialize_resume(&bytes) {
3454                Ok(rd) => rd,
3455                Err(e) => {
3456                    warn!(file = %file_name, "failed to deserialize resume file: {e}");
3457                    failed = failed.saturating_add(1);
3458                    continue;
3459                }
3460            };
3461
3462            // Try to reconstruct as a resolved torrent (info dict present).
3463            if let Some(meta) = crate::resume_file::reconstruct_torrent_meta(&rd) {
3464                let info_hash = meta.info_hash;
3465                let pieces = rd.pieces.clone();
3466                let torrent_meta = irontide_core::TorrentMeta::V1(meta);
3467
3468                // Restore to the original save_path (per-torrent download dir).
3469                let restore_dir = if rd.save_path.is_empty() {
3470                    None
3471                } else {
3472                    Some(PathBuf::from(&rd.save_path))
3473                };
3474                match self.handle_add_torrent(torrent_meta, None, restore_dir).await {
3475                    Ok(added_hash) => {
3476                        // Restore the piece bitmap if non-empty.
3477                        if !pieces.is_empty()
3478                            && let Some(entry) = self.torrents.get(&added_hash)
3479                            && let Err(e) = entry.handle.restore_resume_bitmap(pieces).await
3480                        {
3481                            warn!(
3482                                %info_hash,
3483                                "failed to restore piece bitmap, torrent will recheck: {e}"
3484                            );
3485                        }
3486                        info!(%info_hash, "restored torrent from resume file");
3487                        restored = restored.saturating_add(1);
3488                    }
3489                    Err(crate::Error::DuplicateTorrent(_)) => {
3490                        debug!(%info_hash, "skipped duplicate torrent from resume");
3491                        skipped = skipped.saturating_add(1);
3492                    }
3493                    Err(e) => {
3494                        warn!(%info_hash, "failed to add restored torrent: {e}");
3495                        failed = failed.saturating_add(1);
3496                    }
3497                }
3498            } else if let Some(magnet) = crate::resume_file::reconstruct_magnet(&rd) {
3499                // Unresolved magnet: re-add as magnet link.
3500                let info_hash = magnet.info_hash();
3501                let restore_dir = if rd.save_path.is_empty() {
3502                    None
3503                } else {
3504                    Some(PathBuf::from(&rd.save_path))
3505                };
3506                match self.handle_add_magnet(magnet, restore_dir).await {
3507                    Ok(_) => {
3508                        info!(%info_hash, "restored magnet from resume file");
3509                        restored = restored.saturating_add(1);
3510                    }
3511                    Err(crate::Error::DuplicateTorrent(_)) => {
3512                        debug!(%info_hash, "skipped duplicate magnet from resume");
3513                        skipped = skipped.saturating_add(1);
3514                    }
3515                    Err(e) => {
3516                        warn!(%info_hash, "failed to add restored magnet: {e}");
3517                        failed = failed.saturating_add(1);
3518                    }
3519                }
3520            } else {
3521                warn!(file = %file_name, "resume file has no valid info dict and no valid info hash");
3522                failed = failed.saturating_add(1);
3523            }
3524        }
3525
3526        info!(restored, skipped, failed, "resume state loaded");
3527        Ok(ResumeLoadResult {
3528            restored,
3529            skipped,
3530            failed,
3531        })
3532    }
3533
3534    /// Save resume files for all torrents with a dirty `need_save_resume` flag.
3535    ///
3536    /// Returns the number of resume files successfully written.
3537    async fn save_dirty_resume_files(&mut self) -> usize {
3538        let resume_dir = self.effective_resume_dir();
3539
3540        if let Err(e) = std::fs::create_dir_all(resume_dir.join("torrents")) {
3541            warn!("failed to create resume dir: {e}");
3542            return 0;
3543        }
3544
3545        let mut saved = 0usize;
3546        // Collect info_hashes first to avoid borrow conflict with `self`.
3547        let info_hashes: Vec<Id20> = self.torrents.keys().copied().collect();
3548
3549        for info_hash in &info_hashes {
3550            let entry = match self.torrents.get(info_hash) {
3551                Some(e) => e,
3552                None => continue,
3553            };
3554
3555            // Check dirty flag via stats
3556            let needs_save = match entry.handle.stats().await {
3557                Ok(stats) => stats.need_save_resume,
3558                Err(_) => continue,
3559            };
3560            if !needs_save {
3561                continue;
3562            }
3563
3564            // Build resume data
3565            let rd = match entry.handle.save_resume_data().await {
3566                Ok(rd) => rd,
3567                Err(e) => {
3568                    warn!(%info_hash, "failed to build resume data: {e}");
3569                    continue;
3570                }
3571            };
3572
3573            // Serialize
3574            let bytes = match crate::resume_file::serialize_resume(&rd) {
3575                Ok(b) => b,
3576                Err(e) => {
3577                    warn!(%info_hash, "failed to serialize resume data: {e}");
3578                    continue;
3579                }
3580            };
3581
3582            // Atomic write
3583            let path = crate::resume_file::resume_file_path(&resume_dir, info_hash);
3584            if let Err(e) = crate::resume_file::atomic_write(&path, &bytes) {
3585                warn!(%info_hash, "failed to write resume file: {e}");
3586                continue;
3587            }
3588
3589            // Reset dirty flag
3590            if let Err(e) = entry.handle.clear_save_resume_flag().await {
3591                warn!(%info_hash, "failed to clear save_resume flag: {e}");
3592            }
3593
3594            saved = saved.saturating_add(1);
3595        }
3596        saved
3597    }
3598
3599    /// Apply new settings at runtime.
3600    ///
3601    /// Validates, updates rate limiters and alert mask immediately.
3602    /// Sub-actor reconfiguration (disk, DHT, NAT) takes effect on next restart.
3603    fn handle_apply_settings(&mut self, new: Settings) -> crate::Result<()> {
3604        new.validate()?;
3605
3606        // Update rate limiters if changed
3607        if new.upload_rate_limit != self.settings.upload_rate_limit {
3608            self.global_upload_bucket
3609                .lock()
3610                .set_rate(new.upload_rate_limit);
3611        }
3612        if new.download_rate_limit != self.settings.download_rate_limit {
3613            self.global_download_bucket
3614                .lock()
3615                .set_rate(new.download_rate_limit);
3616        }
3617
3618        // Update alert mask if changed
3619        if new.alert_mask != self.settings.alert_mask {
3620            self.alert_mask
3621                .store(new.alert_mask.bits(), Ordering::Relaxed);
3622        }
3623
3624        // Store new settings
3625        self.settings = new;
3626
3627        // Fire alert
3628        post_alert(&self.alert_tx, &self.alert_mask, AlertKind::SettingsChanged);
3629
3630        Ok(())
3631    }
3632
3633    /// Build a QueueEntry snapshot from current auto-managed torrents.
3634    fn queue_entries(&self) -> Vec<crate::queue::QueueEntry> {
3635        self.torrents
3636            .iter()
3637            .filter(|(_, e)| e.auto_managed)
3638            .map(|(&hash, e)| crate::queue::QueueEntry {
3639                info_hash: hash,
3640                position: e.queue_position,
3641            })
3642            .collect()
3643    }
3644
3645    fn handle_set_queue_position(&mut self, info_hash: Id20, pos: i32) -> crate::Result<()> {
3646        if !self.torrents.contains_key(&info_hash) {
3647            return Err(crate::Error::TorrentNotFound(info_hash));
3648        }
3649        let mut entries = self.queue_entries();
3650        let changed = crate::queue::set_position(&mut entries, info_hash, pos);
3651        self.apply_queue_changes(&changed);
3652        Ok(())
3653    }
3654
3655    fn handle_queue_move(&mut self, info_hash: Id20, op: QueueMoveFn) -> crate::Result<()> {
3656        if !self.torrents.contains_key(&info_hash) {
3657            return Err(crate::Error::TorrentNotFound(info_hash));
3658        }
3659        let mut entries = self.queue_entries();
3660        let changed = op(&mut entries, info_hash);
3661        self.apply_queue_changes(&changed);
3662        Ok(())
3663    }
3664
3665    /// Apply position changes back to TorrentEntry fields and fire alerts.
3666    fn apply_queue_changes(&mut self, changed: &[(Id20, i32, i32)]) {
3667        for &(hash, old_pos, new_pos) in changed {
3668            if let Some(entry) = self.torrents.get_mut(&hash) {
3669                entry.queue_position = new_pos;
3670            }
3671            crate::alert::post_alert(
3672                &self.alert_tx,
3673                &self.alert_mask,
3674                crate::alert::AlertKind::TorrentQueuePositionChanged {
3675                    info_hash: hash,
3676                    old_pos,
3677                    new_pos,
3678                },
3679            );
3680        }
3681    }
3682
3683    async fn evaluate_queue(&mut self) {
3684        let now = tokio::time::Instant::now();
3685        let startup_duration = std::time::Duration::from_secs(self.settings.auto_manage_startup);
3686        let auto_manage_secs = self.settings.auto_manage_interval.max(1);
3687        let mut candidates = Vec::new();
3688
3689        // Collect info hashes first to avoid borrow issues with async calls
3690        let hashes: Vec<Id20> = self.torrents.keys().copied().collect();
3691
3692        for &info_hash in &hashes {
3693            let (auto_managed, queue_position, started_at, prev_downloaded, prev_uploaded) = {
3694                let entry = match self.torrents.get(&info_hash) {
3695                    Some(e) => e,
3696                    None => continue,
3697                };
3698                if !entry.auto_managed {
3699                    continue;
3700                }
3701                (
3702                    entry.auto_managed,
3703                    entry.queue_position,
3704                    entry.started_at,
3705                    entry.prev_downloaded,
3706                    entry.prev_uploaded,
3707                )
3708            };
3709
3710            let _ = auto_managed; // used above in the guard
3711
3712            // Get current stats (async call — self.torrents is not borrowed here)
3713            let stats = match self.torrents.get(&info_hash) {
3714                Some(entry) => match entry.handle.stats().await {
3715                    Ok(s) => s,
3716                    Err(_) => continue,
3717                },
3718                None => continue,
3719            };
3720
3721            let category = match stats.state {
3722                TorrentState::Downloading
3723                | TorrentState::FetchingMetadata
3724                | TorrentState::Checking => crate::queue::QueueCategory::Downloading,
3725                TorrentState::Seeding | TorrentState::Complete => {
3726                    crate::queue::QueueCategory::Seeding
3727                }
3728                TorrentState::Paused => {
3729                    // Determine category based on completion
3730                    if stats.pieces_have >= stats.pieces_total && stats.pieces_total > 0 {
3731                        crate::queue::QueueCategory::Seeding
3732                    } else {
3733                        crate::queue::QueueCategory::Downloading
3734                    }
3735                }
3736                TorrentState::Stopped | TorrentState::Sharing => continue,
3737            };
3738
3739            let is_active = stats.state != TorrentState::Paused;
3740
3741            // Compute rate from delta since last tick
3742            let download_rate = stats.downloaded.saturating_sub(prev_downloaded) / auto_manage_secs;
3743            let upload_rate = stats.uploaded.saturating_sub(prev_uploaded) / auto_manage_secs;
3744
3745            let past_startup = started_at
3746                .map(|t| now.duration_since(t) > startup_duration)
3747                .unwrap_or(true);
3748
3749            let is_inactive = past_startup
3750                && match category {
3751                    crate::queue::QueueCategory::Downloading => {
3752                        download_rate < self.settings.inactive_down_rate
3753                    }
3754                    crate::queue::QueueCategory::Seeding => {
3755                        upload_rate < self.settings.inactive_up_rate
3756                    }
3757                };
3758
3759            candidates.push(crate::queue::QueueCandidate {
3760                info_hash,
3761                position: queue_position,
3762                category,
3763                is_active,
3764                is_inactive,
3765            });
3766        }
3767
3768        // Update cached stats for next tick's rate calculation
3769        for &hash in &hashes {
3770            if let Some(entry) = self.torrents.get(&hash)
3771                && let Ok(stats) = entry.handle.stats().await
3772                && let Some(entry) = self.torrents.get_mut(&hash)
3773            {
3774                entry.prev_downloaded = stats.downloaded;
3775                entry.prev_uploaded = stats.uploaded;
3776            }
3777        }
3778
3779        let decision = crate::queue::evaluate(
3780            &candidates,
3781            self.settings.active_downloads,
3782            self.settings.active_seeds,
3783            self.settings.active_limit,
3784            self.settings.dont_count_slow_torrents,
3785            self.settings.auto_manage_prefer_seeds,
3786        );
3787
3788        // Apply decisions
3789        for hash in &decision.to_pause {
3790            if let Some(entry) = self.torrents.get(hash) {
3791                let _ = entry.handle.pause().await;
3792            }
3793            post_alert(
3794                &self.alert_tx,
3795                &self.alert_mask,
3796                AlertKind::TorrentAutoManaged {
3797                    info_hash: *hash,
3798                    paused: true,
3799                },
3800            );
3801        }
3802
3803        for hash in &decision.to_resume {
3804            if let Some(entry) = self.torrents.get_mut(hash) {
3805                let _ = entry.handle.resume().await;
3806                entry.started_at = Some(tokio::time::Instant::now());
3807            }
3808            post_alert(
3809                &self.alert_tx,
3810                &self.alert_mask,
3811                AlertKind::TorrentAutoManaged {
3812                    info_hash: *hash,
3813                    paused: false,
3814                },
3815            );
3816        }
3817    }
3818
3819    /// Handle a pre-validated inbound connection from the ListenerTask (M114).
3820    fn handle_identified_inbound(&self, conn: crate::listener::IdentifiedConnection) {
3821        if let Some(entry) = self.torrents.get(&conn.info_hash) {
3822            debug!(%conn.addr, %conn.info_hash, "routing validated inbound peer");
3823            let handle = entry.handle.clone();
3824            tokio::spawn(async move {
3825                let _ = handle.send_incoming_peer(conn.stream, conn.addr).await;
3826            });
3827        } else {
3828            // Race: torrent removed between validation and receipt.
3829            debug!(%conn.addr, %conn.info_hash, "validated peer for removed torrent, dropping");
3830        }
3831    }
3832
3833    /// Handle an incoming SSL/TLS connection (M42).
3834    ///
3835    /// Uses `LazyConfigAcceptor` to peek at the TLS ClientHello and extract
3836    /// the SNI (hex-encoded info hash) to route the connection to the right
3837    /// torrent. The full TLS handshake uses the torrent's CA cert to build
3838    /// the server config.
3839    async fn handle_ssl_incoming(
3840        &mut self,
3841        stream: crate::transport::BoxedStream,
3842        addr: std::net::SocketAddr,
3843    ) {
3844        use tokio_rustls::LazyConfigAcceptor;
3845
3846        let acceptor = LazyConfigAcceptor::new(rustls::server::Acceptor::default(), stream);
3847
3848        let start_handshake = match acceptor.await {
3849            Ok(sh) => sh,
3850            Err(e) => {
3851                debug!(%addr, error = %e, "SSL ClientHello read failed");
3852                return;
3853            }
3854        };
3855
3856        // Extract SNI from ClientHello
3857        let client_hello = start_handshake.client_hello();
3858        let sni = match client_hello.server_name() {
3859            Some(name) => name.to_string(),
3860            None => {
3861                debug!(%addr, "SSL connection missing SNI");
3862                return;
3863            }
3864        };
3865
3866        // SNI is hex-encoded info hash (40 chars for SHA-1)
3867        let info_hash = match Id20::from_hex(&sni) {
3868            Ok(h) => h,
3869            Err(_) => {
3870                debug!(%addr, sni = %sni, "SSL SNI is not a valid info hash");
3871                return;
3872            }
3873        };
3874
3875        // Look up the torrent
3876        let torrent = match self.torrents.get(&info_hash) {
3877            Some(t) => t,
3878            None => {
3879                debug!(%addr, %info_hash, "SSL connection for unknown torrent");
3880                return;
3881            }
3882        };
3883
3884        // Get the SSL CA cert from the torrent's metadata
3885        let ssl_cert = match torrent.meta.as_ref().and_then(|m| m.ssl_cert.as_ref()) {
3886            Some(cert) => cert.clone(),
3887            None => {
3888                debug!(%addr, %info_hash, "SSL connection for non-SSL torrent");
3889                return;
3890            }
3891        };
3892
3893        // Build server config using the torrent's CA cert
3894        let server_config = match self.ssl_manager.as_ref() {
3895            Some(mgr) => match mgr.server_config(&ssl_cert) {
3896                Ok(cfg) => cfg,
3897                Err(e) => {
3898                    warn!(%addr, %info_hash, error = %e, "failed to build SSL server config");
3899                    return;
3900                }
3901            },
3902            None => {
3903                debug!(%addr, "SSL manager not initialized");
3904                return;
3905            }
3906        };
3907
3908        // Complete the TLS handshake
3909        let tls_stream = match start_handshake.into_stream(server_config).await {
3910            Ok(s) => s,
3911            Err(e) => {
3912                warn!(%addr, %info_hash, error = %e, "SSL handshake failed");
3913                post_alert(
3914                    &self.alert_tx,
3915                    &self.alert_mask,
3916                    AlertKind::SslTorrentError {
3917                        info_hash,
3918                        message: format!("inbound TLS handshake from {addr}: {e}"),
3919                    },
3920                );
3921                return;
3922            }
3923        };
3924
3925        // Route to the torrent actor via SpawnSslPeer command
3926        let _ = torrent.handle.spawn_ssl_peer(addr, tls_stream).await;
3927    }
3928
3929    async fn handle_dht_put_immutable(&self, value: Vec<u8>) -> crate::Result<Id20> {
3930        let dht = self.dht_v4.as_ref().ok_or(crate::Error::DhtDisabled)?;
3931        match dht.put_immutable(value.clone()).await {
3932            Ok(target) => {
3933                post_alert(
3934                    &self.alert_tx,
3935                    &self.alert_mask,
3936                    AlertKind::DhtPutComplete { target },
3937                );
3938                Ok(target)
3939            }
3940            Err(e) => {
3941                let target = irontide_core::sha1(&value);
3942                post_alert(
3943                    &self.alert_tx,
3944                    &self.alert_mask,
3945                    AlertKind::DhtItemError {
3946                        target,
3947                        message: e.to_string(),
3948                    },
3949                );
3950                Err(crate::Error::Dht(e))
3951            }
3952        }
3953    }
3954
3955    async fn handle_dht_get_immutable(&self, target: Id20) -> crate::Result<Option<Vec<u8>>> {
3956        let dht = self.dht_v4.as_ref().ok_or(crate::Error::DhtDisabled)?;
3957        match dht.get_immutable(target).await {
3958            Ok(value) => {
3959                post_alert(
3960                    &self.alert_tx,
3961                    &self.alert_mask,
3962                    AlertKind::DhtGetResult {
3963                        target,
3964                        value: value.clone(),
3965                    },
3966                );
3967                Ok(value)
3968            }
3969            Err(e) => {
3970                post_alert(
3971                    &self.alert_tx,
3972                    &self.alert_mask,
3973                    AlertKind::DhtItemError {
3974                        target,
3975                        message: e.to_string(),
3976                    },
3977                );
3978                Err(crate::Error::Dht(e))
3979            }
3980        }
3981    }
3982
3983    async fn handle_dht_put_mutable(
3984        &self,
3985        keypair_bytes: [u8; 32],
3986        value: Vec<u8>,
3987        seq: i64,
3988        salt: Vec<u8>,
3989    ) -> crate::Result<Id20> {
3990        let dht = self.dht_v4.as_ref().ok_or(crate::Error::DhtDisabled)?;
3991        match dht.put_mutable(keypair_bytes, value, seq, salt).await {
3992            Ok(target) => {
3993                post_alert(
3994                    &self.alert_tx,
3995                    &self.alert_mask,
3996                    AlertKind::DhtMutablePutComplete { target, seq },
3997                );
3998                Ok(target)
3999            }
4000            Err(e) => {
4001                post_alert(
4002                    &self.alert_tx,
4003                    &self.alert_mask,
4004                    AlertKind::DhtItemError {
4005                        target: Id20::from([0u8; 20]),
4006                        message: e.to_string(),
4007                    },
4008                );
4009                Err(crate::Error::Dht(e))
4010            }
4011        }
4012    }
4013
4014    async fn handle_dht_get_mutable(
4015        &self,
4016        public_key: [u8; 32],
4017        salt: Vec<u8>,
4018    ) -> crate::Result<Option<(Vec<u8>, i64)>> {
4019        let dht = self.dht_v4.as_ref().ok_or(crate::Error::DhtDisabled)?;
4020        let target = irontide_dht::compute_mutable_target(&public_key, &salt);
4021        match dht.get_mutable(public_key, salt).await {
4022            Ok(result) => {
4023                let (value, seq) = match &result {
4024                    Some((v, s)) => (Some(v.clone()), Some(*s)),
4025                    None => (None, None),
4026                };
4027                post_alert(
4028                    &self.alert_tx,
4029                    &self.alert_mask,
4030                    AlertKind::DhtMutableGetResult {
4031                        target,
4032                        value,
4033                        seq,
4034                        public_key,
4035                    },
4036                );
4037                Ok(result)
4038            }
4039            Err(e) => {
4040                post_alert(
4041                    &self.alert_tx,
4042                    &self.alert_mask,
4043                    AlertKind::DhtItemError {
4044                        target,
4045                        message: e.to_string(),
4046                    },
4047                );
4048                Err(crate::Error::Dht(e))
4049            }
4050        }
4051    }
4052
4053    async fn shutdown_all(&mut self) {
4054        // Save resume files before draining torrents (M161 Phase 5).
4055        let save_count = self.save_dirty_resume_files().await;
4056        if save_count > 0 {
4057            info!(save_count, "saved resume files on shutdown");
4058        }
4059
4060        for (info_hash, entry) in self.torrents.drain() {
4061            debug!(%info_hash, "shutting down torrent");
4062            let _ = entry.handle.shutdown().await;
4063        }
4064        if let Some(ref dht) = self.dht_v4 {
4065            let _ = dht.shutdown().await;
4066        }
4067        if let Some(ref dht) = self.dht_v6 {
4068            let _ = dht.shutdown().await;
4069        }
4070        if let Some(ref nat) = self.nat {
4071            nat.shutdown().await;
4072        }
4073        if let Some(ref lsd) = self.lsd {
4074            lsd.shutdown().await;
4075        }
4076        if let Some(ref socket) = self.utp_socket
4077            && let Err(e) = socket.shutdown().await
4078        {
4079            debug!(error = %e, "uTP socket shutdown error");
4080        }
4081        if let Some(ref socket) = self.utp_socket_v6
4082            && let Err(e) = socket.shutdown().await
4083        {
4084            debug!(error = %e, "uTP v6 socket shutdown error");
4085        }
4086        self.disk_manager.shutdown().await;
4087    }
4088}
4089
4090/// Helper to receive NAT events from an optional receiver.
4091/// Returns `pending` if no receiver is available, so the `select!` branch is skipped.
4092async fn recv_nat_event(
4093    rx: &mut Option<mpsc::Receiver<irontide_nat::NatEvent>>,
4094) -> irontide_nat::NatEvent {
4095    match rx {
4096        Some(r) => match r.recv().await {
4097            Some(event) => event,
4098            None => std::future::pending().await,
4099        },
4100        None => std::future::pending().await,
4101    }
4102}
4103
4104/// Receive from an optional DHT IP consensus channel, pending forever if absent.
4105async fn recv_dht_ip(
4106    rx: &mut Option<mpsc::Receiver<std::net::IpAddr>>,
4107) -> Option<std::net::IpAddr> {
4108    match rx {
4109        Some(r) => r.recv().await,
4110        None => std::future::pending().await,
4111    }
4112}
4113
4114/// Synthesize a minimal `TorrentMetaV1` from a `TorrentMetaV2` for session compatibility.
4115///
4116/// The session engine uses v1 structures internally (info hash as Id20, InfoDict for
4117/// piece hashing, etc.). For v2-only torrents, we create a "virtual" v1 representation
4118/// with the truncated SHA-256 hash as the info_hash.
4119fn synthesize_v1_from_v2(
4120    v2: &irontide_core::TorrentMetaV2,
4121) -> crate::Result<irontide_core::TorrentMetaV1> {
4122    use irontide_core::{FileEntry, InfoDict};
4123
4124    let info_hash = v2.info_hashes.best_v1();
4125
4126    // Build file entries from v2 file tree
4127    let v2_files = v2.info.files();
4128    let file_entries: Vec<FileEntry> = v2_files
4129        .iter()
4130        .map(|f| FileEntry {
4131            length: f.attr.length,
4132            path: f.path.clone(),
4133            attr: None,
4134            mtime: None,
4135            symlink_path: None,
4136        })
4137        .collect();
4138
4139    // v2-only torrents have no v1 piece hashes — use placeholder pieces field.
4140    // Verification is done via v2 Merkle trees, not v1 SHA-1 hashes.
4141    let num_pieces = v2.info.num_pieces() as usize;
4142    let pieces = vec![0u8; num_pieces * 20];
4143
4144    let info = InfoDict {
4145        name: v2.info.name.clone(),
4146        piece_length: v2.info.piece_length,
4147        pieces,
4148        length: if file_entries.len() == 1 {
4149            Some(file_entries[0].length)
4150        } else {
4151            None
4152        },
4153        files: if file_entries.len() > 1 {
4154            Some(file_entries)
4155        } else {
4156            None
4157        },
4158        private: None,
4159        source: None,
4160        ssl_cert: v2.ssl_cert.clone(),
4161        similar: Vec::new(),
4162        collections: Vec::new(),
4163    };
4164
4165    Ok(irontide_core::TorrentMetaV1 {
4166        info_hash,
4167        announce: v2.announce.clone(),
4168        announce_list: v2.announce_list.clone(),
4169        comment: v2.comment.clone(),
4170        created_by: v2.created_by.clone(),
4171        creation_date: v2.creation_date,
4172        info,
4173        info_bytes: None,
4174        url_list: Vec::new(),
4175        httpseeds: Vec::new(),
4176        ssl_cert: v2.ssl_cert.clone(),
4177    })
4178}
4179
4180#[cfg(test)]
4181mod tests {
4182    use super::*;
4183    use crate::types::TorrentState;
4184    use irontide_core::{DEFAULT_CHUNK_SIZE, Lengths, torrent_from_bytes};
4185    use irontide_storage::MemoryStorage;
4186    use std::time::Duration;
4187
4188    fn make_test_torrent(data: &[u8], piece_length: u64) -> TorrentMetaV1 {
4189        use serde::Serialize;
4190
4191        let mut pieces = Vec::new();
4192        let mut offset = 0;
4193        while offset < data.len() {
4194            let end = (offset + piece_length as usize).min(data.len());
4195            let hash = irontide_core::sha1(&data[offset..end]);
4196            pieces.extend_from_slice(hash.as_bytes());
4197            offset = end;
4198        }
4199
4200        #[derive(Serialize)]
4201        struct Info<'a> {
4202            length: u64,
4203            name: &'a str,
4204            #[serde(rename = "piece length")]
4205            piece_length: u64,
4206            #[serde(with = "serde_bytes")]
4207            pieces: &'a [u8],
4208        }
4209
4210        #[derive(Serialize)]
4211        struct Torrent<'a> {
4212            info: Info<'a>,
4213        }
4214
4215        let t = Torrent {
4216            info: Info {
4217                length: data.len() as u64,
4218                name: "test",
4219                piece_length,
4220                pieces: &pieces,
4221            },
4222        };
4223
4224        let bytes = irontide_bencode::to_bytes(&t).unwrap();
4225        torrent_from_bytes(&bytes).unwrap()
4226    }
4227
4228    fn make_storage(data: &[u8], piece_length: u64) -> Arc<MemoryStorage> {
4229        let lengths = Lengths::new(data.len() as u64, piece_length, DEFAULT_CHUNK_SIZE);
4230        Arc::new(MemoryStorage::new(lengths))
4231    }
4232
4233    fn test_settings() -> Settings {
4234        Settings {
4235            listen_port: 0,
4236            download_dir: PathBuf::from("/tmp"),
4237            max_torrents: 10,
4238            enable_dht: false,
4239            enable_pex: false,
4240            enable_lsd: false,
4241            enable_fast_extension: false,
4242            enable_utp: false,
4243            enable_upnp: false,
4244            enable_natpmp: false,
4245            enable_ipv6: false,
4246            alert_channel_size: 64,
4247            disk_io_threads: 2,
4248            storage_mode: irontide_core::StorageMode::Sparse,
4249            disk_cache_size: 1024 * 1024,
4250            ..Settings::default()
4251        }
4252    }
4253
4254    // ---- Test 1: Start and shutdown ----
4255
4256    #[tokio::test]
4257    async fn session_start_and_shutdown() {
4258        let session = SessionHandle::start(test_settings()).await.unwrap();
4259        let stats = session.session_stats().await.unwrap();
4260        assert_eq!(stats.active_torrents, 0);
4261        session.shutdown().await.unwrap();
4262    }
4263
4264    // ---- Test 2: Add and list torrent ----
4265
4266    #[tokio::test]
4267    async fn add_and_list_torrent() {
4268        let session = SessionHandle::start(test_settings()).await.unwrap();
4269        let data = vec![0xAB; 16384];
4270        let meta = make_test_torrent(&data, 16384);
4271        let expected_hash = meta.info_hash;
4272
4273        let storage = make_storage(&data, 16384);
4274        let info_hash = session
4275            .add_torrent(meta.into(), Some(storage))
4276            .await
4277            .unwrap();
4278        assert_eq!(info_hash, expected_hash);
4279
4280        let list = session.list_torrents().await.unwrap();
4281        assert_eq!(list.len(), 1);
4282        assert!(list.contains(&info_hash));
4283
4284        session.shutdown().await.unwrap();
4285    }
4286
4287    // ---- Test 3: Remove torrent ----
4288
4289    #[tokio::test]
4290    async fn remove_torrent() {
4291        let session = SessionHandle::start(test_settings()).await.unwrap();
4292        let data = vec![0xAB; 16384];
4293        let meta = make_test_torrent(&data, 16384);
4294        let storage = make_storage(&data, 16384);
4295
4296        let info_hash = session
4297            .add_torrent(meta.into(), Some(storage))
4298            .await
4299            .unwrap();
4300        session.remove_torrent(info_hash).await.unwrap();
4301
4302        tokio::time::sleep(Duration::from_millis(50)).await;
4303
4304        let list = session.list_torrents().await.unwrap();
4305        assert!(list.is_empty());
4306
4307        session.shutdown().await.unwrap();
4308    }
4309
4310    // ---- Test 4: Duplicate rejection ----
4311
4312    #[tokio::test]
4313    async fn duplicate_torrent_rejected() {
4314        let session = SessionHandle::start(test_settings()).await.unwrap();
4315        let data = vec![0xAB; 16384];
4316        let meta = make_test_torrent(&data, 16384);
4317        let storage1 = make_storage(&data, 16384);
4318        let storage2 = make_storage(&data, 16384);
4319
4320        session
4321            .add_torrent(meta.clone().into(), Some(storage1))
4322            .await
4323            .unwrap();
4324        let result = session.add_torrent(meta.into(), Some(storage2)).await;
4325        assert!(result.is_err());
4326        assert!(result.unwrap_err().to_string().contains("duplicate"));
4327
4328        session.shutdown().await.unwrap();
4329    }
4330
4331    // ---- Test 5: Max capacity ----
4332
4333    #[tokio::test]
4334    async fn session_at_capacity() {
4335        let mut config = test_settings();
4336        config.max_torrents = 1;
4337        let session = SessionHandle::start(config).await.unwrap();
4338
4339        let data1 = vec![0xAA; 16384];
4340        let meta1 = make_test_torrent(&data1, 16384);
4341        let storage1 = make_storage(&data1, 16384);
4342        session
4343            .add_torrent(meta1.into(), Some(storage1))
4344            .await
4345            .unwrap();
4346
4347        let data2 = vec![0xBB; 16384];
4348        let meta2 = make_test_torrent(&data2, 16384);
4349        let storage2 = make_storage(&data2, 16384);
4350        let result = session.add_torrent(meta2.into(), Some(storage2)).await;
4351        assert!(result.is_err());
4352        assert!(result.unwrap_err().to_string().contains("capacity"));
4353
4354        session.shutdown().await.unwrap();
4355    }
4356
4357    // ---- Test 6: Torrent stats ----
4358
4359    #[tokio::test]
4360    async fn torrent_stats_via_session() {
4361        let session = SessionHandle::start(test_settings()).await.unwrap();
4362        let data = vec![0xAB; 32768];
4363        let meta = make_test_torrent(&data, 16384);
4364        let storage = make_storage(&data, 16384);
4365
4366        let info_hash = session
4367            .add_torrent(meta.into(), Some(storage))
4368            .await
4369            .unwrap();
4370        let stats = session.torrent_stats(info_hash).await.unwrap();
4371        assert_eq!(stats.state, TorrentState::Downloading);
4372        assert_eq!(stats.pieces_total, 2);
4373
4374        session.shutdown().await.unwrap();
4375    }
4376
4377    // ---- Test 7: Torrent info ----
4378
4379    #[tokio::test]
4380    async fn torrent_info_via_session() {
4381        let session = SessionHandle::start(test_settings()).await.unwrap();
4382        let data = vec![0xAB; 32768];
4383        let meta = make_test_torrent(&data, 16384);
4384        let storage = make_storage(&data, 16384);
4385
4386        let info_hash = session
4387            .add_torrent(meta.into(), Some(storage))
4388            .await
4389            .unwrap();
4390        let info = session.torrent_info(info_hash).await.unwrap();
4391        assert_eq!(info.info_hash, info_hash);
4392        assert_eq!(info.name, "test");
4393        assert_eq!(info.total_length, 32768);
4394        assert_eq!(info.num_pieces, 2);
4395        assert!(!info.private);
4396        assert_eq!(info.files.len(), 1);
4397        assert_eq!(info.files[0].length, 32768);
4398
4399        session.shutdown().await.unwrap();
4400    }
4401
4402    // ---- Test 8: Pause/resume via session ----
4403
4404    #[tokio::test]
4405    async fn pause_resume_via_session() {
4406        let session = SessionHandle::start(test_settings()).await.unwrap();
4407        let data = vec![0xAB; 16384];
4408        let meta = make_test_torrent(&data, 16384);
4409        let storage = make_storage(&data, 16384);
4410
4411        let info_hash = session
4412            .add_torrent(meta.into(), Some(storage))
4413            .await
4414            .unwrap();
4415
4416        session.pause_torrent(info_hash).await.unwrap();
4417        tokio::time::sleep(Duration::from_millis(50)).await;
4418        let stats = session.torrent_stats(info_hash).await.unwrap();
4419        assert_eq!(stats.state, TorrentState::Paused);
4420
4421        session.resume_torrent(info_hash).await.unwrap();
4422        tokio::time::sleep(Duration::from_millis(50)).await;
4423        let stats = session.torrent_stats(info_hash).await.unwrap();
4424        assert_eq!(stats.state, TorrentState::Downloading);
4425
4426        session.shutdown().await.unwrap();
4427    }
4428
4429    // ---- Test 9: Not-found errors ----
4430
4431    #[tokio::test]
4432    async fn not_found_errors() {
4433        let session = SessionHandle::start(test_settings()).await.unwrap();
4434        let fake_hash = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
4435
4436        assert!(session.torrent_stats(fake_hash).await.is_err());
4437        assert!(session.torrent_info(fake_hash).await.is_err());
4438        assert!(session.pause_torrent(fake_hash).await.is_err());
4439        assert!(session.resume_torrent(fake_hash).await.is_err());
4440        assert!(session.remove_torrent(fake_hash).await.is_err());
4441
4442        session.shutdown().await.unwrap();
4443    }
4444
4445    // ---- Test 10: Session stats ----
4446
4447    #[tokio::test]
4448    async fn session_stats_aggregate() {
4449        let session = SessionHandle::start(test_settings()).await.unwrap();
4450
4451        let data1 = vec![0xAA; 16384];
4452        let meta1 = make_test_torrent(&data1, 16384);
4453        let storage1 = make_storage(&data1, 16384);
4454        session
4455            .add_torrent(meta1.into(), Some(storage1))
4456            .await
4457            .unwrap();
4458
4459        let data2 = vec![0xBB; 16384];
4460        let meta2 = make_test_torrent(&data2, 16384);
4461        let storage2 = make_storage(&data2, 16384);
4462        session
4463            .add_torrent(meta2.into(), Some(storage2))
4464            .await
4465            .unwrap();
4466
4467        let stats = session.session_stats().await.unwrap();
4468        assert_eq!(stats.active_torrents, 2);
4469
4470        session.shutdown().await.unwrap();
4471    }
4472
4473    // ---- Test 11: Add magnet and list ----
4474
4475    #[tokio::test]
4476    async fn add_magnet_and_list() {
4477        use irontide_core::Magnet;
4478
4479        let session = SessionHandle::start(test_settings()).await.unwrap();
4480        let magnet = Magnet {
4481            info_hashes: irontide_core::InfoHashes::v1_only(
4482                Id20::from_hex("aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d").unwrap(),
4483            ),
4484            display_name: Some("test-magnet".into()),
4485            trackers: vec![],
4486            peers: vec![],
4487            selected_files: None,
4488        };
4489        let expected_hash = magnet.info_hash();
4490
4491        let info_hash = session.add_magnet(magnet).await.unwrap();
4492        assert_eq!(info_hash, expected_hash);
4493
4494        let list = session.list_torrents().await.unwrap();
4495        assert_eq!(list.len(), 1);
4496        assert!(list.contains(&info_hash));
4497
4498        // torrent_info should fail with MetadataNotReady
4499        let err = session.torrent_info(info_hash).await.unwrap_err();
4500        assert!(err.to_string().contains("metadata not yet available"));
4501
4502        session.shutdown().await.unwrap();
4503    }
4504
4505    // ---- Test 12: Duplicate magnet rejected ----
4506
4507    #[tokio::test]
4508    async fn add_magnet_duplicate_rejected() {
4509        use irontide_core::Magnet;
4510
4511        let session = SessionHandle::start(test_settings()).await.unwrap();
4512        let magnet = Magnet {
4513            info_hashes: irontide_core::InfoHashes::v1_only(
4514                Id20::from_hex("aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d").unwrap(),
4515            ),
4516            display_name: Some("test-magnet".into()),
4517            trackers: vec![],
4518            peers: vec![],
4519            selected_files: None,
4520        };
4521
4522        session.add_magnet(magnet.clone()).await.unwrap();
4523        let result = session.add_magnet(magnet).await;
4524        assert!(result.is_err());
4525        assert!(result.unwrap_err().to_string().contains("duplicate"));
4526
4527        session.shutdown().await.unwrap();
4528    }
4529
4530    // ---- Test 13: Session with LSD enabled ----
4531
4532    #[tokio::test]
4533    async fn session_with_lsd_enabled() {
4534        use irontide_core::Magnet;
4535
4536        // LSD may fail to bind port 6771 — session should still start
4537        let mut config = test_settings();
4538        config.enable_lsd = true;
4539
4540        let session = SessionHandle::start(config).await.unwrap();
4541
4542        // Add a torrent (triggers LSD announce if available)
4543        let data = vec![0xAB; 16384];
4544        let meta = make_test_torrent(&data, 16384);
4545        let storage = make_storage(&data, 16384);
4546        session
4547            .add_torrent(meta.into(), Some(storage))
4548            .await
4549            .unwrap();
4550
4551        // Add a magnet (also triggers LSD announce)
4552        let magnet = Magnet {
4553            info_hashes: irontide_core::InfoHashes::v1_only(
4554                Id20::from_hex("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap(),
4555            ),
4556            display_name: Some("lsd-test".into()),
4557            trackers: vec![],
4558            peers: vec![],
4559            selected_files: None,
4560        };
4561        session.add_magnet(magnet).await.unwrap();
4562
4563        let list = session.list_torrents().await.unwrap();
4564        assert_eq!(list.len(), 2);
4565
4566        session.shutdown().await.unwrap();
4567    }
4568
4569    // ---- Test: v2-only torrent addition ----
4570
4571    #[tokio::test]
4572    async fn add_v2_only_torrent() {
4573        use irontide_bencode::BencodeValue;
4574        use std::collections::BTreeMap;
4575
4576        let session = SessionHandle::start(test_settings()).await.unwrap();
4577
4578        // Build a minimal v2-only torrent
4579        let mut attr_map: BTreeMap<Vec<u8>, BencodeValue> = BTreeMap::new();
4580        attr_map.insert(b"length".to_vec(), BencodeValue::Integer(16384));
4581        let mut file_node: BTreeMap<Vec<u8>, BencodeValue> = BTreeMap::new();
4582        file_node.insert(b"".to_vec(), BencodeValue::Dict(attr_map));
4583        let mut ft_map: BTreeMap<Vec<u8>, BencodeValue> = BTreeMap::new();
4584        ft_map.insert(b"test.dat".to_vec(), BencodeValue::Dict(file_node));
4585
4586        let mut info_map: BTreeMap<Vec<u8>, BencodeValue> = BTreeMap::new();
4587        info_map.insert(b"file tree".to_vec(), BencodeValue::Dict(ft_map));
4588        info_map.insert(b"meta version".to_vec(), BencodeValue::Integer(2));
4589        info_map.insert(b"name".to_vec(), BencodeValue::Bytes(b"v2test".to_vec()));
4590        info_map.insert(b"piece length".to_vec(), BencodeValue::Integer(16384));
4591
4592        let mut root_map: BTreeMap<Vec<u8>, BencodeValue> = BTreeMap::new();
4593        root_map.insert(b"info".to_vec(), BencodeValue::Dict(info_map));
4594
4595        let bytes = irontide_bencode::to_bytes(&BencodeValue::Dict(root_map)).unwrap();
4596        let meta = irontide_core::torrent_from_bytes_any(&bytes).unwrap();
4597        assert!(meta.is_v2());
4598
4599        // This should NOT return an error now (v2-only is supported)
4600        let info_hash = session.add_torrent(meta, None).await.unwrap();
4601        let list = session.list_torrents().await.unwrap();
4602        assert!(list.contains(&info_hash));
4603
4604        session.shutdown().await.unwrap();
4605    }
4606
4607    // ---- Test 14: Save torrent resume data via session ----
4608
4609    #[tokio::test]
4610    async fn save_torrent_resume_data_via_session() {
4611        let session = SessionHandle::start(test_settings()).await.unwrap();
4612        let data = vec![0xAB; 32768];
4613        let meta = make_test_torrent(&data, 16384);
4614        let info_hash = meta.info_hash;
4615        let storage = make_storage(&data, 16384);
4616        session
4617            .add_torrent(meta.into(), Some(storage))
4618            .await
4619            .unwrap();
4620
4621        let rd = session.save_torrent_resume_data(info_hash).await.unwrap();
4622        assert_eq!(rd.info_hash, info_hash.as_bytes().as_slice());
4623        assert_eq!(rd.name, "test");
4624        assert_eq!(rd.file_format, "libtorrent resume file");
4625        assert_eq!(rd.file_version, 1);
4626        assert!(!rd.pieces.is_empty());
4627        assert_eq!(rd.paused, 0);
4628
4629        session.shutdown().await.unwrap();
4630    }
4631
4632    // ---- Test 15: Save session state captures all torrents ----
4633
4634    #[tokio::test]
4635    async fn save_session_state_captures_all_torrents() {
4636        let session = SessionHandle::start(test_settings()).await.unwrap();
4637
4638        let data1 = vec![0xAA; 16384];
4639        let meta1 = make_test_torrent(&data1, 16384);
4640        let storage1 = make_storage(&data1, 16384);
4641        session
4642            .add_torrent(meta1.into(), Some(storage1))
4643            .await
4644            .unwrap();
4645
4646        let data2 = vec![0xBB; 16384];
4647        let meta2 = make_test_torrent(&data2, 16384);
4648        let storage2 = make_storage(&data2, 16384);
4649        session
4650            .add_torrent(meta2.into(), Some(storage2))
4651            .await
4652            .unwrap();
4653
4654        let state = session.save_session_state().await.unwrap();
4655        assert_eq!(state.torrents.len(), 2);
4656
4657        for rd in &state.torrents {
4658            assert_eq!(rd.file_format, "libtorrent resume file");
4659            assert_eq!(rd.info_hash.len(), 20);
4660        }
4661
4662        session.shutdown().await.unwrap();
4663    }
4664
4665    // ---- Test 16: Save resume data not found ----
4666
4667    #[tokio::test]
4668    async fn save_resume_data_not_found() {
4669        let session = SessionHandle::start(test_settings()).await.unwrap();
4670        let fake_hash = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
4671        let result = session.save_torrent_resume_data(fake_hash).await;
4672        assert!(result.is_err());
4673        assert!(result.unwrap_err().to_string().contains("not found"));
4674        session.shutdown().await.unwrap();
4675    }
4676
4677    // ---- Test 17: Subscribe receives TorrentAdded alert ----
4678
4679    #[tokio::test]
4680    async fn subscribe_receives_torrent_added_alert() {
4681        use crate::alert::AlertKind;
4682
4683        let session = SessionHandle::start(test_settings()).await.unwrap();
4684        let mut alerts = session.subscribe();
4685
4686        let data = vec![0xAB; 16384];
4687        let meta = make_test_torrent(&data, 16384);
4688        let storage = make_storage(&data, 16384);
4689        let _info_hash = session
4690            .add_torrent(meta.into(), Some(storage))
4691            .await
4692            .unwrap();
4693
4694        let alert = tokio::time::timeout(Duration::from_secs(2), alerts.recv())
4695            .await
4696            .unwrap()
4697            .unwrap();
4698        assert!(matches!(alert.kind, AlertKind::TorrentAdded { .. }));
4699        session.shutdown().await.unwrap();
4700    }
4701
4702    // ---- Test 18: Subscribe receives TorrentRemoved alert ----
4703
4704    #[tokio::test]
4705    async fn subscribe_receives_torrent_removed_alert() {
4706        use crate::alert::AlertKind;
4707        use crate::types::TorrentState;
4708
4709        let session = SessionHandle::start(test_settings()).await.unwrap();
4710        let mut alerts = session.subscribe();
4711
4712        let data = vec![0xAB; 16384];
4713        let meta = make_test_torrent(&data, 16384);
4714        let storage = make_storage(&data, 16384);
4715        let info_hash = session
4716            .add_torrent(meta.into(), Some(storage))
4717            .await
4718            .unwrap();
4719
4720        // Drain TorrentAdded and any checking alerts
4721        while let Ok(Ok(a)) = tokio::time::timeout(Duration::from_secs(1), alerts.recv()).await {
4722            if matches!(
4723                a.kind,
4724                AlertKind::StateChanged {
4725                    new_state: TorrentState::Downloading,
4726                    ..
4727                }
4728            ) {
4729                break;
4730            }
4731        }
4732
4733        session.remove_torrent(info_hash).await.unwrap();
4734
4735        // Find TorrentRemoved (skip any interleaved alerts)
4736        loop {
4737            let alert = tokio::time::timeout(Duration::from_secs(2), alerts.recv())
4738                .await
4739                .unwrap()
4740                .unwrap();
4741            if matches!(alert.kind, AlertKind::TorrentRemoved { .. }) {
4742                break;
4743            }
4744        }
4745        session.shutdown().await.unwrap();
4746    }
4747
4748    // ---- Test 19: Multiple subscribers each receive alerts ----
4749
4750    #[tokio::test]
4751    async fn multiple_subscribers_each_receive_alerts() {
4752        use crate::alert::AlertKind;
4753
4754        let session = SessionHandle::start(test_settings()).await.unwrap();
4755        let mut sub1 = session.subscribe();
4756        let mut sub2 = session.subscribe();
4757
4758        let data = vec![0xAB; 16384];
4759        let meta = make_test_torrent(&data, 16384);
4760        let storage = make_storage(&data, 16384);
4761        session
4762            .add_torrent(meta.into(), Some(storage))
4763            .await
4764            .unwrap();
4765
4766        let a1 = tokio::time::timeout(Duration::from_secs(2), sub1.recv())
4767            .await
4768            .unwrap()
4769            .unwrap();
4770        let a2 = tokio::time::timeout(Duration::from_secs(2), sub2.recv())
4771            .await
4772            .unwrap()
4773            .unwrap();
4774
4775        assert!(matches!(a1.kind, AlertKind::TorrentAdded { .. }));
4776        assert!(matches!(a2.kind, AlertKind::TorrentAdded { .. }));
4777        session.shutdown().await.unwrap();
4778    }
4779
4780    // ---- Test 20: set_alert_mask filters at runtime ----
4781
4782    #[tokio::test]
4783    async fn set_alert_mask_filters_at_runtime() {
4784        use crate::alert::{AlertCategory, AlertKind};
4785
4786        let session = SessionHandle::start(test_settings()).await.unwrap();
4787        let mut alerts = session.subscribe();
4788
4789        // Start with ALL — TorrentAdded (STATUS) should arrive
4790        let data = vec![0xAB; 16384];
4791        let meta = make_test_torrent(&data, 16384);
4792        let storage = make_storage(&data, 16384);
4793        session
4794            .add_torrent(meta.into(), Some(storage))
4795            .await
4796            .unwrap();
4797
4798        let alert = tokio::time::timeout(Duration::from_secs(2), alerts.recv())
4799            .await
4800            .unwrap()
4801            .unwrap();
4802        assert!(matches!(alert.kind, AlertKind::TorrentAdded { .. }));
4803
4804        // Drain any remaining alerts from the first torrent (StateChanged, CheckingProgress, etc.)
4805        while tokio::time::timeout(Duration::from_millis(200), alerts.recv())
4806            .await
4807            .is_ok()
4808        {}
4809
4810        // Change mask to empty — no alerts should pass
4811        session.set_alert_mask(AlertCategory::empty());
4812
4813        let data2 = vec![0xBB; 16384];
4814        let meta2 = make_test_torrent(&data2, 16384);
4815        let storage2 = make_storage(&data2, 16384);
4816        session
4817            .add_torrent(meta2.into(), Some(storage2))
4818            .await
4819            .unwrap();
4820
4821        // Give a small window — nothing should arrive
4822        let result = tokio::time::timeout(Duration::from_millis(200), alerts.recv()).await;
4823        assert!(result.is_err(), "should have timed out with empty mask");
4824
4825        // Restore STATUS — adding another torrent should arrive
4826        session.set_alert_mask(AlertCategory::STATUS);
4827
4828        let data3 = vec![0xCC; 16384];
4829        let meta3 = make_test_torrent(&data3, 16384);
4830        let storage3 = make_storage(&data3, 16384);
4831        session
4832            .add_torrent(meta3.into(), Some(storage3))
4833            .await
4834            .unwrap();
4835
4836        let alert = tokio::time::timeout(Duration::from_secs(2), alerts.recv())
4837            .await
4838            .unwrap()
4839            .unwrap();
4840        assert!(matches!(alert.kind, AlertKind::TorrentAdded { .. }));
4841
4842        session.shutdown().await.unwrap();
4843    }
4844
4845    // ---- Test 21: AlertStream filters per subscriber ----
4846
4847    #[tokio::test]
4848    async fn alert_stream_filters_per_subscriber() {
4849        use crate::alert::{AlertCategory, AlertKind};
4850
4851        let session = SessionHandle::start(test_settings()).await.unwrap();
4852
4853        // subscriber A: STATUS only
4854        let mut status_sub = session.subscribe_filtered(AlertCategory::STATUS);
4855        // subscriber B: PEER only
4856        let mut peer_sub = session.subscribe_filtered(AlertCategory::PEER);
4857
4858        let data = vec![0xAB; 16384];
4859        let meta = make_test_torrent(&data, 16384);
4860        let storage = make_storage(&data, 16384);
4861        session
4862            .add_torrent(meta.into(), Some(storage))
4863            .await
4864            .unwrap();
4865
4866        // STATUS sub gets TorrentAdded
4867        let alert = tokio::time::timeout(Duration::from_secs(2), status_sub.recv())
4868            .await
4869            .unwrap()
4870            .unwrap();
4871        assert!(matches!(alert.kind, AlertKind::TorrentAdded { .. }));
4872
4873        // PEER sub should NOT receive TorrentAdded (it's STATUS category)
4874        let result = tokio::time::timeout(Duration::from_millis(200), peer_sub.recv()).await;
4875        assert!(
4876            result.is_err(),
4877            "PEER subscriber should not get STATUS alerts"
4878        );
4879
4880        session.shutdown().await.unwrap();
4881    }
4882
4883    // ---- Test 22: State changed tracks transitions ----
4884
4885    #[tokio::test]
4886    async fn state_changed_tracks_transitions() {
4887        use crate::alert::AlertKind;
4888
4889        let session = SessionHandle::start(test_settings()).await.unwrap();
4890        let mut alerts = session.subscribe();
4891
4892        let data = vec![0xAB; 16384];
4893        let meta = make_test_torrent(&data, 16384);
4894        let storage = make_storage(&data, 16384);
4895        let info_hash = session
4896            .add_torrent(meta.into(), Some(storage))
4897            .await
4898            .unwrap();
4899
4900        // Drain TorrentAdded
4901        let _ = tokio::time::timeout(Duration::from_secs(1), alerts.recv())
4902            .await
4903            .unwrap();
4904
4905        // Pause — should get StateChanged(Downloading → Paused) + TorrentPaused
4906        session.pause_torrent(info_hash).await.unwrap();
4907        tokio::time::sleep(Duration::from_millis(100)).await;
4908
4909        // Collect alerts over a short window
4910        let mut state_changes = Vec::new();
4911        let mut paused_alerts = Vec::new();
4912        loop {
4913            match tokio::time::timeout(Duration::from_millis(200), alerts.recv()).await {
4914                Ok(Ok(a)) => match &a.kind {
4915                    AlertKind::StateChanged {
4916                        prev_state,
4917                        new_state,
4918                        ..
4919                    } => {
4920                        state_changes.push((*prev_state, *new_state));
4921                    }
4922                    AlertKind::TorrentPaused { .. } => {
4923                        paused_alerts.push(a);
4924                    }
4925                    _ => {} // other alerts (PeerConnected etc)
4926                },
4927                _ => break,
4928            }
4929        }
4930
4931        assert!(
4932            state_changes.contains(&(TorrentState::Downloading, TorrentState::Paused)),
4933            "expected Downloading→Paused, got: {state_changes:?}"
4934        );
4935        assert!(!paused_alerts.is_empty(), "expected TorrentPaused alert");
4936
4937        // Resume — should get StateChanged(Paused → Downloading) + TorrentResumed
4938        session.resume_torrent(info_hash).await.unwrap();
4939        tokio::time::sleep(Duration::from_millis(100)).await;
4940
4941        let mut resume_state_changes = Vec::new();
4942        let mut resumed_alerts = Vec::new();
4943        loop {
4944            match tokio::time::timeout(Duration::from_millis(200), alerts.recv()).await {
4945                Ok(Ok(a)) => match &a.kind {
4946                    AlertKind::StateChanged {
4947                        prev_state,
4948                        new_state,
4949                        ..
4950                    } => {
4951                        resume_state_changes.push((*prev_state, *new_state));
4952                    }
4953                    AlertKind::TorrentResumed { .. } => {
4954                        resumed_alerts.push(a);
4955                    }
4956                    _ => {}
4957                },
4958                _ => break,
4959            }
4960        }
4961
4962        assert!(
4963            resume_state_changes.contains(&(TorrentState::Paused, TorrentState::Downloading)),
4964            "expected Paused→Downloading, got: {resume_state_changes:?}"
4965        );
4966        assert!(!resumed_alerts.is_empty(), "expected TorrentResumed alert");
4967
4968        session.shutdown().await.unwrap();
4969    }
4970
4971    #[tokio::test]
4972    async fn session_config_creates_utp_socket() {
4973        // Start session with uTP enabled — should succeed without errors
4974        let mut config = test_settings();
4975        config.enable_utp = true;
4976        let session = SessionHandle::start(config).await.unwrap();
4977        let stats = session.session_stats().await.unwrap();
4978        assert_eq!(stats.active_torrents, 0);
4979        session.shutdown().await.unwrap();
4980    }
4981
4982    #[test]
4983    fn settings_nat_defaults() {
4984        let s = Settings::default();
4985        assert!(s.enable_upnp, "enable_upnp should default to true");
4986        assert!(s.enable_natpmp, "enable_natpmp should default to true");
4987    }
4988
4989    #[tokio::test]
4990    async fn session_with_nat_disabled() {
4991        let config = test_settings();
4992        // test_session_config already sets enable_upnp: false, enable_natpmp: false
4993        assert!(!config.enable_upnp);
4994        assert!(!config.enable_natpmp);
4995        let session = SessionHandle::start(config).await.unwrap();
4996        let stats = session.session_stats().await.unwrap();
4997        assert_eq!(stats.active_torrents, 0);
4998        session.shutdown().await.unwrap();
4999    }
5000
5001    // ---- M29: Anonymous mode, force proxy, proxy config tests ----
5002
5003    #[test]
5004    fn anonymous_mode_disables_discovery() {
5005        let mut config = test_settings();
5006        config.anonymous_mode = true;
5007        config.enable_dht = true;
5008        config.enable_lsd = true;
5009        config.enable_upnp = true;
5010        config.enable_natpmp = true;
5011
5012        // SessionHandle::start() will override these when anonymous_mode is true.
5013        // We test the enforcement logic directly here.
5014        if config.anonymous_mode {
5015            config.enable_dht = false;
5016            config.enable_lsd = false;
5017            config.enable_upnp = false;
5018            config.enable_natpmp = false;
5019        }
5020
5021        assert!(!config.enable_dht);
5022        assert!(!config.enable_lsd);
5023        assert!(!config.enable_upnp);
5024        assert!(!config.enable_natpmp);
5025    }
5026
5027    #[tokio::test]
5028    async fn anonymous_mode_session_starts_with_discovery_disabled() {
5029        let mut config = test_settings();
5030        config.anonymous_mode = true;
5031        // Even if we enable these, anonymous_mode should override
5032        config.enable_dht = true;
5033        config.enable_lsd = true;
5034
5035        let session = SessionHandle::start(config).await.unwrap();
5036        let stats = session.session_stats().await.unwrap();
5037        assert_eq!(stats.active_torrents, 0);
5038        session.shutdown().await.unwrap();
5039    }
5040
5041    #[test]
5042    fn force_proxy_requires_proxy_configured() {
5043        let mut config = test_settings();
5044        config.force_proxy = true;
5045        config.proxy = crate::proxy::ProxyConfig::default(); // no proxy
5046
5047        // Validate the config error
5048        assert_eq!(config.proxy.proxy_type, crate::proxy::ProxyType::None);
5049        assert!(config.force_proxy);
5050        // This would error in SessionHandle::start()
5051    }
5052
5053    #[tokio::test]
5054    async fn force_proxy_errors_without_proxy() {
5055        let mut config = test_settings();
5056        config.force_proxy = true;
5057        // proxy_type is None by default
5058
5059        let result = SessionHandle::start(config).await;
5060        assert!(result.is_err());
5061        match result {
5062            Err(e) => assert!(
5063                e.to_string().contains("force_proxy"),
5064                "error should mention force_proxy: {e}"
5065            ),
5066            Ok(_) => panic!("expected error"),
5067        }
5068    }
5069
5070    #[test]
5071    fn force_proxy_disables_features() {
5072        let mut config = test_settings();
5073        config.force_proxy = true;
5074        config.proxy = crate::proxy::ProxyConfig {
5075            proxy_type: crate::proxy::ProxyType::Socks5,
5076            hostname: "proxy.example.com".into(),
5077            port: 1080,
5078            ..Default::default()
5079        };
5080        config.enable_dht = true;
5081        config.enable_lsd = true;
5082        config.enable_upnp = true;
5083        config.enable_natpmp = true;
5084
5085        // Simulate the enforcement from start()
5086        if config.force_proxy {
5087            config.enable_upnp = false;
5088            config.enable_natpmp = false;
5089            config.enable_dht = false;
5090            config.enable_lsd = false;
5091        }
5092
5093        assert!(!config.enable_dht);
5094        assert!(!config.enable_lsd);
5095        assert!(!config.enable_upnp);
5096        assert!(!config.enable_natpmp);
5097    }
5098
5099    #[test]
5100    fn proxy_config_round_trip() {
5101        let s = Settings {
5102            proxy: crate::proxy::ProxyConfig {
5103                proxy_type: crate::proxy::ProxyType::Socks5Password,
5104                hostname: "localhost".into(),
5105                port: 9050,
5106                username: Some("user".into()),
5107                password: Some("pass".into()),
5108                ..Default::default()
5109            },
5110            force_proxy: true,
5111            anonymous_mode: true,
5112            ..test_settings()
5113        };
5114
5115        assert_eq!(s.proxy.proxy_type, crate::proxy::ProxyType::Socks5Password);
5116        assert_eq!(s.proxy.hostname, "localhost");
5117        assert_eq!(s.proxy.port, 9050);
5118        assert!(s.force_proxy);
5119        assert!(s.anonymous_mode);
5120        assert_eq!(s.proxy.to_url(), "socks5://user:pass@localhost:9050");
5121    }
5122
5123    #[tokio::test]
5124    async fn apply_settings_runtime() {
5125        let session = SessionHandle::start(test_settings()).await.unwrap();
5126        let original = session.settings().await.unwrap();
5127        assert_eq!(original.max_torrents, 10);
5128
5129        let mut new = original.clone();
5130        new.max_torrents = 200;
5131        new.upload_rate_limit = 1_000_000;
5132        session.apply_settings(new).await.unwrap();
5133
5134        let updated = session.settings().await.unwrap();
5135        assert_eq!(updated.max_torrents, 200);
5136        assert_eq!(updated.upload_rate_limit, 1_000_000);
5137
5138        session.shutdown().await.unwrap();
5139    }
5140
5141    #[tokio::test]
5142    async fn apply_settings_validation_error() {
5143        let session = SessionHandle::start(test_settings()).await.unwrap();
5144
5145        // force_proxy=true without a proxy configured should fail validation
5146        let mut bad = Settings::default();
5147        bad.force_proxy = true;
5148        let result = session.apply_settings(bad).await;
5149        assert!(result.is_err());
5150
5151        // Original settings should be unchanged
5152        let current = session.settings().await.unwrap();
5153        assert!(!current.force_proxy);
5154
5155        session.shutdown().await.unwrap();
5156    }
5157
5158    // ---- M50: Session stats counters tests ----
5159
5160    #[tokio::test]
5161    async fn session_stats_counters_accessible() {
5162        let session = SessionHandle::start(test_settings()).await.unwrap();
5163        let counters = session.counters();
5164        // Uptime should be >= 0 from the moment of creation
5165        assert!(counters.uptime_secs() >= 0);
5166        assert_eq!(counters.len(), crate::stats::NUM_METRICS);
5167        session.shutdown().await.unwrap();
5168    }
5169
5170    #[tokio::test]
5171    async fn post_session_stats_fires_alert() {
5172        use crate::alert::{AlertCategory, AlertKind};
5173
5174        let session = SessionHandle::start(test_settings()).await.unwrap();
5175        let mut stats_sub = session.subscribe_filtered(AlertCategory::STATS);
5176
5177        session.post_session_stats().await.unwrap();
5178
5179        let alert = tokio::time::timeout(Duration::from_secs(2), stats_sub.recv())
5180            .await
5181            .expect("timed out waiting for SessionStatsAlert")
5182            .expect("recv error");
5183        assert!(
5184            matches!(alert.kind, AlertKind::SessionStatsAlert { ref values } if values.len() == crate::stats::NUM_METRICS),
5185            "expected SessionStatsAlert with {} values, got {:?}",
5186            crate::stats::NUM_METRICS,
5187            alert.kind,
5188        );
5189        session.shutdown().await.unwrap();
5190    }
5191
5192    #[tokio::test]
5193    async fn session_stats_include_torrent_count() {
5194        use crate::alert::{AlertCategory, AlertKind};
5195
5196        let session = SessionHandle::start(test_settings()).await.unwrap();
5197        let mut stats_sub = session.subscribe_filtered(AlertCategory::STATS);
5198
5199        // Add a torrent
5200        let data = vec![0xAB; 16384];
5201        let meta = make_test_torrent(&data, 16384);
5202        let storage = make_storage(&data, 16384);
5203        session
5204            .add_torrent(meta.into(), Some(storage))
5205            .await
5206            .unwrap();
5207
5208        session.post_session_stats().await.unwrap();
5209
5210        let alert = tokio::time::timeout(Duration::from_secs(2), stats_sub.recv())
5211            .await
5212            .expect("timed out waiting for SessionStatsAlert")
5213            .expect("recv error");
5214        match alert.kind {
5215            AlertKind::SessionStatsAlert { values } => {
5216                assert!(
5217                    values[crate::stats::SES_NUM_TORRENTS] > 0,
5218                    "SES_NUM_TORRENTS should be > 0 after adding a torrent, got {}",
5219                    values[crate::stats::SES_NUM_TORRENTS],
5220                );
5221            }
5222            other => panic!("expected SessionStatsAlert, got {other:?}"),
5223        }
5224        session.shutdown().await.unwrap();
5225    }
5226
5227    #[tokio::test]
5228    async fn stats_timer_disabled_when_zero() {
5229        use crate::alert::{AlertCategory, AlertKind};
5230
5231        let mut config = test_settings();
5232        config.stats_report_interval = 0;
5233        let session = SessionHandle::start(config).await.unwrap();
5234        let mut stats_sub = session.subscribe_filtered(AlertCategory::STATS);
5235
5236        // Wait 200ms — no periodic stats alert should arrive
5237        let result = tokio::time::timeout(Duration::from_millis(200), stats_sub.recv()).await;
5238        assert!(
5239            result.is_err(),
5240            "no SessionStatsAlert should fire when stats_report_interval is 0"
5241        );
5242        session.shutdown().await.unwrap();
5243    }
5244
5245    #[tokio::test]
5246    async fn sample_infohashes_timer_disabled_when_zero() {
5247        use crate::alert::{AlertCategory, AlertKind};
5248
5249        let mut config = test_settings();
5250        config.dht_sample_infohashes_interval = 0;
5251        let session = SessionHandle::start(config).await.unwrap();
5252        let mut dht_sub = session.subscribe_filtered(AlertCategory::DHT);
5253
5254        // Wait 200ms — no DhtSampleInfohashes alert should arrive
5255        let result = tokio::time::timeout(Duration::from_millis(200), dht_sub.recv()).await;
5256        assert!(
5257            result.is_err(),
5258            "no DhtSampleInfohashes alert should fire when interval is 0"
5259        );
5260        session.shutdown().await.unwrap();
5261    }
5262
5263    // ---- Test: open_file returns TorrentNotFound for unknown hash ----
5264
5265    #[tokio::test]
5266    async fn open_file_not_found() {
5267        let session = SessionHandle::start(test_settings()).await.unwrap();
5268        let fake_hash = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
5269        let result = session.open_file(fake_hash, 0).await;
5270        assert!(result.is_err());
5271        let err = result.err().unwrap();
5272        assert!(err.to_string().contains("not found"));
5273        session.shutdown().await.unwrap();
5274    }
5275
5276    // ---- Test: open_file on a real torrent routes to TorrentHandle ----
5277
5278    #[tokio::test]
5279    async fn open_file_routes_to_torrent() {
5280        let session = SessionHandle::start(test_settings()).await.unwrap();
5281        let data = vec![0xAB; 32768];
5282        let meta = make_test_torrent(&data, 16384);
5283        let storage = make_storage(&data, 16384);
5284
5285        let info_hash = session
5286            .add_torrent(meta.into(), Some(storage))
5287            .await
5288            .unwrap();
5289
5290        // open_file should succeed for file_index 0 (single-file torrent)
5291        let stream = session.open_file(info_hash, 0).await;
5292        assert!(stream.is_ok(), "open_file should succeed for file_index 0");
5293
5294        // open_file should fail for out-of-range file_index
5295        let result = session.open_file(info_hash, 999).await;
5296        assert!(
5297            result.is_err(),
5298            "open_file should fail for invalid file_index"
5299        );
5300
5301        session.shutdown().await.unwrap();
5302    }
5303
5304    // ---- Test: force_reannounce via session ----
5305
5306    #[tokio::test]
5307    async fn session_force_reannounce() {
5308        let session = SessionHandle::start(test_settings()).await.unwrap();
5309        let data = vec![0xAB; 16384];
5310        let meta = make_test_torrent(&data, 16384);
5311        let storage = make_storage(&data, 16384);
5312        let info_hash = session
5313            .add_torrent(meta.into(), Some(storage))
5314            .await
5315            .unwrap();
5316
5317        // Should succeed for a known torrent.
5318        let result = session.force_reannounce(info_hash).await;
5319        assert!(
5320            result.is_ok(),
5321            "force_reannounce should succeed: {result:?}"
5322        );
5323
5324        // Should fail for unknown torrent.
5325        let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
5326        assert!(session.force_reannounce(fake).await.is_err());
5327
5328        session.shutdown().await.unwrap();
5329    }
5330
5331    // ---- Test: tracker_list via session ----
5332
5333    #[tokio::test]
5334    async fn session_tracker_list() {
5335        let session = SessionHandle::start(test_settings()).await.unwrap();
5336        let data = vec![0xAB; 16384];
5337        let meta = make_test_torrent(&data, 16384);
5338        let storage = make_storage(&data, 16384);
5339        let info_hash = session
5340            .add_torrent(meta.into(), Some(storage))
5341            .await
5342            .unwrap();
5343
5344        // Should succeed (empty list since test torrent has no announce URL).
5345        let trackers = session.tracker_list(info_hash).await.unwrap();
5346        assert!(trackers.is_empty(), "test torrent has no trackers");
5347
5348        // Should fail for unknown torrent.
5349        let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
5350        assert!(session.tracker_list(fake).await.is_err());
5351
5352        session.shutdown().await.unwrap();
5353    }
5354
5355    // ---- Test: scrape via session ----
5356
5357    #[tokio::test]
5358    async fn session_scrape() {
5359        let session = SessionHandle::start(test_settings()).await.unwrap();
5360        let data = vec![0xAB; 16384];
5361        let meta = make_test_torrent(&data, 16384);
5362        let storage = make_storage(&data, 16384);
5363        let info_hash = session
5364            .add_torrent(meta.into(), Some(storage))
5365            .await
5366            .unwrap();
5367
5368        // Should succeed (None since test torrent has no trackers to scrape).
5369        let scrape = session.scrape(info_hash).await.unwrap();
5370        assert!(scrape.is_none(), "test torrent has no trackers to scrape");
5371
5372        // Should fail for unknown torrent.
5373        let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
5374        assert!(session.scrape(fake).await.is_err());
5375
5376        session.shutdown().await.unwrap();
5377    }
5378
5379    // ---- Test: set_file_priority via session ----
5380
5381    #[tokio::test]
5382    async fn session_set_file_priority() {
5383        let session = SessionHandle::start(test_settings()).await.unwrap();
5384        let data = vec![0xAB; 16384];
5385        let meta = make_test_torrent(&data, 16384);
5386        let storage = make_storage(&data, 16384);
5387        let info_hash = session
5388            .add_torrent(meta.into(), Some(storage))
5389            .await
5390            .unwrap();
5391
5392        // Should succeed for file index 0 (single-file torrent).
5393        let result = session
5394            .set_file_priority(info_hash, 0, irontide_core::FilePriority::Normal)
5395            .await;
5396        assert!(
5397            result.is_ok(),
5398            "set_file_priority should succeed: {result:?}"
5399        );
5400
5401        // Should fail for unknown torrent.
5402        let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
5403        assert!(
5404            session
5405                .set_file_priority(fake, 0, irontide_core::FilePriority::Normal)
5406                .await
5407                .is_err()
5408        );
5409
5410        session.shutdown().await.unwrap();
5411    }
5412
5413    // ---- Test: file_priorities via session ----
5414
5415    #[tokio::test]
5416    async fn session_file_priorities() {
5417        let session = SessionHandle::start(test_settings()).await.unwrap();
5418        let data = vec![0xAB; 16384];
5419        let meta = make_test_torrent(&data, 16384);
5420        let storage = make_storage(&data, 16384);
5421        let info_hash = session
5422            .add_torrent(meta.into(), Some(storage))
5423            .await
5424            .unwrap();
5425
5426        // Should return priorities for the single file.
5427        let priorities = session.file_priorities(info_hash).await.unwrap();
5428        assert_eq!(
5429            priorities.len(),
5430            1,
5431            "single-file torrent should have 1 file priority"
5432        );
5433        assert_eq!(priorities[0], irontide_core::FilePriority::Normal);
5434
5435        // Should fail for unknown torrent.
5436        let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
5437        assert!(session.file_priorities(fake).await.is_err());
5438
5439        session.shutdown().await.unwrap();
5440    }
5441
5442    // ---- Test: set_download_limit zero means unlimited ----
5443
5444    #[tokio::test]
5445    async fn set_download_limit_zero_means_unlimited() {
5446        let session = SessionHandle::start(test_settings()).await.unwrap();
5447        let data = vec![0xAB; 16384];
5448        let meta = make_test_torrent(&data, 16384);
5449        let storage = make_storage(&data, 16384);
5450        let info_hash = session
5451            .add_torrent(meta.into(), Some(storage))
5452            .await
5453            .unwrap();
5454
5455        // Set limit to non-zero, then back to zero (unlimited).
5456        session.set_download_limit(info_hash, 50_000).await.unwrap();
5457        session.set_download_limit(info_hash, 0).await.unwrap();
5458        let limit = session.download_limit(info_hash).await.unwrap();
5459        assert_eq!(limit, 0, "0 means unlimited");
5460
5461        session.shutdown().await.unwrap();
5462    }
5463
5464    // ---- Test: set_upload_limit persists ----
5465
5466    #[tokio::test]
5467    async fn set_upload_limit_persists() {
5468        let session = SessionHandle::start(test_settings()).await.unwrap();
5469        let data = vec![0xAB; 16384];
5470        let meta = make_test_torrent(&data, 16384);
5471        let storage = make_storage(&data, 16384);
5472        let info_hash = session
5473            .add_torrent(meta.into(), Some(storage))
5474            .await
5475            .unwrap();
5476
5477        session.set_upload_limit(info_hash, 100_000).await.unwrap();
5478        let limit = session.upload_limit(info_hash).await.unwrap();
5479        assert_eq!(limit, 100_000);
5480
5481        session.shutdown().await.unwrap();
5482    }
5483
5484    // ---- Test: download_limit default is zero ----
5485
5486    #[tokio::test]
5487    async fn download_limit_default_is_zero() {
5488        let session = SessionHandle::start(test_settings()).await.unwrap();
5489        let data = vec![0xAB; 16384];
5490        let meta = make_test_torrent(&data, 16384);
5491        let storage = make_storage(&data, 16384);
5492        let info_hash = session
5493            .add_torrent(meta.into(), Some(storage))
5494            .await
5495            .unwrap();
5496
5497        // Default config has download_rate_limit = 0.
5498        let limit = session.download_limit(info_hash).await.unwrap();
5499        assert_eq!(limit, 0, "default download limit should be 0 (unlimited)");
5500
5501        session.shutdown().await.unwrap();
5502    }
5503
5504    // ---- Test: rate_limit_round_trip ----
5505
5506    #[tokio::test]
5507    async fn rate_limit_round_trip() {
5508        let session = SessionHandle::start(test_settings()).await.unwrap();
5509        let data = vec![0xAB; 16384];
5510        let meta = make_test_torrent(&data, 16384);
5511        let storage = make_storage(&data, 16384);
5512        let info_hash = session
5513            .add_torrent(meta.into(), Some(storage))
5514            .await
5515            .unwrap();
5516
5517        // Set both limits.
5518        session
5519            .set_download_limit(info_hash, 1_000_000)
5520            .await
5521            .unwrap();
5522        session.set_upload_limit(info_hash, 500_000).await.unwrap();
5523
5524        // Read them back.
5525        let dl = session.download_limit(info_hash).await.unwrap();
5526        let ul = session.upload_limit(info_hash).await.unwrap();
5527        assert_eq!(dl, 1_000_000);
5528        assert_eq!(ul, 500_000);
5529
5530        // Update and verify again.
5531        session
5532            .set_download_limit(info_hash, 2_000_000)
5533            .await
5534            .unwrap();
5535        let dl = session.download_limit(info_hash).await.unwrap();
5536        assert_eq!(dl, 2_000_000);
5537
5538        // Unknown torrent should fail.
5539        let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
5540        assert!(session.download_limit(fake).await.is_err());
5541        assert!(session.upload_limit(fake).await.is_err());
5542        assert!(session.set_download_limit(fake, 100).await.is_err());
5543        assert!(session.set_upload_limit(fake, 100).await.is_err());
5544
5545        session.shutdown().await.unwrap();
5546    }
5547
5548    // ---- Test: sequential_download_toggle ----
5549
5550    #[tokio::test]
5551    async fn sequential_download_toggle() {
5552        let session = SessionHandle::start(test_settings()).await.unwrap();
5553        let data = vec![0xAB; 16384];
5554        let meta = make_test_torrent(&data, 16384);
5555        let storage = make_storage(&data, 16384);
5556        let info_hash = session
5557            .add_torrent(meta.into(), Some(storage))
5558            .await
5559            .unwrap();
5560
5561        // Enable sequential download.
5562        session
5563            .set_sequential_download(info_hash, true)
5564            .await
5565            .unwrap();
5566        assert!(session.is_sequential_download(info_hash).await.unwrap());
5567
5568        // Disable it again.
5569        session
5570            .set_sequential_download(info_hash, false)
5571            .await
5572            .unwrap();
5573        assert!(!session.is_sequential_download(info_hash).await.unwrap());
5574
5575        session.shutdown().await.unwrap();
5576    }
5577
5578    // ---- Test: super_seeding_toggle ----
5579
5580    #[tokio::test]
5581    async fn super_seeding_toggle() {
5582        let session = SessionHandle::start(test_settings()).await.unwrap();
5583        let data = vec![0xAB; 16384];
5584        let meta = make_test_torrent(&data, 16384);
5585        let storage = make_storage(&data, 16384);
5586        let info_hash = session
5587            .add_torrent(meta.into(), Some(storage))
5588            .await
5589            .unwrap();
5590
5591        // Enable super seeding.
5592        session.set_super_seeding(info_hash, true).await.unwrap();
5593        assert!(session.is_super_seeding(info_hash).await.unwrap());
5594
5595        // Disable it again.
5596        session.set_super_seeding(info_hash, false).await.unwrap();
5597        assert!(!session.is_super_seeding(info_hash).await.unwrap());
5598
5599        session.shutdown().await.unwrap();
5600    }
5601
5602    // ---- Test: sequential_download_default_false ----
5603
5604    #[tokio::test]
5605    async fn sequential_download_default_false() {
5606        let session = SessionHandle::start(test_settings()).await.unwrap();
5607        let data = vec![0xAB; 16384];
5608        let meta = make_test_torrent(&data, 16384);
5609        let storage = make_storage(&data, 16384);
5610        let info_hash = session
5611            .add_torrent(meta.into(), Some(storage))
5612            .await
5613            .unwrap();
5614
5615        // Default config has sequential_download = false.
5616        assert!(!session.is_sequential_download(info_hash).await.unwrap());
5617
5618        session.shutdown().await.unwrap();
5619    }
5620
5621    // ---- Test: super_seeding_default_false ----
5622
5623    #[tokio::test]
5624    async fn super_seeding_default_false() {
5625        let session = SessionHandle::start(test_settings()).await.unwrap();
5626        let data = vec![0xAB; 16384];
5627        let meta = make_test_torrent(&data, 16384);
5628        let storage = make_storage(&data, 16384);
5629        let info_hash = session
5630            .add_torrent(meta.into(), Some(storage))
5631            .await
5632            .unwrap();
5633
5634        // Default config has super_seeding = false.
5635        assert!(!session.is_super_seeding(info_hash).await.unwrap());
5636
5637        session.shutdown().await.unwrap();
5638    }
5639
5640    // ---- M159 Test: seed_mode_flips_user_flag ----
5641
5642    #[tokio::test]
5643    async fn seed_mode_flips_user_flag() {
5644        let session = SessionHandle::start(test_settings()).await.unwrap();
5645        let data = vec![0xAB; 16384];
5646        let meta = make_test_torrent(&data, 16384);
5647        let storage = make_storage(&data, 16384);
5648        let info_hash = session
5649            .add_torrent(meta.into(), Some(storage))
5650            .await
5651            .unwrap();
5652
5653        // Initial state: user_seed_mode defaults to false.
5654        let stats_before = session.torrent_stats(info_hash).await.unwrap();
5655        assert!(
5656            !stats_before.user_seed_mode,
5657            "new torrent should not start in user seed mode"
5658        );
5659
5660        // Enable user seed mode.
5661        session.set_seed_mode(info_hash, true).await.unwrap();
5662        let stats_on = session.torrent_stats(info_hash).await.unwrap();
5663        assert!(
5664            stats_on.user_seed_mode,
5665            "stats should reflect user_seed_mode=true after enabling"
5666        );
5667
5668        // Disable it again.
5669        session.set_seed_mode(info_hash, false).await.unwrap();
5670        let stats_off = session.torrent_stats(info_hash).await.unwrap();
5671        assert!(
5672            !stats_off.user_seed_mode,
5673            "stats should reflect user_seed_mode=false after disabling"
5674        );
5675
5676        session.shutdown().await.unwrap();
5677    }
5678
5679    // ---- M159 Test: seed_mode_round_trip ----
5680
5681    #[tokio::test]
5682    async fn seed_mode_round_trip() {
5683        // Five flips in a row, exercising the actor's toggle idempotency and
5684        // piece-reservation cleanup logic. Must not panic and must leave the
5685        // flag consistent with the most recent call.
5686        let session = SessionHandle::start(test_settings()).await.unwrap();
5687        let data = vec![0xAB; 16384];
5688        let meta = make_test_torrent(&data, 16384);
5689        let storage = make_storage(&data, 16384);
5690        let info_hash = session
5691            .add_torrent(meta.into(), Some(storage))
5692            .await
5693            .unwrap();
5694
5695        for (i, enabled) in [true, false, true, true, false].iter().enumerate() {
5696            session.set_seed_mode(info_hash, *enabled).await.unwrap();
5697            let stats = session.torrent_stats(info_hash).await.unwrap();
5698            assert_eq!(
5699                stats.user_seed_mode, *enabled,
5700                "iteration {i}: stats.user_seed_mode should track the toggle"
5701            );
5702        }
5703
5704        session.shutdown().await.unwrap();
5705    }
5706
5707    // ---- M159 Test: seed_mode_missing_info_hash_errors ----
5708
5709    #[tokio::test]
5710    async fn seed_mode_missing_info_hash_errors() {
5711        let session = SessionHandle::start(test_settings()).await.unwrap();
5712        let fake =
5713            irontide_core::Id20::from_hex("ffffffffffffffffffffffffffffffffffffffff").unwrap();
5714        let err = session
5715            .set_seed_mode(fake, true)
5716            .await
5717            .expect_err("set_seed_mode on unknown info hash must return an error");
5718        match err {
5719            crate::Error::TorrentNotFound(h) => assert_eq!(h, fake),
5720            other => panic!("expected TorrentNotFound, got {other:?}"),
5721        }
5722        session.shutdown().await.unwrap();
5723    }
5724
5725    // ---- M159 Test: seed_mode_idempotent ----
5726
5727    #[tokio::test]
5728    async fn seed_mode_idempotent() {
5729        // Setting the same value twice must not panic or corrupt state.
5730        let session = SessionHandle::start(test_settings()).await.unwrap();
5731        let data = vec![0xAB; 16384];
5732        let meta = make_test_torrent(&data, 16384);
5733        let storage = make_storage(&data, 16384);
5734        let info_hash = session
5735            .add_torrent(meta.into(), Some(storage))
5736            .await
5737            .unwrap();
5738
5739        // Set true twice — second call is a no-op.
5740        session.set_seed_mode(info_hash, true).await.unwrap();
5741        session.set_seed_mode(info_hash, true).await.unwrap();
5742        assert!(
5743            session
5744                .torrent_stats(info_hash)
5745                .await
5746                .unwrap()
5747                .user_seed_mode
5748        );
5749
5750        // Set false twice — also a no-op the second time.
5751        session.set_seed_mode(info_hash, false).await.unwrap();
5752        session.set_seed_mode(info_hash, false).await.unwrap();
5753        assert!(
5754            !session
5755                .torrent_stats(info_hash)
5756                .await
5757                .unwrap()
5758                .user_seed_mode
5759        );
5760
5761        session.shutdown().await.unwrap();
5762    }
5763
5764    // ---- Test: add_tracker_increases_count ----
5765
5766    #[tokio::test]
5767    async fn add_tracker_increases_count() {
5768        let session = SessionHandle::start(test_settings()).await.unwrap();
5769        let data = vec![0xAB; 16384];
5770        let meta = make_test_torrent(&data, 16384);
5771        let storage = make_storage(&data, 16384);
5772        let info_hash = session
5773            .add_torrent(meta.into(), Some(storage))
5774            .await
5775            .unwrap();
5776
5777        // Test torrent has no trackers initially.
5778        let before = session.tracker_list(info_hash).await.unwrap();
5779        assert!(before.is_empty());
5780
5781        // Add a tracker.
5782        session
5783            .add_tracker(info_hash, "udp://tracker.example.com:6969/announce".into())
5784            .await
5785            .unwrap();
5786
5787        let after = session.tracker_list(info_hash).await.unwrap();
5788        assert_eq!(after.len(), 1);
5789        assert_eq!(after[0].url, "udp://tracker.example.com:6969/announce");
5790
5791        session.shutdown().await.unwrap();
5792    }
5793
5794    // ---- Test: replace_trackers_replaces_all ----
5795
5796    #[tokio::test]
5797    async fn replace_trackers_replaces_all() {
5798        let session = SessionHandle::start(test_settings()).await.unwrap();
5799        let data = vec![0xAB; 16384];
5800        let meta = make_test_torrent(&data, 16384);
5801        let storage = make_storage(&data, 16384);
5802        let info_hash = session
5803            .add_torrent(meta.into(), Some(storage))
5804            .await
5805            .unwrap();
5806
5807        // Add 2 trackers.
5808        session
5809            .add_tracker(info_hash, "udp://tracker1.example.com:6969/announce".into())
5810            .await
5811            .unwrap();
5812        session
5813            .add_tracker(info_hash, "http://tracker2.example.com/announce".into())
5814            .await
5815            .unwrap();
5816        assert_eq!(session.tracker_list(info_hash).await.unwrap().len(), 2);
5817
5818        // Replace with 1 different tracker.
5819        session
5820            .replace_trackers(
5821                info_hash,
5822                vec!["http://replacement.example.com/announce".into()],
5823            )
5824            .await
5825            .unwrap();
5826
5827        let after = session.tracker_list(info_hash).await.unwrap();
5828        assert_eq!(after.len(), 1);
5829        assert_eq!(after[0].url, "http://replacement.example.com/announce");
5830
5831        session.shutdown().await.unwrap();
5832    }
5833
5834    // ---- Test: add_tracker_deduplicates ----
5835
5836    #[tokio::test]
5837    async fn add_tracker_deduplicates() {
5838        let session = SessionHandle::start(test_settings()).await.unwrap();
5839        let data = vec![0xAB; 16384];
5840        let meta = make_test_torrent(&data, 16384);
5841        let storage = make_storage(&data, 16384);
5842        let info_hash = session
5843            .add_torrent(meta.into(), Some(storage))
5844            .await
5845            .unwrap();
5846
5847        // Add the same tracker URL twice.
5848        session
5849            .add_tracker(info_hash, "udp://tracker.example.com:6969/announce".into())
5850            .await
5851            .unwrap();
5852        session
5853            .add_tracker(info_hash, "udp://tracker.example.com:6969/announce".into())
5854            .await
5855            .unwrap();
5856
5857        // Should only have 1 tracker (deduplicated).
5858        let trackers = session.tracker_list(info_hash).await.unwrap();
5859        assert_eq!(trackers.len(), 1);
5860
5861        session.shutdown().await.unwrap();
5862    }
5863
5864    // ---- Test: info_hashes_matches_added_torrent ----
5865
5866    #[tokio::test]
5867    async fn info_hashes_matches_added_torrent() {
5868        let session = SessionHandle::start(test_settings()).await.unwrap();
5869        let data = vec![0xAB; 16384];
5870        let meta = make_test_torrent(&data, 16384);
5871        let expected_v1 = meta.info_hash;
5872        let storage = make_storage(&data, 16384);
5873
5874        let info_hash = session
5875            .add_torrent(meta.into(), Some(storage))
5876            .await
5877            .unwrap();
5878        let hashes = session.info_hashes(info_hash).await.unwrap();
5879        assert_eq!(hashes.v1, Some(expected_v1));
5880        // v1-only torrent should not have v2 hash
5881        assert!(hashes.v2.is_none());
5882
5883        session.shutdown().await.unwrap();
5884    }
5885
5886    // ---- Test: torrent_file_returns_meta ----
5887
5888    #[tokio::test]
5889    async fn torrent_file_returns_meta() {
5890        let session = SessionHandle::start(test_settings()).await.unwrap();
5891        let data = vec![0xAB; 32768];
5892        let meta = make_test_torrent(&data, 16384);
5893        let storage = make_storage(&data, 16384);
5894
5895        let info_hash = session
5896            .add_torrent(meta.into(), Some(storage))
5897            .await
5898            .unwrap();
5899        let torrent = session.torrent_file(info_hash).await.unwrap();
5900        assert!(torrent.is_some());
5901        let torrent = torrent.unwrap();
5902        assert_eq!(torrent.info_hash, info_hash);
5903        assert_eq!(torrent.info.name, "test");
5904        assert_eq!(torrent.info.total_length(), 32768);
5905
5906        session.shutdown().await.unwrap();
5907    }
5908
5909    // ---- Test: torrent_file_none_before_metadata ----
5910
5911    #[tokio::test]
5912    async fn torrent_file_none_before_metadata() {
5913        let session = SessionHandle::start(test_settings()).await.unwrap();
5914        let magnet = irontide_core::Magnet::parse(
5915            "magnet:?xt=urn:btih:aaf4c61ddcc5e8a2dabede0f3b482cd9aea9434d&dn=test",
5916        )
5917        .unwrap();
5918
5919        let info_hash = session.add_magnet(magnet).await.unwrap();
5920        let torrent = session.torrent_file(info_hash).await.unwrap();
5921        // Before metadata is received, torrent_file should return None.
5922        assert!(torrent.is_none());
5923
5924        session.shutdown().await.unwrap();
5925    }
5926
5927    // ---- Test: force_dht_announce_no_error ----
5928
5929    #[tokio::test]
5930    async fn force_dht_announce_no_error() {
5931        let session = SessionHandle::start(test_settings()).await.unwrap();
5932        let data = vec![0xAB; 16384];
5933        let meta = make_test_torrent(&data, 16384);
5934        let storage = make_storage(&data, 16384);
5935        let info_hash = session
5936            .add_torrent(meta.into(), Some(storage))
5937            .await
5938            .unwrap();
5939
5940        // Should succeed even without DHT enabled (no-op, no error).
5941        let result = session.force_dht_announce(info_hash).await;
5942        assert!(
5943            result.is_ok(),
5944            "force_dht_announce should succeed: {result:?}"
5945        );
5946
5947        // Should fail for unknown torrent.
5948        let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
5949        assert!(session.force_dht_announce(fake).await.is_err());
5950
5951        session.shutdown().await.unwrap();
5952    }
5953
5954    // ---- Test: force_lsd_announce_no_error ----
5955
5956    #[tokio::test]
5957    async fn force_lsd_announce_no_error() {
5958        let session = SessionHandle::start(test_settings()).await.unwrap();
5959        let data = vec![0xAB; 16384];
5960        let meta = make_test_torrent(&data, 16384);
5961        let storage = make_storage(&data, 16384);
5962        let info_hash = session
5963            .add_torrent(meta.into(), Some(storage))
5964            .await
5965            .unwrap();
5966
5967        // Should succeed even without LSD enabled (no-op announce, no error).
5968        let result = session.force_lsd_announce(info_hash).await;
5969        assert!(
5970            result.is_ok(),
5971            "force_lsd_announce should succeed: {result:?}"
5972        );
5973
5974        // Should fail for unknown torrent.
5975        let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
5976        assert!(session.force_lsd_announce(fake).await.is_err());
5977
5978        session.shutdown().await.unwrap();
5979    }
5980
5981    // ---- Test: read_piece_after_download ----
5982
5983    #[tokio::test]
5984    async fn read_piece_after_download() {
5985        let data = vec![0xCD; 32768]; // 2 pieces of 16384
5986        let meta = make_test_torrent(&data, 16384);
5987        let lengths = Lengths::new(data.len() as u64, 16384, DEFAULT_CHUNK_SIZE);
5988        let storage = Arc::new(MemoryStorage::new(lengths));
5989        // Pre-fill storage with the data
5990        storage.write_chunk(0, 0, &data[..16384]).unwrap();
5991        storage.write_chunk(1, 0, &data[16384..]).unwrap();
5992
5993        let session = SessionHandle::start(test_settings()).await.unwrap();
5994        let info_hash = session
5995            .add_torrent(meta.into(), Some(storage))
5996            .await
5997            .unwrap();
5998
5999        // Read piece 0
6000        let piece_data = session.read_piece(info_hash, 0).await.unwrap();
6001        assert_eq!(piece_data.len(), 16384);
6002        assert!(piece_data.iter().all(|&b| b == 0xCD));
6003
6004        // Read piece 1
6005        let piece_data = session.read_piece(info_hash, 1).await.unwrap();
6006        assert_eq!(piece_data.len(), 16384);
6007        assert!(piece_data.iter().all(|&b| b == 0xCD));
6008
6009        // Out-of-range piece should fail
6010        let result = session.read_piece(info_hash, 999).await;
6011        assert!(result.is_err(), "read_piece out of range should fail");
6012
6013        // Unknown torrent should fail
6014        let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
6015        assert!(session.read_piece(fake, 0).await.is_err());
6016
6017        session.shutdown().await.unwrap();
6018    }
6019
6020    // ---- Test: flush_cache_completes ----
6021
6022    #[tokio::test]
6023    async fn flush_cache_completes() {
6024        let session = SessionHandle::start(test_settings()).await.unwrap();
6025        let data = vec![0xAB; 16384];
6026        let meta = make_test_torrent(&data, 16384);
6027        let storage = make_storage(&data, 16384);
6028        let info_hash = session
6029            .add_torrent(meta.into(), Some(storage))
6030            .await
6031            .unwrap();
6032
6033        // flush_cache should succeed.
6034        let result = session.flush_cache(info_hash).await;
6035        assert!(result.is_ok(), "flush_cache should succeed: {result:?}");
6036
6037        // Should fail for unknown torrent.
6038        let fake = Id20::from_hex("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap();
6039        assert!(session.flush_cache(fake).await.is_err());
6040
6041        session.shutdown().await.unwrap();
6042    }
6043
6044    // ---- BEP 44 session API tests ----
6045
6046    fn test_settings_with_dht() -> Settings {
6047        let mut s = test_settings();
6048        s.enable_dht = true;
6049        s
6050    }
6051
6052    #[tokio::test]
6053    async fn test_dht_disabled_returns_error() {
6054        let session = SessionHandle::start(test_settings()).await.unwrap();
6055
6056        // All 4 methods should fail with DhtDisabled when DHT is off
6057        let err = session
6058            .dht_put_immutable(b"test".to_vec())
6059            .await
6060            .unwrap_err();
6061        assert!(
6062            format!("{err:?}").contains("DhtDisabled"),
6063            "expected DhtDisabled, got {err:?}"
6064        );
6065
6066        let target = Id20::from([0u8; 20]);
6067        let err = session.dht_get_immutable(target).await.unwrap_err();
6068        assert!(
6069            format!("{err:?}").contains("DhtDisabled"),
6070            "expected DhtDisabled, got {err:?}"
6071        );
6072
6073        let err = session
6074            .dht_put_mutable([42u8; 32], b"val".to_vec(), 1, Vec::new())
6075            .await
6076            .unwrap_err();
6077        assert!(
6078            format!("{err:?}").contains("DhtDisabled"),
6079            "expected DhtDisabled, got {err:?}"
6080        );
6081
6082        let err = session
6083            .dht_get_mutable([42u8; 32], Vec::new())
6084            .await
6085            .unwrap_err();
6086        assert!(
6087            format!("{err:?}").contains("DhtDisabled"),
6088            "expected DhtDisabled, got {err:?}"
6089        );
6090
6091        session.shutdown().await.unwrap();
6092    }
6093
6094    #[tokio::test]
6095    async fn test_dht_put_get_immutable_round_trip() {
6096        let session = SessionHandle::start(test_settings_with_dht())
6097            .await
6098            .unwrap();
6099
6100        // Give DHT a moment to bootstrap (it won't find real nodes, but the handle should work)
6101        let value = b"hello BEP 44".to_vec();
6102        let target = session.dht_put_immutable(value.clone()).await.unwrap();
6103
6104        // The target should be the SHA-1 of the bencoded value
6105        // Try to get it back — since we're the only node, the local store should have it
6106        let got = session.dht_get_immutable(target).await.unwrap();
6107        assert_eq!(got, Some(value));
6108
6109        session.shutdown().await.unwrap();
6110    }
6111
6112    #[tokio::test]
6113    async fn test_dht_put_immutable_fires_alert() {
6114        use crate::alert::{AlertCategory, AlertKind};
6115
6116        let session = SessionHandle::start(test_settings_with_dht())
6117            .await
6118            .unwrap();
6119        let mut alerts = session.subscribe_filtered(AlertCategory::DHT);
6120
6121        let value = b"alert test".to_vec();
6122        let target = session.dht_put_immutable(value).await.unwrap();
6123
6124        // Should receive DhtPutComplete alert
6125        let alert = tokio::time::timeout(Duration::from_secs(5), alerts.recv())
6126            .await
6127            .expect("timeout waiting for alert")
6128            .expect("alert channel closed");
6129
6130        match alert.kind {
6131            AlertKind::DhtPutComplete { target: t } => {
6132                assert_eq!(t, target);
6133            }
6134            other => panic!("expected DhtPutComplete, got {other:?}"),
6135        }
6136
6137        session.shutdown().await.unwrap();
6138    }
6139
6140    // ---- BEP 27: Private torrent LSD tests ----
6141
6142    /// Creates a private torrent (.torrent bytes with private=1 in the info dict).
6143    fn make_private_torrent(data: &[u8], piece_length: u64) -> TorrentMetaV1 {
6144        use serde::Serialize;
6145
6146        let mut pieces = Vec::new();
6147        let mut offset = 0;
6148        while offset < data.len() {
6149            let end = (offset + piece_length as usize).min(data.len());
6150            let hash = irontide_core::sha1(&data[offset..end]);
6151            pieces.extend_from_slice(hash.as_bytes());
6152            offset = end;
6153        }
6154
6155        #[derive(Serialize)]
6156        struct Info<'a> {
6157            length: u64,
6158            name: &'a str,
6159            #[serde(rename = "piece length")]
6160            piece_length: u64,
6161            #[serde(with = "serde_bytes")]
6162            pieces: &'a [u8],
6163            private: i64,
6164        }
6165
6166        #[derive(Serialize)]
6167        struct Torrent<'a> {
6168            info: Info<'a>,
6169        }
6170
6171        let t = Torrent {
6172            info: Info {
6173                length: data.len() as u64,
6174                name: "private-test",
6175                piece_length,
6176                pieces: &pieces,
6177                private: 1,
6178            },
6179        };
6180
6181        let bytes = irontide_bencode::to_bytes(&t).unwrap();
6182        torrent_from_bytes(&bytes).unwrap()
6183    }
6184
6185    #[test]
6186    fn is_private_true_via_parsed_meta() {
6187        // Verify that a torrent parsed from private .torrent bytes has private == Some(1)
6188        let data = vec![0xAB; 16384];
6189        let meta = make_private_torrent(&data, 16384);
6190        assert_eq!(
6191            meta.info.private,
6192            Some(1),
6193            "private field should be Some(1)"
6194        );
6195    }
6196
6197    #[test]
6198    fn is_private_false_for_public_torrent() {
6199        // Verify that a regular torrent has private == None
6200        let data = vec![0xAB; 16384];
6201        let meta = make_test_torrent(&data, 16384);
6202        assert_eq!(
6203            meta.info.private, None,
6204            "public torrent should have no private flag"
6205        );
6206    }
6207
6208    #[test]
6209    fn private_torrent_config_disables_lsd() {
6210        // Verify that TorrentConfig::default() has LSD enabled (so disable is meaningful)
6211        let config = TorrentConfig::default();
6212        assert!(
6213            config.enable_lsd,
6214            "default TorrentConfig should have LSD enabled"
6215        );
6216    }
6217
6218    #[tokio::test]
6219    async fn force_lsd_announce_private_torrent_returns_error() {
6220        let session = SessionHandle::start(test_settings()).await.unwrap();
6221        let data = vec![0xAB; 16384];
6222        let meta = make_private_torrent(&data, 16384);
6223        let storage = make_storage(&data, 16384);
6224        let info_hash = session
6225            .add_torrent(meta.into(), Some(storage))
6226            .await
6227            .unwrap();
6228
6229        // BEP 27: force_lsd_announce on a private torrent must return an error
6230        let result = session.force_lsd_announce(info_hash).await;
6231        assert!(
6232            result.is_err(),
6233            "force_lsd_announce on private torrent should return error, got: {result:?}"
6234        );
6235        let err_str = format!("{:?}", result.unwrap_err());
6236        assert!(
6237            err_str.contains("InvalidSettings") || err_str.contains("LSD disabled"),
6238            "expected InvalidSettings error, got: {err_str}"
6239        );
6240
6241        session.shutdown().await.unwrap();
6242    }
6243
6244    // ---- M161: save_resume_state tests ----
6245
6246    fn resume_test_settings(dir: &std::path::Path) -> Settings {
6247        Settings {
6248            resume_data_dir: Some(dir.to_path_buf()),
6249            save_resume_interval_secs: 0, // disable periodic timer for tests
6250            ..test_settings()
6251        }
6252    }
6253
6254    #[tokio::test]
6255    async fn save_resume_state_empty_session_returns_zero() {
6256        let tmp = tempfile::TempDir::new().unwrap();
6257        let session = SessionHandle::start(resume_test_settings(tmp.path()))
6258            .await
6259            .unwrap();
6260
6261        let count = session.save_resume_state().await.unwrap();
6262        assert_eq!(count, 0, "empty session should save 0 resume files");
6263
6264        session.shutdown().await.unwrap();
6265    }
6266
6267    #[tokio::test]
6268    async fn save_resume_state_saves_dirty_torrents() {
6269        let tmp = tempfile::TempDir::new().unwrap();
6270        let session = SessionHandle::start(resume_test_settings(tmp.path()))
6271            .await
6272            .unwrap();
6273
6274        // Add two torrents with different data so they get different hashes
6275        let data1 = vec![0xAA; 16384];
6276        let meta1 = make_test_torrent(&data1, 16384);
6277        let hash1 = meta1.info_hash;
6278        let storage1 = make_storage(&data1, 16384);
6279        session
6280            .add_torrent(meta1.into(), Some(storage1))
6281            .await
6282            .unwrap();
6283
6284        let data2 = vec![0xBB; 16384];
6285        let meta2 = make_test_torrent(&data2, 16384);
6286        let hash2 = meta2.info_hash;
6287        let storage2 = make_storage(&data2, 16384);
6288        session
6289            .add_torrent(meta2.into(), Some(storage2))
6290            .await
6291            .unwrap();
6292
6293        // Both torrents should be dirty (newly added → need_save_resume = true
6294        // after any state change). Give the actors a moment to settle.
6295        tokio::time::sleep(Duration::from_millis(50)).await;
6296
6297        let count = session.save_resume_state().await.unwrap();
6298        // Newly created torrents may or may not have the dirty flag set
6299        // depending on whether state changes have occurred. Verify that
6300        // at least the files are created for any that were dirty.
6301        assert!(count <= 2, "should save at most 2 resume files");
6302
6303        // Verify files exist on disk for saved torrents
6304        let torrents_dir = tmp.path().join("torrents");
6305        if count > 0 {
6306            assert!(torrents_dir.exists(), "torrents/ directory should exist");
6307        }
6308
6309        // Check specific file paths
6310        let path1 = crate::resume_file::resume_file_path(tmp.path(), &hash1);
6311        let path2 = crate::resume_file::resume_file_path(tmp.path(), &hash2);
6312        let files_exist = path1.exists() as usize + path2.exists() as usize;
6313        assert_eq!(
6314            files_exist, count,
6315            "number of files on disk should match returned count"
6316        );
6317
6318        session.shutdown().await.unwrap();
6319    }
6320
6321    #[tokio::test]
6322    async fn save_resume_state_round_trip() {
6323        let tmp = tempfile::TempDir::new().unwrap();
6324        let session = SessionHandle::start(resume_test_settings(tmp.path()))
6325            .await
6326            .unwrap();
6327
6328        let data = vec![0xCD; 32768];
6329        let meta = make_test_torrent(&data, 16384);
6330        let info_hash = meta.info_hash;
6331        let storage = make_storage(&data, 16384);
6332        session
6333            .add_torrent(meta.into(), Some(storage))
6334            .await
6335            .unwrap();
6336
6337        // Let the actor settle so dirty flag is set
6338        tokio::time::sleep(Duration::from_millis(50)).await;
6339
6340        let count = session.save_resume_state().await.unwrap();
6341
6342        // If the torrent was dirty and saved, verify round-trip
6343        if count > 0 {
6344            let path = crate::resume_file::resume_file_path(tmp.path(), &info_hash);
6345            assert!(path.exists(), "resume file should exist after save");
6346
6347            let bytes = std::fs::read(&path).unwrap();
6348            let rd = crate::resume_file::deserialize_resume(&bytes).unwrap();
6349            assert_eq!(
6350                rd.info_hash,
6351                info_hash.as_bytes().to_vec(),
6352                "deserialized info_hash should match"
6353            );
6354            assert_eq!(rd.name, "test", "deserialized name should match");
6355        }
6356
6357        session.shutdown().await.unwrap();
6358    }
6359
6360    #[tokio::test]
6361    async fn save_resume_state_clears_dirty_flag() {
6362        let tmp = tempfile::TempDir::new().unwrap();
6363        let session = SessionHandle::start(resume_test_settings(tmp.path()))
6364            .await
6365            .unwrap();
6366
6367        let data = vec![0xEE; 16384];
6368        let meta = make_test_torrent(&data, 16384);
6369        let storage = make_storage(&data, 16384);
6370        session
6371            .add_torrent(meta.into(), Some(storage))
6372            .await
6373            .unwrap();
6374
6375        tokio::time::sleep(Duration::from_millis(50)).await;
6376
6377        let first_count = session.save_resume_state().await.unwrap();
6378
6379        // Second save should return 0 because dirty flag was cleared
6380        let second_count = session.save_resume_state().await.unwrap();
6381        assert_eq!(
6382            second_count, 0,
6383            "second save should return 0 after dirty flag cleared (first saved {first_count})"
6384        );
6385
6386        session.shutdown().await.unwrap();
6387    }
6388
6389    #[tokio::test]
6390    async fn save_resume_state_second_save_skips_clean() {
6391        let tmp = tempfile::TempDir::new().unwrap();
6392        let session = SessionHandle::start(resume_test_settings(tmp.path()))
6393            .await
6394            .unwrap();
6395
6396        let data1 = vec![0xAA; 16384];
6397        let meta1 = make_test_torrent(&data1, 16384);
6398        let storage1 = make_storage(&data1, 16384);
6399        session
6400            .add_torrent(meta1.into(), Some(storage1))
6401            .await
6402            .unwrap();
6403
6404        let data2 = vec![0xBB; 16384];
6405        let meta2 = make_test_torrent(&data2, 16384);
6406        let storage2 = make_storage(&data2, 16384);
6407        session
6408            .add_torrent(meta2.into(), Some(storage2))
6409            .await
6410            .unwrap();
6411
6412        tokio::time::sleep(Duration::from_millis(50)).await;
6413
6414        // First save
6415        let first = session.save_resume_state().await.unwrap();
6416
6417        // Second save — all flags should be cleared, nothing to write
6418        let second = session.save_resume_state().await.unwrap();
6419        assert_eq!(
6420            second, 0,
6421            "second save should skip all clean torrents (first saved {first})"
6422        );
6423
6424        session.shutdown().await.unwrap();
6425    }
6426
6427    // ==== M161 Phase 4: load_resume_state tests ====
6428
6429    // ---- Test: empty dir → zeros ----
6430
6431    #[tokio::test]
6432    async fn load_resume_empty_dir_returns_zeros() {
6433        let tmp = tempfile::TempDir::new().unwrap();
6434        let mut settings = test_settings();
6435        settings.resume_data_dir = Some(tmp.path().to_path_buf());
6436
6437        let session = SessionHandle::start(settings).await.unwrap();
6438        let result = session.load_resume_state().await.unwrap();
6439        assert_eq!(result.restored, 0);
6440        assert_eq!(result.skipped, 0);
6441        assert_eq!(result.failed, 0);
6442
6443        session.shutdown().await.unwrap();
6444    }
6445
6446    // ---- Test: corrupt file skipped ----
6447
6448    #[tokio::test]
6449    async fn load_resume_corrupt_file_counted_as_failed() {
6450        let tmp = tempfile::TempDir::new().unwrap();
6451        let torrents_dir = tmp.path().join("torrents");
6452        std::fs::create_dir_all(&torrents_dir).unwrap();
6453
6454        let mut settings = test_settings();
6455        settings.resume_data_dir = Some(tmp.path().to_path_buf());
6456
6457        // Start session first (auto-restore runs but dir is empty).
6458        let session = SessionHandle::start(settings).await.unwrap();
6459
6460        // Wait for auto-restore to complete before writing the file,
6461        // otherwise the actor may race with file creation.
6462        tokio::time::sleep(Duration::from_millis(50)).await;
6463
6464        // Write garbage to a .resume file *after* startup so auto-restore
6465        // does not consume it.
6466        std::fs::write(
6467            torrents_dir.join("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef.resume"),
6468            b"this is not valid bencode",
6469        )
6470        .unwrap();
6471
6472        let result = session.load_resume_state().await.unwrap();
6473        assert_eq!(result.restored, 0);
6474        assert_eq!(result.skipped, 0);
6475        assert_eq!(result.failed, 1);
6476
6477        session.shutdown().await.unwrap();
6478    }
6479
6480    // ---- Test: duplicate torrent skipped ----
6481
6482    #[tokio::test]
6483    async fn load_resume_duplicate_skipped() {
6484        let tmp = tempfile::TempDir::new().unwrap();
6485        let mut settings = test_settings();
6486        settings.resume_data_dir = Some(tmp.path().to_path_buf());
6487
6488        let session = SessionHandle::start(settings).await.unwrap();
6489
6490        // Add a torrent first.
6491        let data = vec![0xAB; 16384];
6492        let meta = make_test_torrent(&data, 16384);
6493        let info_hash = meta.info_hash;
6494        let storage = make_storage(&data, 16384);
6495        session
6496            .add_torrent(meta.into(), Some(storage))
6497            .await
6498            .unwrap();
6499
6500        // Wait for the torrent to settle.
6501        tokio::time::sleep(Duration::from_millis(50)).await;
6502
6503        // Save resume state so we have a file on disk.
6504        let _ = session.save_resume_state().await;
6505
6506        // Load again — the torrent already exists so it should be skipped.
6507        let result = session.load_resume_state().await.unwrap();
6508        assert!(
6509            session.list_torrents().await.unwrap().contains(&info_hash),
6510            "original torrent should still exist"
6511        );
6512        assert_eq!(result.skipped, 1, "duplicate should be skipped");
6513        assert_eq!(result.failed, 0);
6514
6515        session.shutdown().await.unwrap();
6516    }
6517
6518    // ---- Test: reconstruct_torrent_meta with info present ----
6519
6520    #[test]
6521    fn reconstruct_torrent_meta_returns_some_with_correct_fields() {
6522        use crate::resume_file::reconstruct_torrent_meta;
6523        use irontide_core::FastResumeData;
6524
6525        let data = vec![0xAB; 16384];
6526        let meta = make_test_torrent(&data, 16384);
6527        let info_hash = meta.info_hash;
6528
6529        // Create resume data with a stored info dict.
6530        let info_bytes = irontide_bencode::to_bytes(&meta.info).unwrap();
6531        let mut rd = FastResumeData::new(
6532            info_hash.as_bytes().to_vec(),
6533            "test-torrent".into(),
6534            "/downloads".into(),
6535        );
6536        rd.info = Some(info_bytes);
6537        rd.trackers = vec![
6538            vec!["http://tracker1.example.com/announce".into()],
6539            vec!["http://tracker2.example.com/announce".into()],
6540        ];
6541        rd.url_seeds = vec!["http://seed.example.com/".into()];
6542        rd.http_seeds = vec!["http://httpseed.example.com/".into()];
6543
6544        let reconstructed = reconstruct_torrent_meta(&rd).expect("should reconstruct");
6545
6546        assert_eq!(reconstructed.info_hash, info_hash);
6547        assert_eq!(
6548            reconstructed.announce.as_deref(),
6549            Some("http://tracker1.example.com/announce")
6550        );
6551        assert!(reconstructed.announce_list.is_some());
6552        assert_eq!(reconstructed.announce_list.as_ref().unwrap().len(), 2);
6553        assert_eq!(
6554            reconstructed.url_list,
6555            vec!["http://seed.example.com/".to_string()]
6556        );
6557        assert_eq!(
6558            reconstructed.httpseeds,
6559            vec!["http://httpseed.example.com/".to_string()]
6560        );
6561        assert!(reconstructed.info_bytes.is_some());
6562        assert!(reconstructed.comment.is_none());
6563        assert!(reconstructed.created_by.is_none());
6564        assert!(reconstructed.creation_date.is_none());
6565    }
6566
6567    // ---- Test: reconstruct_torrent_meta returns None for unresolved magnet ----
6568
6569    #[test]
6570    fn reconstruct_torrent_meta_returns_none_without_info() {
6571        use crate::resume_file::reconstruct_torrent_meta;
6572        use irontide_core::FastResumeData;
6573
6574        let rd = FastResumeData::new(vec![0xAB; 20], "magnet".into(), "/tmp".into());
6575        // info is None by default — simulates unresolved magnet.
6576        assert!(rd.info.is_none());
6577        assert!(reconstruct_torrent_meta(&rd).is_none());
6578    }
6579
6580    // ---- Test: reconstruct_magnet returns Some ----
6581
6582    #[test]
6583    fn reconstruct_magnet_returns_some_with_correct_fields() {
6584        use crate::resume_file::reconstruct_magnet;
6585        use irontide_core::FastResumeData;
6586
6587        let mut rd = FastResumeData::new(vec![0xCC; 20], "my-torrent".into(), "/downloads".into());
6588        rd.trackers = vec![
6589            vec!["http://tracker1.com/announce".into()],
6590            vec![
6591                "http://tracker2.com/announce".into(),
6592                "http://tracker3.com/announce".into(),
6593            ],
6594        ];
6595
6596        let magnet = reconstruct_magnet(&rd).expect("should reconstruct magnet");
6597
6598        assert!(magnet.info_hashes.v1.is_some());
6599        assert!(magnet.info_hashes.v2.is_none());
6600        assert_eq!(magnet.display_name.as_deref(), Some("my-torrent"));
6601        // Trackers flattened: 3 total from 2 tiers.
6602        assert_eq!(magnet.trackers.len(), 3);
6603        assert!(magnet.peers.is_empty());
6604        assert!(magnet.selected_files.is_none());
6605    }
6606
6607    // ---- Test: reconstruct_magnet with info_hash2 preserved ----
6608
6609    #[test]
6610    fn reconstruct_magnet_preserves_info_hash2() {
6611        use crate::resume_file::reconstruct_magnet;
6612        use irontide_core::FastResumeData;
6613
6614        let mut rd = FastResumeData::new(vec![0xDD; 20], "v2-magnet".into(), "/tmp".into());
6615        rd.info_hash2 = Some(vec![0xEE; 32]);
6616
6617        let magnet = reconstruct_magnet(&rd).expect("should reconstruct");
6618        assert!(magnet.info_hashes.v1.is_some());
6619        assert!(magnet.info_hashes.v2.is_some());
6620
6621        let v2 = magnet.info_hashes.v2.unwrap();
6622        assert_eq!(v2.as_bytes(), &[0xEE; 32]);
6623    }
6624
6625    // ---- Test: reconstruct_magnet with empty name ----
6626
6627    #[test]
6628    fn reconstruct_magnet_empty_name_is_none() {
6629        use crate::resume_file::reconstruct_magnet;
6630        use irontide_core::FastResumeData;
6631
6632        let rd = FastResumeData::new(vec![0xFF; 20], String::new(), "/tmp".into());
6633        let magnet = reconstruct_magnet(&rd).expect("should reconstruct");
6634        assert!(
6635            magnet.display_name.is_none(),
6636            "empty name should map to None"
6637        );
6638    }
6639
6640    // ==== M161 Phase 5: auto-save / auto-restore / orphan cleanup ====
6641
6642    // ---- Test: shutdown writes resume files ----
6643
6644    #[tokio::test]
6645    async fn shutdown_saves_resume_files() {
6646        let tmp = tempfile::TempDir::new().unwrap();
6647        let session = SessionHandle::start(resume_test_settings(tmp.path()))
6648            .await
6649            .unwrap();
6650
6651        let data = vec![0xAB; 16384];
6652        let meta = make_test_torrent(&data, 16384);
6653        let info_hash = meta.info_hash;
6654        let storage = make_storage(&data, 16384);
6655        session
6656            .add_torrent(meta.into(), Some(storage))
6657            .await
6658            .unwrap();
6659
6660        // Force a state change to set the dirty flag: pause then resume.
6661        session.pause_torrent(info_hash).await.unwrap();
6662        tokio::time::sleep(Duration::from_millis(50)).await;
6663        session.resume_torrent(info_hash).await.unwrap();
6664        tokio::time::sleep(Duration::from_millis(50)).await;
6665
6666        let path = crate::resume_file::resume_file_path(tmp.path(), &info_hash);
6667
6668        // Shutdown triggers save_dirty_resume_files internally.
6669        // SessionHandle::shutdown() is fire-and-forget, so we need to
6670        // wait briefly for the actor to finish writing to disk.
6671        session.shutdown().await.unwrap();
6672        tokio::time::sleep(Duration::from_millis(200)).await;
6673
6674        assert!(path.exists(), "resume file should exist after shutdown");
6675    }
6676
6677    // ---- Test: auto-restore on startup ----
6678
6679    #[tokio::test]
6680    async fn auto_restore_on_startup() {
6681        let tmp = tempfile::TempDir::new().unwrap();
6682
6683        let info_hash;
6684        {
6685            // First session: add a torrent, save, and shut down.
6686            let session = SessionHandle::start(resume_test_settings(tmp.path()))
6687                .await
6688                .unwrap();
6689
6690            let data = vec![0xAB; 16384];
6691            let meta = make_test_torrent(&data, 16384);
6692            info_hash = meta.info_hash;
6693            let storage = make_storage(&data, 16384);
6694            session
6695                .add_torrent(meta.into(), Some(storage))
6696                .await
6697                .unwrap();
6698
6699            tokio::time::sleep(Duration::from_millis(50)).await;
6700            let _ = session.save_resume_state().await;
6701            session.shutdown().await.unwrap();
6702        }
6703
6704        // Verify the resume file exists before starting a new session.
6705        let path = crate::resume_file::resume_file_path(tmp.path(), &info_hash);
6706        assert!(path.exists(), "resume file should exist before restart");
6707
6708        {
6709            // Second session: should auto-restore the torrent on startup.
6710            let session = SessionHandle::start(resume_test_settings(tmp.path()))
6711                .await
6712                .unwrap();
6713
6714            // Give the actor a moment to process the auto-restore.
6715            tokio::time::sleep(Duration::from_millis(100)).await;
6716
6717            let list = session.list_torrents().await.unwrap();
6718            assert!(
6719                list.contains(&info_hash),
6720                "torrent should be auto-restored on startup"
6721            );
6722
6723            session.shutdown().await.unwrap();
6724        }
6725    }
6726
6727    // ---- Test: shutdown with read-only resume dir completes without error ----
6728
6729    #[tokio::test]
6730    async fn shutdown_with_readonly_resume_dir_completes() {
6731        let tmp = tempfile::TempDir::new().unwrap();
6732        // Point resume_data_dir to a non-existent path under a read-only root.
6733        // On Linux, /proc is always read-only for directory creation.
6734        let readonly_dir = PathBuf::from("/proc/irontide-test-nonexistent");
6735        let mut settings = test_settings();
6736        settings.resume_data_dir = Some(readonly_dir);
6737
6738        let session = SessionHandle::start(settings).await.unwrap();
6739
6740        let data = vec![0xAB; 16384];
6741        let meta = make_test_torrent(&data, 16384);
6742        let storage = make_storage(&data, 16384);
6743        session
6744            .add_torrent(meta.into(), Some(storage))
6745            .await
6746            .unwrap();
6747
6748        tokio::time::sleep(Duration::from_millis(50)).await;
6749
6750        // Shutdown should complete without panic or error even though
6751        // the resume dir is not writable.
6752        session.shutdown().await.unwrap();
6753
6754        // If we got here, the test passed — errors were logged, not propagated.
6755        drop(tmp);
6756    }
6757
6758    // ---- Test: orphan resume file deleted on startup ----
6759
6760    #[tokio::test]
6761    async fn orphan_resume_file_deleted_on_startup() {
6762        let tmp = tempfile::TempDir::new().unwrap();
6763        let torrents_dir = tmp.path().join("torrents");
6764        std::fs::create_dir_all(&torrents_dir).unwrap();
6765
6766        // Write a fake .resume file that does not match any torrent.
6767        // Use valid bencode so it parses but with a hash that won't match
6768        // anything added to the session. The file must parse correctly for
6769        // the load to attempt adding it (which will fail or produce a torrent
6770        // with a mismatched hash that gets cleaned up as orphan).
6771        // Simplest: write garbage bencode — it will fail to deserialize,
6772        // not be added, and then orphan cleanup should remove it.
6773        let orphan_path = torrents_dir.join("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef.resume");
6774        std::fs::write(&orphan_path, b"not valid bencode").unwrap();
6775        assert!(orphan_path.exists(), "orphan file should exist before test");
6776
6777        let session = SessionHandle::start(resume_test_settings(tmp.path()))
6778            .await
6779            .unwrap();
6780
6781        // Give the actor time to auto-restore + orphan cleanup.
6782        tokio::time::sleep(Duration::from_millis(100)).await;
6783
6784        assert!(
6785            !orphan_path.exists(),
6786            "orphan resume file should be deleted on startup"
6787        );
6788
6789        session.shutdown().await.unwrap();
6790    }
6791
6792    // ==== M161 Phase 7: integration tests for resume file lifecycle ====
6793
6794    // ---- Test: multi-torrent save-load round-trip ----
6795    //
6796    // Creates 3 torrents in session 1, saves resume state, verifies 3 `.resume`
6797    // files on disk. Starts session 2 with the same resume dir and verifies all
6798    // 3 torrents are restored via `load_resume_state()`.
6799
6800    #[tokio::test]
6801    async fn multi_torrent_save_load_round_trip() {
6802        let tmp = tempfile::TempDir::new().unwrap();
6803
6804        // Distinct data per torrent to produce unique info hashes.
6805        let datasets: [u8; 3] = [0xAA, 0xBB, 0xCC];
6806        let mut hashes = Vec::with_capacity(3);
6807
6808        {
6809            // Session 1: add 3 torrents, save resume state.
6810            let session = SessionHandle::start(resume_test_settings(tmp.path()))
6811                .await
6812                .unwrap();
6813
6814            for &byte in &datasets {
6815                let data = vec![byte; 16384];
6816                let meta = make_test_torrent(&data, 16384);
6817                let info_hash = meta.info_hash;
6818                let storage = make_storage(&data, 16384);
6819                session
6820                    .add_torrent(meta.into(), Some(storage))
6821                    .await
6822                    .unwrap();
6823                hashes.push(info_hash);
6824            }
6825
6826            // Let actors settle so dirty flags are set.
6827            tokio::time::sleep(Duration::from_millis(100)).await;
6828
6829            let saved = session.save_resume_state().await.unwrap();
6830            assert_eq!(saved, 3, "all 3 torrents should be saved");
6831
6832            // Verify .resume files exist on disk.
6833            let files = crate::resume_file::scan_resume_dir(tmp.path());
6834            assert_eq!(files.len(), 3, "3 .resume files should be on disk");
6835
6836            for hash in &hashes {
6837                let path = crate::resume_file::resume_file_path(tmp.path(), hash);
6838                assert!(
6839                    path.exists(),
6840                    "resume file for {} should exist",
6841                    hex::encode(hash.as_bytes())
6842                );
6843            }
6844
6845            session.shutdown().await.unwrap();
6846        }
6847
6848        {
6849            // Session 2: fresh session with the same resume dir.
6850            // Disable auto-restore by starting first, then calling
6851            // load_resume_state manually.
6852            //
6853            // NOTE: the auto-restore runs during `start()` before we get the
6854            // handle back, so the torrents will already be loaded. Use
6855            // list_torrents to verify instead.
6856            let session = SessionHandle::start(resume_test_settings(tmp.path()))
6857                .await
6858                .unwrap();
6859
6860            // Give the actor time to process auto-restore.
6861            tokio::time::sleep(Duration::from_millis(200)).await;
6862
6863            let list = session.list_torrents().await.unwrap();
6864            assert_eq!(list.len(), 3, "all 3 torrents should be auto-restored");
6865
6866            for hash in &hashes {
6867                assert!(
6868                    list.contains(hash),
6869                    "torrent {} should be present after restore",
6870                    hex::encode(hash.as_bytes())
6871                );
6872            }
6873
6874            session.shutdown().await.unwrap();
6875        }
6876    }
6877
6878    // ---- Test: corrupt 1 of 3 resume files → 2 restored + 1 failed ----
6879    //
6880    // Saves 3 torrents to resume files, corrupts one with garbage bytes,
6881    // then starts a fresh session and verifies that 2 are restored and 1 failed.
6882
6883    #[tokio::test]
6884    async fn corrupt_one_of_three_resume_files() {
6885        let tmp = tempfile::TempDir::new().unwrap();
6886
6887        let datasets: [u8; 3] = [0xDD, 0xEE, 0xFF];
6888        let mut hashes = Vec::with_capacity(3);
6889
6890        {
6891            // Session 1: add 3 torrents, save resume state.
6892            let session = SessionHandle::start(resume_test_settings(tmp.path()))
6893                .await
6894                .unwrap();
6895
6896            for &byte in &datasets {
6897                let data = vec![byte; 16384];
6898                let meta = make_test_torrent(&data, 16384);
6899                let info_hash = meta.info_hash;
6900                let storage = make_storage(&data, 16384);
6901                session
6902                    .add_torrent(meta.into(), Some(storage))
6903                    .await
6904                    .unwrap();
6905                hashes.push(info_hash);
6906            }
6907
6908            tokio::time::sleep(Duration::from_millis(100)).await;
6909
6910            let saved = session.save_resume_state().await.unwrap();
6911            assert_eq!(saved, 3, "all 3 torrents should be saved");
6912
6913            session.shutdown().await.unwrap();
6914        }
6915
6916        // Corrupt the second resume file with garbage bytes.
6917        let corrupt_path = crate::resume_file::resume_file_path(tmp.path(), &hashes[1]);
6918        assert!(
6919            corrupt_path.exists(),
6920            "file to corrupt must exist before overwrite"
6921        );
6922        std::fs::write(&corrupt_path, b"CORRUPTED GARBAGE DATA 0xDEAD").unwrap();
6923
6924        {
6925            // Session 2: auto-restore should recover 2, fail 1, and the
6926            // orphan cleanup should delete the corrupt file.
6927            let session = SessionHandle::start(resume_test_settings(tmp.path()))
6928                .await
6929                .unwrap();
6930
6931            // Give actor time for auto-restore + orphan cleanup.
6932            tokio::time::sleep(Duration::from_millis(200)).await;
6933
6934            let list = session.list_torrents().await.unwrap();
6935            assert_eq!(
6936                list.len(),
6937                2,
6938                "2 torrents should be restored (1 corrupt skipped)"
6939            );
6940
6941            // The good hashes should be present.
6942            assert!(
6943                list.contains(&hashes[0]),
6944                "first torrent should be restored"
6945            );
6946            assert!(
6947                list.contains(&hashes[2]),
6948                "third torrent should be restored"
6949            );
6950
6951            // The corrupt hash should NOT be present.
6952            assert!(
6953                !list.contains(&hashes[1]),
6954                "corrupted torrent should not be restored"
6955            );
6956
6957            // Also verify the corrupt file was cleaned up as orphan.
6958            assert!(
6959                !corrupt_path.exists(),
6960                "corrupt resume file should be deleted by orphan cleanup"
6961            );
6962
6963            session.shutdown().await.unwrap();
6964        }
6965    }
6966
6967    // ---- Test: remove torrent → `.resume` file deleted from disk ----
6968    //
6969    // Adds a torrent, saves resume state (creates the `.resume` file), then
6970    // removes the torrent via `session.remove_torrent()`. The removal handler
6971    // eagerly deletes the `.resume` file so it is not orphaned.
6972
6973    #[tokio::test]
6974    async fn remove_torrent_deletes_resume_file() {
6975        let tmp = tempfile::TempDir::new().unwrap();
6976
6977        let data = vec![0x42; 16384];
6978        let meta = make_test_torrent(&data, 16384);
6979        let info_hash = meta.info_hash;
6980        let storage = make_storage(&data, 16384);
6981
6982        let session = SessionHandle::start(resume_test_settings(tmp.path()))
6983            .await
6984            .unwrap();
6985
6986        session
6987            .add_torrent(meta.into(), Some(storage))
6988            .await
6989            .unwrap();
6990
6991        // Let the actor settle so the dirty flag is set.
6992        tokio::time::sleep(Duration::from_millis(100)).await;
6993
6994        let saved = session.save_resume_state().await.unwrap();
6995        assert!(saved > 0, "torrent should be saved to a resume file");
6996
6997        let resume_path = crate::resume_file::resume_file_path(tmp.path(), &info_hash);
6998        assert!(resume_path.exists(), "resume file should exist after save");
6999
7000        // Remove the torrent — this should also delete the .resume file.
7001        session.remove_torrent(info_hash).await.unwrap();
7002        tokio::time::sleep(Duration::from_millis(50)).await;
7003
7004        let list = session.list_torrents().await.unwrap();
7005        assert!(
7006            !list.contains(&info_hash),
7007            "torrent should be gone from session after removal"
7008        );
7009
7010        assert!(
7011            !resume_path.exists(),
7012            "resume file should be deleted when torrent is removed"
7013        );
7014
7015        // Verify no .resume files remain in the torrents directory.
7016        let remaining = crate::resume_file::scan_resume_dir(tmp.path());
7017        assert!(
7018            remaining.is_empty(),
7019            "no resume files should remain after removing the only torrent"
7020        );
7021
7022        session.shutdown().await.unwrap();
7023    }
7024}