use crate::{BlockStorage, KnownMultiCodec, MultiCodec, StorageError};
use cid::Cid;
use core::ops::Range;
use futures::Stream;
use rust_unixfs::file::visit::IdleFileVisit;
pub fn unixfs_stream<S>(
storage: S,
cid: Cid,
range: Option<Range<u64>>,
) -> impl Stream<Item = Result<Vec<u8>, StorageError>>
where
S: BlockStorage + Send,
{
async_stream::try_stream! {
let mut visit = IdleFileVisit::default();
if let Some(range) = range {
visit = visit.with_target_range(range);
}
let mut buf = Vec::new();
buf.append(
&mut storage
.get(MultiCodec::with_codec(KnownMultiCodec::DagPb, &cid)?)
.await?
.into_inner()
.1,
);
let (content, _, _metadata, mut step) = visit
.start(&buf)
.map_err(|e| StorageError::Internal(e.into()))?;
yield content.to_vec();
while let Some(visit) = step {
let (first, _) = visit.pending_links();
buf.clear();
buf.append(&mut storage.get(first).await?.into_inner().1);
let (content, next_step) = visit
.continue_walk(&buf, &mut None)
.map_err(|e| StorageError::Internal(e.into()))?;
yield content.to_vec();
step = next_step;
}
}
}
#[cfg(test)]
mod tests {
use crate::{unixfs_add, unixfs_stream, TestStorage};
use futures::{io::Cursor, TryStreamExt};
#[tokio::test]
async fn test_unixfs_stream() {
let storage = TestStorage::default();
let data = "hello world test".repeat(64).repeat(1024); let mut stream = Cursor::new(data.as_bytes().to_vec());
let cids = unixfs_add(&storage, &mut stream).await.unwrap();
let buffer = unixfs_stream(storage, *cids.last().unwrap(), None)
.try_collect::<Vec<_>>()
.await
.unwrap()
.concat();
assert_eq!(buffer, data.as_bytes().to_vec());
}
#[tokio::test]
async fn test_unixfs_stream_range() {
let storage = TestStorage::default();
let data = "hello world test".repeat(64).repeat(1024); let mut stream = Cursor::new(data.as_bytes().to_vec());
let cids = unixfs_add(&storage, &mut stream).await.unwrap();
let range = 512 * 1024..786 * 1024;
let buffer = unixfs_stream(storage, *cids.last().unwrap(), Some(range.clone()))
.try_collect::<Vec<_>>()
.await
.unwrap()
.concat();
let data_bytes = data.as_bytes();
assert_eq!(&buffer[..], &data_bytes[512 * 1024..786 * 1024]);
}
#[tokio::test]
async fn test_unixfs_stream_range_unaligned() {
let storage = TestStorage::default();
let data = "hello world test".repeat(64).repeat(1024); let mut stream = Cursor::new(data.as_bytes().to_vec());
let cids = unixfs_add(&storage, &mut stream).await.unwrap();
let range = 10000..10016;
let buffer = unixfs_stream(storage, *cids.last().unwrap(), Some(range.clone()))
.try_collect::<Vec<_>>()
.await
.unwrap()
.concat();
assert_eq!(&buffer[..], "hello world test".as_bytes());
}
}