co_primitives/library/
unixfs.rs1use crate::{AnyBlockStorage, Block, BlockStorage, KnownMultiCodec, MultiCodec, StorageError};
5use cid::Cid;
6use futures::{AsyncRead, AsyncReadExt};
7use rust_unixfs::file::{adder::FileAdder, visit::IdleFileVisit};
8
9pub async fn unixfs_cat_buffer(storage: &impl AnyBlockStorage, cid: &Cid) -> Result<Vec<u8>, StorageError> {
13 let mut result = Vec::new();
14
15 let mut buf = Vec::new();
23 buf.append(
24 &mut storage
25 .get(MultiCodec::with_codec(KnownMultiCodec::DagPb, cid)?)
26 .await?
27 .into_inner()
28 .1,
29 );
30
31 let (content, _, _metadata, mut step) = IdleFileVisit::default()
33 .start(&buf)
34 .map_err(|e| StorageError::Internal(e.into()))?;
35 result.extend_from_slice(content);
36
37 while let Some(visit) = step {
39 let (first, _) = visit.pending_links();
43
44 buf.clear();
45 buf.append(&mut storage.get(first).await?.into_inner().1);
46
47 let (content, next_step) = visit
51 .continue_walk(&buf, &mut None)
52 .map_err(|e| StorageError::Internal(e.into()))?;
53 result.extend_from_slice(content);
54
55 step = next_step;
57 }
58
59 Ok(result)
61}
62
63pub async fn unixfs_add<I>(storage: &impl AnyBlockStorage, stream: &mut I) -> Result<Vec<Cid>, StorageError>
68where
69 I: AsyncRead + Unpin,
70{
71 let mut result = Vec::new();
72 let mut adder = FileAdder::default();
73 let mut buf = vec![0u8; 16384];
74 loop {
75 let bytes = stream.read(&mut buf).await.map_err(|e| StorageError::Internal(e.into()))?;
77 if bytes == 0 {
78 let blocks = adder.finish();
79 add_blocks(storage, blocks, &mut result).await?;
80 break;
81 }
82
83 let mut total = 0;
85 while total < bytes {
86 let (blocks, consumed) = adder.push(&buf[total..bytes]);
87 add_blocks(storage, blocks, &mut result).await?;
88 total += consumed;
89 }
90 }
91 Ok(result)
92}
93
94pub fn unixfs_encode_buffer(buf: &[u8]) -> Vec<Block> {
97 let mut result = Vec::new();
98 let mut adder = FileAdder::default();
99
100 let mut total = 0;
102 while total < buf.len() {
103 let (blocks, consumed) = adder.push(&buf[total..]);
104 for (cid, data) in blocks {
105 result.push(Block::new_unchecked(cid, data));
106 }
107 total += consumed;
108 }
109
110 for (cid, data) in adder.finish() {
112 result.push(Block::new_unchecked(cid, data));
113 }
114
115 result
117}
118
119async fn add_blocks<S>(
121 storage: &S,
122 blocks: impl Iterator<Item = (Cid, Vec<u8>)>,
123 cids: &mut Vec<Cid>,
124) -> Result<(), StorageError>
125where
126 S: BlockStorage + Send,
127{
128 for (cid, data) in blocks {
129 let block = Block::new_unchecked(cid, data);
130 let cid = storage.set(block).await?;
131 cids.push(cid);
132 }
133 Ok(())
134}
135
136#[cfg(test)]
137mod tests {
138 use crate::{unixfs_add, unixfs_cat_buffer, TestStorage};
139 use cid::Cid;
140 use futures::io::Cursor;
141 use std::str::FromStr;
142
143 #[tokio::test]
146 async fn test_unixfs_add() {
147 let storage = TestStorage::default();
148 let mut stream = Cursor::new("hello world test".repeat(64).repeat(1024).as_bytes().to_vec()); let cids = unixfs_add(&storage, &mut stream).await.unwrap();
150 assert_eq!(5, cids.len());
152 assert_eq!(cids[0], Cid::from_str("QmPEvxGmvxzfMews81gF5NMvFNeFAdNmhtwzGPhkHhoyqy").unwrap());
153 assert_eq!(cids[1], Cid::from_str("QmPEvxGmvxzfMews81gF5NMvFNeFAdNmhtwzGPhkHhoyqy").unwrap());
154 assert_eq!(cids[2], Cid::from_str("QmPEvxGmvxzfMews81gF5NMvFNeFAdNmhtwzGPhkHhoyqy").unwrap());
155 assert_eq!(cids[3], Cid::from_str("QmPEvxGmvxzfMews81gF5NMvFNeFAdNmhtwzGPhkHhoyqy").unwrap());
156 assert_eq!(cids[4], Cid::from_str("QmVRRmYKvn8m3jQT8VHX1BCgrQLFvzsB26aKwLCyFRvYSv").unwrap());
157 }
158
159 #[tokio::test]
160 async fn test_unixfs_add_empty() {
161 let storage = TestStorage::default();
162 let mut stream = Cursor::new([]);
163 let cids = unixfs_add(&storage, &mut stream).await.unwrap();
164 assert_eq!(1, cids.len());
165 assert_eq!(cids[0], Cid::from_str("QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH").unwrap());
166 }
167
168 #[tokio::test]
169 async fn test_unixfs_cat_buffer() {
170 let storage = TestStorage::default();
171 let data = "hello world test".repeat(64).repeat(1024); let mut stream = Cursor::new(data.as_bytes().to_vec());
173 let cids = unixfs_add(&storage, &mut stream).await.unwrap();
174 let buffer = unixfs_cat_buffer(&storage, cids.last().unwrap()).await.unwrap();
175 assert_eq!(data.as_bytes().to_vec(), buffer);
176 }
177}