noosphere_core/view/
content.rs1use crate::data::BodyChunkIpld;
2use async_stream::try_stream;
3use bytes::Bytes;
4use cid::Cid;
5use libipld_cbor::DagCborCodec;
6use noosphere_storage::BlockStore;
7use tokio_stream::Stream;
8
9pub struct BodyChunkDecoder<'a, 'b, S: BlockStore>(pub &'a Cid, pub &'b S);
11
12impl<'a, 'b, S: BlockStore> BodyChunkDecoder<'a, 'b, S> {
13 pub fn stream(self) -> impl Stream<Item = Result<Bytes, std::io::Error>> + Unpin {
16 let mut next = Some(*self.0);
17 let store = self.1.clone();
18 Box::pin(try_stream! {
19 while let Some(cid) = next {
20 trace!("Unpacking block {}...", cid);
21 let chunk = store.load::<DagCborCodec, BodyChunkIpld>(&cid).await.map_err(|error| {
22 std::io::Error::new(std::io::ErrorKind::UnexpectedEof, error.to_string())
23 })?;
24 yield Bytes::from(chunk.bytes);
25 next = chunk.next;
26 }
27 })
28 }
29}