wnfs_unixfs_file/
builder.rs

1use crate::{
2    balanced_tree::{TreeBuilder, DEFAULT_DEGREE},
3    chunker::{self, Chunker, ChunkerConfig, DEFAULT_CHUNK_SIZE_LIMIT},
4    protobufs,
5    types::{Block, BoxAsyncRead},
6};
7use anyhow::{anyhow, ensure, Result};
8use bytes::Bytes;
9use futures::{Stream, TryStreamExt};
10use libipld::Cid;
11use prost::Message;
12use std::fmt::Debug;
13use tokio::io::AsyncRead;
14use wnfs_common::{utils::CondSend, BlockStore};
15
16/// Representation of a constructed File.
17pub struct File<'a> {
18    content: BoxAsyncRead<'a>,
19    tree_builder: TreeBuilder,
20    chunker: Chunker,
21}
22
23impl<'a> Debug for File<'a> {
24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25        f.debug_struct("File")
26            .field(
27                "content",
28                &"Content::Reader(Pin<Box<dyn AsyncRead + Send>>)",
29            )
30            .field("tree_builder", &self.tree_builder)
31            .field("chunker", &self.chunker)
32            .finish()
33    }
34}
35
36impl<'a> File<'a> {
37    pub fn encode(
38        self,
39        store: &'a impl BlockStore,
40    ) -> Result<impl Stream<Item = Result<(Cid, Block)>> + '_> {
41        let chunks = self.chunker.chunks(self.content);
42        Ok(self.tree_builder.stream_tree(chunks, store))
43    }
44
45    pub async fn store(self, store: &impl BlockStore) -> Result<Cid> {
46        let blocks = self.encode(store)?;
47        tokio::pin!(blocks);
48
49        let mut root_cid = None;
50
51        while let Some((cid, _)) = blocks.try_next().await? {
52            root_cid = Some(cid);
53        }
54
55        root_cid.ok_or_else(|| anyhow!("error encoding file, no blocks produced"))
56    }
57}
58
59/// Constructs a UnixFS file.
60pub struct FileBuilder<'a> {
61    reader: Option<BoxAsyncRead<'a>>,
62    chunker: Chunker,
63    degree: usize,
64}
65
66impl<'a> Default for FileBuilder<'a> {
67    fn default() -> Self {
68        Self {
69            reader: None,
70            chunker: Chunker::Fixed(chunker::Fixed::default()),
71            degree: DEFAULT_DEGREE,
72        }
73    }
74}
75
76impl<'a> Debug for FileBuilder<'a> {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        let reader = if self.reader.is_some() {
79            "Some(Box<AsyncRead>)"
80        } else {
81            "None"
82        };
83        f.debug_struct("FileBuilder")
84            .field("chunker", &self.chunker)
85            .field("degree", &self.degree)
86            .field("reader", &reader)
87            .finish()
88    }
89}
90
91/// FileBuilder separates uses a reader or bytes to chunk the data into raw unixfs nodes
92impl<'a> FileBuilder<'a> {
93    pub fn new() -> Self {
94        Default::default()
95    }
96
97    pub fn chunker(mut self, chunker: impl Into<Chunker>) -> Self {
98        self.chunker = chunker.into();
99        self
100    }
101
102    /// Set the chunker to be fixed size.
103    pub fn fixed_chunker(mut self, chunk_size: usize) -> Self {
104        self.chunker = Chunker::Fixed(chunker::Fixed::new(chunk_size));
105        self
106    }
107
108    /// Use the rabin chunker.
109    pub fn rabin_chunker(mut self) -> Self {
110        self.chunker = Chunker::Rabin(Box::default());
111        self
112    }
113
114    pub fn degree(mut self, degree: usize) -> Self {
115        self.degree = degree;
116        self
117    }
118
119    pub fn content_bytes(mut self, content: impl Into<Bytes>) -> Self {
120        let bytes = content.into();
121        self.reader = Some(Box::pin(std::io::Cursor::new(bytes)));
122        self
123    }
124
125    pub fn content_reader(mut self, content: impl AsyncRead + CondSend + 'a) -> Self {
126        self.reader = Some(Box::pin(content));
127        self
128    }
129
130    pub fn build(self) -> Result<File<'a>> {
131        let degree = self.degree;
132        let chunker = self.chunker;
133        let tree_builder = TreeBuilder::balanced_tree_with_degree(degree);
134
135        if let Some(reader) = self.reader {
136            return Ok(File {
137                content: reader,
138                chunker,
139                tree_builder,
140            });
141        }
142
143        anyhow::bail!("must have a reader for the content");
144    }
145}
146
147pub(crate) fn encode_unixfs_pb(
148    inner: &protobufs::Data,
149    links: Vec<protobufs::PbLink>,
150) -> Result<protobufs::PbNode> {
151    let data = inner.encode_to_vec();
152    ensure!(
153        data.len() <= DEFAULT_CHUNK_SIZE_LIMIT,
154        "node is too large: {} bytes",
155        data.len()
156    );
157
158    Ok(protobufs::PbNode {
159        links,
160        data: Some(data.into()),
161    })
162}
163
164/// Configuration for adding unixfs content
165#[derive(Debug, Clone, PartialEq, Eq)]
166pub struct Config {
167    /// Should the outer object be wrapped in a directory?
168    pub wrap: bool,
169    pub chunker: Option<ChunkerConfig>,
170}
171
172#[cfg(test)]
173mod tests {
174    use super::*;
175    use crate::chunker::DEFAULT_CHUNKS_SIZE;
176    use futures::TryStreamExt;
177    use wnfs_common::MemoryBlockStore;
178
179    #[tokio::test]
180    async fn test_builder_stream_small() -> Result<()> {
181        let store = &MemoryBlockStore::new();
182        // Add a file
183        let bar_encoded: Vec<_> = {
184            let bar_reader = std::io::Cursor::new(b"bar");
185            let bar = FileBuilder::new().content_reader(bar_reader).build()?;
186            bar.encode(store)?.try_collect().await?
187        };
188        assert_eq!(bar_encoded.len(), 1);
189
190        // TODO: check content
191        Ok(())
192    }
193
194    #[tokio::test]
195    async fn test_builder_stream_large() -> Result<()> {
196        let store = &MemoryBlockStore::new();
197        // Add a file
198        let bar_encoded: Vec<_> = {
199            let bar_reader = std::io::Cursor::new(vec![1u8; 1024 * 1024]);
200            let bar = FileBuilder::new().content_reader(bar_reader).build()?;
201            bar.encode(store)?.try_collect().await?
202        };
203        assert_eq!(bar_encoded.len(), 5);
204
205        // Add a file
206        let mut baz_content = Vec::with_capacity(1024 * 1024 * 2);
207        for i in 0..2 {
208            for _ in 0..(1024 * 1024) {
209                baz_content.push(i);
210            }
211        }
212
213        let baz_encoded: Vec<_> = {
214            let baz_reader = std::io::Cursor::new(baz_content);
215            let baz = FileBuilder::new().content_reader(baz_reader).build()?;
216            baz.encode(store)?.try_collect().await?
217        };
218        assert_eq!(baz_encoded.len(), 9);
219
220        // TODO: check content
221        Ok(())
222    }
223
224    #[test]
225    fn test_chunk_config_from_str() {
226        assert_eq!(
227            "fixed".parse::<ChunkerConfig>().unwrap(),
228            ChunkerConfig::Fixed(DEFAULT_CHUNKS_SIZE)
229        );
230        assert_eq!(
231            "fixed-123".parse::<ChunkerConfig>().unwrap(),
232            ChunkerConfig::Fixed(123)
233        );
234
235        assert!("fixed-".parse::<ChunkerConfig>().is_err());
236        assert!(format!("fixed-{}", DEFAULT_CHUNK_SIZE_LIMIT + 1)
237            .parse::<ChunkerConfig>()
238            .is_err());
239        assert!("foo-123".parse::<ChunkerConfig>().is_err());
240        assert!("foo".parse::<ChunkerConfig>().is_err());
241
242        assert_eq!(
243            "rabin".parse::<ChunkerConfig>().unwrap(),
244            ChunkerConfig::Rabin
245        );
246    }
247}
248
249#[cfg(test)]
250mod proptests {
251    use super::*;
252    use crate::unixfs::UnixFsFile;
253    use proptest::{option, strategy::Strategy};
254    use rand_chacha::ChaCha12Rng;
255    use rand_core::{RngCore, SeedableRng};
256    use std::io::SeekFrom;
257    use test_strategy::proptest;
258    use testresult::TestResult;
259    use tokio::io::{AsyncReadExt, AsyncSeekExt};
260    use wnfs_common::{MemoryBlockStore, MAX_BLOCK_SIZE};
261
262    fn arb_chunker() -> impl Strategy<Value = ChunkerConfig> {
263        option::of(1_000..MAX_BLOCK_SIZE).prop_map(|opt| match opt {
264            Some(lim) => ChunkerConfig::Fixed(lim),
265            None => ChunkerConfig::Rabin,
266        })
267    }
268
269    #[proptest(cases = 64)]
270    fn test_encode_decode_roundtrip(
271        seed: u64,
272        #[strategy(2..DEFAULT_DEGREE)] degree: usize,
273        #[strategy(0usize..5_000_000)] len: usize,
274        #[strategy(arb_chunker())] chunker: ChunkerConfig,
275    ) {
276        let store = &MemoryBlockStore::new();
277        let rng = &mut ChaCha12Rng::seed_from_u64(seed);
278        let mut data = vec![0; len];
279        rng.fill_bytes(&mut data);
280
281        async_std::task::block_on(async {
282            let root_cid = FileBuilder::new()
283                .content_bytes(data.clone())
284                .chunker(chunker)
285                .degree(degree)
286                .build()?
287                .store(store)
288                .await?;
289
290            let file = UnixFsFile::load(&root_cid, store).await?;
291            assert_eq!(file.filesize(), Some(len as u64));
292
293            let mut buffer = Vec::new();
294            let mut reader = file.into_content_reader(store, None)?;
295            reader.read_to_end(&mut buffer).await?;
296
297            assert_eq!(buffer, data);
298
299            Ok(()) as TestResult
300        })
301        .unwrap();
302    }
303
304    #[proptest(cases = 256)]
305    fn test_seek_subarray(
306        seed: u64,
307        #[strategy(2..DEFAULT_DEGREE)] degree: usize,
308        #[strategy(0usize..100_000)] len: usize,
309        #[strategy(0usize..100_000)] seek_start: usize,
310        #[strategy(0usize..1_000)] seek_len: usize,
311        #[strategy(arb_chunker())] chunker: ChunkerConfig,
312    ) {
313        let store = &MemoryBlockStore::new();
314        let rng = &mut ChaCha12Rng::seed_from_u64(seed);
315        let mut data = vec![0; len];
316        rng.fill_bytes(&mut data);
317
318        let seek_start = std::cmp::min(seek_start, len);
319        let seek_len = std::cmp::min(seek_start + seek_len, len - seek_start);
320
321        async_std::task::block_on(async {
322            let root_cid = FileBuilder::new()
323                .content_bytes(data.clone())
324                .chunker(chunker)
325                .degree(degree)
326                .build()?
327                .store(store)
328                .await?;
329
330            let file = UnixFsFile::load(&root_cid, store).await?;
331            assert_eq!(file.filesize(), Some(len as u64));
332
333            let mut buffer = vec![0; seek_len];
334            let mut reader = file.into_content_reader(store, None)?;
335            reader.seek(SeekFrom::Start(seek_start as u64)).await?;
336            let read = reader.read_exact(&mut buffer).await?;
337
338            assert_eq!(read, seek_len);
339
340            assert_eq!(buffer, data[seek_start..seek_start + seek_len]);
341
342            Ok(()) as TestResult
343        })
344        .unwrap();
345    }
346}