irontide-session 1.0.1

BitTorrent session management: peers, torrents, and piece selection
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
//! Isolated listener task for accepting and identifying inbound connections.
//!
//! Moves TCP and uTP listener polling out of `SessionActor`'s main `select!`
//! loop into a dedicated task. Each accepted connection is identified by reading
//! the 48-byte BEP 3 preamble, validated against a registry of active info
//! hashes, and forwarded to the session via an mpsc channel.

use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicI32, AtomicUsize, Ordering};
use std::time::Duration;

use dashmap::DashMap;
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tracing::{debug, info, warn};

use crate::transport::{BoxedStream, TransportListener};
use crate::utp_routing::identify_plaintext_connection;
use irontide_core::LiveConnectionGuard;

/// Timeout for reading the 48-byte BEP 3 preamble from a new connection.
const IDENTIFY_TIMEOUT: Duration = Duration::from_secs(5);

/// Backoff sleep after a persistent accept error (e.g. EMFILE/ENFILE fd exhaustion)
/// to prevent tight-loop CPU spinning.
const ACCEPT_ERROR_BACKOFF: Duration = Duration::from_secs(10);

/// M173 Lane B (B2): bounded join timeout for listener-task shutdown.
///
/// `accept()` is parked-on-syscall and only wakes on (a) a new TCP/uTP
/// connection or (b) the shutdown signal selected against it. This
/// timeout protects against a futex-stuck `accept()` on certain
/// kernels — if the task hasn't joined within 5 s of the shutdown
/// signal, the executor abandons the join and proceeds with the rebind.
/// Returning early is preferable to indefinitely hanging
/// `apply_settings`.
pub const SHUTDOWN_JOIN_TIMEOUT: Duration = Duration::from_secs(5);

/// An inbound connection that has been identified by its BEP 3 preamble.
///
/// The `stream` replays the 48 consumed preamble bytes before yielding
/// subsequent data, so `run_peer()` can read the full handshake.
pub(crate) struct IdentifiedConnection {
    /// The type-erased bidirectional stream (wraps a `PrefixedStream`).
    pub stream: BoxedStream,
    /// Remote peer address.
    pub addr: SocketAddr,
    /// The info hash extracted from the preamble.
    pub info_hash: irontide_core::Id20,
    /// M224 D3: RAII counter handle. Held from TCP accept through validate +
    /// forward; drops when the routing path in `SessionActor` completes (the
    /// peer task takes ownership of the stream, not the guard, so the live
    /// count tracks the "accepted-but-not-yet-handed-off" window — the window
    /// during which a DoS-style accept flood would otherwise pile up
    /// preamble-reads behind `IDENTIFY_TIMEOUT`).
    pub _live_guard: LiveConnectionGuard,
}


/// Handle for the spawned listener task. Holds the shutdown channel
/// and join handle so the session can request a clean exit before
/// rebinding the listen port (M173 Lane B, B2).
///
/// Use [`ListenerHandle::shutdown_with_timeout`] to send the shutdown
/// signal and join the task with a bounded timeout. If the task does
/// not exit within [`SHUTDOWN_JOIN_TIMEOUT`], the join is abandoned —
/// a hung listener cannot be allowed to wedge `apply_settings`.
pub(crate) struct ListenerHandle {
    shutdown_tx: Option<oneshot::Sender<()>>,
    join_handle: JoinHandle<()>,
}

impl ListenerHandle {
    /// Spawn a listener task and return a handle for shutdown.
    ///
    /// The task runs until either:
    /// - the shutdown channel fires (clean exit),
    /// - the validated-conn receiver is dropped (clean exit), or
    /// - the [`shutdown_with_timeout`] join deadline expires.
    pub fn spawn(task: ListenerTask) -> Self {
        let (shutdown_tx, shutdown_rx) = oneshot::channel();
        let join_handle = tokio::spawn(task.run(shutdown_rx));
        Self {
            shutdown_tx: Some(shutdown_tx),
            join_handle,
        }
    }

    /// Signal the task to exit and join it with [`SHUTDOWN_JOIN_TIMEOUT`].
    ///
    /// Returns `Ok(())` if the task exited cleanly within the deadline.
    /// Returns `Err(JoinTimeout)` if the task did not exit within
    /// [`SHUTDOWN_JOIN_TIMEOUT`] — the join future is dropped (no
    /// `JoinHandle::abort` because abort can leak in-flight identify
    /// futures); the caller should proceed with the rebind anyway, and
    /// the orphaned task will eventually exit on `validated_tx` send
    /// failure once the session drops its receiver.
    ///
    /// # Errors
    ///
    /// Returns [`JoinTimeout`] if the listener did not exit within the
    /// bounded join window.
    pub async fn shutdown_with_timeout(mut self) -> Result<(), JoinTimeout> {
        if let Some(tx) = self.shutdown_tx.take() {
            // The receive side may already have exited (e.g. the
            // validated_tx channel closed first). `send` errors are
            // benign — the task is already on its way down.
            let _ = tx.send(());
        }
        match tokio::time::timeout(SHUTDOWN_JOIN_TIMEOUT, &mut self.join_handle).await {
            Ok(Err(e)) if e.is_panic() => {
                warn!(error = %e, "listener task panicked during shutdown");
                Ok(())
            }
            Ok(Ok(()) | Err(_)) => Ok(()), // completed or cancelled; treat as exited
            Err(_elapsed) => {
                warn!(
                    timeout = ?SHUTDOWN_JOIN_TIMEOUT,
                    "listener task did not exit within shutdown window — proceeding anyway"
                );
                Err(JoinTimeout)
            }
        }
    }

    /// Abort the listener task without a graceful shutdown.
    ///
    /// Used by session-teardown when the entire session is dropping —
    /// the `validated_tx` receiver is also being dropped, so the
    /// receiver-drop arm of the loop will exit cleanly. The shutdown
    /// sender is intentionally NOT dropped here (a dropped sender fires
    /// the `oneshot::RecvError` arm of the loop, which also exits, but
    /// would race the receiver-drop on shutdown timing).
    #[allow(dead_code)] // session shutdown path; used by Drop sequence
    pub fn abort(self) -> JoinHandle<()> {
        // Hand off the join handle. shutdown_tx Drops with `self` →
        // receiver fires RecvError → loop exits.
        self.join_handle
    }
}

/// Returned by [`ListenerHandle::shutdown_with_timeout`] when the join
/// deadline fires before the task exits.
#[derive(Debug, thiserror::Error)]
#[error("listener shutdown timed out after {SHUTDOWN_JOIN_TIMEOUT:?}")]
pub struct JoinTimeout;

/// Dedicated task that accepts inbound TCP and uTP connections, identifies
/// them via the BEP 3 preamble, and forwards validated connections to the
/// session actor.
pub(crate) struct ListenerTask {
    tcp_listener: Option<Box<dyn TransportListener>>,
    utp_listener: Option<irontide_utp::UtpListener>,
    utp_listener_v6: Option<irontide_utp::UtpListener>,
    info_hash_registry: Arc<DashMap<irontide_core::Id20, ()>>,
    validated_tx: mpsc::Sender<IdentifiedConnection>,
    /// M224 D3: shared with `SessionActor` so `handle_apply_settings` can
    /// update the cap atomically. `-1` = unlimited; `n >= 1` enforces.
    /// TCP-only at M224 MVP (D5); uTP carry-forward to M225.
    max_connections_global: Arc<AtomicI32>,
    /// M224 D3: shared counter — incremented at TCP accept via
    /// `LiveConnectionGuard::new`, decremented when the guard drops.
    live_connections: Arc<AtomicUsize>,
}

impl ListenerTask {
    /// Create a new listener task.
    pub fn new(
        tcp_listener: Option<Box<dyn TransportListener>>,
        utp_listener: Option<irontide_utp::UtpListener>,
        utp_listener_v6: Option<irontide_utp::UtpListener>,
        info_hash_registry: Arc<DashMap<irontide_core::Id20, ()>>,
        validated_tx: mpsc::Sender<IdentifiedConnection>,
        max_connections_global: Arc<AtomicI32>,
        live_connections: Arc<AtomicUsize>,
    ) -> Self {
        Self {
            tcp_listener,
            utp_listener,
            utp_listener_v6,
            info_hash_registry,
            validated_tx,
            max_connections_global,
            live_connections,
        }
    }

    /// Run the listener loop until any of:
    ///
    /// - the explicit `shutdown_rx` channel fires (M173 Lane B B2: used
    ///   by the listen-port rebind path so `accept()` does not need to
    ///   wait for a real connection to wake up); or
    /// - the session drops the receiving end of `validated_tx` (legacy
    ///   shutdown path, used by full-session teardown).
    pub async fn run(mut self, mut shutdown_rx: oneshot::Receiver<()>) {
        let mut futs = FuturesUnordered::new();

        loop {
            tokio::select! {
                // M173 Lane B (B2): explicit shutdown signal — wins
                // over `accept()` so a port-rebind does not need to
                // wait for a connection to wake the syscall.
                _ = &mut shutdown_rx => {
                    info!("listener shutdown signalled, exiting");
                    return;
                }
                result = accept_transport(&mut self.tcp_listener) => {
                    match result {
                        Ok((stream, addr)) => {
                            // M224 D3: TCP-inbound cap. `-1` = unlimited; any
                            // `n >= 1` rejects accepts beyond the cap. We
                            // measure live BEFORE the guard so the cap is
                            // checked against the pre-accept count, not the
                            // post-increment one (otherwise a cap of 1 would
                            // allow zero connections).
                            let max = self.max_connections_global.load(Ordering::SeqCst);
                            let live = self.live_connections.load(Ordering::SeqCst);
                            // `max >= 1` guards the cast; `usize::try_from`
                            // would be pedantic given the precondition. The
                            // sentinel `-1` (unlimited) short-circuits before
                            // the cast is reached.
                            #[allow(clippy::cast_sign_loss)]
                            let cap = max as usize;
                            if max >= 1 && live >= cap {
                                debug!(
                                    %addr, live, max,
                                    "M224: rejecting inbound TCP connection — global cap reached"
                                );
                                drop(stream);
                                continue;
                            }
                            let guard = LiveConnectionGuard::new(Arc::clone(&self.live_connections));
                            let registry = self.info_hash_registry.clone();
                            futs.push(Self::identify_and_validate(stream, addr, registry, guard));
                        }
                        Err(e) => {
                            warn!(error = %e, "TCP accept failed, backing off");
                            tokio::time::sleep(ACCEPT_ERROR_BACKOFF).await;
                        }
                    }
                }
                result = accept_utp(&mut self.utp_listener) => {
                    match result {
                        Ok((stream, addr)) => {
                            // M224 D5: uTP NOT gated by max_connections_global at
                            // M224 MVP — uTP socket lifecycle is owned by the
                            // SocketActor (a single socket multiplexes many
                            // peers). Carry-forward to M225 with the matching
                            // accept-side counter on the SocketActor.
                            let registry = self.info_hash_registry.clone();
                            let guard = LiveConnectionGuard::new(Arc::clone(&self.live_connections));
                            futs.push(Self::identify_and_validate(
                                BoxedStream::new(stream), addr, registry, guard,
                            ));
                        }
                        Err(e) => {
                            warn!(error = %e, "uTP v4 accept failed, backing off");
                            tokio::time::sleep(ACCEPT_ERROR_BACKOFF).await;
                        }
                    }
                }
                result = accept_utp(&mut self.utp_listener_v6) => {
                    match result {
                        Ok((stream, addr)) => {
                            let registry = self.info_hash_registry.clone();
                            let guard = LiveConnectionGuard::new(Arc::clone(&self.live_connections));
                            futs.push(Self::identify_and_validate(
                                BoxedStream::new(stream), addr, registry, guard,
                            ));
                        }
                        Err(e) => {
                            warn!(error = %e, "uTP v6 accept failed, backing off");
                            tokio::time::sleep(ACCEPT_ERROR_BACKOFF).await;
                        }
                    }
                }
                Some(result) = futs.next(), if !futs.is_empty() => {
                    // M224 D3: BOTH `Ok` (forward to session) and `Err`
                    // (rejected — guard already dropped inside the
                    // future) must match the pattern. If only `Ok`
                    // matched, an `Err` yield would disable the branch
                    // for the rest of this `select!` call and the loop
                    // would deadlock waiting on the other (Pending)
                    // arms — starving any remaining `Err` futures still
                    // in `futs`.
                    if let Ok(conn) = result
                        && self.validated_tx.send(conn).await.is_err()
                    {
                        // SessionActor dropped the receiver — shut down.
                        break;
                    }
                }
            }
        }
    }

    /// Read the BEP 3 preamble, validate the info hash against the registry,
    /// and wrap the stream for replay.
    ///
    /// M224 D3: the caller passes a `LiveConnectionGuard` taken at accept
    /// time. On success, the guard is folded into [`IdentifiedConnection`]
    /// so the live count tracks the connection through forwarding. On
    /// failure (unknown info hash, encrypted preamble, IO error, timeout),
    /// the guard drops here and the live count decrements immediately.
    async fn identify_and_validate<S>(
        stream: S,
        addr: SocketAddr,
        registry: Arc<DashMap<irontide_core::Id20, ()>>,
        live_guard: LiveConnectionGuard,
    ) -> Result<IdentifiedConnection, ()>
    where
        S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
    {
        match tokio::time::timeout(IDENTIFY_TIMEOUT, identify_plaintext_connection(stream)).await {
            Ok(Ok(Some((info_hash, prefixed_stream)))) => {
                if registry.contains_key(&info_hash) {
                    Ok(IdentifiedConnection {
                        stream: BoxedStream::new(prefixed_stream),
                        addr,
                        info_hash,
                        _live_guard: live_guard,
                    })
                } else {
                    debug!(
                        %addr,
                        info_hash = %info_hash,
                        "rejecting inbound connection: unknown info hash"
                    );
                    Err(())
                }
            }
            Ok(Ok(None)) => {
                // Encrypted (MSE) connection — not a plaintext BT handshake.
                debug!(%addr, "rejecting inbound connection: encrypted preamble");
                Err(())
            }
            Ok(Err(e)) => {
                debug!(%addr, error = %e, "error reading preamble from inbound connection");
                Err(())
            }
            Err(_elapsed) => {
                debug!(%addr, "timeout reading preamble from inbound connection");
                Err(())
            }
        }
    }
}

/// Accept a connection from an optional [`TransportListener`].
///
/// Returns `pending` if no listener is available, causing the `select!` arm
/// to be effectively disabled.
async fn accept_transport(
    listener: &mut Option<Box<dyn TransportListener>>,
) -> std::io::Result<(BoxedStream, SocketAddr)> {
    match listener {
        Some(l) => l.accept().await,
        None => std::future::pending().await,
    }
}

/// Accept a connection from an optional uTP listener.
///
/// Returns `pending` if no listener is available, causing the `select!` arm
/// to be effectively disabled.
async fn accept_utp(
    listener: &mut Option<irontide_utp::UtpListener>,
) -> std::io::Result<(irontide_utp::UtpStream, SocketAddr)> {
    match listener {
        Some(l) => l.accept().await.map_err(std::io::Error::other),
        None => std::future::pending().await,
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::transport::TokioListener;
    use tokio::io::AsyncWriteExt;
    use tokio::net::{TcpListener, TcpStream};

    /// Build a valid 48-byte BEP 3 preamble from an info hash.
    fn build_preamble(info_hash: &irontide_core::Id20) -> [u8; 48] {
        let mut buf = [0u8; 48];
        buf[0] = 0x13; // pstrlen
        buf[1..20].copy_from_slice(b"BitTorrent protocol"); // 19 bytes
        // buf[20..28] = reserved (zeros, already zeroed)
        buf[28..48].copy_from_slice(info_hash.as_bytes());
        buf
    }

    /// Create a `ListenerTask` wired to a real TCP listener on localhost,
    /// returning `(task, local_addr, receiver, live_connections)`.
    ///
    /// M224 D4: `live_connections` and `max_connections_global` are returned
    /// so cap-enforcement tests can observe the counter and tweak the cap
    /// without rebuilding the task. Default cap is `-1` (unlimited) to keep
    /// the legacy listener tests behaviour-equivalent.
    async fn setup_listener(
        channel_capacity: usize,
        registry: Arc<DashMap<irontide_core::Id20, ()>>,
    ) -> (
        ListenerTask,
        SocketAddr,
        mpsc::Receiver<IdentifiedConnection>,
        Arc<AtomicI32>,
        Arc<AtomicUsize>,
    ) {
        let tcp = TcpListener::bind("127.0.0.1:0")
            .await
            .expect("bind to ephemeral port");
        let local_addr = tcp.local_addr().expect("local_addr");
        let (tx, rx) = mpsc::channel(channel_capacity);
        let max = Arc::new(AtomicI32::new(-1));
        let live = Arc::new(AtomicUsize::new(0));

        let task = ListenerTask::new(
            Some(Box::new(TokioListener(tcp))),
            None,
            None,
            registry,
            tx,
            Arc::clone(&max),
            Arc::clone(&live),
        );
        (task, local_addr, rx, max, live)
    }

    /// Spawn the task with a never-firing shutdown channel, used by the
    /// pre-B2 tests that exit via the receiver-drop path. The unused
    /// shutdown sender is returned to keep the receiver alive (dropping
    /// the sender would close it and the `select!` arm would fire on
    /// `Closed`, looking like a phantom shutdown).
    fn spawn_with_dummy_shutdown(
        task: ListenerTask,
    ) -> (oneshot::Sender<()>, tokio::task::JoinHandle<()>) {
        let (sd_tx, sd_rx) = oneshot::channel();
        let jh = tokio::spawn(task.run(sd_rx));
        (sd_tx, jh)
    }

    // -----------------------------------------------------------------------
    // Test 1: listener_accepts_valid_handshake
    // -----------------------------------------------------------------------

    #[tokio::test]
    async fn listener_accepts_valid_handshake() {
        let info_hash = irontide_core::Id20::from([0xAA; 20]);
        let registry = Arc::new(DashMap::new());
        registry.insert(info_hash, ());

        let (task, addr, mut rx, _max, _live) = setup_listener(4, registry).await;
        let (_sd_tx, handle) = spawn_with_dummy_shutdown(task);

        let mut client = TcpStream::connect(addr).await.expect("connect to listener");
        client
            .write_all(&build_preamble(&info_hash))
            .await
            .expect("write preamble");

        let conn = tokio::time::timeout(Duration::from_secs(2), rx.recv())
            .await
            .expect("receive within timeout")
            .expect("channel not closed");

        assert_eq!(conn.info_hash, info_hash);
        assert_eq!(
            conn.addr.ip(),
            client.local_addr().expect("client addr").ip()
        );

        // Shut down the listener by dropping the receiver.
        drop(rx);
        // Send another connection to trigger the send-error exit path.
        let _ = TcpStream::connect(addr).await;
        let _ = tokio::time::timeout(Duration::from_secs(1), handle).await;
    }

    // -----------------------------------------------------------------------
    // Test 2: listener_rejects_invalid_protocol
    // -----------------------------------------------------------------------

    #[tokio::test]
    async fn listener_rejects_invalid_protocol() {
        let info_hash = irontide_core::Id20::from([0xBB; 20]);
        let registry = Arc::new(DashMap::new());
        registry.insert(info_hash, ());

        let (task, addr, mut rx, _max, _live) = setup_listener(4, registry).await;
        let (_sd_tx, _handle) = spawn_with_dummy_shutdown(task);

        // Send first byte 0xFF — not a BT preamble.
        let mut client = TcpStream::connect(addr).await.expect("connect to listener");
        let mut bad_preamble = [0u8; 48];
        bad_preamble[0] = 0xFF;
        client
            .write_all(&bad_preamble)
            .await
            .expect("write bad preamble");

        // Nothing should arrive on the channel.
        let result = tokio::time::timeout(Duration::from_millis(200), rx.recv()).await;
        assert!(result.is_err(), "expected timeout, got a connection");
    }

    // -----------------------------------------------------------------------
    // Test 3: listener_rejects_unknown_info_hash
    // -----------------------------------------------------------------------

    #[tokio::test]
    async fn listener_rejects_unknown_info_hash() {
        let known_hash = irontide_core::Id20::from([0xCC; 20]);
        let unknown_hash = irontide_core::Id20::from([0xDD; 20]);
        let registry = Arc::new(DashMap::new());
        registry.insert(known_hash, ());

        let (task, addr, mut rx, _max, _live) = setup_listener(4, registry).await;
        let (_sd_tx, _handle) = spawn_with_dummy_shutdown(task);

        // Send a valid preamble but with an unknown info hash.
        let mut client = TcpStream::connect(addr).await.expect("connect to listener");
        client
            .write_all(&build_preamble(&unknown_hash))
            .await
            .expect("write unknown hash preamble");

        // Nothing should arrive on the channel.
        let result = tokio::time::timeout(Duration::from_millis(200), rx.recv()).await;
        assert!(result.is_err(), "expected timeout, got a connection");
    }

    // -----------------------------------------------------------------------
    // Test 4: listener_timeout_on_slow_handshake
    // -----------------------------------------------------------------------

    #[tokio::test]
    async fn listener_timeout_on_slow_handshake() {
        tokio::time::pause();

        let info_hash = irontide_core::Id20::from([0xEE; 20]);
        let registry = Arc::new(DashMap::new());
        registry.insert(info_hash, ());

        // Test identify_and_validate directly with a DuplexStream that never
        // sends data, so the 5s IDENTIFY_TIMEOUT fires.
        let (client, _server) = tokio::io::duplex(64);
        let addr: SocketAddr = "127.0.0.1:9999".parse().expect("parse addr");

        let identify_fut = ListenerTask::identify_and_validate(
            client,
            addr,
            registry,
            LiveConnectionGuard::new(Arc::new(AtomicUsize::new(0))),
        );

        // Spawn the identify future so we can advance time.
        let handle = tokio::spawn(identify_fut);

        // Advance past the 5s timeout.
        tokio::time::advance(Duration::from_secs(6)).await;

        // The spawned task should now be complete (timeout fired).
        let result = handle.await.expect("task did not panic");
        assert!(result.is_err(), "expected Err(()) from timeout");
    }

    // -----------------------------------------------------------------------
    // Test 5: listener_concurrent_handshakes
    // -----------------------------------------------------------------------

    #[tokio::test]
    async fn listener_concurrent_handshakes() {
        let info_hash = irontide_core::Id20::from([0x11; 20]);
        let registry = Arc::new(DashMap::new());
        registry.insert(info_hash, ());

        let (task, addr, mut rx, _max, _live) = setup_listener(16, registry).await;
        let (_sd_tx, _handle) = spawn_with_dummy_shutdown(task);

        // 10 simultaneous connections: 5 valid, 5 invalid.
        let mut clients = Vec::with_capacity(10);
        for i in 0u8..10 {
            let mut stream = TcpStream::connect(addr).await.expect("connect to listener");
            if i < 5 {
                // Valid preamble.
                stream
                    .write_all(&build_preamble(&info_hash))
                    .await
                    .expect("write valid preamble");
            } else {
                // Invalid first byte.
                let mut bad = [0u8; 48];
                bad[0] = 0xFF;
                stream
                    .write_all(&bad)
                    .await
                    .expect("write invalid preamble");
            }
            clients.push(stream);
        }

        // Exactly 5 valid connections should arrive.
        let mut received = 0;
        for _ in 0..5 {
            let conn = tokio::time::timeout(Duration::from_secs(2), rx.recv())
                .await
                .expect("receive within timeout")
                .expect("channel not closed");
            assert_eq!(conn.info_hash, info_hash);
            received += 1;
        }
        assert_eq!(received, 5);

        // No more should arrive.
        let extra = tokio::time::timeout(Duration::from_millis(200), rx.recv()).await;
        assert!(extra.is_err(), "expected no more connections");
    }

    // -----------------------------------------------------------------------
    // Test 6: listener_futures_unordered_ordering
    // -----------------------------------------------------------------------

    #[tokio::test]
    async fn listener_futures_unordered_ordering() {
        use futures::stream::{FuturesUnordered, StreamExt};
        use tokio::io::AsyncWriteExt;

        tokio::time::pause();

        let info_hash = irontide_core::Id20::from([0x22; 20]);
        let registry = Arc::new(DashMap::new());
        registry.insert(info_hash, ());

        // Two duplex pairs. Stream 1 delays, stream 2 sends immediately.
        let (client1, mut server1) = tokio::io::duplex(256);
        let (client2, mut server2) = tokio::io::duplex(256);

        let addr1: SocketAddr = "127.0.0.1:1001".parse().expect("parse addr");
        let addr2: SocketAddr = "127.0.0.1:1002".parse().expect("parse addr");

        let reg1 = registry.clone();
        let reg2 = registry.clone();

        // M224: shared counter; each call gets its own guard so the
        // FuturesUnordered ordering test doesn't accidentally depend on
        // RAII drop ordering.
        let counter = Arc::new(AtomicUsize::new(0));
        let mut futs = FuturesUnordered::new();
        futs.push(ListenerTask::identify_and_validate(
            client1,
            addr1,
            reg1,
            LiveConnectionGuard::new(Arc::clone(&counter)),
        ));
        futs.push(ListenerTask::identify_and_validate(
            client2,
            addr2,
            reg2,
            LiveConnectionGuard::new(Arc::clone(&counter)),
        ));

        // Send preamble to stream 2 immediately.
        server2
            .write_all(&build_preamble(&info_hash))
            .await
            .expect("write preamble 2");

        // Yield to let the identify future for stream 2 complete.
        tokio::task::yield_now().await;
        tokio::time::advance(Duration::from_millis(1)).await;

        // The first result from FuturesUnordered should be stream 2 (addr2).
        let first = futs.next().await.expect("at least one result");
        let conn = first.expect("stream 2 should succeed");
        assert_eq!(conn.addr, addr2, "second client should arrive first");

        // Now send preamble to stream 1.
        server1
            .write_all(&build_preamble(&info_hash))
            .await
            .expect("write preamble 1");

        tokio::task::yield_now().await;
        tokio::time::advance(Duration::from_millis(1)).await;

        let second = futs.next().await.expect("second result");
        let conn2 = second.expect("stream 1 should succeed");
        assert_eq!(conn2.addr, addr1, "first client should arrive second");
    }

    // -----------------------------------------------------------------------
    // Test 7: listener_channel_backpressure
    // -----------------------------------------------------------------------

    #[tokio::test]
    async fn listener_channel_backpressure() {
        let info_hash = irontide_core::Id20::from([0x33; 20]);
        let registry = Arc::new(DashMap::new());
        registry.insert(info_hash, ());

        // Channel capacity 1: only one connection can be buffered.
        let (task, addr, mut rx, _max, _live) = setup_listener(1, registry).await;
        let (_sd_tx, _handle) = spawn_with_dummy_shutdown(task);

        // Send two valid connections quickly.
        let mut client1 = TcpStream::connect(addr).await.expect("connect client 1");
        client1
            .write_all(&build_preamble(&info_hash))
            .await
            .expect("write preamble 1");

        let mut client2 = TcpStream::connect(addr).await.expect("connect client 2");
        client2
            .write_all(&build_preamble(&info_hash))
            .await
            .expect("write preamble 2");

        // First connection should arrive.
        let conn1 = tokio::time::timeout(Duration::from_secs(2), rx.recv())
            .await
            .expect("first recv within timeout")
            .expect("channel not closed");
        assert_eq!(conn1.info_hash, info_hash);

        // Second connection should arrive after we drained the first.
        let conn2 = tokio::time::timeout(Duration::from_secs(2), rx.recv())
            .await
            .expect("second recv within timeout")
            .expect("channel not closed");
        assert_eq!(conn2.info_hash, info_hash);

        // Verify both arrived without data loss by checking distinct peer addrs.
        assert_ne!(
            conn1.addr, conn2.addr,
            "connections should have distinct peer addresses"
        );
    }

    // -----------------------------------------------------------------------
    // Test 8: listener_shutdown_via_channel_drop
    // -----------------------------------------------------------------------

    #[tokio::test]
    async fn listener_shutdown_via_channel_drop() {
        let info_hash = irontide_core::Id20::from([0x44; 20]);
        let registry = Arc::new(DashMap::new());
        registry.insert(info_hash, ());

        let (task, addr, rx, _max, _live) = setup_listener(4, registry).await;
        let (_sd_tx, handle) = spawn_with_dummy_shutdown(task);

        // Drop the receiver — the task should exit the next time it tries to send.
        drop(rx);

        // Send a valid connection so the task actually attempts to send and
        // discovers the receiver is gone.
        let mut client = TcpStream::connect(addr).await.expect("connect to listener");
        client
            .write_all(&build_preamble(&info_hash))
            .await
            .expect("write preamble");

        // The task should exit within a reasonable time.
        let result = tokio::time::timeout(Duration::from_secs(2), handle).await;
        assert!(
            result.is_ok(),
            "listener task should exit after receiver is dropped"
        );
        // The JoinHandle itself should complete without panic.
        result
            .expect("timeout should not fire")
            .expect("task should not panic");
    }

    // -----------------------------------------------------------------------
    // Test 9: listener_torrent_removed_race
    // -----------------------------------------------------------------------

    #[tokio::test]
    async fn listener_torrent_removed_race() {
        // Tests time-of-check behavior: identify_and_validate validates the
        // info hash at the point of identification. If the hash is removed
        // from the registry afterward, the already-validated connection is
        // still delivered. This is the correct behavior — the session layer
        // handles the post-delivery race.

        let info_hash = irontide_core::Id20::from([0x55; 20]);
        let registry = Arc::new(DashMap::new());
        registry.insert(info_hash, ());

        // Call identify_and_validate directly with a DuplexStream.
        let (client, mut server) = tokio::io::duplex(256);
        let addr: SocketAddr = "127.0.0.1:5555".parse().expect("parse addr");

        let reg = registry.clone();

        // Write the preamble so identify_and_validate can complete.
        server
            .write_all(&build_preamble(&info_hash))
            .await
            .expect("write preamble");

        // Identify while hash is in registry — should succeed.
        let counter = Arc::new(AtomicUsize::new(0));
        let result = ListenerTask::identify_and_validate(
            client,
            addr,
            reg,
            LiveConnectionGuard::new(Arc::clone(&counter)),
        )
        .await;
        let conn = result.expect("connection should be validated");
        assert_eq!(conn.info_hash, info_hash);
        assert_eq!(conn.addr, addr);

        // Now remove the hash from the registry — the connection was already
        // validated, proving the time-of-check is at identification time.
        registry.remove(&info_hash);
        assert!(
            !registry.contains_key(&info_hash),
            "hash should be removed from registry"
        );

        // A new connection with the same hash should now be rejected.
        let (client2, mut server2) = tokio::io::duplex(256);
        let reg2 = Arc::new(DashMap::new()); // empty registry
        server2
            .write_all(&build_preamble(&info_hash))
            .await
            .expect("write preamble 2");
        let counter2 = Arc::new(AtomicUsize::new(0));
        let result2 = ListenerTask::identify_and_validate(
            client2,
            addr,
            reg2,
            LiveConnectionGuard::new(Arc::clone(&counter2)),
        )
        .await;
        assert!(
            result2.is_err(),
            "connection with removed hash should be rejected"
        );
    }

    // -----------------------------------------------------------------------
    // M173 Lane B (B2): shutdown channel + bounded join timeout.
    // -----------------------------------------------------------------------

    /// Send the explicit shutdown signal and assert the task exits within
    /// 100 ms — well under the 5 s [`SHUTDOWN_JOIN_TIMEOUT`]. This is the
    /// happy path for the listen-port-rebind code in B4.
    #[tokio::test]
    async fn listener_shutdown_via_explicit_signal_exits_fast() {
        let info_hash = irontide_core::Id20::from([0x77; 20]);
        let registry = Arc::new(DashMap::new());
        registry.insert(info_hash, ());

        let (task, _addr, _rx, _max, _live) = setup_listener(4, registry).await;
        let (sd_tx, handle) = spawn_with_dummy_shutdown(task);

        // Fire the explicit shutdown signal.
        sd_tx.send(()).expect("send shutdown signal");

        let result = tokio::time::timeout(Duration::from_millis(500), handle).await;
        assert!(
            result.is_ok(),
            "task must exit promptly on shutdown signal (no real connection needed)"
        );
        result
            .expect("timeout did not fire")
            .expect("task did not panic");
    }

    /// `ListenerHandle::shutdown_with_timeout` returns Ok on a clean
    /// shutdown — the happy path for B4's port-rebind code.
    #[tokio::test]
    async fn listener_handle_shutdown_with_timeout_clean_exit() {
        let info_hash = irontide_core::Id20::from([0x88; 20]);
        let registry = Arc::new(DashMap::new());
        registry.insert(info_hash, ());

        let (task, _addr, _rx, _max, _live) = setup_listener(4, registry).await;
        let handle = ListenerHandle::spawn(task);

        let result =
            tokio::time::timeout(Duration::from_secs(2), handle.shutdown_with_timeout()).await;
        let inner = result.expect("outer timeout did not fire");
        assert!(
            inner.is_ok(),
            "clean shutdown should return Ok within the deadline: {inner:?}"
        );
    }

    /// `ListenerHandle::abort` keeps the shutdown sender held until the
    /// handle is dropped — the receiver hits `RecvError` ONLY when the
    /// `ListenerHandle` itself is dropped, which is the session-teardown
    /// signal. Pins the contract that makes `SessionActor::start_full`
    /// safe.
    #[tokio::test]
    async fn listener_handle_abort_clean_join() {
        let info_hash = irontide_core::Id20::from([0x99; 20]);
        let registry = Arc::new(DashMap::new());
        registry.insert(info_hash, ());

        let (task, _addr, _rx, _max, _live) = setup_listener(4, registry).await;
        let handle = ListenerHandle::spawn(task);

        // Abort — the task is now waiting on either an event OR the
        // dropped shutdown sender's RecvError.
        let join_handle = handle.abort();
        let result = tokio::time::timeout(Duration::from_secs(2), join_handle).await;
        assert!(
            result.is_ok(),
            "abort should join cleanly via the dropped shutdown sender's RecvError"
        );
        result.expect("outer timeout").expect("task did not panic");
    }

    // -----------------------------------------------------------------------
    // M224 G1: TCP-inbound cap enforcement + LiveConnectionGuard RAII
    // -----------------------------------------------------------------------

    /// Drive the live count up to N by opening N TCP connections that
    /// stall mid-preamble. Each connection sits in `identify_and_validate`
    /// holding its `LiveConnectionGuard` until `IDENTIFY_TIMEOUT` fires
    /// (5s default) — plenty of slack to observe the steady-state count.
    async fn open_stalled_connection(addr: SocketAddr) -> TcpStream {
        let stream = TcpStream::connect(addr)
            .await
            .expect("connect to listener");
        // Don't send any preamble — the listener will block on read.
        stream
    }

    /// Connect once, returning `Ok(stream)` if the OS-level handshake stuck
    /// or the typed `Err` when the listener's over-cap drop raced the
    /// client's `connect` and returned `ECONNRESET`. Used for the
    /// over-cap connects in `m224_g1_tcp_cap_enforces_*` — those MUST
    /// be observed to keep `live` capped, whether they appear to the
    /// client as connected-then-RST or fail at connect.
    async fn try_open_stalled_connection(addr: SocketAddr) -> std::io::Result<TcpStream> {
        TcpStream::connect(addr).await
    }

    /// Wait (busy-loop with short sleeps) until `live` reaches `target`
    /// or the deadline expires. Returns the observed value.
    async fn wait_for_live(live: &Arc<AtomicUsize>, target: usize, deadline: Duration) -> usize {
        let start = std::time::Instant::now();
        loop {
            let v = live.load(Ordering::SeqCst);
            if v >= target || start.elapsed() >= deadline {
                return v;
            }
            tokio::time::sleep(Duration::from_millis(20)).await;
        }
    }

    #[tokio::test]
    async fn m224_g1_tcp_cap_enforces_at_accept_gate() {
        // D4: bind on `127.0.0.1:0` so the OS picks the port; tests in
        // parallel cannot port-clash. Cap=2; open 2 under-cap connections
        // and assert live converges to 2. Open more, and live MUST NOT
        // exceed 2 over the observation window. The drain half of the
        // contract is covered by `m224_g1_live_guard_drops_decrement_counter`
        // — splitting the two halves avoids a kernel-queue race where a
        // queued over-cap accept fills a freshly-freed slot during drain.
        let info_hash = irontide_core::Id20::from([0xEE; 20]);
        let registry = Arc::new(DashMap::new());
        registry.insert(info_hash, ());

        let (task, addr, _rx, max, live) = setup_listener(4, registry).await;
        max.store(2, Ordering::SeqCst);
        let (_sd_tx, handle) = spawn_with_dummy_shutdown(task);

        // Drive live → 2 with 2 stalled connections under cap.
        let _s1 = open_stalled_connection(addr).await;
        let _s2 = open_stalled_connection(addr).await;
        let observed = wait_for_live(&live, 2, Duration::from_secs(2)).await;
        assert_eq!(
            observed, 2,
            "expected live=2 after 2 accepts under cap, got {observed}"
        );

        // Open 3 MORE over-cap connections. The accept gate must hold
        // `live` at the cap regardless of how the kernel races RST back
        // to the client (`connect` may return `Ok` or `ECONNRESET`).
        let _s3 = try_open_stalled_connection(addr).await;
        let _s4 = try_open_stalled_connection(addr).await;
        let _s5 = try_open_stalled_connection(addr).await;

        // Observe live over a 500 ms window; the cap must hold the
        // whole time, not just at the end. Under-cap connections do
        // not naturally exit (they're stalled in `read_exact`), so
        // any growth past 2 here is a real gate breach.
        let deadline = std::time::Instant::now() + Duration::from_millis(500);
        while std::time::Instant::now() < deadline {
            let v = live.load(Ordering::SeqCst);
            assert!(
                v <= 2,
                "cap=2 must hold across all accepts, observed live={v}"
            );
            tokio::time::sleep(Duration::from_millis(20)).await;
        }

        drop(handle);
    }

    #[tokio::test]
    async fn m224_g1_live_guard_drops_decrement_counter() {
        // D3 RAII contract: dropping the `LiveConnectionGuard` (here via
        // client-side stream drop → server-side EOF → `Err` return →
        // guard drop) must decrement `live_connections` back to its
        // pre-accept value. Cap is left at `-1` (unlimited) so the
        // accept gate never rejects — this isolates the drop path.
        let info_hash = irontide_core::Id20::from([0xDD; 20]);
        let registry = Arc::new(DashMap::new());
        registry.insert(info_hash, ());

        let (task, addr, _rx, _max, live) = setup_listener(4, registry).await;
        let (_sd_tx, handle) = spawn_with_dummy_shutdown(task);

        let s1 = open_stalled_connection(addr).await;
        let s2 = open_stalled_connection(addr).await;
        let observed = wait_for_live(&live, 2, Duration::from_secs(2)).await;
        assert_eq!(observed, 2, "expected live=2 before drop, got {observed}");

        // Drop both client streams. Server-side `read_exact` sees EOF,
        // `identify_and_validate` returns `Err`, the guards drop, and
        // `live` drains to 0.
        drop(s1);
        drop(s2);
        let observed = wait_for_live_to_drain(&live, 0, Duration::from_secs(3)).await;
        assert_eq!(
            observed, 0,
            "LiveConnectionGuard must decrement counter on drop; got live={observed}"
        );

        drop(handle);
    }

    /// Wait until `live` drops to `target` (used after closing connections).
    async fn wait_for_live_to_drain(
        live: &Arc<AtomicUsize>,
        target: usize,
        deadline: Duration,
    ) -> usize {
        let start = std::time::Instant::now();
        loop {
            let v = live.load(Ordering::SeqCst);
            if v <= target || start.elapsed() >= deadline {
                return v;
            }
            tokio::time::sleep(Duration::from_millis(20)).await;
        }
    }

    #[tokio::test]
    async fn m224_g1_tcp_cap_minus_one_means_unlimited() {
        // D2: `-1` is the wire sentinel for unlimited. The listener must
        // never gate accepts when max == -1, regardless of live count.
        let info_hash = irontide_core::Id20::from([0xEF; 20]);
        let registry = Arc::new(DashMap::new());
        registry.insert(info_hash, ());

        let (task, addr, _rx, max, live) = setup_listener(8, registry).await;
        // Confirm setup_listener seeds -1.
        assert_eq!(max.load(Ordering::SeqCst), -1);
        let (_sd_tx, handle) = spawn_with_dummy_shutdown(task);

        let _conns = futures::future::join_all(
            (0..5).map(|_| open_stalled_connection(addr)),
        )
        .await;
        let observed = wait_for_live(&live, 5, Duration::from_secs(2)).await;
        assert_eq!(
            observed, 5,
            "unlimited cap (-1) must accept all 5, got live={observed}"
        );

        drop(handle);
    }
}