irontide-engine 1.2.1

IronTide engine runtime: the per-torrent actor, peer I/O loops, and the non-leaf engine infrastructure (disk I/O, tracker management, alerts, streaming, extensions, SSL) — the renamed irontide-torrent-actor
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
#![allow(
    clippy::cast_possible_truncation,
    reason = "M175: hash verification — piece arithmetic bounded by num_pieces (u32 by construction in Lengths::new)"
)]

//! `TorrentActor` hash verification, smart banning, parole, and BEP 52 hash coordination.

use std::collections::VecDeque;
use std::net::SocketAddr;
use std::time::{Duration, Instant};

use tracing::{debug, error, info, warn};

use crate::alert::{AlertKind, post_alert};
use crate::disk::DiskJobFlags;
use crate::piece_reservation::{PieceState, PieceTracker};
use crate::torrent::{HashResult, TorrentActor, now_unix, serve_hashes};
use crate::types::{PeerCommand, TorrentState};

use irontide_core::DEFAULT_CHUNK_SIZE;

/// M257g: record a completed piece's download duration into the steal-threshold
/// estimator (`completed_piece_times`, 100-entry cap), reading the reservation
/// age from whichever tracker owns the piece.
///
/// The M187 `DirectDispatch` [`PieceTracker`] is consulted first via
/// [`PieceTracker::take_inflight`], which removes the reservation (exactly as the
/// old `mark_piece_hash_ok` call did) **and** returns its age. This is the fix for
/// the orphaned estimator: pre-M257g this path discarded the age and the estimator
/// was fed only from the dead pre-M187 `inflight_started` array, so
/// `completed_piece_times` stayed empty -> `peer_avg_time` pinned at the 30 s
/// fallback -> endgame steal threshold 90 s -> the contended endgame stall. The
/// `inflight_started` array remains the fallback for the legacy CAS path. Returns
/// the recorded duration, or `None` if the piece was tracked by neither (e.g. a
/// seed-mode piece that was never reserved).
fn record_piece_completion_time(
    piece_tracker: Option<&mut PieceTracker>,
    inflight_started: &mut [Option<Instant>],
    completed_piece_times: &mut VecDeque<Duration>,
    index: u32,
) -> Option<Duration> {
    let elapsed = piece_tracker
        .and_then(|pt| pt.take_inflight(index))
        .or_else(|| {
            inflight_started
                .get_mut(index as usize)
                .and_then(Option::take)
                .map(|started| started.elapsed())
        });
    if let Some(e) = elapsed {
        completed_piece_times.push_back(e);
        if completed_piece_times.len() > 100 {
            completed_piece_times.pop_front();
        }
    }
    elapsed
}

impl TorrentActor {
    /// M92: Process a single block completion — extracted from `handle_chunk_written()`.
    /// Called once per block in a batch. Returns `true` if this block completed a piece
    /// and piece verification was triggered.
    ///
    /// IMPORTANT: This is a line-for-line extraction of `handle_chunk_written()`.
    /// Every edge case (duplicate detection, end-game cancels, v2/hybrid verification,
    /// predictive Have, smart banning) is preserved identically.
    pub(crate) async fn process_block_completion(
        &mut self,
        peer_addr: SocketAddr,
        index: u32,
        begin: u32,
        length: u32,
        rtt: Option<std::time::Duration>,
    ) -> bool {
        // Skip duplicate blocks — in end-game mode or after timeout re-requests,
        // the same block may arrive from multiple peers. Writing it to the store
        // buffer would overwrite valid data that's pending verification.
        if let Some(ref ct) = self.chunk_tracker
            && ct.has_chunk(index, begin)
        {
            self.total_download += u64::from(length) + 13;
            self.counters
                .inc(crate::stats::PROTO_WASTE_BYTES, i64::from(length) + 13);
            // Remove from pending_requests to free pipeline slots. Without this,
            // the peer accumulates phantom entries from already-verified pieces
            // and eventually has zero available pipeline slots — permanent stall.
            if let Some(peer) = self.peers.get_mut(&peer_addr) {
                peer.pending_requests.remove(index, begin);
            }
            // Remove from end-game tracker so pick_block won't return this
            // block again. The normal path calls block_received which does
            // this, but we skip that path for duplicates.
            if self.end_game.is_active() {
                self.end_game.block_received(index, begin, peer_addr);
            }
            // Peer task already returned its permit on direct write
            return false;
        }

        // NOTE: No disk write here — the peer task has already written to disk.

        self.downloaded += u64::from(length);
        self.total_download += u64::from(length) + 13; // payload + message header
        self.last_download = now_unix();
        self.need_save_resume = true;

        // M93: Track piece ownership (actor learns about peer's CAS reservation via chunk arrival)
        if let Some(slab_idx) = self.peer_slab.slot_of(&peer_addr)
            && self.piece_owner.get(index as usize) == Some(&None)
        {
            self.piece_owner[index as usize] = Some(slab_idx);
            // M103: Add to steal queue if piece has unrequested blocks
            if let (Some(sc), Some(bm)) = (&self.steal_candidates, &self.block_maps)
                && let Some(lengths) = &self.lengths
            {
                let total_blocks = lengths.chunks_in_piece(index);
                if bm.next_unrequested(index, total_blocks).is_some() {
                    sc.push(index);
                }
            }
        }

        // Smart banning: track which peers contribute to each piece
        self.piece_contributors
            .entry(index)
            .or_default()
            .insert(peer_addr.ip());

        let now = std::time::Instant::now();
        if let Some(peer) = self.peers.get_mut(&peer_addr) {
            peer.pending_requests.remove(index, begin);
            peer.download_bytes_window += u64::from(length);
            peer.download_bytes_total += u64::from(length);
            peer.pipeline.block_received(length);
            // M106/M257f: RTT is measured by the peer task (the only
            // component that sees both the Request send and the Piece
            // receipt post-M104) and carried on the block batch. Feeds
            // eviction scoring and the BDP depth caps.
            if let Some(rtt) = rtt {
                let rtt_secs = rtt.as_secs_f64();
                peer.avg_rtt = Some(crate::peer_state::ewma_update(
                    peer.avg_rtt.unwrap_or(rtt_secs),
                    rtt_secs,
                    crate::peer_state::RTT_EWMA_ALPHA,
                ));
            }
            peer.blocks_completed = peer.blocks_completed.saturating_add(1);
            peer.last_data_received = Some(now);
            // Clear snub if snubbed
            if peer.snubbed {
                peer.snubbed = false;
            }
        }
        // M137: Backoff is now automatically reset by mark_live() in PeerStates.

        // End-game: cancel this block on all other peers. The 200ms end-game
        // refill tick will re-stock freed peers — no reactive cascade needed.
        if self.end_game.is_active() {
            let cancels = self.end_game.block_received(index, begin, peer_addr);
            for (cancel_addr, ci, cb, cl) in cancels {
                if let Some(cancel_peer) = self.peers.get_mut(&cancel_addr) {
                    let _ = cancel_peer.cmd_tx.try_send(PeerCommand::Cancel {
                        index: ci,
                        begin: cb,
                        length: cl,
                    });
                    cancel_peer.pending_requests.remove(ci, cb);
                }
            }
        }

        // Track chunk completion
        let piece_complete = if let Some(ref mut ct) = self.chunk_tracker {
            ct.chunk_received(index, begin)
        } else {
            false
        };

        if piece_complete && !self.pending_verify.contains(&index) {
            // M44/M118: Predictive piece announce — broadcast Have before verification
            if self.config.predictive_piece_announce_ms > 0
                && !self.predictive_have_sent.contains(&index)
            {
                self.predictive_have_sent.insert(index);
                let _ = self.have_broadcast_tx.send(index);
            }

            // M100: Flush deferred writes before verification — ensures all
            // blocks are on disk so read_piece() sees complete data.
            if let Some(ref disk) = self.disk {
                disk.flush_piece_writes(index).await;
            }

            match self.version {
                irontide_core::TorrentVersion::V1Only => {
                    // Async: fire-and-forget, result via verify_result_rx
                    if let Some(ref disk) = self.disk
                        && let Some(expected) = self
                            .meta
                            .as_ref()
                            .and_then(|m| m.info.piece_hash(index as usize))
                    {
                        self.pending_verify.insert(index);
                        let generation = self
                            .piece_generations
                            .get(index as usize)
                            .copied()
                            .unwrap_or(0);
                        disk.enqueue_verify(index, expected, generation, &self.verify_result_tx);
                    }
                }
                irontide_core::TorrentVersion::V2Only => {
                    // Blocking: needs mutable hash_picker for Merkle tree
                    self.verify_and_mark_piece_v2(index).await;
                }
                irontide_core::TorrentVersion::Hybrid => {
                    // Blocking: needs both v1+v2 decision matrix
                    self.verify_and_mark_piece_hybrid(index).await;
                }
            }
        }

        // Peer task already returned its permit on direct write.
        // End-game dispatch still happens here.
        if self.end_game.is_active() {
            self.request_end_game_block(peer_addr).await;
        }

        piece_complete
    }

    pub(crate) async fn verify_and_mark_piece(&mut self, index: u32) {
        match self.version {
            irontide_core::TorrentVersion::V1Only => {
                self.verify_and_mark_piece_v1(index).await;
            }
            irontide_core::TorrentVersion::V2Only => {
                self.verify_and_mark_piece_v2(index).await;
            }
            irontide_core::TorrentVersion::Hybrid => {
                self.verify_and_mark_piece_hybrid(index).await;
            }
        }
    }

    /// SHA-1 piece verification (v1 torrents).
    async fn verify_and_mark_piece_v1(&mut self, index: u32) {
        let expected_hash = self
            .meta
            .as_ref()
            .and_then(|m| m.info.piece_hash(index as usize));

        let verified = if let (Some(disk), Some(expected)) = (&self.disk, expected_hash) {
            disk.verify_piece(index, expected, DiskJobFlags::empty())
                .await
                .unwrap_or(false)
        } else {
            false
        };

        if verified {
            self.on_piece_verified(index).await;
        } else {
            self.on_piece_hash_failed(index).await;
        }
    }

    /// SHA-256 per-block Merkle verification (v2 torrents, BEP 52).
    pub(crate) async fn verify_and_mark_piece_v2(&mut self, index: u32) {
        let result = self.run_v2_block_verification(index).await;
        match result {
            HashResult::Passed => self.on_piece_verified(index).await,
            HashResult::Failed => self.on_piece_hash_failed(index).await,
            HashResult::NotApplicable => {
                // Blocks stored, will resolve when piece-layer hashes arrive
            }
        }
    }

    /// Run SHA-256 per-block Merkle verification and return a `HashResult`
    /// without triggering side effects (no `on_piece_verified`/`on_piece_hash_failed`).
    ///
    /// Extracted from `verify_and_mark_piece_v2` so it can be reused in hybrid
    /// dual-verification without double-firing callbacks.
    pub(crate) async fn run_v2_block_verification(&mut self, index: u32) -> HashResult {
        let disk = match &self.disk {
            Some(d) => d.clone(),
            None => return HashResult::NotApplicable,
        };

        let lengths = match &self.lengths {
            Some(l) => l.clone(),
            None => return HashResult::NotApplicable,
        };

        // Flush write buffer before reading back for hashing
        if let Err(e) = disk.flush_piece(index).await {
            warn!(index, "failed to flush piece for v2 verification: {e}");
            return HashResult::NotApplicable;
        }

        // Compute SHA-256 of each 16 KiB block and feed to Merkle tree
        let num_chunks = lengths.chunks_in_piece(index);
        let blocks_per_piece = (lengths.piece_length() as u32) / DEFAULT_CHUNK_SIZE;
        let mut all_ok = true;

        for chunk_idx in 0..num_chunks {
            let Some((begin, length)) = lengths.chunk_info(index, chunk_idx) else {
                continue;
            };

            let block_hash = match disk
                .hash_block(index, begin, length, DiskJobFlags::empty())
                .await
            {
                Ok(h) => h,
                Err(e) => {
                    warn!(index, chunk_idx, "failed to hash block: {e}");
                    all_ok = false;
                    break;
                }
            };

            if let Some(ref mut picker) = self.hash_picker {
                // Gap 6: single-file assumption — multi-file mapping deferred to M35
                let file_index = 0;
                let global_block = index * blocks_per_piece + chunk_idx;
                // Gap 10: 3-arg set_block_hash (offset removed)
                match picker.set_block_hash(file_index, global_block, block_hash) {
                    irontide_core::SetBlockResult::Ok => {
                        if let Some(ref mut ct) = self.chunk_tracker {
                            ct.mark_block_verified(index, chunk_idx);
                        }
                    }
                    irontide_core::SetBlockResult::Unknown => {
                        // Piece-layer hash not yet available — stored for deferred verification
                        debug!(
                            index,
                            chunk_idx, "block hash stored, awaiting piece-layer hashes"
                        );
                    }
                    irontide_core::SetBlockResult::HashFailed => {
                        warn!(index, chunk_idx, "block hash failed Merkle verification");
                        return HashResult::Failed;
                    }
                }
            }
        }

        if all_ok
            && self
                .chunk_tracker
                .as_ref()
                .is_some_and(|ct| ct.all_blocks_verified(index))
        {
            HashResult::Passed
        } else {
            // Either a disk error (all_ok = false) or blocks stored awaiting piece-layer hashes
            HashResult::NotApplicable
        }
    }

    /// Dual SHA-1 + SHA-256 verification for hybrid torrents.
    ///
    /// Runs both v1 (whole-piece SHA-1) and v2 (per-block SHA-256 Merkle)
    /// verification. Decision matrix:
    /// - Both Passed → piece verified
    /// - Both Failed → piece hash failed (normal re-request / parole path)
    /// - One Passed + one Failed → inconsistent hashes (fatal, pauses torrent)
    /// - Any `NotApplicable` → deferred (v2 blocks stored, will resolve later)
    pub(crate) async fn verify_and_mark_piece_hybrid(&mut self, index: u32) {
        // ── v1 verification (SHA-1 whole-piece) ──
        let v1_result = {
            let expected_hash = self
                .meta
                .as_ref()
                .and_then(|m| m.info.piece_hash(index as usize));

            if let (Some(disk), Some(expected)) = (&self.disk, expected_hash) {
                match disk
                    .verify_piece(index, expected, DiskJobFlags::empty())
                    .await
                {
                    Ok(true) => HashResult::Passed,
                    Ok(false) => HashResult::Failed,
                    Err(_) => HashResult::NotApplicable,
                }
            } else {
                HashResult::NotApplicable
            }
        };

        // ── v2 verification (SHA-256 per-block Merkle) ──
        let v2_result = self.run_v2_block_verification(index).await;

        // ── Decision matrix ──
        match (v1_result, v2_result) {
            // Both agree: piece is good
            (HashResult::Passed, HashResult::Passed) => {
                self.on_piece_verified(index).await;
            }
            // Both agree piece is bad, or v1 failed with v2 deferred — fail immediately
            // (no point waiting for v2 when v1 already rejected).
            (HashResult::Failed, HashResult::Failed | HashResult::NotApplicable) => {
                self.on_piece_hash_failed(index).await;
            }
            // One passes, one fails: the .torrent metadata is inconsistent
            (HashResult::Passed, HashResult::Failed) | (HashResult::Failed, HashResult::Passed) => {
                self.on_inconsistent_hashes(index).await;
            }
            // v2 deferred (awaiting piece-layer hashes) but v1 passed: defer the whole thing.
            // When piece-layer hashes arrive, handle_hashes_received will re-verify.
            (HashResult::Passed, HashResult::NotApplicable) => {
                debug!(
                    index,
                    "hybrid: v1 passed, v2 deferred — waiting for piece-layer hashes"
                );
            }
            // v1 not applicable (missing meta/disk): defer
            (HashResult::NotApplicable, _) => {
                debug!(index, "hybrid: v1 not applicable — deferring");
            }
        }
    }

    /// Common success path after a piece passes verification (v1 SHA-1 or v2 Merkle).
    /// Handle a result from the `HashPool` (M96).
    pub(crate) async fn handle_hash_result(&mut self, result: crate::hash_pool::HashResult) {
        self.pending_verify.remove(&result.piece);

        // Staleness check
        let current_gen = self
            .piece_generations
            .get(result.piece as usize)
            .copied()
            .unwrap_or(0);
        if result.generation != current_gen {
            tracing::debug!(
                piece = result.piece,
                result_gen = result.generation,
                current_gen,
                "discarding stale hash result"
            );
            return;
        }

        // Guard: ignore results for already-verified pieces
        let dominated = self
            .chunk_tracker
            .as_ref()
            .is_some_and(|ct| ct.bitfield().get(result.piece));
        if dominated {
            return;
        }

        if result.passed {
            self.on_piece_verified(result.piece).await;
        } else {
            self.on_piece_hash_failed(result.piece).await;
        }
    }

    pub(crate) async fn on_piece_verified(&mut self, index: u32) {
        if let Some(ref mut ct) = self.chunk_tracker {
            ct.mark_verified(index);
        }
        self.piece_contributors.remove(&index);
        // Remove stale end-game blocks for this piece. Without this,
        // pick_block() returns these blocks, peers request them, has_chunk
        // rejects them, and register_request accumulates until every peer
        // is registered for every stale block — permanent stall.
        if self.end_game.is_active() {
            self.end_game.remove_piece(index);
        }
        // M93: Mark piece complete atomically
        if let Some(ref states) = self.atomic_states {
            states.mark_complete(index);
        }
        // M103: Clean up block stealing state
        if let Some(ref sc) = self.steal_candidates {
            sc.remove(index);
        }
        if let (Some(bm), Some(lengths)) = (&self.block_maps, &self.lengths) {
            bm.clear(index, lengths.chunks_in_piece(index));
        }
        self.piece_owner[index as usize] = None;
        // M187 + M257g: clear the completed reservation AND feed its download time
        // into the M149 steal-threshold estimator. Pre-M257g the DirectDispatch
        // path called `mark_piece_hash_ok` (which discards the duration) while the
        // estimator was fed only from the dead pre-M187 `inflight_started` array,
        // so `completed_piece_times` stayed empty -> `peer_avg_time` pinned at the
        // 30 s fallback -> endgame steal threshold 30 s x 3 = 90 s -> the contended
        // endgame stall (evidence: docs/investigations/2026-06-14-m257g-*). The
        // helper's `take_inflight` performs the same removal as `mark_piece_hash_ok`
        // and returns the reservation age, restoring the estimator as M149 designed.
        record_piece_completion_time(
            self.piece_tracker.as_mut(),
            &mut self.inflight_started,
            &mut self.completed_piece_times,
            index,
        );
        if let Some(ref notify) = self.reservation_notify {
            // M257e: wake ALL parked requesters — a single notify_one()
            // lets the herd starve when the woken peer re-acquires
            // NoneAvailable and the wake cascade dies.
            notify.notify_waiters();
            self.counters
                .inc(crate::stats::DISPATCH_NOTIFY_WAKEUP_TOTAL, 1);
        }
        info!(index, "piece verified");
        post_alert(
            &self.alert_tx,
            &self.alert_mask,
            AlertKind::PieceFinished {
                info_hash: self.info_hash,
                piece: index,
            },
        );

        // Notify FileStream consumers of piece completion
        let _ = self.piece_ready_tx.send(index);
        if let Some(ref ct) = self.chunk_tracker {
            let _ = self.have_watch_tx.send(ct.bitfield().clone());
        }

        // Check if the completed piece finishes any file (FileCompleted alert)
        self.check_file_completion(index);

        // Handle parole success: the parole peer delivered a good piece,
        // so the original contributors are the likely offenders.
        if let Some(parole) = self.parole_pieces.remove(&index) {
            self.apply_parole_success(index, parole).await;
        }

        // M118: Broadcast Have to all peers via broadcast channel (skip in super-seed mode).
        // Per-peer filtering (skip if peer already has piece) is done receiver-side
        // in PeerConnection via should_transmit_have().
        if self.super_seed.is_none() {
            let already_announced = self.predictive_have_sent.remove(&index);
            if !already_announced {
                let _ = self.have_broadcast_tx.send(index);
            }
        }

        // M44: suggest newly-verified piece to peers that don't have it
        if self.config.suggest_mode {
            let max_suggest = self.config.max_suggest_pieces;
            let peer_addrs: Vec<SocketAddr> = self.peers.keys().copied().collect();
            for peer_addr in peer_addrs {
                let already = self.suggested_to_peers.entry(peer_addr).or_default();
                if already.len() >= max_suggest {
                    continue;
                }
                let should_suggest = self
                    .peers
                    .get(&peer_addr)
                    .is_some_and(|p| !p.bitfield.get(index));
                if should_suggest
                    && !already.contains(&index)
                    && let Some(peer) = self.peers.get(&peer_addr)
                {
                    let _ = peer.cmd_tx.try_send(PeerCommand::SuggestPiece(index));
                    already.insert(index);
                }
            }
        }

        // Share mode LRU: track piece, evict oldest if over capacity.
        // In share mode, we never "finish" — we keep cycling pieces.
        if self.share_max_pieces > 0 {
            self.share_lru.push_back(index);
            while self.share_lru.len() > self.share_max_pieces {
                if let Some(evicted) = self.share_lru.pop_front() {
                    if let Some(ref mut ct) = self.chunk_tracker {
                        ct.clear_piece(evicted);
                    }
                    // Re-add to wanted so it can be re-downloaded later
                    if evicted < self.wanted_pieces.len() {
                        self.wanted_pieces.set(evicted);
                    }
                    debug!(evicted, "share mode: evicted piece from LRU");
                }
            }
        }

        // Check if download is complete (skip in share mode — never finishes)
        if self.share_max_pieces == 0
            && let Some(ref ct) = self.chunk_tracker
            && ct.bitfield().count_ones() == self.num_pieces
        {
            info!("download complete, transitioning to seeding");
            post_alert(
                &self.alert_tx,
                &self.alert_mask,
                AlertKind::TorrentFinished {
                    info_hash: self.info_hash,
                },
            );
            self.end_game.deactivate();
            self.transition_state(TorrentState::Seeding);
            self.choker.set_seed_mode(true);
            // BEP 21: broadcast upload-only status
            if self.config.upload_only_announce {
                let hs = irontide_wire::ExtHandshake::new_upload_only();
                for peer in self.peers.values() {
                    let _ = peer
                        .cmd_tx
                        .try_send(PeerCommand::SendExtHandshake(hs.clone()));
                }
            }
            // Announce completion to trackers
            let result = self
                .tracker_manager
                .announce_completed(self.uploaded, self.downloaded)
                .await;
            self.fire_tracker_alerts(&result.outcomes);
        }
    }

    /// Common failure path after a piece fails hash verification.
    #[allow(
        clippy::unused_async,
        reason = "handler method called from async select! loop"
    )]
    pub(crate) async fn on_piece_hash_failed(&mut self, index: u32) {
        let contributors: Vec<std::net::IpAddr> = self
            .piece_contributors
            .remove(&index)
            .unwrap_or_default()
            .into_iter()
            .collect();

        warn!(
            index,
            contributors = contributors.len(),
            "piece hash verification failed"
        );

        let failed_piece_size = self
            .lengths
            .as_ref()
            .map_or(0, |l| u64::from(l.piece_size(index)));
        self.total_failed_bytes += failed_piece_size;

        self.counters.inc(crate::stats::PROTO_HASHFAILS, 1);
        self.counters.inc(
            crate::stats::PROTO_WASTE_BYTES,
            failed_piece_size.cast_signed(),
        );

        self.predictive_have_sent.remove(&index);

        // Check if this is a parole failure
        if let Some(parole) = self.parole_pieces.remove(&index) {
            self.apply_parole_failure(index, &parole);
        } else {
            // First failure: enter parole if enabled
            self.enter_parole(index, contributors.clone());
        }

        post_alert(
            &self.alert_tx,
            &self.alert_mask,
            AlertKind::HashFailed {
                info_hash: self.info_hash,
                piece: index,
                contributors,
            },
        );
        if let Some(ref mut ct) = self.chunk_tracker {
            ct.mark_failed(index);
        }
        // M96: Increment generation to invalidate any in-flight hash for this piece
        if let Some(g) = self.piece_generations.get_mut(index as usize) {
            *g += 1;
        }
        // M93: Release piece back to Available
        if let Some(ref states) = self.atomic_states {
            states.release(index);
        }
        // M103: Clean up block stealing state on hash failure
        if let Some(ref sc) = self.steal_candidates {
            sc.remove(index);
        }
        if let (Some(bm), Some(lengths)) = (&self.block_maps, &self.lengths) {
            bm.clear(index, lengths.chunks_in_piece(index));
        }
        self.piece_owner[index as usize] = None;
        // M187: Return piece to direct-acquire queue.
        if let Some(ref mut pt) = self.piece_tracker {
            pt.mark_piece_hash_failed(index);
        }
        // M149: Clear inflight tracking without recording (hash failed)
        if let Some(slot) = self.inflight_started.get_mut(index as usize) {
            *slot = None;
        }
        if let Some(ref notify) = self.reservation_notify {
            // M257e: wake ALL parked requesters — a single notify_one()
            // lets the herd starve when the woken peer re-acquires
            // NoneAvailable and the wake cascade dies.
            notify.notify_waiters();
            self.counters
                .inc(crate::stats::DISPATCH_NOTIFY_WAKEUP_TOTAL, 1);
        }
        // Hash failure in end-game: deactivate and resume normal mode
        if self.end_game.is_active() {
            self.end_game.deactivate();
            info!(index, "end-game deactivated due to hash failure");
            // M93: Transition Endgame pieces back
            if let Some(ref atomic_states) = self.atomic_states {
                for piece in 0..self.num_pieces {
                    if atomic_states.get(piece) == PieceState::Endgame {
                        if self.piece_owner[piece as usize].is_some() {
                            atomic_states.force_reserved(piece);
                        } else {
                            atomic_states.release(piece);
                        }
                    }
                }
            }
            if let Some(ref notify) = self.reservation_notify {
                notify.notify_waiters();
                self.counters
                    .inc(crate::stats::DISPATCH_NOTIFY_WAKEUP_TOTAL, 1);
            }
        }
    }

    /// Handle v1/v2 hash inconsistency — the torrent data itself is corrupt.
    ///
    /// Matching libtorrent: destroy hash picker, pause the torrent, post error alert.
    #[allow(
        clippy::unused_async,
        reason = "handler method called from async select! loop"
    )]
    async fn on_inconsistent_hashes(&mut self, piece: u32) {
        let info_hash = self.info_hash;
        error!(
            piece,
            info_hash = %info_hash,
            "v1 and v2 hashes are inconsistent — torrent data is corrupt"
        );

        // Destroy hash picker (Merkle tree state is untrustworthy)
        self.hash_picker = None;

        // Post specific inconsistency alert
        post_alert(
            &self.alert_tx,
            &self.alert_mask,
            AlertKind::InconsistentHashes { info_hash, piece },
        );

        // Post generic error alert for broader consumers
        post_alert(
            &self.alert_tx,
            &self.alert_mask,
            AlertKind::TorrentError {
                info_hash,
                message: format!("v1 and v2 hashes do not describe the same data (piece {piece})"),
            },
        );

        // Pause the torrent — transition state to Paused
        self.transition_state(TorrentState::Paused);
    }

    // ── BEP 52 hash event handlers (M34) ────────────────────────────────

    /// Process received piece-layer or block-layer hashes from a peer.
    pub(crate) async fn handle_hashes_received(
        &mut self,
        peer_addr: SocketAddr,
        request: irontide_core::HashRequest,
        hashes: Vec<irontide_core::Id32>,
    ) {
        let Some(ref mut picker) = self.hash_picker else {
            debug!(peer = %peer_addr, "received hashes but no hash picker (v1 torrent)");
            return;
        };

        match picker.add_hashes(&request, &hashes) {
            Ok(result) => {
                if !result.valid {
                    warn!(peer = %peer_addr, "received hashes failed Merkle proof validation");
                    return;
                }
                for piece in result.hash_passed {
                    self.on_piece_verified(piece).await;
                }
                for piece in result.hash_failed {
                    warn!(piece, "piece failed after hash layer received");
                    post_alert(
                        &self.alert_tx,
                        &self.alert_mask,
                        AlertKind::HashFailed {
                            info_hash: self.info_hash,
                            piece,
                            contributors: Vec::new(),
                        },
                    );
                }
            }
            Err(e) => {
                warn!(peer = %peer_addr, "invalid hashes: {e}");
            }
        }
    }

    /// Handle an incoming hash request from a peer (serve or reject).
    #[allow(
        clippy::unused_async,
        reason = "handler method called from async select! loop"
    )]
    pub(crate) async fn handle_incoming_hash_request(
        &self,
        peer_addr: SocketAddr,
        request: irontide_core::HashRequest,
    ) {
        let Some(peer) = self.peers.get(&peer_addr) else {
            return;
        };

        match serve_hashes(
            self.meta_v2.as_ref(),
            self.version,
            self.lengths.as_ref(),
            &request,
        ) {
            Some(hashes) => {
                let _ = peer
                    .cmd_tx
                    .try_send(PeerCommand::SendHashes { request, hashes });
            }
            None => {
                let _ = peer.cmd_tx.try_send(PeerCommand::SendHashReject(request));
            }
        }
    }

    // ── Smart banning helpers (M25) ────────────────────────────────────

    /// Enter parole mode for a failed piece: save the original contributors
    /// and mark the piece for single-peer re-download.
    fn enter_parole(&mut self, index: u32, contributors: Vec<std::net::IpAddr>) {
        let use_parole = crate::timed_lock::TimedGuard::new(
            self.ban_manager.read(),
            &self.lock_timing,
            "ban_manager",
        )
        .use_parole();
        if !use_parole || contributors.is_empty() {
            // Parole disabled or no contributors to blame — strike everyone
            let mut mgr = self.ban_manager.write();
            for &ip in &contributors {
                if mgr.record_strike(ip) {
                    info!(%ip, "peer banned (no parole, hash failure threshold)");
                }
            }
            return;
        }

        info!(
            index,
            contributors = contributors.len(),
            "entering parole mode"
        );
        self.parole_pieces.insert(
            index,
            crate::ban::ParoleState {
                original_contributors: contributors.into_iter().collect(),
                parole_peer: None,
            },
        );
    }

    /// Parole piece verified successfully — the original contributors sent bad data.
    async fn apply_parole_success(&mut self, index: u32, parole: crate::ban::ParoleState) {
        info!(index, "parole success — striking original contributors");
        let mut banned_ips = Vec::new();
        {
            let mut mgr = self.ban_manager.write();
            for ip in &parole.original_contributors {
                if mgr.record_strike(*ip) {
                    info!(%ip, "peer banned (parole confirmed bad data)");
                    banned_ips.push(*ip);
                }
            }
        }
        // Disconnect and fire alerts for newly banned peers
        for ip in banned_ips {
            self.disconnect_banned_ip(ip).await;
            post_alert(
                &self.alert_tx,
                &self.alert_mask,
                AlertKind::PeerBanned {
                    info_hash: self.info_hash,
                    addr: std::net::SocketAddr::new(ip, 0),
                },
            );
        }
    }

    /// Parole piece failed again — the parole peer itself sent bad data.
    fn apply_parole_failure(&mut self, index: u32, parole: &crate::ban::ParoleState) {
        if let Some(parole_ip) = parole.parole_peer {
            info!(index, %parole_ip, "parole failure — striking parole peer");
            let mut mgr = self.ban_manager.write();
            if mgr.record_strike(parole_ip) {
                info!(%parole_ip, "parole peer banned");
            }
        }
        // Don't re-enter parole for the same piece — ambiguous situation
    }
}

#[cfg(test)]
mod m257g_estimator_tests {
    //! M257g: the steal-threshold estimator feed. Pre-fix, a piece completing
    //! under the M187 `DirectDispatch` path discarded its download duration, so
    //! `completed_piece_times` stayed empty, `peer_avg_time` was pinned at the
    //! 30 s empty-list fallback, and the endgame steal threshold (30 s x 3 = 90 s)
    //! never fired in time — the contended bimodal collapse. These tests pin the
    //! fix: `DirectDispatch` completions feed the estimator, the legacy
    //! `inflight_started` path still works, and the fed estimator yields a steal
    //! threshold below the stall. Evidence:
    //! docs/investigations/2026-06-14-m257g-contended-collapse-evidence.md
    use super::*;
    use irontide_storage::Bitfield;
    use std::net::{IpAddr, Ipv4Addr};

    fn addr(port: u16) -> SocketAddr {
        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port)
    }

    fn all_wanted(n: u32) -> Bitfield {
        let mut bf = Bitfield::new(n);
        for i in 0..n {
            bf.set(i);
        }
        bf
    }

    /// Production mirror of the `peer_avg_time` computation at
    /// `torrent_peers.rs:825-829` / `:2998-3003`: empty -> `None` (caller applies
    /// the 30 s fallback), else the arithmetic mean.
    fn mean_completion(times: &VecDeque<Duration>) -> Option<Duration> {
        if times.is_empty() {
            return None;
        }
        let sum: Duration = times.iter().sum();
        Some(sum / u32::try_from(times.len()).expect("test cap << u32::MAX"))
    }

    #[test]
    fn m257g_direct_dispatch_completion_feeds_estimator() {
        // The bug: a DirectDispatch (PieceTracker) reservation completing must
        // feed completed_piece_times. Pre-fix it called mark_piece_hash_ok and
        // dropped the duration -> estimator empty forever -> 90 s steal threshold.
        let mut tracker = PieceTracker::new(8, &Bitfield::new(8), &all_wanted(8));
        tracker.record_reservation(3, addr(1000));
        assert_eq!(tracker.inflight_count(), 1);

        let mut started: Vec<Option<Instant>> = vec![None; 8];
        let mut times: VecDeque<Duration> = VecDeque::new();

        let recorded =
            record_piece_completion_time(Some(&mut tracker), &mut started, &mut times, 3);

        assert!(
            recorded.is_some(),
            "DirectDispatch completion must record a duration"
        );
        assert_eq!(times.len(), 1, "estimator must learn the completion time");
        assert_eq!(
            tracker.inflight_count(),
            0,
            "completion must clear the reservation (as mark_piece_hash_ok did)"
        );
    }

    #[test]
    fn m257g_legacy_inflight_started_path_still_feeds() {
        // No DirectDispatch tracker -> fall back to the pre-M187 inflight_started
        // array. This path was the ONLY feed pre-fix; it must keep working.
        let mut started: Vec<Option<Instant>> = vec![None; 8];
        started[2] = Some(Instant::now());
        let mut times: VecDeque<Duration> = VecDeque::new();

        let recorded = record_piece_completion_time(None, &mut started, &mut times, 2);

        assert!(recorded.is_some());
        assert_eq!(times.len(), 1);
        assert!(started[2].is_none(), "inflight_started slot must be taken");
    }

    #[test]
    fn m257g_untracked_piece_records_nothing() {
        // A piece tracked by neither path (e.g. seed-mode, never reserved) must
        // not fabricate a completion time.
        let mut tracker = PieceTracker::new(8, &Bitfield::new(8), &all_wanted(8));
        let mut started: Vec<Option<Instant>> = vec![None; 8];
        let mut times: VecDeque<Duration> = VecDeque::new();

        let recorded =
            record_piece_completion_time(Some(&mut tracker), &mut started, &mut times, 5);

        assert!(recorded.is_none());
        assert!(times.is_empty());
    }

    #[test]
    fn m257g_fed_estimator_drops_endgame_steal_threshold_below_stall() {
        // The outcome the fix buys: with the estimator fed from real (sub-second)
        // completions, the endgame steal threshold (avg x 3) drops below a stuck
        // piece's age, so try_steal rescues it. With the empty-list 30 s fallback
        // the threshold is 90 s and the piece crawls — the collapse.
        let mut tracker = PieceTracker::new(8, &Bitfield::new(8), &all_wanted(8));
        let mut started: Vec<Option<Instant>> = vec![None; 8];
        let mut times: VecDeque<Duration> = VecDeque::new();

        for p in 0..5u32 {
            tracker.record_reservation(p, addr(1000 + u16::try_from(p).expect("small")));
            record_piece_completion_time(Some(&mut tracker), &mut started, &mut times, p);
        }
        assert_eq!(times.len(), 5);

        let avg = mean_completion(&times).expect("estimator populated");
        let endgame_threshold = avg.mul_f64(3.0); // M149 endgame multiplier
        let stuck_age = Duration::from_secs(1); // representative collapse endgame age

        assert!(
            endgame_threshold < stuck_age,
            "fed estimator must yield a threshold ({endgame_threshold:?}) below the \
             stuck-piece age ({stuck_age:?}) so try_steal fires"
        );

        // Counterfactual sanity: the empty-list 30 s fallback IS the collapse.
        let fallback_threshold = Duration::from_secs(30).mul_f64(3.0); // 90 s
        assert!(
            fallback_threshold > stuck_age,
            "the 30 s fallback (90 s threshold) would not rescue the stuck piece"
        );
    }
}