1use crate::{
2 balanced_tree::{TreeBuilder, DEFAULT_DEGREE},
3 chunker::{self, Chunker, ChunkerConfig, DEFAULT_CHUNK_SIZE_LIMIT},
4 protobufs,
5 types::{Block, BoxAsyncRead},
6};
7use anyhow::{anyhow, ensure, Result};
8use bytes::Bytes;
9use futures::{Stream, TryStreamExt};
10use libipld::Cid;
11use prost::Message;
12use std::fmt::Debug;
13use tokio::io::AsyncRead;
14use wnfs_common::{utils::CondSend, BlockStore};
15
16pub struct File<'a> {
18 content: BoxAsyncRead<'a>,
19 tree_builder: TreeBuilder,
20 chunker: Chunker,
21}
22
23impl<'a> Debug for File<'a> {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 f.debug_struct("File")
26 .field(
27 "content",
28 &"Content::Reader(Pin<Box<dyn AsyncRead + Send>>)",
29 )
30 .field("tree_builder", &self.tree_builder)
31 .field("chunker", &self.chunker)
32 .finish()
33 }
34}
35
36impl<'a> File<'a> {
37 pub fn encode(
38 self,
39 store: &'a impl BlockStore,
40 ) -> Result<impl Stream<Item = Result<(Cid, Block)>> + '_> {
41 let chunks = self.chunker.chunks(self.content);
42 Ok(self.tree_builder.stream_tree(chunks, store))
43 }
44
45 pub async fn store(self, store: &impl BlockStore) -> Result<Cid> {
46 let blocks = self.encode(store)?;
47 tokio::pin!(blocks);
48
49 let mut root_cid = None;
50
51 while let Some((cid, _)) = blocks.try_next().await? {
52 root_cid = Some(cid);
53 }
54
55 root_cid.ok_or_else(|| anyhow!("error encoding file, no blocks produced"))
56 }
57}
58
59pub struct FileBuilder<'a> {
61 reader: Option<BoxAsyncRead<'a>>,
62 chunker: Chunker,
63 degree: usize,
64}
65
66impl<'a> Default for FileBuilder<'a> {
67 fn default() -> Self {
68 Self {
69 reader: None,
70 chunker: Chunker::Fixed(chunker::Fixed::default()),
71 degree: DEFAULT_DEGREE,
72 }
73 }
74}
75
76impl<'a> Debug for FileBuilder<'a> {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 let reader = if self.reader.is_some() {
79 "Some(Box<AsyncRead>)"
80 } else {
81 "None"
82 };
83 f.debug_struct("FileBuilder")
84 .field("chunker", &self.chunker)
85 .field("degree", &self.degree)
86 .field("reader", &reader)
87 .finish()
88 }
89}
90
91impl<'a> FileBuilder<'a> {
93 pub fn new() -> Self {
94 Default::default()
95 }
96
97 pub fn chunker(mut self, chunker: impl Into<Chunker>) -> Self {
98 self.chunker = chunker.into();
99 self
100 }
101
102 pub fn fixed_chunker(mut self, chunk_size: usize) -> Self {
104 self.chunker = Chunker::Fixed(chunker::Fixed::new(chunk_size));
105 self
106 }
107
108 pub fn rabin_chunker(mut self) -> Self {
110 self.chunker = Chunker::Rabin(Box::default());
111 self
112 }
113
114 pub fn degree(mut self, degree: usize) -> Self {
115 self.degree = degree;
116 self
117 }
118
119 pub fn content_bytes(mut self, content: impl Into<Bytes>) -> Self {
120 let bytes = content.into();
121 self.reader = Some(Box::pin(std::io::Cursor::new(bytes)));
122 self
123 }
124
125 pub fn content_reader(mut self, content: impl AsyncRead + CondSend + 'a) -> Self {
126 self.reader = Some(Box::pin(content));
127 self
128 }
129
130 pub fn build(self) -> Result<File<'a>> {
131 let degree = self.degree;
132 let chunker = self.chunker;
133 let tree_builder = TreeBuilder::balanced_tree_with_degree(degree);
134
135 if let Some(reader) = self.reader {
136 return Ok(File {
137 content: reader,
138 chunker,
139 tree_builder,
140 });
141 }
142
143 anyhow::bail!("must have a reader for the content");
144 }
145}
146
147pub(crate) fn encode_unixfs_pb(
148 inner: &protobufs::Data,
149 links: Vec<protobufs::PbLink>,
150) -> Result<protobufs::PbNode> {
151 let data = inner.encode_to_vec();
152 ensure!(
153 data.len() <= DEFAULT_CHUNK_SIZE_LIMIT,
154 "node is too large: {} bytes",
155 data.len()
156 );
157
158 Ok(protobufs::PbNode {
159 links,
160 data: Some(data.into()),
161 })
162}
163
164#[derive(Debug, Clone, PartialEq, Eq)]
166pub struct Config {
167 pub wrap: bool,
169 pub chunker: Option<ChunkerConfig>,
170}
171
172#[cfg(test)]
173mod tests {
174 use super::*;
175 use crate::chunker::DEFAULT_CHUNKS_SIZE;
176 use futures::TryStreamExt;
177 use wnfs_common::MemoryBlockStore;
178
179 #[tokio::test]
180 async fn test_builder_stream_small() -> Result<()> {
181 let store = &MemoryBlockStore::new();
182 let bar_encoded: Vec<_> = {
184 let bar_reader = std::io::Cursor::new(b"bar");
185 let bar = FileBuilder::new().content_reader(bar_reader).build()?;
186 bar.encode(store)?.try_collect().await?
187 };
188 assert_eq!(bar_encoded.len(), 1);
189
190 Ok(())
192 }
193
194 #[tokio::test]
195 async fn test_builder_stream_large() -> Result<()> {
196 let store = &MemoryBlockStore::new();
197 let bar_encoded: Vec<_> = {
199 let bar_reader = std::io::Cursor::new(vec![1u8; 1024 * 1024]);
200 let bar = FileBuilder::new().content_reader(bar_reader).build()?;
201 bar.encode(store)?.try_collect().await?
202 };
203 assert_eq!(bar_encoded.len(), 5);
204
205 let mut baz_content = Vec::with_capacity(1024 * 1024 * 2);
207 for i in 0..2 {
208 for _ in 0..(1024 * 1024) {
209 baz_content.push(i);
210 }
211 }
212
213 let baz_encoded: Vec<_> = {
214 let baz_reader = std::io::Cursor::new(baz_content);
215 let baz = FileBuilder::new().content_reader(baz_reader).build()?;
216 baz.encode(store)?.try_collect().await?
217 };
218 assert_eq!(baz_encoded.len(), 9);
219
220 Ok(())
222 }
223
224 #[test]
225 fn test_chunk_config_from_str() {
226 assert_eq!(
227 "fixed".parse::<ChunkerConfig>().unwrap(),
228 ChunkerConfig::Fixed(DEFAULT_CHUNKS_SIZE)
229 );
230 assert_eq!(
231 "fixed-123".parse::<ChunkerConfig>().unwrap(),
232 ChunkerConfig::Fixed(123)
233 );
234
235 assert!("fixed-".parse::<ChunkerConfig>().is_err());
236 assert!(format!("fixed-{}", DEFAULT_CHUNK_SIZE_LIMIT + 1)
237 .parse::<ChunkerConfig>()
238 .is_err());
239 assert!("foo-123".parse::<ChunkerConfig>().is_err());
240 assert!("foo".parse::<ChunkerConfig>().is_err());
241
242 assert_eq!(
243 "rabin".parse::<ChunkerConfig>().unwrap(),
244 ChunkerConfig::Rabin
245 );
246 }
247}
248
249#[cfg(test)]
250mod proptests {
251 use super::*;
252 use crate::unixfs::UnixFsFile;
253 use proptest::{option, strategy::Strategy};
254 use rand_chacha::ChaCha12Rng;
255 use rand_core::{RngCore, SeedableRng};
256 use std::io::SeekFrom;
257 use test_strategy::proptest;
258 use testresult::TestResult;
259 use tokio::io::{AsyncReadExt, AsyncSeekExt};
260 use wnfs_common::{MemoryBlockStore, MAX_BLOCK_SIZE};
261
262 fn arb_chunker() -> impl Strategy<Value = ChunkerConfig> {
263 option::of(1_000..MAX_BLOCK_SIZE).prop_map(|opt| match opt {
264 Some(lim) => ChunkerConfig::Fixed(lim),
265 None => ChunkerConfig::Rabin,
266 })
267 }
268
269 #[proptest(cases = 64)]
270 fn test_encode_decode_roundtrip(
271 seed: u64,
272 #[strategy(2..DEFAULT_DEGREE)] degree: usize,
273 #[strategy(0usize..5_000_000)] len: usize,
274 #[strategy(arb_chunker())] chunker: ChunkerConfig,
275 ) {
276 let store = &MemoryBlockStore::new();
277 let rng = &mut ChaCha12Rng::seed_from_u64(seed);
278 let mut data = vec![0; len];
279 rng.fill_bytes(&mut data);
280
281 async_std::task::block_on(async {
282 let root_cid = FileBuilder::new()
283 .content_bytes(data.clone())
284 .chunker(chunker)
285 .degree(degree)
286 .build()?
287 .store(store)
288 .await?;
289
290 let file = UnixFsFile::load(&root_cid, store).await?;
291 assert_eq!(file.filesize(), Some(len as u64));
292
293 let mut buffer = Vec::new();
294 let mut reader = file.into_content_reader(store, None)?;
295 reader.read_to_end(&mut buffer).await?;
296
297 assert_eq!(buffer, data);
298
299 Ok(()) as TestResult
300 })
301 .unwrap();
302 }
303
304 #[proptest(cases = 256)]
305 fn test_seek_subarray(
306 seed: u64,
307 #[strategy(2..DEFAULT_DEGREE)] degree: usize,
308 #[strategy(0usize..100_000)] len: usize,
309 #[strategy(0usize..100_000)] seek_start: usize,
310 #[strategy(0usize..1_000)] seek_len: usize,
311 #[strategy(arb_chunker())] chunker: ChunkerConfig,
312 ) {
313 let store = &MemoryBlockStore::new();
314 let rng = &mut ChaCha12Rng::seed_from_u64(seed);
315 let mut data = vec![0; len];
316 rng.fill_bytes(&mut data);
317
318 let seek_start = std::cmp::min(seek_start, len);
319 let seek_len = std::cmp::min(seek_start + seek_len, len - seek_start);
320
321 async_std::task::block_on(async {
322 let root_cid = FileBuilder::new()
323 .content_bytes(data.clone())
324 .chunker(chunker)
325 .degree(degree)
326 .build()?
327 .store(store)
328 .await?;
329
330 let file = UnixFsFile::load(&root_cid, store).await?;
331 assert_eq!(file.filesize(), Some(len as u64));
332
333 let mut buffer = vec![0; seek_len];
334 let mut reader = file.into_content_reader(store, None)?;
335 reader.seek(SeekFrom::Start(seek_start as u64)).await?;
336 let read = reader.read_exact(&mut buffer).await?;
337
338 assert_eq!(read, seek_len);
339
340 assert_eq!(buffer, data[seek_start..seek_start + seek_len]);
341
342 Ok(()) as TestResult
343 })
344 .unwrap();
345 }
346}