1use std::{
8 collections::{HashMap, VecDeque},
9 io::SeekFrom,
10 path::{Path, PathBuf},
11 sync::Arc,
12 time::{Duration, Instant},
13};
14
15use futures_util::stream::{FuturesUnordered, StreamExt};
16
17use async_trait::async_trait;
18use tokio::{
19 io::{AsyncRead, AsyncSeekExt, AsyncWrite, AsyncWriteExt as TokioAsyncWriteExt},
20 sync::Mutex,
21};
22
23use crate::{
24 content::{CHUNK_SIZE, compute_chunk_list_hash, verify_chunk, verify_content},
25 ids::ContentId,
26 manifest::ManifestV1,
27 peer::PeerAddr,
28 transport::{read_envelope, write_envelope},
29 wire::{
30 ChunkData, ChunkHashList, Envelope, FLAG_ERROR, GetChunk, GetChunkHashes, GetManifest,
31 ManifestData, MsgType, WirePayload,
32 },
33};
34
35pub trait AsyncIo: AsyncRead + AsyncWrite + Unpin + Send {}
36impl<T> AsyncIo for T where T: AsyncRead + AsyncWrite + Unpin + Send {}
37
38pub type BoxedStream = Box<dyn AsyncIo>;
39
40#[async_trait]
41pub trait PeerConnector: Send + Sync {
42 async fn connect(&self, peer: &PeerAddr) -> anyhow::Result<BoxedStream>;
43}
44
45#[async_trait]
46pub trait RequestTransport: Send + Sync {
47 async fn request(
48 &self,
49 peer: &PeerAddr,
50 request: Envelope,
51 timeout_dur: Duration,
52 ) -> anyhow::Result<Envelope>;
53}
54
55#[async_trait]
56impl<C: PeerConnector> RequestTransport for C {
57 async fn request(
58 &self,
59 peer: &PeerAddr,
60 request: Envelope,
61 timeout_dur: Duration,
62 ) -> anyhow::Result<Envelope> {
63 let mut stream = self.connect(peer).await?;
64 send_request_on_stream(&mut stream, request, timeout_dur).await
65 }
66}
67
68pub struct DirectRequestTransport<C> {
69 connector: C,
70}
71
72impl<C> DirectRequestTransport<C> {
73 pub fn new(connector: C) -> Self {
74 Self { connector }
75 }
76}
77
78#[async_trait]
79impl<C: PeerConnector> RequestTransport for DirectRequestTransport<C> {
80 async fn request(
81 &self,
82 peer: &PeerAddr,
83 request: Envelope,
84 timeout_dur: Duration,
85 ) -> anyhow::Result<Envelope> {
86 let mut stream = self.connector.connect(peer).await?;
87 send_request_on_stream(&mut stream, request, timeout_dur).await
88 }
89}
90
91pub struct SessionPoolTransport<C> {
92 connector: C,
93 sessions: Mutex<HashMap<String, (BoxedStream, Instant)>>,
94}
95
96const MAX_POOL_SESSIONS: usize = 64;
99
100impl<C> SessionPoolTransport<C> {
101 pub fn new(connector: C) -> Self {
102 Self {
103 connector,
104 sessions: Mutex::new(HashMap::new()),
105 }
106 }
107
108 async fn pool_insert(&self, key: String, stream: BoxedStream) {
110 let mut sessions = self.sessions.lock().await;
111 if sessions.len() >= MAX_POOL_SESSIONS && !sessions.contains_key(&key) {
112 if let Some(oldest_key) = sessions
114 .iter()
115 .min_by_key(|(_, (_, ts))| *ts)
116 .map(|(k, _)| k.clone())
117 {
118 sessions.remove(&oldest_key);
119 }
120 }
121 sessions.insert(key, (stream, Instant::now()));
122 }
123}
124
125#[async_trait]
126impl<C: PeerConnector> RequestTransport for SessionPoolTransport<C> {
127 async fn request(
128 &self,
129 peer: &PeerAddr,
130 request: Envelope,
131 timeout_dur: Duration,
132 ) -> anyhow::Result<Envelope> {
133 let key = peer_key(peer);
134 let mut stream = {
135 let mut sessions = self.sessions.lock().await;
136 sessions.remove(&key).map(|(s, _)| s)
137 };
138
139 if stream.is_none() {
140 stream = Some(self.connector.connect(peer).await?);
141 }
142
143 let mut stream = stream.expect("stream must be initialized");
144 match send_request_on_stream(&mut stream, request.clone(), timeout_dur).await {
145 Ok(response) => {
146 self.pool_insert(key, stream).await;
147 Ok(response)
148 }
149 Err(first_err) => {
150 let mut fresh_stream = self.connector.connect(peer).await?;
152 let response = send_request_on_stream(&mut fresh_stream, request, timeout_dur)
153 .await
154 .map_err(|_| first_err)?;
155 self.pool_insert(key, fresh_stream).await;
156 Ok(response)
157 }
158 }
159 }
160}
161
162pub struct RelayAwareTransport<'a, C> {
169 connector: &'a C,
170}
171
172impl<'a, C> RelayAwareTransport<'a, C> {
173 pub fn new(connector: &'a C) -> Self {
174 Self { connector }
175 }
176}
177
178#[async_trait]
179impl<'a, C: PeerConnector> RequestTransport for RelayAwareTransport<'a, C> {
180 async fn request(
181 &self,
182 peer: &PeerAddr,
183 request: Envelope,
184 timeout_dur: Duration,
185 ) -> anyhow::Result<Envelope> {
186 if let Some(relay_route) = &peer.relay_via {
187 let mut stream = self.connector.connect(&relay_route.relay_addr).await?;
189
190 let inner_bytes = request.encode()?;
192 let relay_stream_msg = crate::wire::RelayStream {
193 relay_slot_id: relay_route.slot_id,
194 stream_id: 0,
195 kind: crate::wire::RelayPayloadKind::Content,
196 payload: inner_bytes,
197 };
198 let outer_request = Envelope::from_typed(
199 request.req_id,
200 0,
201 &WirePayload::RelayStream(relay_stream_msg),
202 )?;
203
204 let outer_response =
206 send_request_on_stream(&mut stream, outer_request, timeout_dur).await?;
207
208 let typed = outer_response.decode_typed()?;
210 match typed {
211 WirePayload::RelayStream(resp) => Envelope::decode(&resp.payload),
212 _ => anyhow::bail!(
213 "relay tunnel: unexpected response type {:?}",
214 outer_response.r#type
215 ),
216 }
217 } else {
218 let mut stream = self.connector.connect(peer).await?;
220 send_request_on_stream(&mut stream, request, timeout_dur).await
221 }
222 }
223}
224
225pub struct OwnedRelayAwareTransport<C> {
228 connector: Arc<C>,
229}
230
231impl<C> OwnedRelayAwareTransport<C> {
232 pub fn new(connector: Arc<C>) -> Self {
233 Self { connector }
234 }
235}
236
237#[async_trait]
238impl<C: PeerConnector> RequestTransport for OwnedRelayAwareTransport<C> {
239 async fn request(
240 &self,
241 peer: &PeerAddr,
242 request: Envelope,
243 timeout_dur: Duration,
244 ) -> anyhow::Result<Envelope> {
245 if let Some(relay_route) = &peer.relay_via {
246 let mut stream = self.connector.connect(&relay_route.relay_addr).await?;
247 let inner_bytes = request.encode()?;
248 let relay_stream_msg = crate::wire::RelayStream {
249 relay_slot_id: relay_route.slot_id,
250 stream_id: 0,
251 kind: crate::wire::RelayPayloadKind::Content,
252 payload: inner_bytes,
253 };
254 let outer_request = Envelope::from_typed(
255 request.req_id,
256 0,
257 &WirePayload::RelayStream(relay_stream_msg),
258 )?;
259 let outer_response =
260 send_request_on_stream(&mut stream, outer_request, timeout_dur).await?;
261 let typed = outer_response.decode_typed()?;
262 match typed {
263 WirePayload::RelayStream(resp) => Envelope::decode(&resp.payload),
264 _ => anyhow::bail!(
265 "relay tunnel: unexpected response type {:?}",
266 outer_response.r#type
267 ),
268 }
269 } else {
270 let mut stream = self.connector.connect(peer).await?;
271 send_request_on_stream(&mut stream, request, timeout_dur).await
272 }
273 }
274}
275
276#[derive(Debug, Clone)]
277pub struct FetchPolicy {
278 pub attempts_per_peer: usize,
279 pub request_timeout: Duration,
280 pub max_chunks_per_peer: usize,
281 pub failure_backoff_base: Duration,
282 pub max_backoff: Duration,
283 pub parallel_chunks: usize,
286 pub max_stall_rounds: usize,
291 pub initial_reputations: HashMap<String, i32>,
296}
297
298pub type ProgressCallback = Box<dyn Fn(u32, u32, u64) + Send + Sync>;
302
303impl Default for FetchPolicy {
304 fn default() -> Self {
305 Self {
306 attempts_per_peer: 2,
307 request_timeout: Duration::from_secs(3),
308 max_chunks_per_peer: 128,
309 failure_backoff_base: Duration::from_millis(300),
310 max_backoff: Duration::from_secs(8),
311 parallel_chunks: 8,
312 max_stall_rounds: 60,
313 initial_reputations: HashMap::new(),
314 }
315 }
316}
317
318pub async fn fetch_manifest_with_retry<T: RequestTransport + ?Sized>(
319 transport: &T,
320 peers: &[PeerAddr],
321 manifest_id: [u8; 32],
322 policy: &FetchPolicy,
323) -> anyhow::Result<ManifestV1> {
324 if peers.is_empty() {
325 anyhow::bail!("no peers available for manifest fetch");
326 }
327
328 let mut req_id = 1u32;
329 let mut last_err = None;
330 for attempt in 0..policy.attempts_per_peer {
331 for offset in 0..peers.len() {
332 let idx = (attempt + offset) % peers.len();
333 let target = &peers[idx];
334 let result = fetch_manifest_once(transport, target, manifest_id, req_id, policy).await;
335 req_id = req_id.wrapping_add(1);
336 match result {
337 Ok(manifest) => return Ok(manifest),
338 Err(err) => last_err = Some(err),
339 }
340 }
341 }
342
343 Err(last_err.unwrap_or_else(|| anyhow::anyhow!("manifest fetch failed")))
344}
345
346pub async fn fetch_chunk_hashes_with_retry<T: RequestTransport + ?Sized>(
349 transport: &T,
350 peers: &[PeerAddr],
351 content_id: [u8; 32],
352 expected_chunk_count: u32,
353 expected_chunk_list_hash: [u8; 32],
354 policy: &FetchPolicy,
355) -> anyhow::Result<Vec<[u8; 32]>> {
356 if peers.is_empty() {
357 anyhow::bail!("no peers available for chunk hash fetch");
358 }
359
360 let mut req_id = 5_000u32;
361 let mut last_err = None;
362 for attempt in 0..policy.attempts_per_peer {
363 for offset in 0..peers.len() {
364 let idx = (attempt + offset) % peers.len();
365 let target = &peers[idx];
366 let result =
367 fetch_chunk_hashes_once(transport, target, content_id, req_id, policy).await;
368 req_id = req_id.wrapping_add(1);
369 match result {
370 Ok(hashes) => {
371 if hashes.len() != expected_chunk_count as usize {
372 last_err = Some(anyhow::anyhow!(
373 "chunk hash count mismatch: got {} expected {}",
374 hashes.len(),
375 expected_chunk_count
376 ));
377 continue;
378 }
379 let actual_hash = compute_chunk_list_hash(&hashes);
380 if actual_hash != expected_chunk_list_hash {
381 last_err = Some(anyhow::anyhow!("chunk_list_hash verification failed"));
382 continue;
383 }
384 return Ok(hashes);
385 }
386 Err(err) => last_err = Some(err),
387 }
388 }
389 }
390
391 Err(last_err.unwrap_or_else(|| anyhow::anyhow!("chunk hash fetch failed")))
392}
393
394async fn fetch_chunk_hashes_once<T: RequestTransport + ?Sized>(
395 transport: &T,
396 peer: &PeerAddr,
397 content_id: [u8; 32],
398 req_id: u32,
399 policy: &FetchPolicy,
400) -> anyhow::Result<Vec<[u8; 32]>> {
401 let request = Envelope::from_typed(
402 req_id,
403 0,
404 &WirePayload::GetChunkHashes(GetChunkHashes { content_id }),
405 )?;
406 let response = transport
407 .request(peer, request, policy.request_timeout)
408 .await?;
409
410 if response.req_id != req_id {
411 anyhow::bail!("chunk hash response req_id mismatch");
412 }
413 if response.r#type != MsgType::ChunkHashList as u16 {
414 anyhow::bail!("unexpected response type for chunk hash request");
415 }
416 let payload = response.decode_typed()?;
417 let WirePayload::ChunkHashList(ChunkHashList {
418 content_id: returned_id,
419 hashes,
420 }) = payload
421 else {
422 anyhow::bail!("invalid chunk hash response payload");
423 };
424 if returned_id != content_id {
425 anyhow::bail!("chunk hash content_id mismatch");
426 }
427 Ok(hashes)
428}
429
430pub async fn download_swarm_over_network<T: RequestTransport + ?Sized>(
438 transport: &T,
439 peers: &[PeerAddr],
440 content_id: [u8; 32],
441 chunk_hashes: &[[u8; 32]],
442 policy: &FetchPolicy,
443 on_progress: Option<&ProgressCallback>,
444) -> anyhow::Result<Vec<u8>> {
445 if peers.is_empty() {
446 anyhow::bail!("no peers available for content download");
447 }
448
449 let total_chunks = chunk_hashes.len();
450 let max_parallel = policy.parallel_chunks.min(total_chunks).max(1);
451 let max_retries_per_chunk = policy.attempts_per_peer * peers.len();
452 let mut req_id = 10_000u32;
453
454 let mut completed: Vec<Option<Vec<u8>>> = vec![None; total_chunks];
455 let mut completed_count = 0usize;
456 let mut completed_bytes = 0u64;
457
458 let mut stats: HashMap<String, PeerRuntimeStats> = peers
459 .iter()
460 .map(|peer| {
461 let key = peer_key(peer);
462 let initial_score = policy.initial_reputations.get(&key).copied().unwrap_or(0);
463 (
464 key,
465 PeerRuntimeStats {
466 score: initial_score,
467 ..PeerRuntimeStats::default()
468 },
469 )
470 })
471 .collect();
472
473 let mut next_chunk = 0usize;
475 let mut retry_queue: VecDeque<(usize, usize)> = VecDeque::new();
476
477 let mut in_flight = FuturesUnordered::new();
478 let mut stall_count = 0usize;
479
480 loop {
481 while in_flight.len() < max_parallel {
483 let (chunk_idx, retries) = if let Some(retry) = retry_queue.pop_front() {
484 retry
485 } else if next_chunk < total_chunks {
486 let idx = next_chunk;
487 next_chunk += 1;
488 (idx, 0)
489 } else {
490 break;
491 };
492
493 if retries >= max_retries_per_chunk {
494 anyhow::bail!(
495 "unable to retrieve verified chunk {} after {} attempts",
496 chunk_idx,
497 retries
498 );
499 }
500
501 if let Some(peer_idx) = pick_best_peer_index(peers, &stats, policy) {
502 let peer = peers[peer_idx].clone();
503 let pk = peer_key(&peer);
504 let rid = req_id;
505 req_id = req_id.wrapping_add(1);
506 if let Some(s) = stats.get_mut(&pk) {
507 s.requests += 1;
508 s.in_flight += 1;
509 }
510 let expected = chunk_hashes[chunk_idx];
511 in_flight.push(fetch_one_chunk(
512 transport, peer, content_id, chunk_idx, rid, expected, pk, policy, retries,
513 ));
514 } else {
515 retry_queue.push_front((chunk_idx, retries));
517 break;
518 }
519 }
520
521 if completed_count == total_chunks {
523 break;
524 }
525
526 if in_flight.is_empty() {
528 stall_count += 1;
529 if stall_count > policy.max_stall_rounds {
530 anyhow::bail!(
531 "download stalled: no peers can serve remaining {}/{} chunks",
532 total_chunks - completed_count,
533 total_chunks
534 );
535 }
536 tokio::time::sleep(policy.failure_backoff_base).await;
537 continue;
538 }
539 stall_count = 0;
540
541 let res = in_flight.next().await.expect("non-empty FuturesUnordered");
543
544 if let Some(s) = stats.get_mut(&res.peer_key) {
546 s.in_flight = s.in_flight.saturating_sub(1);
547 }
548
549 let now = Instant::now();
550 match res.data {
551 Ok(bytes) if verify_chunk(&res.expected_hash, &bytes).is_ok() => {
552 if let Some(s) = stats.get_mut(&res.peer_key) {
553 s.score += 2;
554 s.consecutive_failures = 0;
555 s.backoff_until = None;
556 }
557 let chunk_len = bytes.len() as u64;
558 completed[res.chunk_idx] = Some(bytes);
559 completed_count += 1;
560 completed_bytes += chunk_len;
561 if let Some(cb) = &on_progress {
562 cb(completed_count as u32, total_chunks as u32, completed_bytes);
563 }
564 }
565 Ok(_) => {
566 if let Some(s) = stats.get_mut(&res.peer_key) {
568 register_failure(s, policy, now);
569 }
570 retry_queue.push_back((res.chunk_idx, res.retries + 1));
571 }
572 Err(_) => {
573 if let Some(s) = stats.get_mut(&res.peer_key) {
574 register_failure(s, policy, now);
575 }
576 retry_queue.push_back((res.chunk_idx, res.retries + 1));
577 }
578 }
579 }
580
581 let mut output = Vec::new();
583 for chunk_data in completed {
584 output.extend_from_slice(&chunk_data.expect("all chunks completed"));
585 }
586
587 verify_content(&ContentId(content_id), &output)?;
588 Ok(output)
589}
590
591pub async fn download_swarm_to_file<T: RequestTransport + ?Sized>(
602 transport: &T,
603 peers: &[PeerAddr],
604 content_id: [u8; 32],
605 chunk_hashes: &[[u8; 32]],
606 policy: &FetchPolicy,
607 target_path: &Path,
608 on_progress: Option<&ProgressCallback>,
609) -> anyhow::Result<PathBuf> {
610 if peers.is_empty() {
611 anyhow::bail!("no peers available for content download");
612 }
613
614 let total_chunks = chunk_hashes.len();
615 let max_parallel = policy.parallel_chunks.min(total_chunks).max(1);
616 let max_retries_per_chunk = policy.attempts_per_peer * peers.len();
617 let mut req_id = 10_000u32;
618
619 let tmp_path = target_path.with_extension("scp2p-partial");
621 let mut file = tokio::fs::File::create(&tmp_path).await?;
622
623 let mut chunk_done = vec![false; total_chunks];
625 let mut completed_count = 0usize;
626 let mut completed_bytes = 0u64;
627
628 let mut stats: HashMap<String, PeerRuntimeStats> = peers
629 .iter()
630 .map(|peer| {
631 let key = peer_key(peer);
632 let initial_score = policy.initial_reputations.get(&key).copied().unwrap_or(0);
633 (
634 key,
635 PeerRuntimeStats {
636 score: initial_score,
637 ..PeerRuntimeStats::default()
638 },
639 )
640 })
641 .collect();
642
643 let mut next_chunk = 0usize;
644 let mut retry_queue: VecDeque<(usize, usize)> = VecDeque::new();
645 let mut in_flight: FuturesUnordered<_> = FuturesUnordered::new();
646 let mut stall_count = 0usize;
647
648 loop {
649 while in_flight.len() < max_parallel {
650 let (chunk_idx, retries) = if let Some(retry) = retry_queue.pop_front() {
651 retry
652 } else if next_chunk < total_chunks {
653 let idx = next_chunk;
654 next_chunk += 1;
655 (idx, 0)
656 } else {
657 break;
658 };
659
660 if retries >= max_retries_per_chunk {
661 let _ = tokio::fs::remove_file(&tmp_path).await;
663 anyhow::bail!(
664 "unable to retrieve verified chunk {} after {} attempts",
665 chunk_idx,
666 retries
667 );
668 }
669
670 if let Some(peer_idx) = pick_best_peer_index(peers, &stats, policy) {
671 let peer = peers[peer_idx].clone();
672 let pk = peer_key(&peer);
673 let rid = req_id;
674 req_id = req_id.wrapping_add(1);
675 if let Some(s) = stats.get_mut(&pk) {
676 s.in_flight += 1;
677 }
678 in_flight.push(fetch_one_chunk(
679 transport,
680 peer,
681 content_id,
682 chunk_idx,
683 rid,
684 chunk_hashes[chunk_idx],
685 pk,
686 policy,
687 retries,
688 ));
689 } else {
690 retry_queue.push_front((chunk_idx, retries));
691 break;
692 }
693 }
694
695 if completed_count >= total_chunks {
696 break;
697 }
698
699 if in_flight.is_empty() {
700 stall_count += 1;
701 if stall_count > policy.max_stall_rounds {
702 let _ = tokio::fs::remove_file(&tmp_path).await;
703 anyhow::bail!(
704 "download stalled: no peers can serve remaining {}/{} chunks",
705 total_chunks - completed_count,
706 total_chunks
707 );
708 }
709 tokio::time::sleep(policy.failure_backoff_base).await;
710 continue;
711 }
712 stall_count = 0;
713
714 let res = in_flight.next().await.expect("non-empty FuturesUnordered");
715
716 if let Some(s) = stats.get_mut(&res.peer_key) {
717 s.in_flight = s.in_flight.saturating_sub(1);
718 }
719
720 let now = Instant::now();
721 match res.data {
722 Ok(bytes) if verify_chunk(&res.expected_hash, &bytes).is_ok() => {
723 if let Some(s) = stats.get_mut(&res.peer_key) {
724 s.score += 2;
725 s.consecutive_failures = 0;
726 s.backoff_until = None;
727 }
728 let offset = res.chunk_idx as u64 * CHUNK_SIZE as u64;
730 file.seek(SeekFrom::Start(offset)).await?;
731 file.write_all(&bytes).await?;
732
733 let chunk_len = bytes.len() as u64;
734 chunk_done[res.chunk_idx] = true;
735 completed_count += 1;
736 completed_bytes += chunk_len;
737 if let Some(cb) = &on_progress {
738 cb(completed_count as u32, total_chunks as u32, completed_bytes);
739 }
740 }
741 Ok(_) => {
742 if let Some(s) = stats.get_mut(&res.peer_key) {
743 register_failure(s, policy, now);
744 }
745 retry_queue.push_back((res.chunk_idx, res.retries + 1));
746 }
747 Err(_) => {
748 if let Some(s) = stats.get_mut(&res.peer_key) {
749 register_failure(s, policy, now);
750 }
751 retry_queue.push_back((res.chunk_idx, res.retries + 1));
752 }
753 }
754 }
755
756 file.flush().await?;
758 drop(file);
759
760 let file_bytes = tokio::fs::read(&tmp_path).await?;
762 verify_content(&ContentId(content_id), &file_bytes)?;
763
764 tokio::fs::rename(&tmp_path, target_path).await?;
766 Ok(target_path.to_path_buf())
767}
768
769struct ChunkFetchOutcome {
772 chunk_idx: usize,
773 peer_key: String,
774 expected_hash: [u8; 32],
775 data: anyhow::Result<Vec<u8>>,
776 retries: usize,
777}
778
779#[allow(clippy::too_many_arguments)]
785async fn fetch_one_chunk<T: RequestTransport + ?Sized>(
786 transport: &T,
787 peer: PeerAddr,
788 content_id: [u8; 32],
789 chunk_idx: usize,
790 req_id: u32,
791 expected_hash: [u8; 32],
792 peer_key: String,
793 policy: &FetchPolicy,
794 retries: usize,
795) -> ChunkFetchOutcome {
796 let data = fetch_chunk_once(
797 transport,
798 &peer,
799 content_id,
800 chunk_idx as u32,
801 req_id,
802 policy,
803 )
804 .await;
805 ChunkFetchOutcome {
806 chunk_idx,
807 peer_key,
808 expected_hash,
809 data,
810 retries,
811 }
812}
813
814fn pick_best_peer_index(
824 peers: &[PeerAddr],
825 stats: &HashMap<String, PeerRuntimeStats>,
826 policy: &FetchPolicy,
827) -> Option<usize> {
828 let now = Instant::now();
829
830 let mut best_under: Option<(usize, i32, usize, usize)> = None;
831 let mut best_over: Option<(usize, i32, usize, usize)> = None;
832
833 for (i, peer) in peers.iter().enumerate() {
834 let key = peer_key(peer);
835 let s = match stats.get(&key) {
836 Some(s) => s,
837 None => continue,
838 };
839 if let Some(until) = s.backoff_until
840 && until > now
841 {
842 continue;
843 }
844 let candidate = (i, s.score, s.in_flight, s.requests);
845 let bucket = if s.requests < policy.max_chunks_per_peer {
846 &mut best_under
847 } else {
848 &mut best_over
849 };
850 match bucket {
851 None => *bucket = Some(candidate),
852 Some((_, bs, bif, br)) => {
853 if s.score > *bs
854 || (s.score == *bs && s.in_flight < *bif)
855 || (s.score == *bs && s.in_flight == *bif && s.requests < *br)
856 {
857 *bucket = Some(candidate);
858 }
859 }
860 }
861 }
862
863 best_under.or(best_over).map(|(idx, _, _, _)| idx)
865}
866
867async fn fetch_manifest_once<T: RequestTransport + ?Sized>(
868 transport: &T,
869 peer: &PeerAddr,
870 manifest_id: [u8; 32],
871 req_id: u32,
872 policy: &FetchPolicy,
873) -> anyhow::Result<ManifestV1> {
874 let request = Envelope::from_typed(
875 req_id,
876 0,
877 &WirePayload::GetManifest(GetManifest { manifest_id }),
878 )?;
879 let response = transport
880 .request(peer, request, policy.request_timeout)
881 .await?;
882
883 if response.req_id != req_id {
884 anyhow::bail!("manifest response req_id mismatch");
885 }
886 if response.r#type != MsgType::ManifestData as u16 {
887 anyhow::bail!("unexpected response type for manifest request");
888 }
889 let payload = response.decode_typed()?;
890 let WirePayload::ManifestData(ManifestData {
891 manifest_id: returned_id,
892 bytes,
893 }) = payload
894 else {
895 anyhow::bail!("invalid manifest response payload");
896 };
897 if returned_id != manifest_id {
898 anyhow::bail!("manifest id mismatch");
899 }
900
901 let manifest: ManifestV1 = crate::cbor::from_slice(&bytes)?;
902 manifest.check_limits()?;
905 if manifest.manifest_id()?.0 != manifest_id {
906 anyhow::bail!("manifest bytes hash does not match manifest_id");
907 }
908 Ok(manifest)
909}
910
911async fn fetch_chunk_once<T: RequestTransport + ?Sized>(
912 transport: &T,
913 peer: &PeerAddr,
914 content_id: [u8; 32],
915 chunk_index: u32,
916 req_id: u32,
917 policy: &FetchPolicy,
918) -> anyhow::Result<Vec<u8>> {
919 let request = Envelope::from_typed(
920 req_id,
921 0,
922 &WirePayload::GetChunk(GetChunk {
923 content_id,
924 chunk_index,
925 }),
926 )?;
927 let response = transport
928 .request(peer, request, policy.request_timeout)
929 .await?;
930
931 if response.req_id != req_id {
932 anyhow::bail!("chunk response req_id mismatch");
933 }
934 if response.r#type != MsgType::ChunkData as u16 {
935 anyhow::bail!("unexpected response type for chunk request");
936 }
937 let payload = response.decode_typed()?;
938 let WirePayload::ChunkData(ChunkData {
939 content_id: returned_content,
940 chunk_index: returned_index,
941 bytes,
942 }) = payload
943 else {
944 anyhow::bail!("invalid chunk response payload");
945 };
946 if returned_content != content_id || returned_index != chunk_index {
947 anyhow::bail!("chunk response mismatch");
948 }
949 Ok(bytes)
950}
951
952pub async fn send_request_on_stream(
953 stream: &mut BoxedStream,
954 request: Envelope,
955 timeout_dur: Duration,
956) -> anyhow::Result<Envelope> {
957 tokio::time::timeout(timeout_dur, write_envelope(stream, &request))
958 .await
959 .map_err(|_| anyhow::anyhow!("request write timed out"))??;
960 let response = tokio::time::timeout(timeout_dur, read_envelope(stream))
961 .await
962 .map_err(|_| anyhow::anyhow!("response read timed out"))??;
963 if response.flags & FLAG_ERROR != 0 {
964 let msg = if response.payload.is_empty() {
965 "peer returned protocol error".to_string()
966 } else if let Ok(text) = String::from_utf8(response.payload.clone()) {
967 text
968 } else {
969 format!(
970 "peer returned protocol error ({} bytes)",
971 response.payload.len()
972 )
973 };
974 anyhow::bail!("{msg}");
975 }
976 Ok(response)
977}
978
979#[derive(Debug, Default)]
980struct PeerRuntimeStats {
981 score: i32,
982 requests: usize,
983 consecutive_failures: u32,
984 backoff_until: Option<Instant>,
985 in_flight: usize,
987}
988
989fn register_failure(stats: &mut PeerRuntimeStats, policy: &FetchPolicy, now: Instant) {
990 stats.score -= 1;
991 stats.consecutive_failures = stats.consecutive_failures.saturating_add(1);
992 let exp = stats.consecutive_failures.saturating_sub(1).min(8);
993 let factor = 1u32 << exp;
994 let mut backoff = policy.failure_backoff_base.saturating_mul(factor);
995 if backoff > policy.max_backoff {
996 backoff = policy.max_backoff;
997 }
998 stats.backoff_until = Some(now + backoff);
999}
1000
1001fn peer_key(peer: &PeerAddr) -> String {
1002 format!("{}:{}:{:?}", peer.ip, peer.port, peer.transport)
1003}
1004
1005#[cfg(test)]
1006mod tests {
1007 use std::{
1008 collections::HashMap,
1009 future::Future,
1010 pin::Pin,
1011 sync::{
1012 Arc,
1013 atomic::{AtomicUsize, Ordering},
1014 },
1015 };
1016
1017 use tokio::{io::DuplexStream, sync::RwLock};
1018
1019 use super::*;
1020 use crate::{
1021 content::{CHUNK_SIZE, describe_content},
1022 peer::TransportProtocol,
1023 wire::{GetChunk, GetManifest},
1024 };
1025
1026 type ConnectFuture = Pin<Box<dyn Future<Output = anyhow::Result<BoxedStream>> + Send>>;
1027 type Handler = Box<dyn Fn() -> ConnectFuture + Send + Sync>;
1028 type HandlerMap = Arc<RwLock<HashMap<String, Handler>>>;
1029
1030 struct MockConnector {
1031 handlers: HandlerMap,
1032 }
1033
1034 impl MockConnector {
1035 fn new() -> Self {
1036 Self {
1037 handlers: Arc::new(RwLock::new(HashMap::new())),
1038 }
1039 }
1040
1041 async fn register<F, Fut>(&self, peer: &PeerAddr, factory: F)
1042 where
1043 F: Fn() -> Fut + Send + Sync + 'static,
1044 Fut: Future<Output = anyhow::Result<BoxedStream>> + Send + 'static,
1045 {
1046 self.handlers
1047 .write()
1048 .await
1049 .insert(peer_key(peer), Box::new(move || Box::pin(factory())));
1050 }
1051 }
1052
1053 #[async_trait]
1054 impl PeerConnector for MockConnector {
1055 async fn connect(&self, peer: &PeerAddr) -> anyhow::Result<BoxedStream> {
1056 let handlers = self.handlers.read().await;
1057 let Some(factory) = handlers.get(&peer_key(peer)) else {
1058 anyhow::bail!("no handler for peer");
1059 };
1060 (factory)().await
1061 }
1062 }
1063
1064 fn make_peer(ip: &str, port: u16) -> PeerAddr {
1065 PeerAddr {
1066 ip: ip.parse().expect("valid ip"),
1067 port,
1068 transport: TransportProtocol::Tcp,
1069 pubkey_hint: None,
1070 relay_via: None,
1071 }
1072 }
1073
1074 async fn manifest_server(
1075 mut server: DuplexStream,
1076 manifest_bytes: Vec<u8>,
1077 manifest_id: [u8; 32],
1078 ) {
1079 let req = read_envelope(&mut server).await.expect("read request");
1080 let typed = req.decode_typed().expect("typed");
1081 let WirePayload::GetManifest(GetManifest {
1082 manifest_id: requested,
1083 }) = typed
1084 else {
1085 panic!("unexpected request type");
1086 };
1087 assert_eq!(requested, manifest_id);
1088 let resp = Envelope::from_typed(
1089 req.req_id,
1090 0x0001,
1091 &WirePayload::ManifestData(ManifestData {
1092 manifest_id,
1093 bytes: manifest_bytes,
1094 }),
1095 )
1096 .expect("resp");
1097 write_envelope(&mut server, &resp)
1098 .await
1099 .expect("write resp");
1100 }
1101
1102 async fn chunk_server(
1103 mut server: DuplexStream,
1104 content_id: [u8; 32],
1105 chunk_index: u32,
1106 bytes: Vec<u8>,
1107 ) {
1108 let req = read_envelope(&mut server).await.expect("read request");
1109 let typed = req.decode_typed().expect("typed");
1110 let WirePayload::GetChunk(GetChunk {
1111 content_id: requested_id,
1112 chunk_index: requested_index,
1113 }) = typed
1114 else {
1115 panic!("unexpected request type");
1116 };
1117 assert_eq!(requested_id, content_id);
1118 assert_eq!(requested_index, chunk_index);
1119 let resp = Envelope::from_typed(
1120 req.req_id,
1121 0x0001,
1122 &WirePayload::ChunkData(ChunkData {
1123 content_id,
1124 chunk_index,
1125 bytes,
1126 }),
1127 )
1128 .expect("resp");
1129 write_envelope(&mut server, &resp)
1130 .await
1131 .expect("write resp");
1132 }
1133
1134 #[tokio::test]
1135 async fn manifest_fetch_retries_and_rotates_peers() {
1136 let peer_a = make_peer("10.0.0.1", 7000);
1137 let peer_b = make_peer("10.0.0.2", 7000);
1138 let connector = MockConnector::new();
1139 let transport = DirectRequestTransport::new(connector);
1140
1141 transport
1142 .connector
1143 .register(&peer_a, || async { anyhow::bail!("dial failed") })
1144 .await;
1145
1146 let manifest = ManifestV1 {
1147 version: 1,
1148 share_pubkey: [1u8; 32],
1149 share_id: [2u8; 32],
1150 seq: 1,
1151 created_at: 1_700_000_000,
1152 expires_at: None,
1153 title: Some("m".into()),
1154 description: None,
1155 visibility: crate::manifest::ShareVisibility::Private,
1156 communities: vec![],
1157 items: vec![],
1158 recommended_shares: vec![],
1159 signature: None,
1160 };
1161 let mut signed = manifest.clone();
1162 let kp =
1163 crate::manifest::ShareKeypair::new(ed25519_dalek::SigningKey::from_bytes(&[7u8; 32]));
1164 signed.share_pubkey = kp.verifying_key().to_bytes();
1165 signed.share_id = kp.share_id().0;
1166 signed.sign(&kp).expect("sign");
1167 let manifest_id = signed.manifest_id().expect("id").0;
1168 let manifest_bytes = crate::cbor::to_vec(&signed).expect("bytes");
1169
1170 transport
1171 .connector
1172 .register(&peer_b, {
1173 let manifest_bytes = manifest_bytes.clone();
1174 move || {
1175 let manifest_bytes = manifest_bytes.clone();
1176 async move {
1177 let (client, server) = tokio::io::duplex(4096);
1178 tokio::spawn(manifest_server(server, manifest_bytes, manifest_id));
1179 Ok(Box::new(client) as BoxedStream)
1180 }
1181 }
1182 })
1183 .await;
1184
1185 let fetched = fetch_manifest_with_retry(
1186 &transport,
1187 &[peer_a, peer_b],
1188 manifest_id,
1189 &FetchPolicy::default(),
1190 )
1191 .await
1192 .expect("fetch manifest");
1193 assert_eq!(fetched.manifest_id().expect("id").0, manifest_id);
1194 }
1195
1196 #[tokio::test]
1197 async fn chunk_fetch_downloads_and_verifies_content() {
1198 let peer_a = make_peer("10.0.0.3", 7000);
1199 let peer_b = make_peer("10.0.0.4", 7000);
1200 let connector = MockConnector::new();
1201 let transport = DirectRequestTransport::new(connector);
1202
1203 let bytes = vec![5u8; CHUNK_SIZE + 5];
1204 let desc = describe_content(&bytes);
1205 let chunk0 = bytes[..CHUNK_SIZE].to_vec();
1206 let chunk1 = bytes[CHUNK_SIZE..].to_vec();
1207 let cid = desc.content_id.0;
1208
1209 transport
1210 .connector
1211 .register(&peer_a, {
1212 let chunk0 = chunk0.clone();
1213 move || {
1214 let value = chunk0.clone();
1215 async move {
1216 let (client, server) = tokio::io::duplex(4096);
1217 tokio::spawn(chunk_server(server, cid, 0, value.clone()));
1218 Ok(Box::new(client) as BoxedStream)
1219 }
1220 }
1221 })
1222 .await;
1223 transport
1224 .connector
1225 .register(&peer_b, {
1226 let chunk1 = chunk1.clone();
1227 move || {
1228 let value = chunk1.clone();
1229 async move {
1230 let (client, server) = tokio::io::duplex(4096);
1231 tokio::spawn(chunk_server(server, cid, 1, value.clone()));
1232 Ok(Box::new(client) as BoxedStream)
1233 }
1234 }
1235 })
1236 .await;
1237
1238 let policy = FetchPolicy {
1239 max_chunks_per_peer: 1,
1240 ..FetchPolicy::default()
1241 };
1242 let out = download_swarm_over_network(
1243 &transport,
1244 &[peer_a, peer_b],
1245 cid,
1246 &desc.chunks,
1247 &policy,
1248 None,
1249 )
1250 .await
1251 .expect("download");
1252 assert_eq!(out, bytes);
1253 }
1254
1255 #[tokio::test]
1256 async fn session_pool_reuses_connection_for_multiple_chunk_requests() {
1257 let peer = make_peer("10.0.0.5", 7000);
1258 let connector = MockConnector::new();
1259 let dial_count = Arc::new(AtomicUsize::new(0));
1260 let bytes = vec![8u8; CHUNK_SIZE - 17];
1261 let desc = describe_content(&bytes);
1262 let cid = desc.content_id.0;
1263
1264 connector
1265 .register(&peer, {
1266 let dial_count = dial_count.clone();
1267 let first = bytes.clone();
1268 move || {
1269 let dial_count = dial_count.clone();
1270 let first = first.clone();
1271 async move {
1272 dial_count.fetch_add(1, Ordering::SeqCst);
1273 let (client, mut server) = tokio::io::duplex(8192);
1274 tokio::spawn(async move {
1275 for _ in 0..2 {
1276 let req = read_envelope(&mut server).await.expect("read request");
1277 let typed = req.decode_typed().expect("typed");
1278 let WirePayload::GetChunk(GetChunk {
1279 content_id: requested_id,
1280 chunk_index: requested_index,
1281 }) = typed
1282 else {
1283 panic!("unexpected request type");
1284 };
1285 assert_eq!(requested_id, cid);
1286 assert_eq!(requested_index, 0);
1287 let resp = Envelope::from_typed(
1288 req.req_id,
1289 0x0001,
1290 &WirePayload::ChunkData(ChunkData {
1291 content_id: cid,
1292 chunk_index: 0,
1293 bytes: first.clone(),
1294 }),
1295 )
1296 .expect("resp");
1297 write_envelope(&mut server, &resp)
1298 .await
1299 .expect("write resp");
1300 }
1301 });
1302 Ok(Box::new(client) as BoxedStream)
1303 }
1304 }
1305 })
1306 .await;
1307
1308 let pool = SessionPoolTransport::new(connector);
1311 let single_chunk = vec![desc.chunks[0]];
1312 let p = FetchPolicy::default();
1313 let _ = download_swarm_over_network(
1314 &pool,
1315 std::slice::from_ref(&peer),
1316 cid,
1317 &single_chunk,
1318 &p,
1319 None,
1320 )
1321 .await
1322 .expect("download1");
1323 let _ = download_swarm_over_network(
1324 &pool,
1325 std::slice::from_ref(&peer),
1326 cid,
1327 &single_chunk,
1328 &p,
1329 None,
1330 )
1331 .await
1332 .expect("download2");
1333
1334 assert_eq!(dial_count.load(Ordering::SeqCst), 1);
1335 }
1336
1337 async fn any_chunk_server(
1340 mut server: DuplexStream,
1341 content_id: [u8; 32],
1342 chunks: Vec<Vec<u8>>,
1343 ) {
1344 let req = read_envelope(&mut server).await.expect("read request");
1345 let typed = req.decode_typed().expect("typed");
1346 let WirePayload::GetChunk(GetChunk {
1347 content_id: requested_id,
1348 chunk_index,
1349 }) = typed
1350 else {
1351 panic!("unexpected request type in any_chunk_server");
1352 };
1353 assert_eq!(requested_id, content_id);
1354 let bytes = chunks[chunk_index as usize].clone();
1355 let resp = Envelope::from_typed(
1356 req.req_id,
1357 0x0001,
1358 &WirePayload::ChunkData(ChunkData {
1359 content_id,
1360 chunk_index,
1361 bytes,
1362 }),
1363 )
1364 .expect("resp");
1365 write_envelope(&mut server, &resp)
1366 .await
1367 .expect("write resp");
1368 }
1369
1370 #[tokio::test]
1371 async fn parallel_download_distributes_chunks_across_peers() {
1372 let bytes = vec![42u8; CHUNK_SIZE * 3 + 100];
1374 let desc = describe_content(&bytes);
1375 let cid = desc.content_id.0;
1376
1377 let raw_chunks: Vec<Vec<u8>> = desc
1379 .chunks
1380 .iter()
1381 .enumerate()
1382 .map(|(i, _)| {
1383 let start = i * CHUNK_SIZE;
1384 let end = ((i + 1) * CHUNK_SIZE).min(bytes.len());
1385 bytes[start..end].to_vec()
1386 })
1387 .collect();
1388 assert_eq!(raw_chunks.len(), 4);
1389
1390 let peer_a = make_peer("10.0.0.20", 7000);
1392 let peer_b = make_peer("10.0.0.21", 7000);
1393 let connector = MockConnector::new();
1394 let transport = DirectRequestTransport::new(connector);
1395
1396 let peer_a_count = Arc::new(AtomicUsize::new(0));
1397 let peer_b_count = Arc::new(AtomicUsize::new(0));
1398
1399 transport
1400 .connector
1401 .register(&peer_a, {
1402 let chunks = raw_chunks.clone();
1403 let counter = peer_a_count.clone();
1404 move || {
1405 let chunks = chunks.clone();
1406 let counter = counter.clone();
1407 async move {
1408 counter.fetch_add(1, Ordering::SeqCst);
1409 let (client, server) = tokio::io::duplex(65536);
1410 tokio::spawn(any_chunk_server(server, cid, chunks));
1411 Ok(Box::new(client) as BoxedStream)
1412 }
1413 }
1414 })
1415 .await;
1416
1417 transport
1418 .connector
1419 .register(&peer_b, {
1420 let chunks = raw_chunks.clone();
1421 let counter = peer_b_count.clone();
1422 move || {
1423 let chunks = chunks.clone();
1424 let counter = counter.clone();
1425 async move {
1426 counter.fetch_add(1, Ordering::SeqCst);
1427 let (client, server) = tokio::io::duplex(65536);
1428 tokio::spawn(any_chunk_server(server, cid, chunks));
1429 Ok(Box::new(client) as BoxedStream)
1430 }
1431 }
1432 })
1433 .await;
1434
1435 let policy = FetchPolicy {
1436 max_chunks_per_peer: 2,
1437 parallel_chunks: 4,
1438 ..FetchPolicy::default()
1439 };
1440
1441 let out = download_swarm_over_network(
1442 &transport,
1443 &[peer_a, peer_b],
1444 cid,
1445 &desc.chunks,
1446 &policy,
1447 None,
1448 )
1449 .await
1450 .expect("parallel download");
1451
1452 assert_eq!(out, bytes);
1453
1454 let a = peer_a_count.load(Ordering::SeqCst);
1456 let b = peer_b_count.load(Ordering::SeqCst);
1457 assert!(a > 0, "peer_a should have served at least 1 chunk");
1458 assert!(b > 0, "peer_b should have served at least 1 chunk");
1459 assert_eq!(a + b, 4, "total chunks served should be 4");
1460 }
1461
1462 #[tokio::test]
1463 async fn parallel_download_retries_failed_chunk_on_other_peer() {
1464 let bytes = vec![99u8; CHUNK_SIZE + 10];
1466 let desc = describe_content(&bytes);
1467 let cid = desc.content_id.0;
1468
1469 let raw_chunks: Vec<Vec<u8>> = desc
1470 .chunks
1471 .iter()
1472 .enumerate()
1473 .map(|(i, _)| {
1474 let start = i * CHUNK_SIZE;
1475 let end = ((i + 1) * CHUNK_SIZE).min(bytes.len());
1476 bytes[start..end].to_vec()
1477 })
1478 .collect();
1479
1480 let peer_a = make_peer("10.0.0.30", 7000);
1481 let peer_b = make_peer("10.0.0.31", 7000);
1482 let connector = MockConnector::new();
1483 let transport = DirectRequestTransport::new(connector);
1484
1485 transport
1487 .connector
1488 .register(&peer_a, || async { anyhow::bail!("peer_a unavailable") })
1489 .await;
1490
1491 transport
1493 .connector
1494 .register(&peer_b, {
1495 let chunks = raw_chunks.clone();
1496 move || {
1497 let chunks = chunks.clone();
1498 async move {
1499 let (client, server) = tokio::io::duplex(65536);
1500 tokio::spawn(any_chunk_server(server, cid, chunks));
1501 Ok(Box::new(client) as BoxedStream)
1502 }
1503 }
1504 })
1505 .await;
1506
1507 let policy = FetchPolicy {
1508 parallel_chunks: 2,
1509 ..FetchPolicy::default()
1510 };
1511
1512 let out = download_swarm_over_network(
1513 &transport,
1514 &[peer_a, peer_b],
1515 cid,
1516 &desc.chunks,
1517 &policy,
1518 None,
1519 )
1520 .await
1521 .expect("download with retries");
1522
1523 assert_eq!(out, bytes);
1524 }
1525}