1use crate::{
2 balanced_tree::{DEFAULT_DEGREE, TreeBuilder},
3 chunker::{self, Chunker, ChunkerConfig, DEFAULT_CHUNK_SIZE_LIMIT},
4 protobufs,
5 types::{Block, BoxAsyncRead},
6};
7use anyhow::{Result, anyhow, ensure};
8use bytes::Bytes;
9use futures::{Stream, TryStreamExt};
10use prost::Message;
11use std::fmt::Debug;
12use tokio::io::AsyncRead;
13use wnfs_common::{BlockStore, Cid, utils::CondSend};
14
15pub struct File<'a> {
17 content: BoxAsyncRead<'a>,
18 tree_builder: TreeBuilder,
19 chunker: Chunker,
20}
21
22impl Debug for File<'_> {
23 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24 f.debug_struct("File")
25 .field(
26 "content",
27 &"Content::Reader(Pin<Box<dyn AsyncRead + Send>>)",
28 )
29 .field("tree_builder", &self.tree_builder)
30 .field("chunker", &self.chunker)
31 .finish()
32 }
33}
34
35impl<'a> File<'a> {
36 pub fn encode(
37 self,
38 store: &'a impl BlockStore,
39 ) -> Result<impl Stream<Item = Result<(Cid, Block)>> + 'a> {
40 let chunks = self.chunker.chunks(self.content);
41 Ok(self.tree_builder.stream_tree(chunks, store))
42 }
43
44 pub async fn store(self, store: &impl BlockStore) -> Result<Cid> {
45 let blocks = self.encode(store)?;
46 tokio::pin!(blocks);
47
48 let mut root_cid = None;
49
50 while let Some((cid, _)) = blocks.try_next().await? {
51 root_cid = Some(cid);
52 }
53
54 root_cid.ok_or_else(|| anyhow!("error encoding file, no blocks produced"))
55 }
56}
57
58pub struct FileBuilder<'a> {
60 reader: Option<BoxAsyncRead<'a>>,
61 chunker: Chunker,
62 degree: usize,
63}
64
65impl Default for FileBuilder<'_> {
66 fn default() -> Self {
67 Self {
68 reader: None,
69 chunker: Chunker::Fixed(chunker::Fixed::default()),
70 degree: DEFAULT_DEGREE,
71 }
72 }
73}
74
75impl Debug for FileBuilder<'_> {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 let reader = if self.reader.is_some() {
78 "Some(Box<AsyncRead>)"
79 } else {
80 "None"
81 };
82 f.debug_struct("FileBuilder")
83 .field("chunker", &self.chunker)
84 .field("degree", &self.degree)
85 .field("reader", &reader)
86 .finish()
87 }
88}
89
90impl<'a> FileBuilder<'a> {
92 pub fn new() -> Self {
93 Default::default()
94 }
95
96 pub fn chunker(mut self, chunker: impl Into<Chunker>) -> Self {
97 self.chunker = chunker.into();
98 self
99 }
100
101 pub fn fixed_chunker(mut self, chunk_size: usize) -> Self {
103 self.chunker = Chunker::Fixed(chunker::Fixed::new(chunk_size));
104 self
105 }
106
107 pub fn rabin_chunker(mut self) -> Self {
109 self.chunker = Chunker::Rabin(Box::default());
110 self
111 }
112
113 pub fn degree(mut self, degree: usize) -> Self {
114 self.degree = degree;
115 self
116 }
117
118 pub fn content_bytes(mut self, content: impl Into<Bytes>) -> Self {
119 let bytes = content.into();
120 self.reader = Some(Box::pin(std::io::Cursor::new(bytes)));
121 self
122 }
123
124 pub fn content_reader(mut self, content: impl AsyncRead + CondSend + 'a) -> Self {
125 self.reader = Some(Box::pin(content));
126 self
127 }
128
129 pub fn build(self) -> Result<File<'a>> {
130 let degree = self.degree;
131 let chunker = self.chunker;
132 let tree_builder = TreeBuilder::balanced_tree_with_degree(degree);
133
134 if let Some(reader) = self.reader {
135 return Ok(File {
136 content: reader,
137 chunker,
138 tree_builder,
139 });
140 }
141
142 anyhow::bail!("must have a reader for the content");
143 }
144}
145
146pub(crate) fn encode_unixfs_pb(
147 inner: &protobufs::Data,
148 links: Vec<protobufs::PbLink>,
149) -> Result<protobufs::PbNode> {
150 let data = inner.encode_to_vec();
151 ensure!(
152 data.len() <= DEFAULT_CHUNK_SIZE_LIMIT,
153 "node is too large: {} bytes",
154 data.len()
155 );
156
157 Ok(protobufs::PbNode {
158 links,
159 data: Some(data.into()),
160 })
161}
162
163#[derive(Debug, Clone, PartialEq, Eq)]
165pub struct Config {
166 pub wrap: bool,
168 pub chunker: Option<ChunkerConfig>,
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174 use crate::chunker::DEFAULT_CHUNKS_SIZE;
175 use wnfs_common::MemoryBlockStore;
176
177 #[tokio::test]
178 async fn test_builder_stream_small() -> Result<()> {
179 let store = &MemoryBlockStore::new();
180 let bar_encoded: Vec<_> = {
182 let bar_reader = std::io::Cursor::new(b"bar");
183 let bar = FileBuilder::new().content_reader(bar_reader).build()?;
184 bar.encode(store)?.try_collect().await?
185 };
186 assert_eq!(bar_encoded.len(), 1);
187
188 Ok(())
190 }
191
192 #[tokio::test]
193 async fn test_builder_stream_large() -> Result<()> {
194 let store = &MemoryBlockStore::new();
195 let bar_encoded: Vec<_> = {
197 let bar_reader = std::io::Cursor::new(vec![1u8; 1024 * 1024]);
198 let bar = FileBuilder::new().content_reader(bar_reader).build()?;
199 bar.encode(store)?.try_collect().await?
200 };
201 assert_eq!(bar_encoded.len(), 5);
202
203 let mut baz_content = Vec::with_capacity(1024 * 1024 * 2);
205 for i in 0..2 {
206 for _ in 0..(1024 * 1024) {
207 baz_content.push(i);
208 }
209 }
210
211 let baz_encoded: Vec<_> = {
212 let baz_reader = std::io::Cursor::new(baz_content);
213 let baz = FileBuilder::new().content_reader(baz_reader).build()?;
214 baz.encode(store)?.try_collect().await?
215 };
216 assert_eq!(baz_encoded.len(), 9);
217
218 Ok(())
220 }
221
222 #[test]
223 fn test_chunk_config_from_str() {
224 assert_eq!(
225 "fixed".parse::<ChunkerConfig>().unwrap(),
226 ChunkerConfig::Fixed(DEFAULT_CHUNKS_SIZE)
227 );
228 assert_eq!(
229 "fixed-123".parse::<ChunkerConfig>().unwrap(),
230 ChunkerConfig::Fixed(123)
231 );
232
233 assert!("fixed-".parse::<ChunkerConfig>().is_err());
234 assert!(
235 format!("fixed-{}", DEFAULT_CHUNK_SIZE_LIMIT + 1)
236 .parse::<ChunkerConfig>()
237 .is_err()
238 );
239 assert!("foo-123".parse::<ChunkerConfig>().is_err());
240 assert!("foo".parse::<ChunkerConfig>().is_err());
241
242 assert_eq!(
243 "rabin".parse::<ChunkerConfig>().unwrap(),
244 ChunkerConfig::Rabin
245 );
246 }
247}
248
249#[cfg(test)]
250mod proptests {
251 use super::*;
252 use crate::unixfs::UnixFsFile;
253 use proptest::{option, strategy::Strategy};
254 use rand_chacha::ChaCha12Rng;
255 use rand_core::{RngCore, SeedableRng};
256 use std::io::SeekFrom;
257 use test_strategy::proptest;
258 use testresult::TestResult;
259 use tokio::io::{AsyncReadExt, AsyncSeekExt};
260 use wnfs_common::{MAX_BLOCK_SIZE, MemoryBlockStore};
261
262 fn arb_chunker() -> impl Strategy<Value = ChunkerConfig> {
263 option::of(1_000..MAX_BLOCK_SIZE).prop_map(|opt| match opt {
264 Some(lim) => ChunkerConfig::Fixed(lim),
265 None => ChunkerConfig::Rabin,
266 })
267 }
268
269 #[proptest(cases = 64)]
270 fn test_encode_decode_roundtrip(
271 seed: u64,
272 #[strategy(2..DEFAULT_DEGREE)] degree: usize,
273 #[strategy(0usize..5_000_000)] len: usize,
274 #[strategy(arb_chunker())] chunker: ChunkerConfig,
275 ) {
276 let store = &MemoryBlockStore::new();
277 let rng = &mut ChaCha12Rng::seed_from_u64(seed);
278 let mut data = vec![0; len];
279 rng.fill_bytes(&mut data);
280
281 async_std::task::block_on(async {
282 let root_cid = FileBuilder::new()
283 .content_bytes(data.clone())
284 .chunker(chunker)
285 .degree(degree)
286 .build()?
287 .store(store)
288 .await?;
289
290 let file = UnixFsFile::load(&root_cid, store).await?;
291 assert_eq!(file.filesize(), Some(len as u64));
292
293 let mut buffer = Vec::new();
294 let mut reader = file.into_content_reader(store, None)?;
295 reader.read_to_end(&mut buffer).await?;
296
297 assert_eq!(buffer, data);
298
299 Ok(()) as TestResult
300 })
301 .unwrap();
302 }
303
304 #[proptest(cases = 256)]
305 fn test_seek_subarray(
306 seed: u64,
307 #[strategy(2..DEFAULT_DEGREE)] degree: usize,
308 #[strategy(0usize..100_000)] len: usize,
309 #[strategy(0usize..100_000)] seek_start: usize,
310 #[strategy(0usize..1_000)] seek_len: usize,
311 #[strategy(arb_chunker())] chunker: ChunkerConfig,
312 ) {
313 let store = &MemoryBlockStore::new();
314 let rng = &mut ChaCha12Rng::seed_from_u64(seed);
315 let mut data = vec![0; len];
316 rng.fill_bytes(&mut data);
317
318 let seek_start = std::cmp::min(seek_start, len);
319 let seek_len = std::cmp::min(seek_start + seek_len, len - seek_start);
320
321 async_std::task::block_on(async {
322 let root_cid = FileBuilder::new()
323 .content_bytes(data.clone())
324 .chunker(chunker)
325 .degree(degree)
326 .build()?
327 .store(store)
328 .await?;
329
330 let file = UnixFsFile::load(&root_cid, store).await?;
331 assert_eq!(file.filesize(), Some(len as u64));
332
333 let mut buffer = vec![0; seek_len];
334 let mut reader = file.into_content_reader(store, None)?;
335 reader.seek(SeekFrom::Start(seek_start as u64)).await?;
336 let read = reader.read_exact(&mut buffer).await?;
337
338 assert_eq!(read, seek_len);
339
340 assert_eq!(buffer, data[seek_start..seek_start + seek_len]);
341
342 Ok(()) as TestResult
343 })
344 .unwrap();
345 }
346}