1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
#![allow(
clippy::cast_possible_truncation,
reason = "M175: hash verification — piece arithmetic bounded by num_pieces (u32 by construction in Lengths::new)"
)]
//! `TorrentActor` hash verification, smart banning, parole, and BEP 52 hash coordination.
use std::net::SocketAddr;
use tracing::{debug, error, info, warn};
use crate::alert::{AlertKind, post_alert};
use crate::disk::DiskJobFlags;
use crate::piece_reservation::PieceState;
use crate::torrent::{HashResult, TorrentActor, now_unix, serve_hashes};
use crate::types::{PeerCommand, TorrentState};
use irontide_core::DEFAULT_CHUNK_SIZE;
impl TorrentActor {
/// M92: Process a single block completion — extracted from `handle_chunk_written()`.
/// Called once per block in a batch. Returns `true` if this block completed a piece
/// and piece verification was triggered.
///
/// IMPORTANT: This is a line-for-line extraction of `handle_chunk_written()`.
/// Every edge case (duplicate detection, end-game cancels, v2/hybrid verification,
/// predictive Have, smart banning) is preserved identically.
pub(crate) async fn process_block_completion(
&mut self,
peer_addr: SocketAddr,
index: u32,
begin: u32,
length: u32,
) -> bool {
// Skip duplicate blocks — in end-game mode or after timeout re-requests,
// the same block may arrive from multiple peers. Writing it to the store
// buffer would overwrite valid data that's pending verification.
if let Some(ref ct) = self.chunk_tracker
&& ct.has_chunk(index, begin)
{
self.total_download += u64::from(length) + 13;
self.counters
.inc(crate::stats::PROTO_WASTE_BYTES, i64::from(length) + 13);
// Remove from pending_requests to free pipeline slots. Without this,
// the peer accumulates phantom entries from already-verified pieces
// and eventually has zero available pipeline slots — permanent stall.
if let Some(peer) = self.peers.get_mut(&peer_addr) {
peer.pending_requests.remove(index, begin);
}
// Remove from end-game tracker so pick_block won't return this
// block again. The normal path calls block_received which does
// this, but we skip that path for duplicates.
if self.end_game.is_active() {
self.end_game.block_received(index, begin, peer_addr);
}
// Peer task already returned its permit on direct write
return false;
}
// NOTE: No disk write here — the peer task has already written to disk.
self.downloaded += u64::from(length);
self.total_download += u64::from(length) + 13; // payload + message header
self.last_download = now_unix();
self.need_save_resume = true;
// M93: Track piece ownership (actor learns about peer's CAS reservation via chunk arrival)
if let Some(slab_idx) = self.peer_slab.slot_of(&peer_addr)
&& self.piece_owner.get(index as usize) == Some(&None)
{
self.piece_owner[index as usize] = Some(slab_idx);
// M103: Add to steal queue if piece has unrequested blocks
if let (Some(sc), Some(bm)) = (&self.steal_candidates, &self.block_maps)
&& let Some(lengths) = &self.lengths
{
let total_blocks = lengths.chunks_in_piece(index);
if bm.next_unrequested(index, total_blocks).is_some() {
sc.push(index);
}
}
}
// Smart banning: track which peers contribute to each piece
self.piece_contributors
.entry(index)
.or_default()
.insert(peer_addr.ip());
let now = std::time::Instant::now();
if let Some(peer) = self.peers.get_mut(&peer_addr) {
peer.pending_requests.remove(index, begin);
peer.download_bytes_window += u64::from(length);
peer.download_bytes_total += u64::from(length);
// M106: Capture RTT from block receipt for scoring
if let Some(rtt) = peer.pipeline.block_received(index, begin, length, now) {
let rtt_secs = rtt.as_secs_f64();
peer.avg_rtt = Some(crate::peer_state::ewma_update(
peer.avg_rtt.unwrap_or(rtt_secs),
rtt_secs,
crate::peer_state::RTT_EWMA_ALPHA,
));
}
peer.blocks_completed = peer.blocks_completed.saturating_add(1);
peer.last_data_received = Some(now);
// Clear snub if snubbed
if peer.snubbed {
peer.snubbed = false;
}
}
// M137: Backoff is now automatically reset by mark_live() in PeerStates.
// End-game: cancel this block on all other peers. The 200ms end-game
// refill tick will re-stock freed peers — no reactive cascade needed.
if self.end_game.is_active() {
let cancels = self.end_game.block_received(index, begin, peer_addr);
for (cancel_addr, ci, cb, cl) in cancels {
if let Some(cancel_peer) = self.peers.get_mut(&cancel_addr) {
let _ = cancel_peer.cmd_tx.try_send(PeerCommand::Cancel {
index: ci,
begin: cb,
length: cl,
});
cancel_peer.pending_requests.remove(ci, cb);
}
}
}
// Track chunk completion
let piece_complete = if let Some(ref mut ct) = self.chunk_tracker {
ct.chunk_received(index, begin)
} else {
false
};
if piece_complete && !self.pending_verify.contains(&index) {
// M44/M118: Predictive piece announce — broadcast Have before verification
if self.config.predictive_piece_announce_ms > 0
&& !self.predictive_have_sent.contains(&index)
{
self.predictive_have_sent.insert(index);
let _ = self.have_broadcast_tx.send(index);
}
// M100: Flush deferred writes before verification — ensures all
// blocks are on disk so read_piece() sees complete data.
if let Some(ref disk) = self.disk {
disk.flush_piece_writes(index).await;
}
match self.version {
irontide_core::TorrentVersion::V1Only => {
// Async: fire-and-forget, result via verify_result_rx
if let Some(ref disk) = self.disk
&& let Some(expected) = self
.meta
.as_ref()
.and_then(|m| m.info.piece_hash(index as usize))
{
self.pending_verify.insert(index);
let generation = self
.piece_generations
.get(index as usize)
.copied()
.unwrap_or(0);
disk.enqueue_verify(index, expected, generation, &self.verify_result_tx);
}
}
irontide_core::TorrentVersion::V2Only => {
// Blocking: needs mutable hash_picker for Merkle tree
self.verify_and_mark_piece_v2(index).await;
}
irontide_core::TorrentVersion::Hybrid => {
// Blocking: needs both v1+v2 decision matrix
self.verify_and_mark_piece_hybrid(index).await;
}
}
}
// Peer task already returned its permit on direct write.
// End-game dispatch still happens here.
if self.end_game.is_active() {
self.request_end_game_block(peer_addr).await;
}
piece_complete
}
pub(crate) async fn verify_and_mark_piece(&mut self, index: u32) {
match self.version {
irontide_core::TorrentVersion::V1Only => {
self.verify_and_mark_piece_v1(index).await;
}
irontide_core::TorrentVersion::V2Only => {
self.verify_and_mark_piece_v2(index).await;
}
irontide_core::TorrentVersion::Hybrid => {
self.verify_and_mark_piece_hybrid(index).await;
}
}
}
/// SHA-1 piece verification (v1 torrents).
async fn verify_and_mark_piece_v1(&mut self, index: u32) {
let expected_hash = self
.meta
.as_ref()
.and_then(|m| m.info.piece_hash(index as usize));
let verified = if let (Some(disk), Some(expected)) = (&self.disk, expected_hash) {
disk.verify_piece(index, expected, DiskJobFlags::empty())
.await
.unwrap_or(false)
} else {
false
};
if verified {
self.on_piece_verified(index).await;
} else {
self.on_piece_hash_failed(index).await;
}
}
/// SHA-256 per-block Merkle verification (v2 torrents, BEP 52).
pub(crate) async fn verify_and_mark_piece_v2(&mut self, index: u32) {
let result = self.run_v2_block_verification(index).await;
match result {
HashResult::Passed => self.on_piece_verified(index).await,
HashResult::Failed => self.on_piece_hash_failed(index).await,
HashResult::NotApplicable => {
// Blocks stored, will resolve when piece-layer hashes arrive
}
}
}
/// Run SHA-256 per-block Merkle verification and return a `HashResult`
/// without triggering side effects (no `on_piece_verified`/`on_piece_hash_failed`).
///
/// Extracted from `verify_and_mark_piece_v2` so it can be reused in hybrid
/// dual-verification without double-firing callbacks.
pub(crate) async fn run_v2_block_verification(&mut self, index: u32) -> HashResult {
let disk = match &self.disk {
Some(d) => d.clone(),
None => return HashResult::NotApplicable,
};
let lengths = match &self.lengths {
Some(l) => l.clone(),
None => return HashResult::NotApplicable,
};
// Flush write buffer before reading back for hashing
if let Err(e) = disk.flush_piece(index).await {
warn!(index, "failed to flush piece for v2 verification: {e}");
return HashResult::NotApplicable;
}
// Compute SHA-256 of each 16 KiB block and feed to Merkle tree
let num_chunks = lengths.chunks_in_piece(index);
let blocks_per_piece = (lengths.piece_length() as u32) / DEFAULT_CHUNK_SIZE;
let mut all_ok = true;
for chunk_idx in 0..num_chunks {
let Some((begin, length)) = lengths.chunk_info(index, chunk_idx) else {
continue;
};
let block_hash = match disk
.hash_block(index, begin, length, DiskJobFlags::empty())
.await
{
Ok(h) => h,
Err(e) => {
warn!(index, chunk_idx, "failed to hash block: {e}");
all_ok = false;
break;
}
};
if let Some(ref mut picker) = self.hash_picker {
// Gap 6: single-file assumption — multi-file mapping deferred to M35
let file_index = 0;
let global_block = index * blocks_per_piece + chunk_idx;
// Gap 10: 3-arg set_block_hash (offset removed)
match picker.set_block_hash(file_index, global_block, block_hash) {
irontide_core::SetBlockResult::Ok => {
if let Some(ref mut ct) = self.chunk_tracker {
ct.mark_block_verified(index, chunk_idx);
}
}
irontide_core::SetBlockResult::Unknown => {
// Piece-layer hash not yet available — stored for deferred verification
debug!(
index,
chunk_idx, "block hash stored, awaiting piece-layer hashes"
);
}
irontide_core::SetBlockResult::HashFailed => {
warn!(index, chunk_idx, "block hash failed Merkle verification");
return HashResult::Failed;
}
}
}
}
if all_ok
&& self
.chunk_tracker
.as_ref()
.is_some_and(|ct| ct.all_blocks_verified(index))
{
HashResult::Passed
} else {
// Either a disk error (all_ok = false) or blocks stored awaiting piece-layer hashes
HashResult::NotApplicable
}
}
/// Dual SHA-1 + SHA-256 verification for hybrid torrents.
///
/// Runs both v1 (whole-piece SHA-1) and v2 (per-block SHA-256 Merkle)
/// verification. Decision matrix:
/// - Both Passed → piece verified
/// - Both Failed → piece hash failed (normal re-request / parole path)
/// - One Passed + one Failed → inconsistent hashes (fatal, pauses torrent)
/// - Any `NotApplicable` → deferred (v2 blocks stored, will resolve later)
pub(crate) async fn verify_and_mark_piece_hybrid(&mut self, index: u32) {
// ── v1 verification (SHA-1 whole-piece) ──
let v1_result = {
let expected_hash = self
.meta
.as_ref()
.and_then(|m| m.info.piece_hash(index as usize));
if let (Some(disk), Some(expected)) = (&self.disk, expected_hash) {
match disk
.verify_piece(index, expected, DiskJobFlags::empty())
.await
{
Ok(true) => HashResult::Passed,
Ok(false) => HashResult::Failed,
Err(_) => HashResult::NotApplicable,
}
} else {
HashResult::NotApplicable
}
};
// ── v2 verification (SHA-256 per-block Merkle) ──
let v2_result = self.run_v2_block_verification(index).await;
// ── Decision matrix ──
match (v1_result, v2_result) {
// Both agree: piece is good
(HashResult::Passed, HashResult::Passed) => {
self.on_piece_verified(index).await;
}
// Both agree piece is bad, or v1 failed with v2 deferred — fail immediately
// (no point waiting for v2 when v1 already rejected).
(HashResult::Failed, HashResult::Failed | HashResult::NotApplicable) => {
self.on_piece_hash_failed(index).await;
}
// One passes, one fails: the .torrent metadata is inconsistent
(HashResult::Passed, HashResult::Failed) | (HashResult::Failed, HashResult::Passed) => {
self.on_inconsistent_hashes(index).await;
}
// v2 deferred (awaiting piece-layer hashes) but v1 passed: defer the whole thing.
// When piece-layer hashes arrive, handle_hashes_received will re-verify.
(HashResult::Passed, HashResult::NotApplicable) => {
debug!(
index,
"hybrid: v1 passed, v2 deferred — waiting for piece-layer hashes"
);
}
// v1 not applicable (missing meta/disk): defer
(HashResult::NotApplicable, _) => {
debug!(index, "hybrid: v1 not applicable — deferring");
}
}
}
/// Common success path after a piece passes verification (v1 SHA-1 or v2 Merkle).
/// Handle a result from the `HashPool` (M96).
pub(crate) async fn handle_hash_result(&mut self, result: crate::hash_pool::HashResult) {
self.pending_verify.remove(&result.piece);
// Staleness check
let current_gen = self
.piece_generations
.get(result.piece as usize)
.copied()
.unwrap_or(0);
if result.generation != current_gen {
tracing::debug!(
piece = result.piece,
result_gen = result.generation,
current_gen,
"discarding stale hash result"
);
return;
}
// Guard: ignore results for already-verified pieces
let dominated = self
.chunk_tracker
.as_ref()
.is_some_and(|ct| ct.bitfield().get(result.piece));
if dominated {
return;
}
if result.passed {
self.on_piece_verified(result.piece).await;
} else {
self.on_piece_hash_failed(result.piece).await;
}
}
pub(crate) async fn on_piece_verified(&mut self, index: u32) {
if let Some(ref mut ct) = self.chunk_tracker {
ct.mark_verified(index);
}
self.piece_contributors.remove(&index);
// Remove stale end-game blocks for this piece. Without this,
// pick_block() returns these blocks, peers request them, has_chunk
// rejects them, and register_request accumulates until every peer
// is registered for every stale block — permanent stall.
if self.end_game.is_active() {
self.end_game.remove_piece(index);
}
// M93: Mark piece complete atomically
if let Some(ref states) = self.atomic_states {
states.mark_complete(index);
}
// M103: Clean up block stealing state
if let Some(ref sc) = self.steal_candidates {
sc.remove(index);
}
if let (Some(bm), Some(lengths)) = (&self.block_maps, &self.lengths) {
bm.clear(index, lengths.chunks_in_piece(index));
}
self.piece_owner[index as usize] = None;
// M187: Update direct-acquire tracker.
if let Some(ref mut pt) = self.piece_tracker {
pt.mark_piece_hash_ok(index);
}
// M149: Record piece completion time for steal threshold calculation
if let Some(started) = self
.inflight_started
.get_mut(index as usize)
.and_then(std::option::Option::take)
{
let elapsed = started.elapsed();
self.completed_piece_times.push_back(elapsed);
if self.completed_piece_times.len() > 100 {
self.completed_piece_times.pop_front();
}
}
if let Some(ref notify) = self.reservation_notify {
notify.notify_one();
self.counters
.inc(crate::stats::DISPATCH_NOTIFY_WAKEUP_TOTAL, 1);
}
info!(index, "piece verified");
post_alert(
&self.alert_tx,
&self.alert_mask,
AlertKind::PieceFinished {
info_hash: self.info_hash,
piece: index,
},
);
// Notify FileStream consumers of piece completion
let _ = self.piece_ready_tx.send(index);
if let Some(ref ct) = self.chunk_tracker {
let _ = self.have_watch_tx.send(ct.bitfield().clone());
}
// Check if the completed piece finishes any file (FileCompleted alert)
self.check_file_completion(index);
// Handle parole success: the parole peer delivered a good piece,
// so the original contributors are the likely offenders.
if let Some(parole) = self.parole_pieces.remove(&index) {
self.apply_parole_success(index, parole).await;
}
// M118: Broadcast Have to all peers via broadcast channel (skip in super-seed mode).
// Per-peer filtering (skip if peer already has piece) is done receiver-side
// in PeerConnection via should_transmit_have().
if self.super_seed.is_none() {
let already_announced = self.predictive_have_sent.remove(&index);
if !already_announced {
let _ = self.have_broadcast_tx.send(index);
}
}
// M44: suggest newly-verified piece to peers that don't have it
if self.config.suggest_mode {
let max_suggest = self.config.max_suggest_pieces;
let peer_addrs: Vec<SocketAddr> = self.peers.keys().copied().collect();
for peer_addr in peer_addrs {
let already = self.suggested_to_peers.entry(peer_addr).or_default();
if already.len() >= max_suggest {
continue;
}
let should_suggest = self
.peers
.get(&peer_addr)
.is_some_and(|p| !p.bitfield.get(index));
if should_suggest
&& !already.contains(&index)
&& let Some(peer) = self.peers.get(&peer_addr)
{
let _ = peer.cmd_tx.try_send(PeerCommand::SuggestPiece(index));
already.insert(index);
}
}
}
// Share mode LRU: track piece, evict oldest if over capacity.
// In share mode, we never "finish" — we keep cycling pieces.
if self.share_max_pieces > 0 {
self.share_lru.push_back(index);
while self.share_lru.len() > self.share_max_pieces {
if let Some(evicted) = self.share_lru.pop_front() {
if let Some(ref mut ct) = self.chunk_tracker {
ct.clear_piece(evicted);
}
// Re-add to wanted so it can be re-downloaded later
if evicted < self.wanted_pieces.len() {
self.wanted_pieces.set(evicted);
}
debug!(evicted, "share mode: evicted piece from LRU");
}
}
}
// Check if download is complete (skip in share mode — never finishes)
if self.share_max_pieces == 0
&& let Some(ref ct) = self.chunk_tracker
&& ct.bitfield().count_ones() == self.num_pieces
{
info!("download complete, transitioning to seeding");
post_alert(
&self.alert_tx,
&self.alert_mask,
AlertKind::TorrentFinished {
info_hash: self.info_hash,
},
);
self.end_game.deactivate();
self.transition_state(TorrentState::Seeding);
self.choker.set_seed_mode(true);
// BEP 21: broadcast upload-only status
if self.config.upload_only_announce {
let hs = irontide_wire::ExtHandshake::new_upload_only();
for peer in self.peers.values() {
let _ = peer
.cmd_tx
.try_send(PeerCommand::SendExtHandshake(hs.clone()));
}
}
// Announce completion to trackers
let result = self
.tracker_manager
.announce_completed(self.uploaded, self.downloaded)
.await;
self.fire_tracker_alerts(&result.outcomes);
}
}
/// Common failure path after a piece fails hash verification.
#[allow(
clippy::unused_async,
reason = "handler method called from async select! loop"
)]
pub(crate) async fn on_piece_hash_failed(&mut self, index: u32) {
let contributors: Vec<std::net::IpAddr> = self
.piece_contributors
.remove(&index)
.unwrap_or_default()
.into_iter()
.collect();
warn!(
index,
contributors = contributors.len(),
"piece hash verification failed"
);
let failed_piece_size = self
.lengths
.as_ref()
.map_or(0, |l| u64::from(l.piece_size(index)));
self.total_failed_bytes += failed_piece_size;
self.counters.inc(crate::stats::PROTO_HASHFAILS, 1);
self.counters.inc(
crate::stats::PROTO_WASTE_BYTES,
failed_piece_size.cast_signed(),
);
self.predictive_have_sent.remove(&index);
// Check if this is a parole failure
if let Some(parole) = self.parole_pieces.remove(&index) {
self.apply_parole_failure(index, &parole);
} else {
// First failure: enter parole if enabled
self.enter_parole(index, contributors.clone());
}
post_alert(
&self.alert_tx,
&self.alert_mask,
AlertKind::HashFailed {
info_hash: self.info_hash,
piece: index,
contributors,
},
);
if let Some(ref mut ct) = self.chunk_tracker {
ct.mark_failed(index);
}
// M96: Increment generation to invalidate any in-flight hash for this piece
if let Some(g) = self.piece_generations.get_mut(index as usize) {
*g += 1;
}
// M93: Release piece back to Available
if let Some(ref states) = self.atomic_states {
states.release(index);
}
// M103: Clean up block stealing state on hash failure
if let Some(ref sc) = self.steal_candidates {
sc.remove(index);
}
if let (Some(bm), Some(lengths)) = (&self.block_maps, &self.lengths) {
bm.clear(index, lengths.chunks_in_piece(index));
}
self.piece_owner[index as usize] = None;
// M187: Return piece to direct-acquire queue.
if let Some(ref mut pt) = self.piece_tracker {
pt.mark_piece_hash_failed(index);
}
// M149: Clear inflight tracking without recording (hash failed)
if let Some(slot) = self.inflight_started.get_mut(index as usize) {
*slot = None;
}
if let Some(ref notify) = self.reservation_notify {
notify.notify_one();
self.counters
.inc(crate::stats::DISPATCH_NOTIFY_WAKEUP_TOTAL, 1);
}
// Hash failure in end-game: deactivate and resume normal mode
if self.end_game.is_active() {
self.end_game.deactivate();
info!(index, "end-game deactivated due to hash failure");
// M93: Transition Endgame pieces back
if let Some(ref atomic_states) = self.atomic_states {
for piece in 0..self.num_pieces {
if atomic_states.get(piece) == PieceState::Endgame {
if self.piece_owner[piece as usize].is_some() {
atomic_states.force_reserved(piece);
} else {
atomic_states.release(piece);
}
}
}
}
if let Some(ref notify) = self.reservation_notify {
notify.notify_waiters();
self.counters
.inc(crate::stats::DISPATCH_NOTIFY_WAKEUP_TOTAL, 1);
}
}
}
/// Handle v1/v2 hash inconsistency — the torrent data itself is corrupt.
///
/// Matching libtorrent: destroy hash picker, pause the torrent, post error alert.
#[allow(
clippy::unused_async,
reason = "handler method called from async select! loop"
)]
async fn on_inconsistent_hashes(&mut self, piece: u32) {
let info_hash = self.info_hash;
error!(
piece,
info_hash = %info_hash,
"v1 and v2 hashes are inconsistent — torrent data is corrupt"
);
// Destroy hash picker (Merkle tree state is untrustworthy)
self.hash_picker = None;
// Post specific inconsistency alert
post_alert(
&self.alert_tx,
&self.alert_mask,
AlertKind::InconsistentHashes { info_hash, piece },
);
// Post generic error alert for broader consumers
post_alert(
&self.alert_tx,
&self.alert_mask,
AlertKind::TorrentError {
info_hash,
message: format!("v1 and v2 hashes do not describe the same data (piece {piece})"),
},
);
// Pause the torrent — transition state to Paused
self.transition_state(TorrentState::Paused);
}
// ── BEP 52 hash event handlers (M34) ────────────────────────────────
/// Process received piece-layer or block-layer hashes from a peer.
pub(crate) async fn handle_hashes_received(
&mut self,
peer_addr: SocketAddr,
request: irontide_core::HashRequest,
hashes: Vec<irontide_core::Id32>,
) {
let Some(ref mut picker) = self.hash_picker else {
debug!(peer = %peer_addr, "received hashes but no hash picker (v1 torrent)");
return;
};
match picker.add_hashes(&request, &hashes) {
Ok(result) => {
if !result.valid {
warn!(peer = %peer_addr, "received hashes failed Merkle proof validation");
return;
}
for piece in result.hash_passed {
self.on_piece_verified(piece).await;
}
for piece in result.hash_failed {
warn!(piece, "piece failed after hash layer received");
post_alert(
&self.alert_tx,
&self.alert_mask,
AlertKind::HashFailed {
info_hash: self.info_hash,
piece,
contributors: Vec::new(),
},
);
}
}
Err(e) => {
warn!(peer = %peer_addr, "invalid hashes: {e}");
}
}
}
/// Handle an incoming hash request from a peer (serve or reject).
#[allow(
clippy::unused_async,
reason = "handler method called from async select! loop"
)]
pub(crate) async fn handle_incoming_hash_request(
&self,
peer_addr: SocketAddr,
request: irontide_core::HashRequest,
) {
let Some(peer) = self.peers.get(&peer_addr) else {
return;
};
match serve_hashes(
self.meta_v2.as_ref(),
self.version,
self.lengths.as_ref(),
&request,
) {
Some(hashes) => {
let _ = peer
.cmd_tx
.try_send(PeerCommand::SendHashes { request, hashes });
}
None => {
let _ = peer.cmd_tx.try_send(PeerCommand::SendHashReject(request));
}
}
}
// ── Smart banning helpers (M25) ────────────────────────────────────
/// Enter parole mode for a failed piece: save the original contributors
/// and mark the piece for single-peer re-download.
fn enter_parole(&mut self, index: u32, contributors: Vec<std::net::IpAddr>) {
let use_parole = crate::timed_lock::TimedGuard::new(
self.ban_manager.read(),
&self.lock_timing,
"ban_manager",
)
.use_parole();
if !use_parole || contributors.is_empty() {
// Parole disabled or no contributors to blame — strike everyone
let mut mgr = self.ban_manager.write();
for &ip in &contributors {
if mgr.record_strike(ip) {
info!(%ip, "peer banned (no parole, hash failure threshold)");
}
}
return;
}
info!(
index,
contributors = contributors.len(),
"entering parole mode"
);
self.parole_pieces.insert(
index,
crate::ban::ParoleState {
original_contributors: contributors.into_iter().collect(),
parole_peer: None,
},
);
}
/// Parole piece verified successfully — the original contributors sent bad data.
async fn apply_parole_success(&mut self, index: u32, parole: crate::ban::ParoleState) {
info!(index, "parole success — striking original contributors");
let mut banned_ips = Vec::new();
{
let mut mgr = self.ban_manager.write();
for ip in &parole.original_contributors {
if mgr.record_strike(*ip) {
info!(%ip, "peer banned (parole confirmed bad data)");
banned_ips.push(*ip);
}
}
}
// Disconnect and fire alerts for newly banned peers
for ip in banned_ips {
self.disconnect_banned_ip(ip).await;
post_alert(
&self.alert_tx,
&self.alert_mask,
AlertKind::PeerBanned {
info_hash: self.info_hash,
addr: std::net::SocketAddr::new(ip, 0),
},
);
}
}
/// Parole piece failed again — the parole peer itself sent bad data.
fn apply_parole_failure(&mut self, index: u32, parole: &crate::ban::ParoleState) {
if let Some(parole_ip) = parole.parole_peer {
info!(index, %parole_ip, "parole failure — striking parole peer");
let mut mgr = self.ban_manager.write();
if mgr.record_strike(parole_ip) {
info!(%parole_ip, "parole peer banned");
}
}
// Don't re-enter parole for the same piece — ambiguous situation
}
}