Skip to main content

scp2p_core/
net_fetch.rs

1// Copyright (c) 2024-2026 Vanyo Vanev / Tech Art Ltd
2// SPDX-License-Identifier: MPL-2.0
3//
4// This Source Code Form is subject to the terms of the Mozilla Public
5// License, v. 2.0. If a copy of the MPL was not distributed with this
6// file, You can obtain one at https://mozilla.org/MPL/2.0/.
7use std::{
8    collections::{HashMap, VecDeque},
9    io::SeekFrom,
10    path::{Path, PathBuf},
11    sync::Arc,
12    time::{Duration, Instant},
13};
14
15use futures_util::stream::{FuturesUnordered, StreamExt};
16
17use async_trait::async_trait;
18use tokio::{
19    io::{AsyncRead, AsyncSeekExt, AsyncWrite, AsyncWriteExt as TokioAsyncWriteExt},
20    sync::Mutex,
21};
22
23use crate::{
24    content::{CHUNK_SIZE, compute_chunk_list_hash, verify_chunk, verify_content},
25    ids::ContentId,
26    manifest::ManifestV1,
27    peer::PeerAddr,
28    transport::{read_envelope, write_envelope},
29    wire::{
30        ChunkData, ChunkHashList, Envelope, FLAG_ERROR, GetChunk, GetChunkHashes, GetManifest,
31        ManifestData, MsgType, WirePayload,
32    },
33};
34
35pub trait AsyncIo: AsyncRead + AsyncWrite + Unpin + Send {}
36impl<T> AsyncIo for T where T: AsyncRead + AsyncWrite + Unpin + Send {}
37
38pub type BoxedStream = Box<dyn AsyncIo>;
39
40#[async_trait]
41pub trait PeerConnector: Send + Sync {
42    async fn connect(&self, peer: &PeerAddr) -> anyhow::Result<BoxedStream>;
43}
44
45#[async_trait]
46pub trait RequestTransport: Send + Sync {
47    async fn request(
48        &self,
49        peer: &PeerAddr,
50        request: Envelope,
51        timeout_dur: Duration,
52    ) -> anyhow::Result<Envelope>;
53}
54
55#[async_trait]
56impl<C: PeerConnector> RequestTransport for C {
57    async fn request(
58        &self,
59        peer: &PeerAddr,
60        request: Envelope,
61        timeout_dur: Duration,
62    ) -> anyhow::Result<Envelope> {
63        let mut stream = self.connect(peer).await?;
64        send_request_on_stream(&mut stream, request, timeout_dur).await
65    }
66}
67
68pub struct DirectRequestTransport<C> {
69    connector: C,
70}
71
72impl<C> DirectRequestTransport<C> {
73    pub fn new(connector: C) -> Self {
74        Self { connector }
75    }
76}
77
78#[async_trait]
79impl<C: PeerConnector> RequestTransport for DirectRequestTransport<C> {
80    async fn request(
81        &self,
82        peer: &PeerAddr,
83        request: Envelope,
84        timeout_dur: Duration,
85    ) -> anyhow::Result<Envelope> {
86        let mut stream = self.connector.connect(peer).await?;
87        send_request_on_stream(&mut stream, request, timeout_dur).await
88    }
89}
90
91pub struct SessionPoolTransport<C> {
92    connector: C,
93    sessions: Mutex<HashMap<String, (BoxedStream, Instant)>>,
94}
95
96/// Maximum number of cached sessions in the pool.  When full, the
97/// least-recently-used session is evicted before inserting a new one.
98const MAX_POOL_SESSIONS: usize = 64;
99
100impl<C> SessionPoolTransport<C> {
101    pub fn new(connector: C) -> Self {
102        Self {
103            connector,
104            sessions: Mutex::new(HashMap::new()),
105        }
106    }
107
108    /// Insert a session into the pool, evicting the oldest entry if at capacity.
109    async fn pool_insert(&self, key: String, stream: BoxedStream) {
110        let mut sessions = self.sessions.lock().await;
111        if sessions.len() >= MAX_POOL_SESSIONS && !sessions.contains_key(&key) {
112            // Evict least-recently-used entry.
113            if let Some(oldest_key) = sessions
114                .iter()
115                .min_by_key(|(_, (_, ts))| *ts)
116                .map(|(k, _)| k.clone())
117            {
118                sessions.remove(&oldest_key);
119            }
120        }
121        sessions.insert(key, (stream, Instant::now()));
122    }
123}
124
125#[async_trait]
126impl<C: PeerConnector> RequestTransport for SessionPoolTransport<C> {
127    async fn request(
128        &self,
129        peer: &PeerAddr,
130        request: Envelope,
131        timeout_dur: Duration,
132    ) -> anyhow::Result<Envelope> {
133        let key = peer_key(peer);
134        let mut stream = {
135            let mut sessions = self.sessions.lock().await;
136            sessions.remove(&key).map(|(s, _)| s)
137        };
138
139        if stream.is_none() {
140            stream = Some(self.connector.connect(peer).await?);
141        }
142
143        let mut stream = stream.expect("stream must be initialized");
144        match send_request_on_stream(&mut stream, request.clone(), timeout_dur).await {
145            Ok(response) => {
146                self.pool_insert(key, stream).await;
147                Ok(response)
148            }
149            Err(first_err) => {
150                // drop broken stream and redial once for retry.
151                let mut fresh_stream = self.connector.connect(peer).await?;
152                let response = send_request_on_stream(&mut fresh_stream, request, timeout_dur)
153                    .await
154                    .map_err(|_| first_err)?;
155                self.pool_insert(key, fresh_stream).await;
156                Ok(response)
157            }
158        }
159    }
160}
161
162/// Request transport that automatically tunnels through a relay when
163/// the target peer has [`PeerAddr::relay_via`] set.
164///
165/// For direct peers, delegates to the inner connector.  For relayed
166/// peers, connects to the relay node, wraps the request envelope in a
167/// `RelayStream`, sends it, and unwraps the response.
168pub struct RelayAwareTransport<'a, C> {
169    connector: &'a C,
170}
171
172impl<'a, C> RelayAwareTransport<'a, C> {
173    pub fn new(connector: &'a C) -> Self {
174        Self { connector }
175    }
176}
177
178#[async_trait]
179impl<'a, C: PeerConnector> RequestTransport for RelayAwareTransport<'a, C> {
180    async fn request(
181        &self,
182        peer: &PeerAddr,
183        request: Envelope,
184        timeout_dur: Duration,
185    ) -> anyhow::Result<Envelope> {
186        if let Some(relay_route) = &peer.relay_via {
187            // Connect to the relay node.
188            let mut stream = self.connector.connect(&relay_route.relay_addr).await?;
189
190            // Wrap the inner request in a RelayStream message.
191            let inner_bytes = request.encode()?;
192            let relay_stream_msg = crate::wire::RelayStream {
193                relay_slot_id: relay_route.slot_id,
194                stream_id: 0,
195                kind: crate::wire::RelayPayloadKind::Content,
196                payload: inner_bytes,
197            };
198            let outer_request = Envelope::from_typed(
199                request.req_id,
200                0,
201                &WirePayload::RelayStream(relay_stream_msg),
202            )?;
203
204            // Send the outer request through the relay.
205            let outer_response =
206                send_request_on_stream(&mut stream, outer_request, timeout_dur).await?;
207
208            // Unwrap the RelayStream response to get the inner envelope.
209            let typed = outer_response.decode_typed()?;
210            match typed {
211                WirePayload::RelayStream(resp) => Envelope::decode(&resp.payload),
212                _ => anyhow::bail!(
213                    "relay tunnel: unexpected response type {:?}",
214                    outer_response.r#type
215                ),
216            }
217        } else {
218            // Direct connection — same as PeerConnector blanket impl.
219            let mut stream = self.connector.connect(peer).await?;
220            send_request_on_stream(&mut stream, request, timeout_dur).await
221        }
222    }
223}
224
225/// Owning version of [`RelayAwareTransport`] that can be wrapped in `Arc` for
226/// use with long-running background loops (`start_dht_republish_loop`, etc.).
227pub struct OwnedRelayAwareTransport<C> {
228    connector: Arc<C>,
229}
230
231impl<C> OwnedRelayAwareTransport<C> {
232    pub fn new(connector: Arc<C>) -> Self {
233        Self { connector }
234    }
235}
236
237#[async_trait]
238impl<C: PeerConnector> RequestTransport for OwnedRelayAwareTransport<C> {
239    async fn request(
240        &self,
241        peer: &PeerAddr,
242        request: Envelope,
243        timeout_dur: Duration,
244    ) -> anyhow::Result<Envelope> {
245        if let Some(relay_route) = &peer.relay_via {
246            let mut stream = self.connector.connect(&relay_route.relay_addr).await?;
247            let inner_bytes = request.encode()?;
248            let relay_stream_msg = crate::wire::RelayStream {
249                relay_slot_id: relay_route.slot_id,
250                stream_id: 0,
251                kind: crate::wire::RelayPayloadKind::Content,
252                payload: inner_bytes,
253            };
254            let outer_request = Envelope::from_typed(
255                request.req_id,
256                0,
257                &WirePayload::RelayStream(relay_stream_msg),
258            )?;
259            let outer_response =
260                send_request_on_stream(&mut stream, outer_request, timeout_dur).await?;
261            let typed = outer_response.decode_typed()?;
262            match typed {
263                WirePayload::RelayStream(resp) => Envelope::decode(&resp.payload),
264                _ => anyhow::bail!(
265                    "relay tunnel: unexpected response type {:?}",
266                    outer_response.r#type
267                ),
268            }
269        } else {
270            let mut stream = self.connector.connect(peer).await?;
271            send_request_on_stream(&mut stream, request, timeout_dur).await
272        }
273    }
274}
275
276#[derive(Debug, Clone)]
277pub struct FetchPolicy {
278    pub attempts_per_peer: usize,
279    pub request_timeout: Duration,
280    pub max_chunks_per_peer: usize,
281    pub failure_backoff_base: Duration,
282    pub max_backoff: Duration,
283    /// Maximum number of chunk requests in flight at once across all peers.
284    /// Higher values improve throughput when multiple peers are available.
285    pub parallel_chunks: usize,
286    /// Maximum consecutive stall loops (each separated by `failure_backoff_base`)
287    /// before a download is aborted.  A stall loop fires whenever no chunks are
288    /// in-flight because every peer is currently in back-off.  Default: 60
289    /// (≈ 18 s at the 300 ms default back-off base).
290    pub max_stall_rounds: usize,
291    /// Optional seed reputation scores keyed by the peer's canonical string
292    /// (`"<ip>:<port>:<transport>"`).  When present, a peer's
293    /// `PeerRuntimeStats.score` is initialised from this map so that
294    /// historically reliable peers are tried before less-trusted ones.
295    pub initial_reputations: HashMap<String, i32>,
296}
297
298/// Callback invoked after each chunk is verified and stored.
299///
300/// Arguments: `(completed_chunks, total_chunks, bytes_so_far)`.
301pub type ProgressCallback = Box<dyn Fn(u32, u32, u64) + Send + Sync>;
302
303impl Default for FetchPolicy {
304    fn default() -> Self {
305        Self {
306            attempts_per_peer: 2,
307            request_timeout: Duration::from_secs(3),
308            max_chunks_per_peer: 128,
309            failure_backoff_base: Duration::from_millis(300),
310            max_backoff: Duration::from_secs(8),
311            parallel_chunks: 8,
312            max_stall_rounds: 60,
313            initial_reputations: HashMap::new(),
314        }
315    }
316}
317
318pub async fn fetch_manifest_with_retry<T: RequestTransport + ?Sized>(
319    transport: &T,
320    peers: &[PeerAddr],
321    manifest_id: [u8; 32],
322    policy: &FetchPolicy,
323) -> anyhow::Result<ManifestV1> {
324    if peers.is_empty() {
325        anyhow::bail!("no peers available for manifest fetch");
326    }
327
328    let mut req_id = 1u32;
329    let mut last_err = None;
330    for attempt in 0..policy.attempts_per_peer {
331        for offset in 0..peers.len() {
332            let idx = (attempt + offset) % peers.len();
333            let target = &peers[idx];
334            let result = fetch_manifest_once(transport, target, manifest_id, req_id, policy).await;
335            req_id = req_id.wrapping_add(1);
336            match result {
337                Ok(manifest) => return Ok(manifest),
338                Err(err) => last_err = Some(err),
339            }
340        }
341    }
342
343    Err(last_err.unwrap_or_else(|| anyhow::anyhow!("manifest fetch failed")))
344}
345
346/// Fetch the chunk hash list for a content ID from a set of peers, verifying
347/// the result against the expected `chunk_list_hash` commitment.
348pub async fn fetch_chunk_hashes_with_retry<T: RequestTransport + ?Sized>(
349    transport: &T,
350    peers: &[PeerAddr],
351    content_id: [u8; 32],
352    expected_chunk_count: u32,
353    expected_chunk_list_hash: [u8; 32],
354    policy: &FetchPolicy,
355) -> anyhow::Result<Vec<[u8; 32]>> {
356    if peers.is_empty() {
357        anyhow::bail!("no peers available for chunk hash fetch");
358    }
359
360    let mut req_id = 5_000u32;
361    let mut last_err = None;
362    for attempt in 0..policy.attempts_per_peer {
363        for offset in 0..peers.len() {
364            let idx = (attempt + offset) % peers.len();
365            let target = &peers[idx];
366            let result =
367                fetch_chunk_hashes_once(transport, target, content_id, req_id, policy).await;
368            req_id = req_id.wrapping_add(1);
369            match result {
370                Ok(hashes) => {
371                    if hashes.len() != expected_chunk_count as usize {
372                        last_err = Some(anyhow::anyhow!(
373                            "chunk hash count mismatch: got {} expected {}",
374                            hashes.len(),
375                            expected_chunk_count
376                        ));
377                        continue;
378                    }
379                    let actual_hash = compute_chunk_list_hash(&hashes);
380                    if actual_hash != expected_chunk_list_hash {
381                        last_err = Some(anyhow::anyhow!("chunk_list_hash verification failed"));
382                        continue;
383                    }
384                    return Ok(hashes);
385                }
386                Err(err) => last_err = Some(err),
387            }
388        }
389    }
390
391    Err(last_err.unwrap_or_else(|| anyhow::anyhow!("chunk hash fetch failed")))
392}
393
394async fn fetch_chunk_hashes_once<T: RequestTransport + ?Sized>(
395    transport: &T,
396    peer: &PeerAddr,
397    content_id: [u8; 32],
398    req_id: u32,
399    policy: &FetchPolicy,
400) -> anyhow::Result<Vec<[u8; 32]>> {
401    let request = Envelope::from_typed(
402        req_id,
403        0,
404        &WirePayload::GetChunkHashes(GetChunkHashes { content_id }),
405    )?;
406    let response = transport
407        .request(peer, request, policy.request_timeout)
408        .await?;
409
410    if response.req_id != req_id {
411        anyhow::bail!("chunk hash response req_id mismatch");
412    }
413    if response.r#type != MsgType::ChunkHashList as u16 {
414        anyhow::bail!("unexpected response type for chunk hash request");
415    }
416    let payload = response.decode_typed()?;
417    let WirePayload::ChunkHashList(ChunkHashList {
418        content_id: returned_id,
419        hashes,
420    }) = payload
421    else {
422        anyhow::bail!("invalid chunk hash response payload");
423    };
424    if returned_id != content_id {
425        anyhow::bail!("chunk hash content_id mismatch");
426    }
427    Ok(hashes)
428}
429
430/// Downloads content by fetching chunks from a set of peers **in parallel**.
431///
432/// Up to `policy.parallel_chunks` chunk requests are in flight at once.
433/// Peers are ranked by a live score: fast, reliable peers accumulate score
434/// and serve more chunks; failing peers get exponential back-off.  Chunks
435/// that fail verification or time out are automatically retried on the next
436/// best peer.
437pub async fn download_swarm_over_network<T: RequestTransport + ?Sized>(
438    transport: &T,
439    peers: &[PeerAddr],
440    content_id: [u8; 32],
441    chunk_hashes: &[[u8; 32]],
442    policy: &FetchPolicy,
443    on_progress: Option<&ProgressCallback>,
444) -> anyhow::Result<Vec<u8>> {
445    if peers.is_empty() {
446        anyhow::bail!("no peers available for content download");
447    }
448
449    let total_chunks = chunk_hashes.len();
450    let max_parallel = policy.parallel_chunks.min(total_chunks).max(1);
451    let max_retries_per_chunk = policy.attempts_per_peer * peers.len();
452    let mut req_id = 10_000u32;
453
454    let mut completed: Vec<Option<Vec<u8>>> = vec![None; total_chunks];
455    let mut completed_count = 0usize;
456    let mut completed_bytes = 0u64;
457
458    let mut stats: HashMap<String, PeerRuntimeStats> = peers
459        .iter()
460        .map(|peer| {
461            let key = peer_key(peer);
462            let initial_score = policy.initial_reputations.get(&key).copied().unwrap_or(0);
463            (
464                key,
465                PeerRuntimeStats {
466                    score: initial_score,
467                    ..PeerRuntimeStats::default()
468                },
469            )
470        })
471        .collect();
472
473    // Work queues: fresh chunks + retries
474    let mut next_chunk = 0usize;
475    let mut retry_queue: VecDeque<(usize, usize)> = VecDeque::new();
476
477    let mut in_flight = FuturesUnordered::new();
478    let mut stall_count = 0usize;
479
480    loop {
481        // ── Schedule as many chunks as the parallelism window allows ──
482        while in_flight.len() < max_parallel {
483            let (chunk_idx, retries) = if let Some(retry) = retry_queue.pop_front() {
484                retry
485            } else if next_chunk < total_chunks {
486                let idx = next_chunk;
487                next_chunk += 1;
488                (idx, 0)
489            } else {
490                break;
491            };
492
493            if retries >= max_retries_per_chunk {
494                anyhow::bail!(
495                    "unable to retrieve verified chunk {} after {} attempts",
496                    chunk_idx,
497                    retries
498                );
499            }
500
501            if let Some(peer_idx) = pick_best_peer_index(peers, &stats, policy) {
502                let peer = peers[peer_idx].clone();
503                let pk = peer_key(&peer);
504                let rid = req_id;
505                req_id = req_id.wrapping_add(1);
506                if let Some(s) = stats.get_mut(&pk) {
507                    s.requests += 1;
508                    s.in_flight += 1;
509                }
510                let expected = chunk_hashes[chunk_idx];
511                in_flight.push(fetch_one_chunk(
512                    transport, peer, content_id, chunk_idx, rid, expected, pk, policy, retries,
513                ));
514            } else {
515                // No eligible peer right now — put chunk back
516                retry_queue.push_front((chunk_idx, retries));
517                break;
518            }
519        }
520
521        // ── Done? ──
522        if completed_count == total_chunks {
523            break;
524        }
525
526        // ── If nothing is in flight, peers may be in back-off; wait briefly ──
527        if in_flight.is_empty() {
528            stall_count += 1;
529            if stall_count > policy.max_stall_rounds {
530                anyhow::bail!(
531                    "download stalled: no peers can serve remaining {}/{} chunks",
532                    total_chunks - completed_count,
533                    total_chunks
534                );
535            }
536            tokio::time::sleep(policy.failure_backoff_base).await;
537            continue;
538        }
539        stall_count = 0;
540
541        // ── Wait for the next chunk to arrive ──
542        let res = in_flight.next().await.expect("non-empty FuturesUnordered");
543
544        // Decrement in-flight counter
545        if let Some(s) = stats.get_mut(&res.peer_key) {
546            s.in_flight = s.in_flight.saturating_sub(1);
547        }
548
549        let now = Instant::now();
550        match res.data {
551            Ok(bytes) if verify_chunk(&res.expected_hash, &bytes).is_ok() => {
552                if let Some(s) = stats.get_mut(&res.peer_key) {
553                    s.score += 2;
554                    s.consecutive_failures = 0;
555                    s.backoff_until = None;
556                }
557                let chunk_len = bytes.len() as u64;
558                completed[res.chunk_idx] = Some(bytes);
559                completed_count += 1;
560                completed_bytes += chunk_len;
561                if let Some(cb) = &on_progress {
562                    cb(completed_count as u32, total_chunks as u32, completed_bytes);
563                }
564            }
565            Ok(_) => {
566                // Hash mismatch — penalise peer, retry chunk
567                if let Some(s) = stats.get_mut(&res.peer_key) {
568                    register_failure(s, policy, now);
569                }
570                retry_queue.push_back((res.chunk_idx, res.retries + 1));
571            }
572            Err(_) => {
573                if let Some(s) = stats.get_mut(&res.peer_key) {
574                    register_failure(s, policy, now);
575                }
576                retry_queue.push_back((res.chunk_idx, res.retries + 1));
577            }
578        }
579    }
580
581    // Assemble output in chunk order
582    let mut output = Vec::new();
583    for chunk_data in completed {
584        output.extend_from_slice(&chunk_data.expect("all chunks completed"));
585    }
586
587    verify_content(&ContentId(content_id), &output)?;
588    Ok(output)
589}
590
591/// Like [`download_swarm_over_network`], but streams each verified chunk
592/// directly to disk instead of buffering the entire file in memory.
593///
594/// Chunks are written at their correct offset in a temporary file.  Once
595/// all chunks are received and individually hash-verified, the final
596/// `content_id` (BLAKE3 over the full content) is checked via a streaming
597/// pass over the completed file, and the result is atomically renamed to
598/// `target_path`.
599///
600/// Returns the canonical target path on success.
601pub async fn download_swarm_to_file<T: RequestTransport + ?Sized>(
602    transport: &T,
603    peers: &[PeerAddr],
604    content_id: [u8; 32],
605    chunk_hashes: &[[u8; 32]],
606    policy: &FetchPolicy,
607    target_path: &Path,
608    on_progress: Option<&ProgressCallback>,
609) -> anyhow::Result<PathBuf> {
610    if peers.is_empty() {
611        anyhow::bail!("no peers available for content download");
612    }
613
614    let total_chunks = chunk_hashes.len();
615    let max_parallel = policy.parallel_chunks.min(total_chunks).max(1);
616    let max_retries_per_chunk = policy.attempts_per_peer * peers.len();
617    let mut req_id = 10_000u32;
618
619    // Open a temporary file next to the target for writing chunks.
620    let tmp_path = target_path.with_extension("scp2p-partial");
621    let mut file = tokio::fs::File::create(&tmp_path).await?;
622
623    // Track which chunks are completed.
624    let mut chunk_done = vec![false; total_chunks];
625    let mut completed_count = 0usize;
626    let mut completed_bytes = 0u64;
627
628    let mut stats: HashMap<String, PeerRuntimeStats> = peers
629        .iter()
630        .map(|peer| {
631            let key = peer_key(peer);
632            let initial_score = policy.initial_reputations.get(&key).copied().unwrap_or(0);
633            (
634                key,
635                PeerRuntimeStats {
636                    score: initial_score,
637                    ..PeerRuntimeStats::default()
638                },
639            )
640        })
641        .collect();
642
643    let mut next_chunk = 0usize;
644    let mut retry_queue: VecDeque<(usize, usize)> = VecDeque::new();
645    let mut in_flight: FuturesUnordered<_> = FuturesUnordered::new();
646    let mut stall_count = 0usize;
647
648    loop {
649        while in_flight.len() < max_parallel {
650            let (chunk_idx, retries) = if let Some(retry) = retry_queue.pop_front() {
651                retry
652            } else if next_chunk < total_chunks {
653                let idx = next_chunk;
654                next_chunk += 1;
655                (idx, 0)
656            } else {
657                break;
658            };
659
660            if retries >= max_retries_per_chunk {
661                // Clean up partial file.
662                let _ = tokio::fs::remove_file(&tmp_path).await;
663                anyhow::bail!(
664                    "unable to retrieve verified chunk {} after {} attempts",
665                    chunk_idx,
666                    retries
667                );
668            }
669
670            if let Some(peer_idx) = pick_best_peer_index(peers, &stats, policy) {
671                let peer = peers[peer_idx].clone();
672                let pk = peer_key(&peer);
673                let rid = req_id;
674                req_id = req_id.wrapping_add(1);
675                if let Some(s) = stats.get_mut(&pk) {
676                    s.in_flight += 1;
677                }
678                in_flight.push(fetch_one_chunk(
679                    transport,
680                    peer,
681                    content_id,
682                    chunk_idx,
683                    rid,
684                    chunk_hashes[chunk_idx],
685                    pk,
686                    policy,
687                    retries,
688                ));
689            } else {
690                retry_queue.push_front((chunk_idx, retries));
691                break;
692            }
693        }
694
695        if completed_count >= total_chunks {
696            break;
697        }
698
699        if in_flight.is_empty() {
700            stall_count += 1;
701            if stall_count > policy.max_stall_rounds {
702                let _ = tokio::fs::remove_file(&tmp_path).await;
703                anyhow::bail!(
704                    "download stalled: no peers can serve remaining {}/{} chunks",
705                    total_chunks - completed_count,
706                    total_chunks
707                );
708            }
709            tokio::time::sleep(policy.failure_backoff_base).await;
710            continue;
711        }
712        stall_count = 0;
713
714        let res = in_flight.next().await.expect("non-empty FuturesUnordered");
715
716        if let Some(s) = stats.get_mut(&res.peer_key) {
717            s.in_flight = s.in_flight.saturating_sub(1);
718        }
719
720        let now = Instant::now();
721        match res.data {
722            Ok(bytes) if verify_chunk(&res.expected_hash, &bytes).is_ok() => {
723                if let Some(s) = stats.get_mut(&res.peer_key) {
724                    s.score += 2;
725                    s.consecutive_failures = 0;
726                    s.backoff_until = None;
727                }
728                // Write chunk to the correct offset.
729                let offset = res.chunk_idx as u64 * CHUNK_SIZE as u64;
730                file.seek(SeekFrom::Start(offset)).await?;
731                file.write_all(&bytes).await?;
732
733                let chunk_len = bytes.len() as u64;
734                chunk_done[res.chunk_idx] = true;
735                completed_count += 1;
736                completed_bytes += chunk_len;
737                if let Some(cb) = &on_progress {
738                    cb(completed_count as u32, total_chunks as u32, completed_bytes);
739                }
740            }
741            Ok(_) => {
742                if let Some(s) = stats.get_mut(&res.peer_key) {
743                    register_failure(s, policy, now);
744                }
745                retry_queue.push_back((res.chunk_idx, res.retries + 1));
746            }
747            Err(_) => {
748                if let Some(s) = stats.get_mut(&res.peer_key) {
749                    register_failure(s, policy, now);
750                }
751                retry_queue.push_back((res.chunk_idx, res.retries + 1));
752            }
753        }
754    }
755
756    // Ensure all data is flushed.
757    file.flush().await?;
758    drop(file);
759
760    // Verify content_id via streaming BLAKE3 over the completed file.
761    let file_bytes = tokio::fs::read(&tmp_path).await?;
762    verify_content(&ContentId(content_id), &file_bytes)?;
763
764    // Atomic rename to target.
765    tokio::fs::rename(&tmp_path, target_path).await?;
766    Ok(target_path.to_path_buf())
767}
768
769/// Outcome of a single chunk fetch attempt, returned from the in-flight
770/// future back to the download loop.
771struct ChunkFetchOutcome {
772    chunk_idx: usize,
773    peer_key: String,
774    expected_hash: [u8; 32],
775    data: anyhow::Result<Vec<u8>>,
776    retries: usize,
777}
778
779/// Async helper that fetches one chunk from one peer and wraps the result.
780///
781/// Because every call site uses the same concrete `async fn`, all returned
782/// futures share the same type — allowing `FuturesUnordered` to hold them
783/// without boxing.
784#[allow(clippy::too_many_arguments)]
785async fn fetch_one_chunk<T: RequestTransport + ?Sized>(
786    transport: &T,
787    peer: PeerAddr,
788    content_id: [u8; 32],
789    chunk_idx: usize,
790    req_id: u32,
791    expected_hash: [u8; 32],
792    peer_key: String,
793    policy: &FetchPolicy,
794    retries: usize,
795) -> ChunkFetchOutcome {
796    let data = fetch_chunk_once(
797        transport,
798        &peer,
799        content_id,
800        chunk_idx as u32,
801        req_id,
802        policy,
803    )
804    .await;
805    ChunkFetchOutcome {
806        chunk_idx,
807        peer_key,
808        expected_hash,
809        data,
810        retries,
811    }
812}
813
814/// Pick the best eligible peer for a chunk request.  Selection criteria
815/// (in order): highest score, fewest in-flight requests, fewest total
816/// requests.  Peers that are in back-off are always skipped.
817///
818/// `max_chunks_per_peer` is a **soft** cap used for load-balancing: peers
819/// that have served fewer than the cap are preferred.  If *all* non-backed-
820/// off peers have exceeded the cap (e.g. only one seeder is available for a
821/// large file) we fall back to the least-loaded peer among them so the
822/// download is never permanently stuck.
823fn pick_best_peer_index(
824    peers: &[PeerAddr],
825    stats: &HashMap<String, PeerRuntimeStats>,
826    policy: &FetchPolicy,
827) -> Option<usize> {
828    let now = Instant::now();
829
830    let mut best_under: Option<(usize, i32, usize, usize)> = None;
831    let mut best_over: Option<(usize, i32, usize, usize)> = None;
832
833    for (i, peer) in peers.iter().enumerate() {
834        let key = peer_key(peer);
835        let s = match stats.get(&key) {
836            Some(s) => s,
837            None => continue,
838        };
839        if let Some(until) = s.backoff_until
840            && until > now
841        {
842            continue;
843        }
844        let candidate = (i, s.score, s.in_flight, s.requests);
845        let bucket = if s.requests < policy.max_chunks_per_peer {
846            &mut best_under
847        } else {
848            &mut best_over
849        };
850        match bucket {
851            None => *bucket = Some(candidate),
852            Some((_, bs, bif, br)) => {
853                if s.score > *bs
854                    || (s.score == *bs && s.in_flight < *bif)
855                    || (s.score == *bs && s.in_flight == *bif && s.requests < *br)
856                {
857                    *bucket = Some(candidate);
858                }
859            }
860        }
861    }
862
863    // Prefer peers under the cap; fall back to over-cap peers if necessary.
864    best_under.or(best_over).map(|(idx, _, _, _)| idx)
865}
866
867async fn fetch_manifest_once<T: RequestTransport + ?Sized>(
868    transport: &T,
869    peer: &PeerAddr,
870    manifest_id: [u8; 32],
871    req_id: u32,
872    policy: &FetchPolicy,
873) -> anyhow::Result<ManifestV1> {
874    let request = Envelope::from_typed(
875        req_id,
876        0,
877        &WirePayload::GetManifest(GetManifest { manifest_id }),
878    )?;
879    let response = transport
880        .request(peer, request, policy.request_timeout)
881        .await?;
882
883    if response.req_id != req_id {
884        anyhow::bail!("manifest response req_id mismatch");
885    }
886    if response.r#type != MsgType::ManifestData as u16 {
887        anyhow::bail!("unexpected response type for manifest request");
888    }
889    let payload = response.decode_typed()?;
890    let WirePayload::ManifestData(ManifestData {
891        manifest_id: returned_id,
892        bytes,
893    }) = payload
894    else {
895        anyhow::bail!("invalid manifest response payload");
896    };
897    if returned_id != manifest_id {
898        anyhow::bail!("manifest id mismatch");
899    }
900
901    let manifest: ManifestV1 = crate::cbor::from_slice(&bytes)?;
902    // Enforce protocol size limits on received manifests — prevents adversarial
903    // or buggy peers from triggering OOM by sending oversized item lists.
904    manifest.check_limits()?;
905    if manifest.manifest_id()?.0 != manifest_id {
906        anyhow::bail!("manifest bytes hash does not match manifest_id");
907    }
908    Ok(manifest)
909}
910
911async fn fetch_chunk_once<T: RequestTransport + ?Sized>(
912    transport: &T,
913    peer: &PeerAddr,
914    content_id: [u8; 32],
915    chunk_index: u32,
916    req_id: u32,
917    policy: &FetchPolicy,
918) -> anyhow::Result<Vec<u8>> {
919    let request = Envelope::from_typed(
920        req_id,
921        0,
922        &WirePayload::GetChunk(GetChunk {
923            content_id,
924            chunk_index,
925        }),
926    )?;
927    let response = transport
928        .request(peer, request, policy.request_timeout)
929        .await?;
930
931    if response.req_id != req_id {
932        anyhow::bail!("chunk response req_id mismatch");
933    }
934    if response.r#type != MsgType::ChunkData as u16 {
935        anyhow::bail!("unexpected response type for chunk request");
936    }
937    let payload = response.decode_typed()?;
938    let WirePayload::ChunkData(ChunkData {
939        content_id: returned_content,
940        chunk_index: returned_index,
941        bytes,
942    }) = payload
943    else {
944        anyhow::bail!("invalid chunk response payload");
945    };
946    if returned_content != content_id || returned_index != chunk_index {
947        anyhow::bail!("chunk response mismatch");
948    }
949    Ok(bytes)
950}
951
952pub async fn send_request_on_stream(
953    stream: &mut BoxedStream,
954    request: Envelope,
955    timeout_dur: Duration,
956) -> anyhow::Result<Envelope> {
957    tokio::time::timeout(timeout_dur, write_envelope(stream, &request))
958        .await
959        .map_err(|_| anyhow::anyhow!("request write timed out"))??;
960    let response = tokio::time::timeout(timeout_dur, read_envelope(stream))
961        .await
962        .map_err(|_| anyhow::anyhow!("response read timed out"))??;
963    if response.flags & FLAG_ERROR != 0 {
964        let msg = if response.payload.is_empty() {
965            "peer returned protocol error".to_string()
966        } else if let Ok(text) = String::from_utf8(response.payload.clone()) {
967            text
968        } else {
969            format!(
970                "peer returned protocol error ({} bytes)",
971                response.payload.len()
972            )
973        };
974        anyhow::bail!("{msg}");
975    }
976    Ok(response)
977}
978
979#[derive(Debug, Default)]
980struct PeerRuntimeStats {
981    score: i32,
982    requests: usize,
983    consecutive_failures: u32,
984    backoff_until: Option<Instant>,
985    /// Number of chunk requests currently in flight to this peer.
986    in_flight: usize,
987}
988
989fn register_failure(stats: &mut PeerRuntimeStats, policy: &FetchPolicy, now: Instant) {
990    stats.score -= 1;
991    stats.consecutive_failures = stats.consecutive_failures.saturating_add(1);
992    let exp = stats.consecutive_failures.saturating_sub(1).min(8);
993    let factor = 1u32 << exp;
994    let mut backoff = policy.failure_backoff_base.saturating_mul(factor);
995    if backoff > policy.max_backoff {
996        backoff = policy.max_backoff;
997    }
998    stats.backoff_until = Some(now + backoff);
999}
1000
1001fn peer_key(peer: &PeerAddr) -> String {
1002    format!("{}:{}:{:?}", peer.ip, peer.port, peer.transport)
1003}
1004
1005#[cfg(test)]
1006mod tests {
1007    use std::{
1008        collections::HashMap,
1009        future::Future,
1010        pin::Pin,
1011        sync::{
1012            Arc,
1013            atomic::{AtomicUsize, Ordering},
1014        },
1015    };
1016
1017    use tokio::{io::DuplexStream, sync::RwLock};
1018
1019    use super::*;
1020    use crate::{
1021        content::{CHUNK_SIZE, describe_content},
1022        peer::TransportProtocol,
1023        wire::{GetChunk, GetManifest},
1024    };
1025
1026    type ConnectFuture = Pin<Box<dyn Future<Output = anyhow::Result<BoxedStream>> + Send>>;
1027    type Handler = Box<dyn Fn() -> ConnectFuture + Send + Sync>;
1028    type HandlerMap = Arc<RwLock<HashMap<String, Handler>>>;
1029
1030    struct MockConnector {
1031        handlers: HandlerMap,
1032    }
1033
1034    impl MockConnector {
1035        fn new() -> Self {
1036            Self {
1037                handlers: Arc::new(RwLock::new(HashMap::new())),
1038            }
1039        }
1040
1041        async fn register<F, Fut>(&self, peer: &PeerAddr, factory: F)
1042        where
1043            F: Fn() -> Fut + Send + Sync + 'static,
1044            Fut: Future<Output = anyhow::Result<BoxedStream>> + Send + 'static,
1045        {
1046            self.handlers
1047                .write()
1048                .await
1049                .insert(peer_key(peer), Box::new(move || Box::pin(factory())));
1050        }
1051    }
1052
1053    #[async_trait]
1054    impl PeerConnector for MockConnector {
1055        async fn connect(&self, peer: &PeerAddr) -> anyhow::Result<BoxedStream> {
1056            let handlers = self.handlers.read().await;
1057            let Some(factory) = handlers.get(&peer_key(peer)) else {
1058                anyhow::bail!("no handler for peer");
1059            };
1060            (factory)().await
1061        }
1062    }
1063
1064    fn make_peer(ip: &str, port: u16) -> PeerAddr {
1065        PeerAddr {
1066            ip: ip.parse().expect("valid ip"),
1067            port,
1068            transport: TransportProtocol::Tcp,
1069            pubkey_hint: None,
1070            relay_via: None,
1071        }
1072    }
1073
1074    async fn manifest_server(
1075        mut server: DuplexStream,
1076        manifest_bytes: Vec<u8>,
1077        manifest_id: [u8; 32],
1078    ) {
1079        let req = read_envelope(&mut server).await.expect("read request");
1080        let typed = req.decode_typed().expect("typed");
1081        let WirePayload::GetManifest(GetManifest {
1082            manifest_id: requested,
1083        }) = typed
1084        else {
1085            panic!("unexpected request type");
1086        };
1087        assert_eq!(requested, manifest_id);
1088        let resp = Envelope::from_typed(
1089            req.req_id,
1090            0x0001,
1091            &WirePayload::ManifestData(ManifestData {
1092                manifest_id,
1093                bytes: manifest_bytes,
1094            }),
1095        )
1096        .expect("resp");
1097        write_envelope(&mut server, &resp)
1098            .await
1099            .expect("write resp");
1100    }
1101
1102    async fn chunk_server(
1103        mut server: DuplexStream,
1104        content_id: [u8; 32],
1105        chunk_index: u32,
1106        bytes: Vec<u8>,
1107    ) {
1108        let req = read_envelope(&mut server).await.expect("read request");
1109        let typed = req.decode_typed().expect("typed");
1110        let WirePayload::GetChunk(GetChunk {
1111            content_id: requested_id,
1112            chunk_index: requested_index,
1113        }) = typed
1114        else {
1115            panic!("unexpected request type");
1116        };
1117        assert_eq!(requested_id, content_id);
1118        assert_eq!(requested_index, chunk_index);
1119        let resp = Envelope::from_typed(
1120            req.req_id,
1121            0x0001,
1122            &WirePayload::ChunkData(ChunkData {
1123                content_id,
1124                chunk_index,
1125                bytes,
1126            }),
1127        )
1128        .expect("resp");
1129        write_envelope(&mut server, &resp)
1130            .await
1131            .expect("write resp");
1132    }
1133
1134    #[tokio::test]
1135    async fn manifest_fetch_retries_and_rotates_peers() {
1136        let peer_a = make_peer("10.0.0.1", 7000);
1137        let peer_b = make_peer("10.0.0.2", 7000);
1138        let connector = MockConnector::new();
1139        let transport = DirectRequestTransport::new(connector);
1140
1141        transport
1142            .connector
1143            .register(&peer_a, || async { anyhow::bail!("dial failed") })
1144            .await;
1145
1146        let manifest = ManifestV1 {
1147            version: 1,
1148            share_pubkey: [1u8; 32],
1149            share_id: [2u8; 32],
1150            seq: 1,
1151            created_at: 1_700_000_000,
1152            expires_at: None,
1153            title: Some("m".into()),
1154            description: None,
1155            visibility: crate::manifest::ShareVisibility::Private,
1156            communities: vec![],
1157            items: vec![],
1158            recommended_shares: vec![],
1159            signature: None,
1160        };
1161        let mut signed = manifest.clone();
1162        let kp =
1163            crate::manifest::ShareKeypair::new(ed25519_dalek::SigningKey::from_bytes(&[7u8; 32]));
1164        signed.share_pubkey = kp.verifying_key().to_bytes();
1165        signed.share_id = kp.share_id().0;
1166        signed.sign(&kp).expect("sign");
1167        let manifest_id = signed.manifest_id().expect("id").0;
1168        let manifest_bytes = crate::cbor::to_vec(&signed).expect("bytes");
1169
1170        transport
1171            .connector
1172            .register(&peer_b, {
1173                let manifest_bytes = manifest_bytes.clone();
1174                move || {
1175                    let manifest_bytes = manifest_bytes.clone();
1176                    async move {
1177                        let (client, server) = tokio::io::duplex(4096);
1178                        tokio::spawn(manifest_server(server, manifest_bytes, manifest_id));
1179                        Ok(Box::new(client) as BoxedStream)
1180                    }
1181                }
1182            })
1183            .await;
1184
1185        let fetched = fetch_manifest_with_retry(
1186            &transport,
1187            &[peer_a, peer_b],
1188            manifest_id,
1189            &FetchPolicy::default(),
1190        )
1191        .await
1192        .expect("fetch manifest");
1193        assert_eq!(fetched.manifest_id().expect("id").0, manifest_id);
1194    }
1195
1196    #[tokio::test]
1197    async fn chunk_fetch_downloads_and_verifies_content() {
1198        let peer_a = make_peer("10.0.0.3", 7000);
1199        let peer_b = make_peer("10.0.0.4", 7000);
1200        let connector = MockConnector::new();
1201        let transport = DirectRequestTransport::new(connector);
1202
1203        let bytes = vec![5u8; CHUNK_SIZE + 5];
1204        let desc = describe_content(&bytes);
1205        let chunk0 = bytes[..CHUNK_SIZE].to_vec();
1206        let chunk1 = bytes[CHUNK_SIZE..].to_vec();
1207        let cid = desc.content_id.0;
1208
1209        transport
1210            .connector
1211            .register(&peer_a, {
1212                let chunk0 = chunk0.clone();
1213                move || {
1214                    let value = chunk0.clone();
1215                    async move {
1216                        let (client, server) = tokio::io::duplex(4096);
1217                        tokio::spawn(chunk_server(server, cid, 0, value.clone()));
1218                        Ok(Box::new(client) as BoxedStream)
1219                    }
1220                }
1221            })
1222            .await;
1223        transport
1224            .connector
1225            .register(&peer_b, {
1226                let chunk1 = chunk1.clone();
1227                move || {
1228                    let value = chunk1.clone();
1229                    async move {
1230                        let (client, server) = tokio::io::duplex(4096);
1231                        tokio::spawn(chunk_server(server, cid, 1, value.clone()));
1232                        Ok(Box::new(client) as BoxedStream)
1233                    }
1234                }
1235            })
1236            .await;
1237
1238        let policy = FetchPolicy {
1239            max_chunks_per_peer: 1,
1240            ..FetchPolicy::default()
1241        };
1242        let out = download_swarm_over_network(
1243            &transport,
1244            &[peer_a, peer_b],
1245            cid,
1246            &desc.chunks,
1247            &policy,
1248            None,
1249        )
1250        .await
1251        .expect("download");
1252        assert_eq!(out, bytes);
1253    }
1254
1255    #[tokio::test]
1256    async fn session_pool_reuses_connection_for_multiple_chunk_requests() {
1257        let peer = make_peer("10.0.0.5", 7000);
1258        let connector = MockConnector::new();
1259        let dial_count = Arc::new(AtomicUsize::new(0));
1260        let bytes = vec![8u8; CHUNK_SIZE - 17];
1261        let desc = describe_content(&bytes);
1262        let cid = desc.content_id.0;
1263
1264        connector
1265            .register(&peer, {
1266                let dial_count = dial_count.clone();
1267                let first = bytes.clone();
1268                move || {
1269                    let dial_count = dial_count.clone();
1270                    let first = first.clone();
1271                    async move {
1272                        dial_count.fetch_add(1, Ordering::SeqCst);
1273                        let (client, mut server) = tokio::io::duplex(8192);
1274                        tokio::spawn(async move {
1275                            for _ in 0..2 {
1276                                let req = read_envelope(&mut server).await.expect("read request");
1277                                let typed = req.decode_typed().expect("typed");
1278                                let WirePayload::GetChunk(GetChunk {
1279                                    content_id: requested_id,
1280                                    chunk_index: requested_index,
1281                                }) = typed
1282                                else {
1283                                    panic!("unexpected request type");
1284                                };
1285                                assert_eq!(requested_id, cid);
1286                                assert_eq!(requested_index, 0);
1287                                let resp = Envelope::from_typed(
1288                                    req.req_id,
1289                                    0x0001,
1290                                    &WirePayload::ChunkData(ChunkData {
1291                                        content_id: cid,
1292                                        chunk_index: 0,
1293                                        bytes: first.clone(),
1294                                    }),
1295                                )
1296                                .expect("resp");
1297                                write_envelope(&mut server, &resp)
1298                                    .await
1299                                    .expect("write resp");
1300                            }
1301                        });
1302                        Ok(Box::new(client) as BoxedStream)
1303                    }
1304                }
1305            })
1306            .await;
1307
1308        // For this test, use a single-chunk content so one request is enough and reuse is observable
1309        // by forcing two sequential downloads against same peer.
1310        let pool = SessionPoolTransport::new(connector);
1311        let single_chunk = vec![desc.chunks[0]];
1312        let p = FetchPolicy::default();
1313        let _ = download_swarm_over_network(
1314            &pool,
1315            std::slice::from_ref(&peer),
1316            cid,
1317            &single_chunk,
1318            &p,
1319            None,
1320        )
1321        .await
1322        .expect("download1");
1323        let _ = download_swarm_over_network(
1324            &pool,
1325            std::slice::from_ref(&peer),
1326            cid,
1327            &single_chunk,
1328            &p,
1329            None,
1330        )
1331        .await
1332        .expect("download2");
1333
1334        assert_eq!(dial_count.load(Ordering::SeqCst), 1);
1335    }
1336
1337    /// Helper: spawns a server that can serve ANY chunk from the given bytes,
1338    /// routing by the `chunk_index` in the incoming request.
1339    async fn any_chunk_server(
1340        mut server: DuplexStream,
1341        content_id: [u8; 32],
1342        chunks: Vec<Vec<u8>>,
1343    ) {
1344        let req = read_envelope(&mut server).await.expect("read request");
1345        let typed = req.decode_typed().expect("typed");
1346        let WirePayload::GetChunk(GetChunk {
1347            content_id: requested_id,
1348            chunk_index,
1349        }) = typed
1350        else {
1351            panic!("unexpected request type in any_chunk_server");
1352        };
1353        assert_eq!(requested_id, content_id);
1354        let bytes = chunks[chunk_index as usize].clone();
1355        let resp = Envelope::from_typed(
1356            req.req_id,
1357            0x0001,
1358            &WirePayload::ChunkData(ChunkData {
1359                content_id,
1360                chunk_index,
1361                bytes,
1362            }),
1363        )
1364        .expect("resp");
1365        write_envelope(&mut server, &resp)
1366            .await
1367            .expect("write resp");
1368    }
1369
1370    #[tokio::test]
1371    async fn parallel_download_distributes_chunks_across_peers() {
1372        // Create content that spans 4 chunks.
1373        let bytes = vec![42u8; CHUNK_SIZE * 3 + 100];
1374        let desc = describe_content(&bytes);
1375        let cid = desc.content_id.0;
1376
1377        // Split into individual chunk payloads.
1378        let raw_chunks: Vec<Vec<u8>> = desc
1379            .chunks
1380            .iter()
1381            .enumerate()
1382            .map(|(i, _)| {
1383                let start = i * CHUNK_SIZE;
1384                let end = ((i + 1) * CHUNK_SIZE).min(bytes.len());
1385                bytes[start..end].to_vec()
1386            })
1387            .collect();
1388        assert_eq!(raw_chunks.len(), 4);
1389
1390        // Two peers; each can serve any chunk but limited to 2 each.
1391        let peer_a = make_peer("10.0.0.20", 7000);
1392        let peer_b = make_peer("10.0.0.21", 7000);
1393        let connector = MockConnector::new();
1394        let transport = DirectRequestTransport::new(connector);
1395
1396        let peer_a_count = Arc::new(AtomicUsize::new(0));
1397        let peer_b_count = Arc::new(AtomicUsize::new(0));
1398
1399        transport
1400            .connector
1401            .register(&peer_a, {
1402                let chunks = raw_chunks.clone();
1403                let counter = peer_a_count.clone();
1404                move || {
1405                    let chunks = chunks.clone();
1406                    let counter = counter.clone();
1407                    async move {
1408                        counter.fetch_add(1, Ordering::SeqCst);
1409                        let (client, server) = tokio::io::duplex(65536);
1410                        tokio::spawn(any_chunk_server(server, cid, chunks));
1411                        Ok(Box::new(client) as BoxedStream)
1412                    }
1413                }
1414            })
1415            .await;
1416
1417        transport
1418            .connector
1419            .register(&peer_b, {
1420                let chunks = raw_chunks.clone();
1421                let counter = peer_b_count.clone();
1422                move || {
1423                    let chunks = chunks.clone();
1424                    let counter = counter.clone();
1425                    async move {
1426                        counter.fetch_add(1, Ordering::SeqCst);
1427                        let (client, server) = tokio::io::duplex(65536);
1428                        tokio::spawn(any_chunk_server(server, cid, chunks));
1429                        Ok(Box::new(client) as BoxedStream)
1430                    }
1431                }
1432            })
1433            .await;
1434
1435        let policy = FetchPolicy {
1436            max_chunks_per_peer: 2,
1437            parallel_chunks: 4,
1438            ..FetchPolicy::default()
1439        };
1440
1441        let out = download_swarm_over_network(
1442            &transport,
1443            &[peer_a, peer_b],
1444            cid,
1445            &desc.chunks,
1446            &policy,
1447            None,
1448        )
1449        .await
1450        .expect("parallel download");
1451
1452        assert_eq!(out, bytes);
1453
1454        // Both peers should have been used (2 chunks each).
1455        let a = peer_a_count.load(Ordering::SeqCst);
1456        let b = peer_b_count.load(Ordering::SeqCst);
1457        assert!(a > 0, "peer_a should have served at least 1 chunk");
1458        assert!(b > 0, "peer_b should have served at least 1 chunk");
1459        assert_eq!(a + b, 4, "total chunks served should be 4");
1460    }
1461
1462    #[tokio::test]
1463    async fn parallel_download_retries_failed_chunk_on_other_peer() {
1464        // 2 chunks; peer_a always fails, peer_b serves both.
1465        let bytes = vec![99u8; CHUNK_SIZE + 10];
1466        let desc = describe_content(&bytes);
1467        let cid = desc.content_id.0;
1468
1469        let raw_chunks: Vec<Vec<u8>> = desc
1470            .chunks
1471            .iter()
1472            .enumerate()
1473            .map(|(i, _)| {
1474                let start = i * CHUNK_SIZE;
1475                let end = ((i + 1) * CHUNK_SIZE).min(bytes.len());
1476                bytes[start..end].to_vec()
1477            })
1478            .collect();
1479
1480        let peer_a = make_peer("10.0.0.30", 7000);
1481        let peer_b = make_peer("10.0.0.31", 7000);
1482        let connector = MockConnector::new();
1483        let transport = DirectRequestTransport::new(connector);
1484
1485        // peer_a always fails
1486        transport
1487            .connector
1488            .register(&peer_a, || async { anyhow::bail!("peer_a unavailable") })
1489            .await;
1490
1491        // peer_b serves any chunk
1492        transport
1493            .connector
1494            .register(&peer_b, {
1495                let chunks = raw_chunks.clone();
1496                move || {
1497                    let chunks = chunks.clone();
1498                    async move {
1499                        let (client, server) = tokio::io::duplex(65536);
1500                        tokio::spawn(any_chunk_server(server, cid, chunks));
1501                        Ok(Box::new(client) as BoxedStream)
1502                    }
1503                }
1504            })
1505            .await;
1506
1507        let policy = FetchPolicy {
1508            parallel_chunks: 2,
1509            ..FetchPolicy::default()
1510        };
1511
1512        let out = download_swarm_over_network(
1513            &transport,
1514            &[peer_a, peer_b],
1515            cid,
1516            &desc.chunks,
1517            &policy,
1518            None,
1519        )
1520        .await
1521        .expect("download with retries");
1522
1523        assert_eq!(out, bytes);
1524    }
1525}