car_mirror/
common.rs

1use crate::{
2    cache::Cache,
3    dag_walk::DagWalk,
4    error::Error,
5    incremental_verification::{BlockState, IncrementalDagVerification},
6    messages::{PullRequest, PushResponse},
7};
8use bytes::Bytes;
9use deterministic_bloom::runtime_size::BloomFilter;
10use futures::{StreamExt, TryStreamExt};
11use iroh_car::{CarHeader, CarReader, CarWriter};
12use libipld::{Ipld, IpldCodec};
13use libipld_core::{cid::Cid, codec::References};
14use std::io::Cursor;
15use wnfs_common::{
16    utils::{boxed_stream, BoxStream, CondSend},
17    BlockStore,
18};
19
20//--------------------------------------------------------------------------------------------------
21// Types
22//--------------------------------------------------------------------------------------------------
23
24/// Configuration values (such as byte limits) for the CAR mirror protocol
25#[derive(Clone, Debug)]
26pub struct Config {
27    /// The maximum number of bytes per request that a recipient should accept.
28    ///
29    /// This only has an effect in non-streaming versions of this protocol.
30    /// In streaming versions, car-mirror will check the validity of each block
31    /// while streaming.
32    ///
33    /// By default this is 2MB.
34    pub receive_maximum: usize,
35    /// The maximum number of bytes per block.
36    ///
37    /// As long as we can't verify the hash value of a block, we can't verify if we've
38    /// been given the data we actuall want or not, thus we need to put a maximum value
39    /// on the byte size that we accept per block.
40    ///
41    /// By default this is 1MB.
42    ///
43    /// 1MB is also the default maximum block size in IPFS's bitswap protocol.
44    /// 256KiB is the default maximum block size that Kubo produces by default when generating
45    /// UnixFS blocks.
46    ///
47    /// `iroh-car` internally has a maximum 4MB limit on a CAR file frame (CID + block), so
48    /// any value above 4MB doesn't work.
49    pub max_block_size: usize,
50    /// The maximum number of roots per request that will be requested by the recipient
51    /// to be sent by the sender.
52    ///
53    /// By default this is 1_000.
54    pub max_roots_per_round: usize,
55    /// The target false positive rate for the bloom filter that the recipient sends.
56    ///
57    /// By default it's set to `|num| min(0.001, 0.1 / num)`.
58    ///
59    /// This default means bloom filters will aim to have a false positive probability
60    /// one order of magnitude under the number of elements. E.g. for 100_000 elements,
61    /// a false positive probability of 1 in 1 million.
62    pub bloom_fpr: fn(u64) -> f64,
63}
64
65impl Default for Config {
66    fn default() -> Self {
67        Self {
68            receive_maximum: 2_000_000, // 2 MB
69            max_block_size: 1_000_000,  // 1 MB
70            max_roots_per_round: 1000,  // max. ~41KB of CIDs
71            bloom_fpr: |num_of_elems| f64::min(0.001, 0.1 / num_of_elems as f64),
72        }
73    }
74}
75
76/// Some information that the block receiving end provides the block sending end
77/// in order to deduplicate block transfers.
78#[derive(Clone)]
79pub struct ReceiverState {
80    /// At least *some* of the subgraph roots that are missing for sure on the receiving end.
81    pub missing_subgraph_roots: Vec<Cid>,
82    /// An optional bloom filter of all CIDs below the root that the receiving end has.
83    pub have_cids_bloom: Option<BloomFilter>,
84}
85
86/// Newtype around bytes that are supposed to represent a CAR file
87#[derive(Debug, Clone)]
88pub struct CarFile {
89    /// The car file contents as bytes.
90    /// (`CarFile` is cheap to clone, since `Bytes` is like an `Arc` wrapper around a byte buffer.)
91    pub bytes: Bytes,
92}
93
94/// A stream of blocks. This requires the underlying futures to be `Send`, except when the target is `wasm32`.
95pub type BlockStream<'a> = BoxStream<'a, Result<(Cid, Bytes), Error>>;
96
97/// A stream of byte chunks of a CAR file.
98/// The underlying futures are `Send`, except when the target is `wasm32`.
99pub type CarStream<'a> = BoxStream<'a, Result<Bytes, Error>>;
100
101//--------------------------------------------------------------------------------------------------
102// Functions
103//--------------------------------------------------------------------------------------------------
104
105/// This function is run on the block sending side of the protocol.
106///
107/// It's used on the client during the push protocol, or on the server
108/// during the pull protocol.
109///
110/// It returns a `CarFile` of (a subset) of all blocks below `root`, that
111/// are thought to be missing on the receiving end.
112#[tracing::instrument(skip_all, fields(root, last_state))]
113pub async fn block_send(
114    root: Cid,
115    last_state: Option<ReceiverState>,
116    config: &Config,
117    store: impl BlockStore,
118    cache: impl Cache,
119) -> Result<CarFile, Error> {
120    let bytes = block_send_car_stream(
121        root,
122        last_state,
123        Vec::new(),
124        Some(config.receive_maximum),
125        store,
126        cache,
127    )
128    .await?;
129
130    Ok(CarFile {
131        bytes: bytes.into(),
132    })
133}
134
135/// This is the streaming equivalent of `block_send`.
136///
137/// It uses the car file format for framing blocks & CIDs in the given `AsyncWrite`.
138#[tracing::instrument(skip_all, fields(root, last_state))]
139pub async fn block_send_car_stream<W: tokio::io::AsyncWrite + Unpin + Send>(
140    root: Cid,
141    last_state: Option<ReceiverState>,
142    writer: W,
143    send_limit: Option<usize>,
144    store: impl BlockStore,
145    cache: impl Cache,
146) -> Result<W, Error> {
147    let mut block_stream = block_send_block_stream(root, last_state, store, cache).await?;
148    write_blocks_into_car(writer, &mut block_stream, send_limit).await
149}
150
151/// This is the car mirror block sending function, but unlike `block_send_car_stream`
152/// it leaves framing blocks to the caller.
153pub async fn block_send_block_stream<'a>(
154    root: Cid,
155    last_state: Option<ReceiverState>,
156    store: impl BlockStore + 'a,
157    cache: impl Cache + 'a,
158) -> Result<BlockStream<'a>, Error> {
159    let ReceiverState {
160        missing_subgraph_roots,
161        have_cids_bloom,
162    } = last_state.unwrap_or(ReceiverState {
163        missing_subgraph_roots: vec![root],
164        have_cids_bloom: None,
165    });
166
167    // Verify that all missing subgraph roots are in the relevant DAG:
168    let subgraph_roots =
169        verify_missing_subgraph_roots(root, &missing_subgraph_roots, &store, &cache).await?;
170
171    let bloom = handle_missing_bloom(have_cids_bloom);
172
173    let stream = stream_blocks_from_roots(subgraph_roots, bloom, store, cache);
174
175    Ok(Box::pin(stream))
176}
177
178/// This function is run on the block receiving end of the protocol.
179///
180/// It's used on the client during the pull protocol and on the server
181/// during the push protocol.
182///
183/// It takes a `CarFile`, verifies that its contents are related to the
184/// `root` and returns some information to help the block sending side
185/// figure out what blocks to send next.
186#[tracing::instrument(skip_all, fields(root, car_bytes = last_car.as_ref().map(|car| car.bytes.len())))]
187pub async fn block_receive(
188    root: Cid,
189    last_car: Option<CarFile>,
190    config: &Config,
191    store: impl BlockStore,
192    cache: impl Cache,
193) -> Result<ReceiverState, Error> {
194    let mut receiver_state = match last_car {
195        Some(car) => {
196            if car.bytes.len() > config.receive_maximum {
197                return Err(Error::TooManyBytes {
198                    receive_maximum: config.receive_maximum,
199                    bytes_read: car.bytes.len(),
200                });
201            }
202
203            block_receive_car_stream(root, Cursor::new(car.bytes), config, store, cache).await?
204        }
205        None => IncrementalDagVerification::new([root], &store, &cache)
206            .await?
207            .into_receiver_state(config.bloom_fpr),
208    };
209
210    receiver_state
211        .missing_subgraph_roots
212        .truncate(config.max_roots_per_round);
213
214    Ok(receiver_state)
215}
216
217/// Like `block_receive`, but allows consuming the CAR file as a stream.
218#[tracing::instrument(skip_all, fields(root))]
219pub async fn block_receive_car_stream<R: tokio::io::AsyncRead + Unpin + CondSend>(
220    root: Cid,
221    reader: R,
222    config: &Config,
223    store: impl BlockStore,
224    cache: impl Cache,
225) -> Result<ReceiverState, Error> {
226    let reader = CarReader::new(reader).await?;
227
228    let mut stream: BlockStream<'_> = Box::pin(
229        reader
230            .stream()
231            .map_ok(|(cid, bytes)| (cid, Bytes::from(bytes)))
232            .map_err(Error::CarFileError),
233    );
234
235    block_receive_block_stream(root, &mut stream, config, store, cache).await
236}
237
238/// Consumes a stream of blocks, verifying their integrity and
239/// making sure all blocks are part of the DAG.
240pub async fn block_receive_block_stream(
241    root: Cid,
242    stream: &mut BlockStream<'_>,
243    config: &Config,
244    store: impl BlockStore,
245    cache: impl Cache,
246) -> Result<ReceiverState, Error> {
247    let max_block_size = config.max_block_size;
248    let mut dag_verification = IncrementalDagVerification::new([root], &store, &cache).await?;
249
250    while let Some((cid, block)) = stream.try_next().await? {
251        let block_bytes = block.len();
252        // TODO(matheus23): Find a way to restrict size *before* framing. Possibly inside `CarReader`?
253        // Possibly needs making `MAX_ALLOC` in `iroh-car` configurable.
254        if block_bytes > config.max_block_size {
255            return Err(Error::BlockSizeExceeded {
256                cid,
257                block_bytes,
258                max_block_size,
259            });
260        }
261
262        match read_and_verify_block(&mut dag_verification, (cid, block), &store, &cache).await? {
263            BlockState::Have => {
264                // This can happen because we've just discovered a subgraph we already have.
265                // Let's update the endpoint with our new receiver state.
266                tracing::debug!(%cid, "Received block we already have, stopping transfer");
267                break;
268            }
269            BlockState::Unexpected => {
270                // We received a block out-of-order. This is weird, but can
271                // happen due to bloom filter false positives.
272                // Essentially, the sender could've skipped a block that was
273                // important for us to verify that further blocks are connected
274                // to the root.
275                // We should update the endpoint about the skipped block.
276                tracing::debug!(%cid, "Received block out of order, stopping transfer");
277                break;
278            }
279            BlockState::Want => {
280                // Perfect, we're just getting what we want. Let's continue!
281            }
282        }
283    }
284
285    Ok(dag_verification.into_receiver_state(config.bloom_fpr))
286}
287
288/// Turns a stream of blocks (tuples of CIDs and Bytes) into a stream
289/// of frames for a CAR file.
290///
291/// Simply concatenated together, all these frames form a CARv1 file.
292///
293/// The frame boundaries are after the header section and between each block.
294///
295/// The first frame will always be a CAR file header frame.
296pub async fn stream_car_frames(mut blocks: BlockStream<'_>) -> Result<CarStream<'_>, Error> {
297    // https://github.com/wnfs-wg/car-mirror-spec/issues/6
298    // CAR files *must* have at least one CID in them, and all of them
299    // need to appear as a block in the payload.
300    // It would probably make most sense to just write all subgraph roots into this,
301    // but we don't know how many of the subgraph roots fit into this round yet,
302    // so we're simply writing the first one in here, since we know
303    // at least one block will be written (and it'll be that one).
304    let Some((cid, block)) = blocks.try_next().await? else {
305        tracing::debug!("No blocks to write.");
306        return Ok(boxed_stream(futures::stream::empty()));
307    };
308
309    let mut writer = CarWriter::new(CarHeader::new_v1(vec![cid]), Vec::new());
310    writer.write_header().await?;
311    let first_frame = car_frame_from_block((cid, block)).await?;
312
313    let header = writer.finish().await?;
314    Ok(boxed_stream(
315        futures::stream::iter(vec![Ok(header.into()), Ok(first_frame)])
316            .chain(blocks.and_then(car_frame_from_block)),
317    ))
318}
319
320/// Find all CIDs that a block references.
321///
322/// This will error out if
323/// - the codec is not supported
324/// - the block can't be parsed.
325pub fn references<E: Extend<Cid>>(
326    cid: Cid,
327    block: impl AsRef<[u8]>,
328    mut refs: E,
329) -> Result<E, anyhow::Error> {
330    let codec: IpldCodec = cid
331        .codec()
332        .try_into()
333        .map_err(|_| Error::UnsupportedCodec { cid })?;
334
335    <Ipld as References<IpldCodec>>::references(codec, &mut Cursor::new(block), &mut refs)?;
336    Ok(refs)
337}
338
339//--------------------------------------------------------------------------------------------------
340// Private
341//--------------------------------------------------------------------------------------------------
342
343async fn car_frame_from_block(block: (Cid, Bytes)) -> Result<Bytes, Error> {
344    // TODO(matheus23): I wish this were exposed in iroh-car somehow
345    // Instead of having to allocate so many things.
346
347    // The writer will always first emit a header.
348    // If we don't force it here, it'll do so in `writer.write()`.
349    // We do it here so we find out how many bytes we need to skip.
350    let bogus_header = CarHeader::new_v1(vec![Cid::default()]);
351    let mut writer = CarWriter::new(bogus_header, Vec::new());
352    let start = writer.write_header().await?;
353
354    writer.write(block.0, block.1).await?;
355    let mut bytes = writer.finish().await?;
356
357    // This removes the bogus header bytes
358    bytes.drain(0..start);
359
360    Ok(bytes.into())
361}
362
363/// Ensure that any requested subgraph roots are actually part
364/// of the DAG from the root.
365async fn verify_missing_subgraph_roots(
366    root: Cid,
367    missing_subgraph_roots: &[Cid],
368    store: &impl BlockStore,
369    cache: &impl Cache,
370) -> Result<Vec<Cid>, Error> {
371    let subgraph_roots: Vec<Cid> = DagWalk::breadth_first([root])
372        .stream(store, cache)
373        .try_filter_map(|item| async move {
374            let cid = item.to_cid()?;
375            Ok(missing_subgraph_roots.contains(&cid).then_some(cid))
376        })
377        .try_collect()
378        .await?;
379
380    if subgraph_roots.len() != missing_subgraph_roots.len() {
381        let unrelated_roots = missing_subgraph_roots
382            .iter()
383            .filter(|cid| !subgraph_roots.contains(cid))
384            .map(|cid| cid.to_string())
385            .collect::<Vec<_>>()
386            .join(", ");
387
388        tracing::warn!(
389            unrelated_roots = %unrelated_roots,
390            "got asked for DAG-unrelated blocks"
391        );
392    }
393
394    Ok(subgraph_roots)
395}
396
397fn handle_missing_bloom(have_cids_bloom: Option<BloomFilter>) -> BloomFilter {
398    if let Some(bloom) = &have_cids_bloom {
399        tracing::debug!(
400            size_bits = bloom.as_bytes().len() * 8,
401            hash_count = bloom.hash_count(),
402            ones_count = bloom.count_ones(),
403            estimated_fpr = bloom.current_false_positive_rate(),
404            "received 'have cids' bloom",
405        );
406    }
407
408    have_cids_bloom.unwrap_or_else(|| BloomFilter::new_with(1, Box::new([0]))) // An empty bloom that contains nothing
409}
410
411fn stream_blocks_from_roots<'a>(
412    subgraph_roots: Vec<Cid>,
413    bloom: BloomFilter,
414    store: impl BlockStore + 'a,
415    cache: impl Cache + 'a,
416) -> BlockStream<'a> {
417    Box::pin(async_stream::try_stream! {
418        let mut dag_walk = DagWalk::breadth_first(subgraph_roots.clone());
419
420        while let Some(item) = dag_walk.next(&store, &cache).await? {
421            let cid = item.to_cid()?;
422
423            if should_block_be_skipped(&cid, &bloom, &subgraph_roots) {
424                continue;
425            }
426
427            let bytes = store.get_block(&cid).await.map_err(Error::BlockStoreError)?;
428
429            yield (cid, bytes);
430        }
431    })
432}
433
434async fn write_blocks_into_car<W: tokio::io::AsyncWrite + Unpin + Send>(
435    write: W,
436    blocks: &mut BlockStream<'_>,
437    size_limit: Option<usize>,
438) -> Result<W, Error> {
439    let mut block_bytes = 0;
440
441    // https://github.com/wnfs-wg/car-mirror-spec/issues/6
442    // CAR files *must* have at least one CID in them, and all of them
443    // need to appear as a block in the payload.
444    // It would probably make most sense to just write all subgraph roots into this,
445    // but we don't know how many of the subgraph roots fit into this round yet,
446    // so we're simply writing the first one in here, since we know
447    // at least one block will be written (and it'll be that one).
448    let Some((cid, block)) = blocks.try_next().await? else {
449        tracing::debug!("No blocks to write.");
450        return Ok(write);
451    };
452
453    let mut writer = CarWriter::new(CarHeader::new_v1(vec![cid]), write);
454
455    block_bytes += writer.write(cid, block).await?;
456
457    while let Some((cid, block)) = blocks.try_next().await? {
458        tracing::debug!(
459            cid = %cid,
460            num_bytes = block.len(),
461            "writing block to CAR",
462        );
463
464        // Let's be conservative, assume a 64-byte CID (usually ~40 byte)
465        // and a 4-byte frame size varint (3 byte would be enough for an 8MiB frame).
466        let added_bytes = 64 + 4 + block.len();
467
468        if let Some(receive_limit) = size_limit {
469            if block_bytes + added_bytes > receive_limit {
470                tracing::debug!(%cid, receive_limit, block_bytes, added_bytes, "Skipping block because it would go over the receive limit");
471                break;
472            }
473        }
474
475        block_bytes += writer.write(cid, &block).await?;
476    }
477
478    Ok(writer.finish().await?)
479}
480
481fn should_block_be_skipped(cid: &Cid, bloom: &BloomFilter, subgraph_roots: &[Cid]) -> bool {
482    bloom.contains(&cid.to_bytes()) && !subgraph_roots.contains(cid)
483}
484
485/// Takes a block and stores it iff it's one of the blocks we're currently trying to retrieve.
486/// Returns the block state of the received block.
487async fn read_and_verify_block(
488    dag_verification: &mut IncrementalDagVerification,
489    (cid, block): (Cid, Bytes),
490    store: &impl BlockStore,
491    cache: &impl Cache,
492) -> Result<BlockState, Error> {
493    match dag_verification.block_state(cid) {
494        BlockState::Have => Ok(BlockState::Have),
495        BlockState::Unexpected => {
496            tracing::trace!(
497                cid = %cid,
498                "received block out of order (possibly due to bloom false positive)"
499            );
500            Ok(BlockState::Unexpected)
501        }
502        BlockState::Want => {
503            dag_verification
504                .verify_and_store_block((cid, block), store, cache)
505                .await?;
506            Ok(BlockState::Want)
507        }
508    }
509}
510
511//--------------------------------------------------------------------------------------------------
512// Implementations
513//--------------------------------------------------------------------------------------------------
514
515impl From<PushResponse> for ReceiverState {
516    fn from(push: PushResponse) -> Self {
517        let PushResponse {
518            subgraph_roots,
519            bloom_hash_count: hash_count,
520            bloom_bytes: bytes,
521        } = push;
522
523        Self {
524            missing_subgraph_roots: subgraph_roots,
525            have_cids_bloom: Self::bloom_deserialize(hash_count, bytes),
526        }
527    }
528}
529
530impl From<PullRequest> for ReceiverState {
531    fn from(pull: PullRequest) -> Self {
532        let PullRequest {
533            resources,
534            bloom_hash_count: hash_count,
535            bloom_bytes: bytes,
536        } = pull;
537
538        Self {
539            missing_subgraph_roots: resources,
540            have_cids_bloom: Self::bloom_deserialize(hash_count, bytes),
541        }
542    }
543}
544
545impl From<ReceiverState> for PushResponse {
546    fn from(receiver_state: ReceiverState) -> PushResponse {
547        let ReceiverState {
548            missing_subgraph_roots,
549            have_cids_bloom,
550        } = receiver_state;
551
552        let (hash_count, bytes) = ReceiverState::bloom_serialize(have_cids_bloom);
553
554        PushResponse {
555            subgraph_roots: missing_subgraph_roots,
556            bloom_hash_count: hash_count,
557            bloom_bytes: bytes,
558        }
559    }
560}
561
562impl From<ReceiverState> for PullRequest {
563    fn from(receiver_state: ReceiverState) -> PullRequest {
564        let ReceiverState {
565            missing_subgraph_roots,
566            have_cids_bloom,
567        } = receiver_state;
568
569        let (hash_count, bytes) = ReceiverState::bloom_serialize(have_cids_bloom);
570
571        PullRequest {
572            resources: missing_subgraph_roots,
573            bloom_hash_count: hash_count,
574            bloom_bytes: bytes,
575        }
576    }
577}
578
579impl ReceiverState {
580    fn bloom_serialize(bloom: Option<BloomFilter>) -> (u32, Vec<u8>) {
581        match bloom {
582            Some(bloom) => (bloom.hash_count() as u32, bloom.as_bytes().to_vec()),
583            None => (3, Vec::new()),
584        }
585    }
586
587    fn bloom_deserialize(hash_count: u32, bytes: Vec<u8>) -> Option<BloomFilter> {
588        if bytes.is_empty() {
589            None
590        } else {
591            Some(BloomFilter::new_with(
592                hash_count as usize,
593                bytes.into_boxed_slice(),
594            ))
595        }
596    }
597}
598
599impl std::fmt::Debug for ReceiverState {
600    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
601        let have_cids_bloom = self
602            .have_cids_bloom
603            .as_ref()
604            .map_or("None".into(), |bloom| {
605                format!(
606                    "Some(BloomFilter(k_hashes = {}, {} bytes))",
607                    bloom.hash_count(),
608                    bloom.as_bytes().len()
609                )
610            });
611        f.debug_struct("ReceiverState")
612            .field(
613                "missing_subgraph_roots.len() == ",
614                &self.missing_subgraph_roots.len(),
615            )
616            .field("have_cids_bloom", &have_cids_bloom)
617            .finish()
618    }
619}
620
621#[cfg(test)]
622pub(crate) mod tests {
623    use super::*;
624    use crate::{cache::NoCache, test_utils::assert_cond_send_sync};
625    use assert_matches::assert_matches;
626    use testresult::TestResult;
627    use wnfs_common::{MemoryBlockStore, CODEC_RAW};
628
629    #[allow(clippy::unreachable, unused)]
630    fn test_assert_send() {
631        assert_cond_send_sync(|| {
632            block_send(
633                unimplemented!(),
634                unimplemented!(),
635                unimplemented!(),
636                unimplemented!() as MemoryBlockStore,
637                NoCache,
638            )
639        });
640        assert_cond_send_sync(|| {
641            block_receive(
642                unimplemented!(),
643                unimplemented!(),
644                unimplemented!(),
645                unimplemented!() as &MemoryBlockStore,
646                &NoCache,
647            )
648        })
649    }
650
651    #[test]
652    fn test_receiver_state_is_not_a_huge_debug() -> TestResult {
653        let state = ReceiverState {
654            have_cids_bloom: Some(BloomFilter::new_from_size(4096, 1000)),
655            missing_subgraph_roots: vec![Cid::default(); 1000],
656        };
657
658        let debug_print = format!("{state:#?}");
659
660        assert!(debug_print.len() < 1000);
661
662        Ok(())
663    }
664
665    #[test_log::test(async_std::test)]
666    async fn test_stream_car_frame_empty() -> TestResult {
667        let car_frames = stream_car_frames(futures::stream::empty().boxed()).await?;
668        let frames: Vec<Bytes> = car_frames.try_collect().await?;
669
670        assert!(frames.is_empty());
671
672        Ok(())
673    }
674
675    #[test_log::test(async_std::test)]
676    async fn test_write_blocks_into_car_empty() -> TestResult {
677        let car_file =
678            write_blocks_into_car(Vec::new(), &mut futures::stream::empty().boxed(), None).await?;
679
680        assert!(car_file.is_empty());
681
682        Ok(())
683    }
684
685    #[test_log::test(async_std::test)]
686    async fn test_block_receive_block_stream_block_size_exceeded() -> TestResult {
687        let store = &MemoryBlockStore::new();
688
689        let block_small: Bytes = b"This one is small".to_vec().into();
690        let block_big: Bytes = b"This one is very very very big".to_vec().into();
691        let root_small = store.put_block(block_small.clone(), CODEC_RAW).await?;
692        let root_big = store.put_block(block_big.clone(), CODEC_RAW).await?;
693
694        let config = &Config {
695            max_block_size: 20,
696            ..Config::default()
697        };
698
699        block_receive_block_stream(
700            root_small,
701            &mut futures::stream::iter(vec![Ok((root_small, block_small))]).boxed(),
702            config,
703            MemoryBlockStore::new(),
704            NoCache,
705        )
706        .await?;
707
708        let result = block_receive_block_stream(
709            root_small,
710            &mut futures::stream::iter(vec![Ok((root_big, block_big))]).boxed(),
711            config,
712            MemoryBlockStore::new(),
713            NoCache,
714        )
715        .await;
716
717        assert_matches!(result, Err(Error::BlockSizeExceeded { .. }));
718
719        Ok(())
720    }
721}