wnfs_unixfs_file/
builder.rs

1use crate::{
2    balanced_tree::{DEFAULT_DEGREE, TreeBuilder},
3    chunker::{self, Chunker, ChunkerConfig, DEFAULT_CHUNK_SIZE_LIMIT},
4    protobufs,
5    types::{Block, BoxAsyncRead},
6};
7use anyhow::{Result, anyhow, ensure};
8use bytes::Bytes;
9use futures::{Stream, TryStreamExt};
10use prost::Message;
11use std::fmt::Debug;
12use tokio::io::AsyncRead;
13use wnfs_common::{BlockStore, Cid, utils::CondSend};
14
15/// Representation of a constructed File.
16pub struct File<'a> {
17    content: BoxAsyncRead<'a>,
18    tree_builder: TreeBuilder,
19    chunker: Chunker,
20}
21
22impl Debug for File<'_> {
23    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24        f.debug_struct("File")
25            .field(
26                "content",
27                &"Content::Reader(Pin<Box<dyn AsyncRead + Send>>)",
28            )
29            .field("tree_builder", &self.tree_builder)
30            .field("chunker", &self.chunker)
31            .finish()
32    }
33}
34
35impl<'a> File<'a> {
36    pub fn encode(
37        self,
38        store: &'a impl BlockStore,
39    ) -> Result<impl Stream<Item = Result<(Cid, Block)>> + 'a> {
40        let chunks = self.chunker.chunks(self.content);
41        Ok(self.tree_builder.stream_tree(chunks, store))
42    }
43
44    pub async fn store(self, store: &impl BlockStore) -> Result<Cid> {
45        let blocks = self.encode(store)?;
46        tokio::pin!(blocks);
47
48        let mut root_cid = None;
49
50        while let Some((cid, _)) = blocks.try_next().await? {
51            root_cid = Some(cid);
52        }
53
54        root_cid.ok_or_else(|| anyhow!("error encoding file, no blocks produced"))
55    }
56}
57
58/// Constructs a UnixFS file.
59pub struct FileBuilder<'a> {
60    reader: Option<BoxAsyncRead<'a>>,
61    chunker: Chunker,
62    degree: usize,
63}
64
65impl Default for FileBuilder<'_> {
66    fn default() -> Self {
67        Self {
68            reader: None,
69            chunker: Chunker::Fixed(chunker::Fixed::default()),
70            degree: DEFAULT_DEGREE,
71        }
72    }
73}
74
75impl Debug for FileBuilder<'_> {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        let reader = if self.reader.is_some() {
78            "Some(Box<AsyncRead>)"
79        } else {
80            "None"
81        };
82        f.debug_struct("FileBuilder")
83            .field("chunker", &self.chunker)
84            .field("degree", &self.degree)
85            .field("reader", &reader)
86            .finish()
87    }
88}
89
90/// FileBuilder separates uses a reader or bytes to chunk the data into raw unixfs nodes
91impl<'a> FileBuilder<'a> {
92    pub fn new() -> Self {
93        Default::default()
94    }
95
96    pub fn chunker(mut self, chunker: impl Into<Chunker>) -> Self {
97        self.chunker = chunker.into();
98        self
99    }
100
101    /// Set the chunker to be fixed size.
102    pub fn fixed_chunker(mut self, chunk_size: usize) -> Self {
103        self.chunker = Chunker::Fixed(chunker::Fixed::new(chunk_size));
104        self
105    }
106
107    /// Use the rabin chunker.
108    pub fn rabin_chunker(mut self) -> Self {
109        self.chunker = Chunker::Rabin(Box::default());
110        self
111    }
112
113    pub fn degree(mut self, degree: usize) -> Self {
114        self.degree = degree;
115        self
116    }
117
118    pub fn content_bytes(mut self, content: impl Into<Bytes>) -> Self {
119        let bytes = content.into();
120        self.reader = Some(Box::pin(std::io::Cursor::new(bytes)));
121        self
122    }
123
124    pub fn content_reader(mut self, content: impl AsyncRead + CondSend + 'a) -> Self {
125        self.reader = Some(Box::pin(content));
126        self
127    }
128
129    pub fn build(self) -> Result<File<'a>> {
130        let degree = self.degree;
131        let chunker = self.chunker;
132        let tree_builder = TreeBuilder::balanced_tree_with_degree(degree);
133
134        if let Some(reader) = self.reader {
135            return Ok(File {
136                content: reader,
137                chunker,
138                tree_builder,
139            });
140        }
141
142        anyhow::bail!("must have a reader for the content");
143    }
144}
145
146pub(crate) fn encode_unixfs_pb(
147    inner: &protobufs::Data,
148    links: Vec<protobufs::PbLink>,
149) -> Result<protobufs::PbNode> {
150    let data = inner.encode_to_vec();
151    ensure!(
152        data.len() <= DEFAULT_CHUNK_SIZE_LIMIT,
153        "node is too large: {} bytes",
154        data.len()
155    );
156
157    Ok(protobufs::PbNode {
158        links,
159        data: Some(data.into()),
160    })
161}
162
163/// Configuration for adding unixfs content
164#[derive(Debug, Clone, PartialEq, Eq)]
165pub struct Config {
166    /// Should the outer object be wrapped in a directory?
167    pub wrap: bool,
168    pub chunker: Option<ChunkerConfig>,
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174    use crate::chunker::DEFAULT_CHUNKS_SIZE;
175    use wnfs_common::MemoryBlockStore;
176
177    #[tokio::test]
178    async fn test_builder_stream_small() -> Result<()> {
179        let store = &MemoryBlockStore::new();
180        // Add a file
181        let bar_encoded: Vec<_> = {
182            let bar_reader = std::io::Cursor::new(b"bar");
183            let bar = FileBuilder::new().content_reader(bar_reader).build()?;
184            bar.encode(store)?.try_collect().await?
185        };
186        assert_eq!(bar_encoded.len(), 1);
187
188        // TODO: check content
189        Ok(())
190    }
191
192    #[tokio::test]
193    async fn test_builder_stream_large() -> Result<()> {
194        let store = &MemoryBlockStore::new();
195        // Add a file
196        let bar_encoded: Vec<_> = {
197            let bar_reader = std::io::Cursor::new(vec![1u8; 1024 * 1024]);
198            let bar = FileBuilder::new().content_reader(bar_reader).build()?;
199            bar.encode(store)?.try_collect().await?
200        };
201        assert_eq!(bar_encoded.len(), 5);
202
203        // Add a file
204        let mut baz_content = Vec::with_capacity(1024 * 1024 * 2);
205        for i in 0..2 {
206            for _ in 0..(1024 * 1024) {
207                baz_content.push(i);
208            }
209        }
210
211        let baz_encoded: Vec<_> = {
212            let baz_reader = std::io::Cursor::new(baz_content);
213            let baz = FileBuilder::new().content_reader(baz_reader).build()?;
214            baz.encode(store)?.try_collect().await?
215        };
216        assert_eq!(baz_encoded.len(), 9);
217
218        // TODO: check content
219        Ok(())
220    }
221
222    #[test]
223    fn test_chunk_config_from_str() {
224        assert_eq!(
225            "fixed".parse::<ChunkerConfig>().unwrap(),
226            ChunkerConfig::Fixed(DEFAULT_CHUNKS_SIZE)
227        );
228        assert_eq!(
229            "fixed-123".parse::<ChunkerConfig>().unwrap(),
230            ChunkerConfig::Fixed(123)
231        );
232
233        assert!("fixed-".parse::<ChunkerConfig>().is_err());
234        assert!(
235            format!("fixed-{}", DEFAULT_CHUNK_SIZE_LIMIT + 1)
236                .parse::<ChunkerConfig>()
237                .is_err()
238        );
239        assert!("foo-123".parse::<ChunkerConfig>().is_err());
240        assert!("foo".parse::<ChunkerConfig>().is_err());
241
242        assert_eq!(
243            "rabin".parse::<ChunkerConfig>().unwrap(),
244            ChunkerConfig::Rabin
245        );
246    }
247}
248
249#[cfg(test)]
250mod proptests {
251    use super::*;
252    use crate::unixfs::UnixFsFile;
253    use proptest::{option, strategy::Strategy};
254    use rand_chacha::ChaCha12Rng;
255    use rand_core::{RngCore, SeedableRng};
256    use std::io::SeekFrom;
257    use test_strategy::proptest;
258    use testresult::TestResult;
259    use tokio::io::{AsyncReadExt, AsyncSeekExt};
260    use wnfs_common::{MAX_BLOCK_SIZE, MemoryBlockStore};
261
262    fn arb_chunker() -> impl Strategy<Value = ChunkerConfig> {
263        option::of(1_000..MAX_BLOCK_SIZE).prop_map(|opt| match opt {
264            Some(lim) => ChunkerConfig::Fixed(lim),
265            None => ChunkerConfig::Rabin,
266        })
267    }
268
269    #[proptest(cases = 64)]
270    fn test_encode_decode_roundtrip(
271        seed: u64,
272        #[strategy(2..DEFAULT_DEGREE)] degree: usize,
273        #[strategy(0usize..5_000_000)] len: usize,
274        #[strategy(arb_chunker())] chunker: ChunkerConfig,
275    ) {
276        let store = &MemoryBlockStore::new();
277        let rng = &mut ChaCha12Rng::seed_from_u64(seed);
278        let mut data = vec![0; len];
279        rng.fill_bytes(&mut data);
280
281        async_std::task::block_on(async {
282            let root_cid = FileBuilder::new()
283                .content_bytes(data.clone())
284                .chunker(chunker)
285                .degree(degree)
286                .build()?
287                .store(store)
288                .await?;
289
290            let file = UnixFsFile::load(&root_cid, store).await?;
291            assert_eq!(file.filesize(), Some(len as u64));
292
293            let mut buffer = Vec::new();
294            let mut reader = file.into_content_reader(store, None)?;
295            reader.read_to_end(&mut buffer).await?;
296
297            assert_eq!(buffer, data);
298
299            Ok(()) as TestResult
300        })
301        .unwrap();
302    }
303
304    #[proptest(cases = 256)]
305    fn test_seek_subarray(
306        seed: u64,
307        #[strategy(2..DEFAULT_DEGREE)] degree: usize,
308        #[strategy(0usize..100_000)] len: usize,
309        #[strategy(0usize..100_000)] seek_start: usize,
310        #[strategy(0usize..1_000)] seek_len: usize,
311        #[strategy(arb_chunker())] chunker: ChunkerConfig,
312    ) {
313        let store = &MemoryBlockStore::new();
314        let rng = &mut ChaCha12Rng::seed_from_u64(seed);
315        let mut data = vec![0; len];
316        rng.fill_bytes(&mut data);
317
318        let seek_start = std::cmp::min(seek_start, len);
319        let seek_len = std::cmp::min(seek_start + seek_len, len - seek_start);
320
321        async_std::task::block_on(async {
322            let root_cid = FileBuilder::new()
323                .content_bytes(data.clone())
324                .chunker(chunker)
325                .degree(degree)
326                .build()?
327                .store(store)
328                .await?;
329
330            let file = UnixFsFile::load(&root_cid, store).await?;
331            assert_eq!(file.filesize(), Some(len as u64));
332
333            let mut buffer = vec![0; seek_len];
334            let mut reader = file.into_content_reader(store, None)?;
335            reader.seek(SeekFrom::Start(seek_start as u64)).await?;
336            let read = reader.read_exact(&mut buffer).await?;
337
338            assert_eq!(read, seek_len);
339
340            assert_eq!(buffer, data[seek_start..seek_start + seek_len]);
341
342            Ok(()) as TestResult
343        })
344        .unwrap();
345    }
346}