1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
use async_stream::try_stream;
use bytes::Bytes;
use cid::Cid;
use libipld_cbor::DagCborCodec;
use noosphere_core::data::BodyChunkIpld;
use noosphere_storage::BlockStore;
use tokio_stream::Stream;
pub struct BodyChunkDecoder<'a, 'b, S: BlockStore>(pub &'a Cid, pub &'b S);
impl<'a, 'b, S: BlockStore> BodyChunkDecoder<'a, 'b, S> {
pub fn stream(self) -> impl Stream<Item = Result<Bytes, std::io::Error>> + Unpin {
let mut next = Some(*self.0);
let store = self.1.clone();
Box::pin(try_stream! {
while let Some(cid) = next {
debug!("Unpacking block {}...", cid);
let chunk = store.load::<DagCborCodec, BodyChunkIpld>(&cid).await.map_err(|error| {
std::io::Error::new(std::io::ErrorKind::UnexpectedEof, error.to_string())
})?;
yield Bytes::from(chunk.bytes);
next = chunk.next;
}
})
}
}