noosphere_core/view/
content.rs

1use crate::data::BodyChunkIpld;
2use async_stream::try_stream;
3use bytes::Bytes;
4use cid::Cid;
5use libipld_cbor::DagCborCodec;
6use noosphere_storage::BlockStore;
7use tokio_stream::Stream;
8
9/// Helper to easily decode a linked list of `BodyChunkIpld` as a byte stream
10pub struct BodyChunkDecoder<'a, 'b, S: BlockStore>(pub &'a Cid, pub &'b S);
11
12impl<'a, 'b, S: BlockStore> BodyChunkDecoder<'a, 'b, S> {
13    /// Consume the [BodyChunkDecoder] and return an async [Stream] of bytes
14    /// representing the raw body contents
15    pub fn stream(self) -> impl Stream<Item = Result<Bytes, std::io::Error>> + Unpin {
16        let mut next = Some(*self.0);
17        let store = self.1.clone();
18        Box::pin(try_stream! {
19            while let Some(cid) = next {
20                trace!("Unpacking block {}...", cid);
21                let chunk = store.load::<DagCborCodec, BodyChunkIpld>(&cid).await.map_err(|error| {
22                    std::io::Error::new(std::io::ErrorKind::UnexpectedEof, error.to_string())
23                })?;
24                yield Bytes::from(chunk.bytes);
25                next = chunk.next;
26            }
27        })
28    }
29}