irontide-session 1.0.1

BitTorrent session management: peers, torrents, and piece selection
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
#![allow(
    clippy::cast_possible_truncation,
    clippy::cast_possible_wrap,
    clippy::cast_sign_loss,
    reason = "M175: BEP 9 metadata resolver — piece counts bounded by metadata size"
)]

//! M147: Session-level metadata resolution for magnet links.
//!
//! Resolves torrent metadata from a magnet link's `info_hash` BEFORE creating
//! a `TorrentActor`. Matches rqbit's architecture where metadata resolution
//! happens in `session.rs:resolve_magnet()` using `FuturesUnordered` +
//! `Semaphore(128)` before creating `TorrentStateLive`.
//!
//! The resolver connects to peers discovered via DHT/trackers, performs the
//! BT handshake + BEP 10 extension handshake + BEP 9 `ut_metadata` exchange,
//! then returns the assembled metadata along with successfully-connected
//! peer addresses for pre-seeding the `TorrentActor`.

use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use bytes::Bytes;
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use parking_lot::Mutex;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{Semaphore, mpsc};
use tracing::{debug, trace};

use irontide_core::{Id20, TorrentMetaV1, torrent_from_bytes};
use irontide_wire::{ExtHandshake, Handshake, Message, MetadataMessage, MetadataMessageType};

use crate::metadata::MetadataDownloader;
use crate::transport::NetworkFactory;

/// Maximum number of concurrent peer connections during metadata resolution.
/// Matches rqbit's Semaphore(128) in `read_metainfo_from_peer_receiver()`.
pub(crate) const DEFAULT_MAX_CONCURRENT: usize = 128;

/// Overall timeout for the entire metadata resolution process.
const RESOLVE_TIMEOUT: Duration = Duration::from_mins(1);

/// Per-peer connect + handshake + metadata exchange timeout.
const PER_PEER_TIMEOUT: Duration = Duration::from_secs(15);

/// Maximum BT wire message size we accept during metadata resolution (256 KiB).
/// Metadata pieces are 16 KiB each; extension handshakes are small. 256 KiB
/// provides ample headroom without exposing us to memory exhaustion.
const MAX_MESSAGE_SIZE: usize = 256 * 1024;

/// BT handshake size: 1 (pstrlen) + 19 (pstr) + 8 (reserved) + 20 (`info_hash`) + 20 (`peer_id`).
const HANDSHAKE_SIZE: usize = 68;

/// Result of a successful per-peer metadata exchange.
struct PeerResult {
    /// Whether the shared downloader completed after this peer's contribution.
    complete: bool,
}

/// Resolve torrent metadata from peers discovered via DHT/trackers.
///
/// Connects to peers concurrently, performs BT + extension handshakes,
/// and requests metadata pieces via BEP 9 (`ut_metadata`). Returns the
/// parsed `TorrentMetaV1` and the list of peer addresses that were
/// successfully connected (for pre-seeding the `TorrentActor`).
///
/// # Errors
///
/// Returns an error if metadata cannot be resolved within the timeout
/// or if all peer connections fail.
pub(crate) async fn resolve_metadata(
    info_hash: Id20,
    peer_id: Id20,
    mut peer_rx: mpsc::UnboundedReceiver<Vec<SocketAddr>>,
    factory: Arc<NetworkFactory>,
    connect_timeout: Duration,
    max_concurrent: usize,
) -> crate::Result<(TorrentMetaV1, Vec<SocketAddr>)> {
    let semaphore = Arc::new(Semaphore::new(max_concurrent));
    let downloader = Arc::new(Mutex::new(MetadataDownloader::new(info_hash)));
    let connected_peers = Arc::new(Mutex::new(Vec::<SocketAddr>::new()));
    let mut futures = FuturesUnordered::new();

    // Track peers we've already tried to avoid duplicate connections.
    let mut seen = std::collections::HashSet::<SocketAddr>::new();

    let result = tokio::time::timeout(RESOLVE_TIMEOUT, async {
        loop {
            tokio::select! {
                // Receive new peer batches from DHT/trackers.
                Some(batch) = peer_rx.recv() => {
                    for addr in batch {
                        if !seen.insert(addr) {
                            continue;
                        }
                        let Ok(permit) = semaphore.clone().try_acquire_owned() else {
                            // At capacity — skip this peer. We'll try others.
                            trace!(%addr, "metadata resolver: semaphore full, skipping peer");
                            continue;
                        };
                        let factory = Arc::clone(&factory);
                        let dl = Arc::clone(&downloader);
                        let peers = Arc::clone(&connected_peers);

                        futures.push(tokio::spawn(async move {
                            let result = tokio::time::timeout(
                                PER_PEER_TIMEOUT,
                                resolve_from_peer(
                                    addr,
                                    info_hash,
                                    peer_id,
                                    &factory,
                                    connect_timeout,
                                    &dl,
                                ),
                            )
                            .await;

                            drop(permit);

                            match result {
                                Ok(Ok(peer_result)) => {
                                    peers.lock().push(addr);
                                    Some(peer_result)
                                }
                                Ok(Err(e)) => {
                                    trace!(%addr, "metadata peer failed: {e}");
                                    None
                                }
                                Err(_) => {
                                    trace!(%addr, "metadata peer timed out");
                                    None
                                }
                            }
                        }));
                    }
                }
                // Process completed peer connections.
                Some(result) = futures.next() => {
                    match result {
                        Ok(Some(peer_result)) if peer_result.complete => {
                            // Metadata complete — assemble and return.
                            let assembled = downloader.lock().assemble_and_verify()?;
                            return assemble_torrent(info_hash, assembled, &connected_peers);
                        }
                        _ => {
                            // Peer failed or didn't complete metadata — continue.
                        }
                    }
                }
                // All futures drained and no more peers coming.
                else => {
                    break;
                }
            }
        }

        // Drain remaining futures after peer channel closes.
        while let Some(result) = futures.next().await {
            if let Ok(Some(peer_result)) = result
                && peer_result.complete
            {
                let assembled = downloader.lock().assemble_and_verify()?;
                return assemble_torrent(info_hash, assembled, &connected_peers);
            }
        }

        Err(crate::Error::Connection(
            "metadata resolution exhausted all peers".into(),
        ))
    })
    .await;

    match result {
        Ok(inner) => inner,
        Err(_) => Err(crate::Error::Connection(
            "metadata resolution timed out".into(),
        )),
    }
}

/// Assemble the resolved metadata bytes into a `TorrentMetaV1`.
fn assemble_torrent(
    info_hash: Id20,
    info_bytes: Vec<u8>,
    connected_peers: &Arc<Mutex<Vec<SocketAddr>>>,
) -> crate::Result<(TorrentMetaV1, Vec<SocketAddr>)> {
    // Wrap raw info dict in a minimal torrent dict for parsing.
    let mut torrent_bytes = b"d4:info".to_vec();
    torrent_bytes.extend_from_slice(&info_bytes);
    torrent_bytes.push(b'e');

    let mut meta = torrent_from_bytes(&torrent_bytes)
        .map_err(|e| crate::Error::Connection(format!("failed to parse resolved metadata: {e}")))?;
    meta.info_bytes = Some(Bytes::from(info_bytes));

    let peers = connected_peers.lock().clone();
    debug!(
        %info_hash,
        num_peers = peers.len(),
        "metadata resolved via pre-phase"
    );
    Ok((meta, peers))
}

/// Connect to a single peer and exchange metadata pieces.
///
/// Performs: TCP connect -> BT handshake -> BEP 10 extension handshake ->
/// BEP 9 `ut_metadata` piece requests -> feed pieces to shared downloader.
async fn resolve_from_peer(
    addr: SocketAddr,
    info_hash: Id20,
    peer_id: Id20,
    factory: &NetworkFactory,
    connect_timeout: Duration,
    downloader: &Mutex<MetadataDownloader>,
) -> crate::Result<PeerResult> {
    // --- TCP Connect ---
    let mut stream = tokio::time::timeout(connect_timeout, factory.connect_tcp(addr))
        .await
        .map_err(|_| crate::Error::Connection("connect timed out".into()))??;

    // --- BT Handshake ---
    let our_hs = Handshake::new(info_hash, peer_id);
    let hs_bytes = our_hs.to_bytes();
    stream.write_all(&hs_bytes).await?;
    stream.flush().await?;

    let mut hs_buf = [0u8; HANDSHAKE_SIZE];
    stream.read_exact(&mut hs_buf).await?;
    let their_hs = Handshake::from_bytes(&hs_buf)?;

    if their_hs.info_hash != info_hash {
        return Err(crate::Error::Connection("info_hash mismatch".into()));
    }

    if !their_hs.supports_extensions() {
        return Err(crate::Error::Connection(
            "peer does not support BEP 10 extensions".into(),
        ));
    }

    // --- BEP 10 Extension Handshake ---
    // Send our extension handshake advertising ut_metadata support.
    let our_ext = ExtHandshake::new();
    let ext_payload = our_ext.to_bytes().map_err(crate::Error::Wire)?;
    let ext_msg = Message::Extended {
        ext_id: 0,
        payload: ext_payload,
    };
    write_message(&mut stream, &ext_msg).await?;

    // Read messages until we get the peer's extension handshake.
    let their_ext = loop {
        let msg = read_message(&mut stream).await?;
        match msg {
            Message::Extended { ext_id: 0, payload } => {
                break ExtHandshake::from_bytes(&payload).map_err(crate::Error::Wire)?;
            }
            // Skip bitfield, have, keepalive, etc. that arrive before ext handshake.
            Message::KeepAlive
            | Message::Bitfield(_)
            | Message::Have { .. }
            | Message::Unchoke
            | Message::Choke
            | Message::HaveAll
            | Message::HaveNone => {}
            other => {
                trace!(%addr, ?other, "unexpected message before ext handshake");
            }
        }
    };

    // Check if peer supports ut_metadata.
    let their_ut_metadata_id = their_ext
        .ext_id("ut_metadata")
        .ok_or_else(|| crate::Error::Connection("peer does not support ut_metadata".into()))?;

    let metadata_size = their_ext
        .metadata_size
        .ok_or_else(|| crate::Error::Connection("peer did not advertise metadata_size".into()))?;

    if metadata_size == 0 || metadata_size > 16 * 1024 * 1024 {
        return Err(crate::Error::Connection(format!(
            "invalid metadata_size: {metadata_size}"
        )));
    }

    // --- BEP 9: Request metadata pieces ---
    let our_ut_metadata_id = our_ext
        .ext_id("ut_metadata")
        .expect("we always advertise ut_metadata");

    // Set metadata size and get missing pieces under the lock.
    let pieces_to_request = {
        let mut dl = downloader.lock();
        dl.set_total_size(metadata_size);
        dl.request_all_from_peer(addr)
    };

    if pieces_to_request.is_empty() {
        // Another peer already completed the download.
        return Ok(PeerResult { complete: false });
    }

    // Send requests for all missing pieces.
    for &piece_idx in &pieces_to_request {
        let req = MetadataMessage::request(piece_idx);
        let req_bytes = req.to_bytes().map_err(crate::Error::Wire)?;
        let msg = Message::Extended {
            ext_id: their_ut_metadata_id,
            payload: req_bytes,
        };
        write_message(&mut stream, &msg).await?;
    }

    // Receive metadata piece responses.
    loop {
        let msg = read_message(&mut stream).await?;
        match msg {
            Message::Extended { ext_id, payload } if ext_id == our_ut_metadata_id => {
                let meta_msg = MetadataMessage::from_bytes(&payload).map_err(crate::Error::Wire)?;
                match meta_msg.msg_type {
                    MetadataMessageType::Data => {
                        let piece_data = meta_msg.data.ok_or_else(|| {
                            crate::Error::Connection("metadata data message missing payload".into())
                        })?;
                        let complete = {
                            let mut dl = downloader.lock();
                            dl.piece_received(meta_msg.piece, piece_data)
                        };
                        if complete {
                            return Ok(PeerResult { complete: true });
                        }
                    }
                    MetadataMessageType::Reject => {
                        downloader.lock().mark_rejected(addr);
                        return Err(crate::Error::Connection(
                            "peer rejected metadata request".into(),
                        ));
                    }
                    MetadataMessageType::Request => {
                        // Peer is requesting metadata from us — we don't have it.
                        // Ignore.
                    }
                }
            }
            // Skip all non-metadata messages (wire, extended, etc.).
            _ => {}
        }
    }
}

/// Write a single framed BT wire message to a stream.
///
/// Format: 4-byte big-endian length prefix + payload.
async fn write_message(
    stream: &mut (impl AsyncWriteExt + Unpin),
    msg: &Message,
) -> crate::Result<()> {
    let wire_bytes = msg.to_bytes();
    stream.write_all(&wire_bytes).await?;
    stream.flush().await?;
    Ok(())
}

/// Read a single framed BT wire message from a stream.
///
/// Format: 4-byte big-endian length prefix + payload.
async fn read_message(stream: &mut (impl AsyncReadExt + Unpin)) -> crate::Result<Message<Bytes>> {
    // Read the 4-byte length prefix.
    let mut len_buf = [0u8; 4];
    stream.read_exact(&mut len_buf).await?;
    let length = u32::from_be_bytes(len_buf) as usize;

    if length == 0 {
        return Ok(Message::KeepAlive);
    }

    if length > MAX_MESSAGE_SIZE {
        return Err(crate::Error::Connection(format!(
            "message too large during metadata resolution: {length} bytes"
        )));
    }

    // Read the payload.
    let mut payload = vec![0u8; length];
    stream.read_exact(&mut payload).await?;
    let payload = Bytes::from(payload);

    Message::from_payload(payload).map_err(crate::Error::Wire)
}

#[cfg(test)]
mod tests {
    use super::*;
    use irontide_core::PeerId;
    use tokio::io::{AsyncReadExt, AsyncWriteExt};

    /// Build test metadata: raw info dict bytes + their SHA1 hash.
    fn test_metadata() -> (Vec<u8>, Id20) {
        // Minimal bencode info dict with sorted keys (bencode requires lexicographic order).
        let pieces = [0u8; 20];
        // Build manually for correct bencode.
        let mut info_bytes = Vec::new();
        info_bytes.extend_from_slice(b"d");
        info_bytes.extend_from_slice(b"6:lengthi1024e");
        info_bytes.extend_from_slice(b"4:name4:test");
        info_bytes.extend_from_slice(b"12:piece lengthi16384e");
        info_bytes.extend_from_slice(b"6:pieces20:");
        info_bytes.extend_from_slice(&pieces);
        info_bytes.extend_from_slice(b"e");

        let hash = irontide_core::sha1(&info_bytes);
        (info_bytes, hash)
    }

    /// Simulated remote peer that completes the BT + extension handshake
    /// and responds to `ut_metadata` requests with the provided metadata.
    async fn run_mock_peer(
        mut stream: impl AsyncReadExt + AsyncWriteExt + Unpin,
        info_hash: Id20,
        metadata: Vec<u8>,
    ) {
        let remote_id = PeerId::generate().0;

        // Read BT handshake.
        let mut hs_buf = [0u8; HANDSHAKE_SIZE];
        stream.read_exact(&mut hs_buf).await.unwrap();
        let _their_hs = Handshake::from_bytes(&hs_buf).unwrap();

        // Send our BT handshake.
        let our_hs = Handshake::new(info_hash, remote_id);
        stream.write_all(&our_hs.to_bytes()).await.unwrap();
        stream.flush().await.unwrap();

        // Read their ext handshake.
        let msg = read_message_raw(&mut stream).await;
        let their_ut_metadata_id = match msg {
            Message::Extended { ext_id: 0, payload } => {
                let hs = ExtHandshake::from_bytes(&payload).unwrap();
                hs.ext_id("ut_metadata").unwrap_or(1)
            }
            _ => 1,
        };

        // Send our ext handshake with metadata_size.
        let mut ext_hs = ExtHandshake::new();
        ext_hs.metadata_size = Some(metadata.len() as u64);
        let payload = ext_hs.to_bytes().unwrap();
        let msg = Message::Extended { ext_id: 0, payload };
        write_message_raw(&mut stream, &msg).await;

        // Handle ut_metadata requests.
        loop {
            let Ok(msg) =
                tokio::time::timeout(Duration::from_secs(5), read_message_raw(&mut stream)).await
            else {
                break;
            };

            if let Message::Extended { ext_id, payload } = msg
                && ext_id == 1
            {
                // This should be the ext_id our peer assigned for ut_metadata.
                if let Ok(meta_msg) = MetadataMessage::from_bytes(&payload)
                    && meta_msg.msg_type == MetadataMessageType::Request
                {
                    let piece_idx = meta_msg.piece;
                    let piece_size: u64 = 16384;
                    let start = u64::from(piece_idx) * piece_size;
                    let end = ((start + piece_size) as usize).min(metadata.len());
                    let data = Bytes::copy_from_slice(&metadata[start as usize..end]);

                    let resp = MetadataMessage::data(piece_idx, metadata.len() as u64, data);
                    let resp_bytes = resp.to_bytes().unwrap();
                    let resp_msg = Message::Extended {
                        ext_id: their_ut_metadata_id,
                        payload: resp_bytes,
                    };
                    write_message_raw(&mut stream, &resp_msg).await;
                }
            }
        }
    }

    async fn write_message_raw(stream: &mut (impl AsyncWriteExt + Unpin), msg: &Message) {
        let bytes = msg.to_bytes();
        stream.write_all(&bytes).await.unwrap();
        stream.flush().await.unwrap();
    }

    async fn read_message_raw(stream: &mut (impl AsyncReadExt + Unpin)) -> Message<Bytes> {
        let mut len_buf = [0u8; 4];
        stream.read_exact(&mut len_buf).await.unwrap();
        let length = u32::from_be_bytes(len_buf) as usize;
        if length == 0 {
            return Message::KeepAlive;
        }
        let mut payload = vec![0u8; length];
        stream.read_exact(&mut payload).await.unwrap();
        Message::from_payload(Bytes::from(payload)).unwrap()
    }

    #[tokio::test]
    async fn resolve_metadata_happy_path() {
        let (info_bytes, info_hash) = test_metadata();
        let peer_id = PeerId::generate().0;

        // Set up a real TCP listener to simulate a peer.
        let factory = Arc::new(NetworkFactory::tokio());
        let listener_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
        let mut listener = factory.bind_tcp(listener_addr).await.unwrap();
        let actual_addr = listener.local_addr().unwrap();

        let metadata_clone = info_bytes.clone();
        // Spawn mock peer.
        tokio::spawn(async move {
            let (stream, _addr) = listener.accept().await.unwrap();
            run_mock_peer(stream, info_hash, metadata_clone).await;
        });

        // Create peer channel and send the peer address.
        let (peer_tx, peer_rx) = mpsc::unbounded_channel();
        peer_tx.send(vec![actual_addr]).unwrap();
        // Close channel after sending to signal no more peers.
        drop(peer_tx);

        let result = resolve_metadata(
            info_hash,
            peer_id,
            peer_rx,
            factory,
            Duration::from_secs(5),
            DEFAULT_MAX_CONCURRENT,
        )
        .await;

        let (meta, peers) = result.expect("metadata resolution should succeed");
        assert_eq!(meta.info_hash, info_hash);
        assert!(!peers.is_empty());
        assert!(peers.contains(&actual_addr));
    }

    #[tokio::test]
    async fn resolve_metadata_timeout_with_no_peers() {
        let info_hash = Id20::ZERO;
        let peer_id = PeerId::generate().0;
        let factory = Arc::new(NetworkFactory::tokio());

        let (peer_tx, peer_rx) = mpsc::unbounded_channel();
        // Send no peers and close the channel.
        drop(peer_tx);

        let result = resolve_metadata(
            info_hash,
            peer_id,
            peer_rx,
            factory,
            Duration::from_secs(1),
            DEFAULT_MAX_CONCURRENT,
        )
        .await;

        assert!(result.is_err());
    }

    #[tokio::test]
    async fn resolve_metadata_skips_duplicate_peers() {
        let (info_bytes, info_hash) = test_metadata();
        let peer_id = PeerId::generate().0;

        let factory = Arc::new(NetworkFactory::tokio());
        let listener_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
        let mut listener = factory.bind_tcp(listener_addr).await.unwrap();
        let actual_addr = listener.local_addr().unwrap();

        let metadata_clone = info_bytes.clone();
        tokio::spawn(async move {
            // Only accept one connection — duplicates should be deduplicated.
            let (stream, _addr) = listener.accept().await.unwrap();
            run_mock_peer(stream, info_hash, metadata_clone).await;
        });

        let (peer_tx, peer_rx) = mpsc::unbounded_channel();
        // Send the same peer address twice.
        peer_tx.send(vec![actual_addr]).unwrap();
        peer_tx.send(vec![actual_addr]).unwrap();
        drop(peer_tx);

        let result = resolve_metadata(
            info_hash,
            peer_id,
            peer_rx,
            factory,
            Duration::from_secs(5),
            DEFAULT_MAX_CONCURRENT,
        )
        .await;

        let (meta, peers) = result.expect("metadata resolution should succeed");
        assert_eq!(meta.info_hash, info_hash);
        // Should have connected only once despite receiving the address twice.
        assert_eq!(peers.len(), 1);
    }

    #[tokio::test]
    async fn resolve_metadata_peer_without_extensions() {
        let (_info_bytes, info_hash) = test_metadata();
        let peer_id = PeerId::generate().0;

        let factory = Arc::new(NetworkFactory::tokio());
        let listener_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
        let mut listener = factory.bind_tcp(listener_addr).await.unwrap();
        let actual_addr = listener.local_addr().unwrap();

        // Spawn a peer that doesn't support extensions.
        tokio::spawn(async move {
            let (mut stream, _addr) = listener.accept().await.unwrap();
            let remote_id = PeerId::generate().0;

            // Read their handshake.
            let mut hs_buf = [0u8; HANDSHAKE_SIZE];
            stream.read_exact(&mut hs_buf).await.unwrap();

            // Send handshake WITHOUT extension bit.
            let mut hs = Handshake::new(info_hash, remote_id);
            hs.reserved = [0u8; 8]; // Clear all flags.
            stream.write_all(&hs.to_bytes()).await.unwrap();
            stream.flush().await.unwrap();
        });

        let (peer_tx, peer_rx) = mpsc::unbounded_channel();
        peer_tx.send(vec![actual_addr]).unwrap();
        drop(peer_tx);

        let result = resolve_metadata(
            info_hash,
            peer_id,
            peer_rx,
            factory,
            Duration::from_secs(5),
            DEFAULT_MAX_CONCURRENT,
        )
        .await;

        // Should fail since the only peer doesn't support extensions.
        assert!(result.is_err());
    }

    #[tokio::test]
    async fn resolve_metadata_concurrent_limit() {
        // Verify that the semaphore limits concurrent connections.
        let (info_bytes, info_hash) = test_metadata();
        let peer_id = PeerId::generate().0;

        let factory = Arc::new(NetworkFactory::tokio());
        let listener_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
        let mut listener = factory.bind_tcp(listener_addr).await.unwrap();
        let actual_addr = listener.local_addr().unwrap();

        let metadata_clone = info_bytes.clone();
        tokio::spawn(async move {
            let (stream, _addr) = listener.accept().await.unwrap();
            run_mock_peer(stream, info_hash, metadata_clone).await;
        });

        let (peer_tx, peer_rx) = mpsc::unbounded_channel();
        peer_tx.send(vec![actual_addr]).unwrap();
        drop(peer_tx);

        // Use max_concurrent = 1 to test semaphore behavior.
        let result = resolve_metadata(
            info_hash,
            peer_id,
            peer_rx,
            factory,
            Duration::from_secs(5),
            1,
        )
        .await;

        let (meta, _peers) = result.expect("metadata resolution should succeed with limit 1");
        assert_eq!(meta.info_hash, info_hash);
    }
}