1use crate::{
2 cache::Cache,
3 dag_walk::DagWalk,
4 error::Error,
5 incremental_verification::{BlockState, IncrementalDagVerification},
6 messages::{PullRequest, PushResponse},
7};
8use bytes::Bytes;
9use deterministic_bloom::runtime_size::BloomFilter;
10use futures::{StreamExt, TryStreamExt};
11use iroh_car::{CarHeader, CarReader, CarWriter};
12use libipld::{Ipld, IpldCodec};
13use libipld_core::{cid::Cid, codec::References};
14use std::io::Cursor;
15use wnfs_common::{
16 utils::{boxed_stream, BoxStream, CondSend},
17 BlockStore,
18};
19
20#[derive(Clone, Debug)]
26pub struct Config {
27 pub receive_maximum: usize,
35 pub max_block_size: usize,
50 pub max_roots_per_round: usize,
55 pub bloom_fpr: fn(u64) -> f64,
63}
64
65impl Default for Config {
66 fn default() -> Self {
67 Self {
68 receive_maximum: 2_000_000, max_block_size: 1_000_000, max_roots_per_round: 1000, bloom_fpr: |num_of_elems| f64::min(0.001, 0.1 / num_of_elems as f64),
72 }
73 }
74}
75
76#[derive(Clone)]
79pub struct ReceiverState {
80 pub missing_subgraph_roots: Vec<Cid>,
82 pub have_cids_bloom: Option<BloomFilter>,
84}
85
86#[derive(Debug, Clone)]
88pub struct CarFile {
89 pub bytes: Bytes,
92}
93
94pub type BlockStream<'a> = BoxStream<'a, Result<(Cid, Bytes), Error>>;
96
97pub type CarStream<'a> = BoxStream<'a, Result<Bytes, Error>>;
100
101#[tracing::instrument(skip_all, fields(root, last_state))]
113pub async fn block_send(
114 root: Cid,
115 last_state: Option<ReceiverState>,
116 config: &Config,
117 store: impl BlockStore,
118 cache: impl Cache,
119) -> Result<CarFile, Error> {
120 let bytes = block_send_car_stream(
121 root,
122 last_state,
123 Vec::new(),
124 Some(config.receive_maximum),
125 store,
126 cache,
127 )
128 .await?;
129
130 Ok(CarFile {
131 bytes: bytes.into(),
132 })
133}
134
135#[tracing::instrument(skip_all, fields(root, last_state))]
139pub async fn block_send_car_stream<W: tokio::io::AsyncWrite + Unpin + Send>(
140 root: Cid,
141 last_state: Option<ReceiverState>,
142 writer: W,
143 send_limit: Option<usize>,
144 store: impl BlockStore,
145 cache: impl Cache,
146) -> Result<W, Error> {
147 let mut block_stream = block_send_block_stream(root, last_state, store, cache).await?;
148 write_blocks_into_car(writer, &mut block_stream, send_limit).await
149}
150
151pub async fn block_send_block_stream<'a>(
154 root: Cid,
155 last_state: Option<ReceiverState>,
156 store: impl BlockStore + 'a,
157 cache: impl Cache + 'a,
158) -> Result<BlockStream<'a>, Error> {
159 let ReceiverState {
160 missing_subgraph_roots,
161 have_cids_bloom,
162 } = last_state.unwrap_or(ReceiverState {
163 missing_subgraph_roots: vec![root],
164 have_cids_bloom: None,
165 });
166
167 let subgraph_roots =
169 verify_missing_subgraph_roots(root, &missing_subgraph_roots, &store, &cache).await?;
170
171 let bloom = handle_missing_bloom(have_cids_bloom);
172
173 let stream = stream_blocks_from_roots(subgraph_roots, bloom, store, cache);
174
175 Ok(Box::pin(stream))
176}
177
178#[tracing::instrument(skip_all, fields(root, car_bytes = last_car.as_ref().map(|car| car.bytes.len())))]
187pub async fn block_receive(
188 root: Cid,
189 last_car: Option<CarFile>,
190 config: &Config,
191 store: impl BlockStore,
192 cache: impl Cache,
193) -> Result<ReceiverState, Error> {
194 let mut receiver_state = match last_car {
195 Some(car) => {
196 if car.bytes.len() > config.receive_maximum {
197 return Err(Error::TooManyBytes {
198 receive_maximum: config.receive_maximum,
199 bytes_read: car.bytes.len(),
200 });
201 }
202
203 block_receive_car_stream(root, Cursor::new(car.bytes), config, store, cache).await?
204 }
205 None => IncrementalDagVerification::new([root], &store, &cache)
206 .await?
207 .into_receiver_state(config.bloom_fpr),
208 };
209
210 receiver_state
211 .missing_subgraph_roots
212 .truncate(config.max_roots_per_round);
213
214 Ok(receiver_state)
215}
216
217#[tracing::instrument(skip_all, fields(root))]
219pub async fn block_receive_car_stream<R: tokio::io::AsyncRead + Unpin + CondSend>(
220 root: Cid,
221 reader: R,
222 config: &Config,
223 store: impl BlockStore,
224 cache: impl Cache,
225) -> Result<ReceiverState, Error> {
226 let reader = CarReader::new(reader).await?;
227
228 let mut stream: BlockStream<'_> = Box::pin(
229 reader
230 .stream()
231 .map_ok(|(cid, bytes)| (cid, Bytes::from(bytes)))
232 .map_err(Error::CarFileError),
233 );
234
235 block_receive_block_stream(root, &mut stream, config, store, cache).await
236}
237
238pub async fn block_receive_block_stream(
241 root: Cid,
242 stream: &mut BlockStream<'_>,
243 config: &Config,
244 store: impl BlockStore,
245 cache: impl Cache,
246) -> Result<ReceiverState, Error> {
247 let max_block_size = config.max_block_size;
248 let mut dag_verification = IncrementalDagVerification::new([root], &store, &cache).await?;
249
250 while let Some((cid, block)) = stream.try_next().await? {
251 let block_bytes = block.len();
252 if block_bytes > config.max_block_size {
255 return Err(Error::BlockSizeExceeded {
256 cid,
257 block_bytes,
258 max_block_size,
259 });
260 }
261
262 match read_and_verify_block(&mut dag_verification, (cid, block), &store, &cache).await? {
263 BlockState::Have => {
264 tracing::debug!(%cid, "Received block we already have, stopping transfer");
267 break;
268 }
269 BlockState::Unexpected => {
270 tracing::debug!(%cid, "Received block out of order, stopping transfer");
277 break;
278 }
279 BlockState::Want => {
280 }
282 }
283 }
284
285 Ok(dag_verification.into_receiver_state(config.bloom_fpr))
286}
287
288pub async fn stream_car_frames(mut blocks: BlockStream<'_>) -> Result<CarStream<'_>, Error> {
297 let Some((cid, block)) = blocks.try_next().await? else {
305 tracing::debug!("No blocks to write.");
306 return Ok(boxed_stream(futures::stream::empty()));
307 };
308
309 let mut writer = CarWriter::new(CarHeader::new_v1(vec![cid]), Vec::new());
310 writer.write_header().await?;
311 let first_frame = car_frame_from_block((cid, block)).await?;
312
313 let header = writer.finish().await?;
314 Ok(boxed_stream(
315 futures::stream::iter(vec![Ok(header.into()), Ok(first_frame)])
316 .chain(blocks.and_then(car_frame_from_block)),
317 ))
318}
319
320pub fn references<E: Extend<Cid>>(
326 cid: Cid,
327 block: impl AsRef<[u8]>,
328 mut refs: E,
329) -> Result<E, anyhow::Error> {
330 let codec: IpldCodec = cid
331 .codec()
332 .try_into()
333 .map_err(|_| Error::UnsupportedCodec { cid })?;
334
335 <Ipld as References<IpldCodec>>::references(codec, &mut Cursor::new(block), &mut refs)?;
336 Ok(refs)
337}
338
339async fn car_frame_from_block(block: (Cid, Bytes)) -> Result<Bytes, Error> {
344 let bogus_header = CarHeader::new_v1(vec![Cid::default()]);
351 let mut writer = CarWriter::new(bogus_header, Vec::new());
352 let start = writer.write_header().await?;
353
354 writer.write(block.0, block.1).await?;
355 let mut bytes = writer.finish().await?;
356
357 bytes.drain(0..start);
359
360 Ok(bytes.into())
361}
362
363async fn verify_missing_subgraph_roots(
366 root: Cid,
367 missing_subgraph_roots: &[Cid],
368 store: &impl BlockStore,
369 cache: &impl Cache,
370) -> Result<Vec<Cid>, Error> {
371 let subgraph_roots: Vec<Cid> = DagWalk::breadth_first([root])
372 .stream(store, cache)
373 .try_filter_map(|item| async move {
374 let cid = item.to_cid()?;
375 Ok(missing_subgraph_roots.contains(&cid).then_some(cid))
376 })
377 .try_collect()
378 .await?;
379
380 if subgraph_roots.len() != missing_subgraph_roots.len() {
381 let unrelated_roots = missing_subgraph_roots
382 .iter()
383 .filter(|cid| !subgraph_roots.contains(cid))
384 .map(|cid| cid.to_string())
385 .collect::<Vec<_>>()
386 .join(", ");
387
388 tracing::warn!(
389 unrelated_roots = %unrelated_roots,
390 "got asked for DAG-unrelated blocks"
391 );
392 }
393
394 Ok(subgraph_roots)
395}
396
397fn handle_missing_bloom(have_cids_bloom: Option<BloomFilter>) -> BloomFilter {
398 if let Some(bloom) = &have_cids_bloom {
399 tracing::debug!(
400 size_bits = bloom.as_bytes().len() * 8,
401 hash_count = bloom.hash_count(),
402 ones_count = bloom.count_ones(),
403 estimated_fpr = bloom.current_false_positive_rate(),
404 "received 'have cids' bloom",
405 );
406 }
407
408 have_cids_bloom.unwrap_or_else(|| BloomFilter::new_with(1, Box::new([0]))) }
410
411fn stream_blocks_from_roots<'a>(
412 subgraph_roots: Vec<Cid>,
413 bloom: BloomFilter,
414 store: impl BlockStore + 'a,
415 cache: impl Cache + 'a,
416) -> BlockStream<'a> {
417 Box::pin(async_stream::try_stream! {
418 let mut dag_walk = DagWalk::breadth_first(subgraph_roots.clone());
419
420 while let Some(item) = dag_walk.next(&store, &cache).await? {
421 let cid = item.to_cid()?;
422
423 if should_block_be_skipped(&cid, &bloom, &subgraph_roots) {
424 continue;
425 }
426
427 let bytes = store.get_block(&cid).await.map_err(Error::BlockStoreError)?;
428
429 yield (cid, bytes);
430 }
431 })
432}
433
434async fn write_blocks_into_car<W: tokio::io::AsyncWrite + Unpin + Send>(
435 write: W,
436 blocks: &mut BlockStream<'_>,
437 size_limit: Option<usize>,
438) -> Result<W, Error> {
439 let mut block_bytes = 0;
440
441 let Some((cid, block)) = blocks.try_next().await? else {
449 tracing::debug!("No blocks to write.");
450 return Ok(write);
451 };
452
453 let mut writer = CarWriter::new(CarHeader::new_v1(vec![cid]), write);
454
455 block_bytes += writer.write(cid, block).await?;
456
457 while let Some((cid, block)) = blocks.try_next().await? {
458 tracing::debug!(
459 cid = %cid,
460 num_bytes = block.len(),
461 "writing block to CAR",
462 );
463
464 let added_bytes = 64 + 4 + block.len();
467
468 if let Some(receive_limit) = size_limit {
469 if block_bytes + added_bytes > receive_limit {
470 tracing::debug!(%cid, receive_limit, block_bytes, added_bytes, "Skipping block because it would go over the receive limit");
471 break;
472 }
473 }
474
475 block_bytes += writer.write(cid, &block).await?;
476 }
477
478 Ok(writer.finish().await?)
479}
480
481fn should_block_be_skipped(cid: &Cid, bloom: &BloomFilter, subgraph_roots: &[Cid]) -> bool {
482 bloom.contains(&cid.to_bytes()) && !subgraph_roots.contains(cid)
483}
484
485async fn read_and_verify_block(
488 dag_verification: &mut IncrementalDagVerification,
489 (cid, block): (Cid, Bytes),
490 store: &impl BlockStore,
491 cache: &impl Cache,
492) -> Result<BlockState, Error> {
493 match dag_verification.block_state(cid) {
494 BlockState::Have => Ok(BlockState::Have),
495 BlockState::Unexpected => {
496 tracing::trace!(
497 cid = %cid,
498 "received block out of order (possibly due to bloom false positive)"
499 );
500 Ok(BlockState::Unexpected)
501 }
502 BlockState::Want => {
503 dag_verification
504 .verify_and_store_block((cid, block), store, cache)
505 .await?;
506 Ok(BlockState::Want)
507 }
508 }
509}
510
511impl From<PushResponse> for ReceiverState {
516 fn from(push: PushResponse) -> Self {
517 let PushResponse {
518 subgraph_roots,
519 bloom_hash_count: hash_count,
520 bloom_bytes: bytes,
521 } = push;
522
523 Self {
524 missing_subgraph_roots: subgraph_roots,
525 have_cids_bloom: Self::bloom_deserialize(hash_count, bytes),
526 }
527 }
528}
529
530impl From<PullRequest> for ReceiverState {
531 fn from(pull: PullRequest) -> Self {
532 let PullRequest {
533 resources,
534 bloom_hash_count: hash_count,
535 bloom_bytes: bytes,
536 } = pull;
537
538 Self {
539 missing_subgraph_roots: resources,
540 have_cids_bloom: Self::bloom_deserialize(hash_count, bytes),
541 }
542 }
543}
544
545impl From<ReceiverState> for PushResponse {
546 fn from(receiver_state: ReceiverState) -> PushResponse {
547 let ReceiverState {
548 missing_subgraph_roots,
549 have_cids_bloom,
550 } = receiver_state;
551
552 let (hash_count, bytes) = ReceiverState::bloom_serialize(have_cids_bloom);
553
554 PushResponse {
555 subgraph_roots: missing_subgraph_roots,
556 bloom_hash_count: hash_count,
557 bloom_bytes: bytes,
558 }
559 }
560}
561
562impl From<ReceiverState> for PullRequest {
563 fn from(receiver_state: ReceiverState) -> PullRequest {
564 let ReceiverState {
565 missing_subgraph_roots,
566 have_cids_bloom,
567 } = receiver_state;
568
569 let (hash_count, bytes) = ReceiverState::bloom_serialize(have_cids_bloom);
570
571 PullRequest {
572 resources: missing_subgraph_roots,
573 bloom_hash_count: hash_count,
574 bloom_bytes: bytes,
575 }
576 }
577}
578
579impl ReceiverState {
580 fn bloom_serialize(bloom: Option<BloomFilter>) -> (u32, Vec<u8>) {
581 match bloom {
582 Some(bloom) => (bloom.hash_count() as u32, bloom.as_bytes().to_vec()),
583 None => (3, Vec::new()),
584 }
585 }
586
587 fn bloom_deserialize(hash_count: u32, bytes: Vec<u8>) -> Option<BloomFilter> {
588 if bytes.is_empty() {
589 None
590 } else {
591 Some(BloomFilter::new_with(
592 hash_count as usize,
593 bytes.into_boxed_slice(),
594 ))
595 }
596 }
597}
598
599impl std::fmt::Debug for ReceiverState {
600 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
601 let have_cids_bloom = self
602 .have_cids_bloom
603 .as_ref()
604 .map_or("None".into(), |bloom| {
605 format!(
606 "Some(BloomFilter(k_hashes = {}, {} bytes))",
607 bloom.hash_count(),
608 bloom.as_bytes().len()
609 )
610 });
611 f.debug_struct("ReceiverState")
612 .field(
613 "missing_subgraph_roots.len() == ",
614 &self.missing_subgraph_roots.len(),
615 )
616 .field("have_cids_bloom", &have_cids_bloom)
617 .finish()
618 }
619}
620
621#[cfg(test)]
622pub(crate) mod tests {
623 use super::*;
624 use crate::{cache::NoCache, test_utils::assert_cond_send_sync};
625 use assert_matches::assert_matches;
626 use testresult::TestResult;
627 use wnfs_common::{MemoryBlockStore, CODEC_RAW};
628
629 #[allow(clippy::unreachable, unused)]
630 fn test_assert_send() {
631 assert_cond_send_sync(|| {
632 block_send(
633 unimplemented!(),
634 unimplemented!(),
635 unimplemented!(),
636 unimplemented!() as MemoryBlockStore,
637 NoCache,
638 )
639 });
640 assert_cond_send_sync(|| {
641 block_receive(
642 unimplemented!(),
643 unimplemented!(),
644 unimplemented!(),
645 unimplemented!() as &MemoryBlockStore,
646 &NoCache,
647 )
648 })
649 }
650
651 #[test]
652 fn test_receiver_state_is_not_a_huge_debug() -> TestResult {
653 let state = ReceiverState {
654 have_cids_bloom: Some(BloomFilter::new_from_size(4096, 1000)),
655 missing_subgraph_roots: vec![Cid::default(); 1000],
656 };
657
658 let debug_print = format!("{state:#?}");
659
660 assert!(debug_print.len() < 1000);
661
662 Ok(())
663 }
664
665 #[test_log::test(async_std::test)]
666 async fn test_stream_car_frame_empty() -> TestResult {
667 let car_frames = stream_car_frames(futures::stream::empty().boxed()).await?;
668 let frames: Vec<Bytes> = car_frames.try_collect().await?;
669
670 assert!(frames.is_empty());
671
672 Ok(())
673 }
674
675 #[test_log::test(async_std::test)]
676 async fn test_write_blocks_into_car_empty() -> TestResult {
677 let car_file =
678 write_blocks_into_car(Vec::new(), &mut futures::stream::empty().boxed(), None).await?;
679
680 assert!(car_file.is_empty());
681
682 Ok(())
683 }
684
685 #[test_log::test(async_std::test)]
686 async fn test_block_receive_block_stream_block_size_exceeded() -> TestResult {
687 let store = &MemoryBlockStore::new();
688
689 let block_small: Bytes = b"This one is small".to_vec().into();
690 let block_big: Bytes = b"This one is very very very big".to_vec().into();
691 let root_small = store.put_block(block_small.clone(), CODEC_RAW).await?;
692 let root_big = store.put_block(block_big.clone(), CODEC_RAW).await?;
693
694 let config = &Config {
695 max_block_size: 20,
696 ..Config::default()
697 };
698
699 block_receive_block_stream(
700 root_small,
701 &mut futures::stream::iter(vec![Ok((root_small, block_small))]).boxed(),
702 config,
703 MemoryBlockStore::new(),
704 NoCache,
705 )
706 .await?;
707
708 let result = block_receive_block_stream(
709 root_small,
710 &mut futures::stream::iter(vec![Ok((root_big, block_big))]).boxed(),
711 config,
712 MemoryBlockStore::new(),
713 NoCache,
714 )
715 .await;
716
717 assert_matches!(result, Err(Error::BlockSizeExceeded { .. }));
718
719 Ok(())
720 }
721}