use crate::{AnyBlockStorage, Block, BlockStorage, KnownMultiCodec, MultiCodec, StorageError};
use cid::Cid;
use futures::{AsyncRead, AsyncReadExt};
use rust_unixfs::file::{adder::FileAdder, visit::IdleFileVisit};
pub async fn unixfs_cat_buffer(storage: &impl AnyBlockStorage, cid: &Cid) -> Result<Vec<u8>, StorageError> {
let mut result = Vec::new();
let mut buf = Vec::new();
buf.append(
&mut storage
.get(MultiCodec::with_codec(KnownMultiCodec::DagPb, cid)?)
.await?
.into_inner()
.1,
);
let (content, _, _metadata, mut step) = IdleFileVisit::default()
.start(&buf)
.map_err(|e| StorageError::Internal(e.into()))?;
result.extend_from_slice(content);
while let Some(visit) = step {
let (first, _) = visit.pending_links();
buf.clear();
buf.append(&mut storage.get(first).await?.into_inner().1);
let (content, next_step) = visit
.continue_walk(&buf, &mut None)
.map_err(|e| StorageError::Internal(e.into()))?;
result.extend_from_slice(content);
step = next_step;
}
Ok(result)
}
pub async fn unixfs_add<I>(storage: &impl AnyBlockStorage, stream: &mut I) -> Result<Vec<Cid>, StorageError>
where
I: AsyncRead + Unpin,
{
let mut result = Vec::new();
let mut adder = FileAdder::default();
let mut buf = vec![0u8; 16384];
loop {
let bytes = stream.read(&mut buf).await.map_err(|e| StorageError::Internal(e.into()))?;
if bytes == 0 {
let blocks = adder.finish();
add_blocks(storage, blocks, &mut result).await?;
break;
}
let mut total = 0;
while total < bytes {
let (blocks, consumed) = adder.push(&buf[total..bytes]);
add_blocks(storage, blocks, &mut result).await?;
total += consumed;
}
}
Ok(result)
}
pub fn unixfs_encode_buffer(buf: &[u8]) -> Vec<Block> {
let mut result = Vec::new();
let mut adder = FileAdder::default();
let mut total = 0;
while total < buf.len() {
let (blocks, consumed) = adder.push(&buf[total..]);
for (cid, data) in blocks {
result.push(Block::new_unchecked(cid, data));
}
total += consumed;
}
for (cid, data) in adder.finish() {
result.push(Block::new_unchecked(cid, data));
}
result
}
async fn add_blocks<S>(
storage: &S,
blocks: impl Iterator<Item = (Cid, Vec<u8>)>,
cids: &mut Vec<Cid>,
) -> Result<(), StorageError>
where
S: BlockStorage + Send,
{
for (cid, data) in blocks {
let block = Block::new_unchecked(cid, data);
let cid = storage.set(block).await?;
cids.push(cid);
}
Ok(())
}
#[cfg(test)]
mod tests {
use crate::{unixfs_add, unixfs_cat_buffer, TestStorage};
use cid::Cid;
use futures::io::Cursor;
use std::str::FromStr;
#[tokio::test]
async fn test_unixfs_add() {
let storage = TestStorage::default();
let mut stream = Cursor::new("hello world test".repeat(64).repeat(1024).as_bytes().to_vec()); let cids = unixfs_add(&storage, &mut stream).await.unwrap();
assert_eq!(5, cids.len());
assert_eq!(cids[0], Cid::from_str("QmPEvxGmvxzfMews81gF5NMvFNeFAdNmhtwzGPhkHhoyqy").unwrap());
assert_eq!(cids[1], Cid::from_str("QmPEvxGmvxzfMews81gF5NMvFNeFAdNmhtwzGPhkHhoyqy").unwrap());
assert_eq!(cids[2], Cid::from_str("QmPEvxGmvxzfMews81gF5NMvFNeFAdNmhtwzGPhkHhoyqy").unwrap());
assert_eq!(cids[3], Cid::from_str("QmPEvxGmvxzfMews81gF5NMvFNeFAdNmhtwzGPhkHhoyqy").unwrap());
assert_eq!(cids[4], Cid::from_str("QmVRRmYKvn8m3jQT8VHX1BCgrQLFvzsB26aKwLCyFRvYSv").unwrap());
}
#[tokio::test]
async fn test_unixfs_add_empty() {
let storage = TestStorage::default();
let mut stream = Cursor::new([]);
let cids = unixfs_add(&storage, &mut stream).await.unwrap();
assert_eq!(1, cids.len());
assert_eq!(cids[0], Cid::from_str("QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH").unwrap());
}
#[tokio::test]
async fn test_unixfs_cat_buffer() {
let storage = TestStorage::default();
let data = "hello world test".repeat(64).repeat(1024); let mut stream = Cursor::new(data.as_bytes().to_vec());
let cids = unixfs_add(&storage, &mut stream).await.unwrap();
let buffer = unixfs_cat_buffer(&storage, cids.last().unwrap()).await.unwrap();
assert_eq!(data.as_bytes().to_vec(), buffer);
}
}