1use crate::{
2 chunker::DEFAULT_CHUNK_SIZE_LIMIT,
3 codecs::Codec,
4 protobufs,
5 types::{Block, Link, LinkRef, Links, PbLinks},
6};
7use anyhow::{Result, anyhow, bail, ensure};
8use bytes::Bytes;
9use futures::FutureExt;
10use prost::Message;
11use std::{
12 collections::VecDeque,
13 fmt::Debug,
14 pin::Pin,
15 task::{Context, Poll},
16};
17use tokio::io::{AsyncRead, AsyncSeek};
18use wnfs_common::{
19 BlockStore, Cid, LoadIpld, Storable, StoreIpld,
20 utils::{BoxFuture, boxed_fut},
21};
22
23#[derive(
24 Debug, Clone, Copy, PartialEq, Eq, num_enum::IntoPrimitive, num_enum::TryFromPrimitive,
25)]
26#[repr(i32)]
27pub enum DataType {
28 Raw = 0,
29 Directory = 1,
30 File = 2,
31 Metadata = 3,
32 Symlink = 4,
33 HamtShard = 5,
34}
35
36#[derive(Debug, PartialEq, Clone)]
37pub enum UnixFsFile {
38 Raw(Bytes),
39 Node(Node),
40}
41
42#[derive(Debug, PartialEq, Clone)]
43pub struct Node {
44 pub(super) outer: protobufs::PbNode,
45 pub(super) inner: protobufs::Data,
46}
47
48impl Node {
49 fn encode(&self) -> Result<Bytes> {
50 let bytes = self.outer.encode_to_vec();
51 Ok(bytes.into())
52 }
53
54 pub fn typ(&self) -> DataType {
55 self.inner.r#type.try_into().expect("invalid data type")
56 }
57
58 pub fn data(&self) -> Option<Bytes> {
59 self.inner.data.clone()
60 }
61
62 pub fn filesize(&self) -> Option<u64> {
63 self.inner.filesize
64 }
65
66 pub fn blocksizes(&self) -> &[u64] {
67 &self.inner.blocksizes
68 }
69
70 pub fn size(&self) -> Option<usize> {
71 if self.outer.links.is_empty() {
72 return Some(
73 self.inner
74 .data
75 .as_ref()
76 .map(|d| d.len())
77 .unwrap_or_default(),
78 );
79 }
80
81 None
82 }
83
84 pub fn links(&self) -> Links<'_> {
85 Links::Node(PbLinks::new(&self.outer))
86 }
87}
88
89impl UnixFsFile {
90 pub fn empty() -> Self {
91 UnixFsFile::Raw(Bytes::new())
92 }
93
94 pub async fn load(cid: &Cid, store: &impl BlockStore) -> Result<Self> {
95 let block = store.get_block(cid).await?;
96 Self::decode(cid, block)
97 }
98
99 pub fn decode(cid: &Cid, buf: Bytes) -> Result<Self> {
100 match cid.codec() {
101 c if c == Codec::Raw as u64 => Ok(UnixFsFile::Raw(buf)),
102 _ => {
103 let outer = protobufs::PbNode::decode(buf)?;
104 let inner_data = outer
105 .data
106 .as_ref()
107 .cloned()
108 .ok_or_else(|| anyhow!("missing data"))?;
109 let inner = protobufs::Data::decode(inner_data)?;
110 let typ: DataType = inner.r#type.try_into()?;
111 let node = Node { outer, inner };
112
113 match typ {
115 DataType::File => Ok(UnixFsFile::Node(node)),
116 _ => bail!("unixfs data type unsupported: {typ:?}"),
117 }
118 }
119 }
120 }
121
122 pub fn encode(&self) -> Result<Block> {
123 let res = match self {
124 UnixFsFile::Raw(data) => {
125 let out = data.clone();
126 let links = vec![];
127 Block::new(Codec::Raw, out, links)
128 }
129 UnixFsFile::Node(node) => {
130 let out = node.encode()?;
131 let links = node
132 .links()
133 .map(|x| Ok(x?.cid))
134 .collect::<Result<Vec<_>>>()?;
135 Block::new(Codec::DagPb, out, links)
136 }
137 };
138
139 ensure!(
140 res.data().len() <= DEFAULT_CHUNK_SIZE_LIMIT,
141 "node is too large: {} bytes",
142 res.data().len()
143 );
144
145 Ok(res)
146 }
147
148 pub const fn typ(&self) -> Option<DataType> {
149 match self {
150 UnixFsFile::Raw(_) => None,
151 UnixFsFile::Node(_) => Some(DataType::File),
152 }
153 }
154
155 pub fn size(&self) -> Option<usize> {
158 match self {
159 UnixFsFile::Raw(data) => Some(data.len()),
160 UnixFsFile::Node(node) => node.size(),
161 }
162 }
163
164 pub fn filesize(&self) -> Option<u64> {
167 match self {
168 UnixFsFile::Raw(data) => Some(data.len() as u64),
169 UnixFsFile::Node(node) => node.filesize(),
170 }
171 }
172
173 pub fn blocksizes(&self) -> &[u64] {
176 match self {
177 UnixFsFile::Raw(_) => &[],
178 UnixFsFile::Node(node) => node.blocksizes(),
179 }
180 }
181
182 pub fn links(&self) -> Links<'_> {
183 match self {
184 UnixFsFile::Raw(_) => Links::Leaf,
185 UnixFsFile::Node(node) => Links::Node(PbLinks::new(&node.outer)),
186 }
187 }
188
189 pub fn links_owned(&self) -> Result<VecDeque<Link>> {
190 self.links().map(|l| l.map(|l| l.to_owned())).collect()
191 }
192
193 pub async fn get_link_by_name<S: AsRef<str>>(
194 &self,
195 link_name: S,
196 ) -> Result<Option<LinkRef<'_>>> {
197 let link_name = link_name.as_ref();
198 self.links()
199 .find(|l| match l {
200 Ok(l) => l.name == Some(link_name),
201 _ => false,
202 })
203 .transpose()
204 }
205
206 pub fn into_content_reader<B: BlockStore>(
207 self,
208 store: &B,
209 pos_max: Option<usize>,
210 ) -> Result<UnixFsFileReader<'_, B>> {
211 let current_links = vec![self.links_owned()?];
212
213 Ok(UnixFsFileReader {
214 root_node: self,
215 pos: 0,
216 pos_max,
217 current_node: CurrentNodeState::Outer,
218 current_links,
219 store,
220 })
221 }
222}
223
224impl Storable for UnixFsFile {
225 type Serializable = UnixFsFile;
226
227 async fn to_serializable(&self, _store: &impl BlockStore) -> Result<Self::Serializable> {
228 Ok(self.clone())
229 }
230
231 async fn from_serializable(
232 _cid: Option<&Cid>,
233 serializable: Self::Serializable,
234 ) -> Result<Self> {
235 Ok(serializable)
236 }
237}
238
239impl StoreIpld for UnixFsFile {
240 fn encode_ipld(&self) -> Result<(Bytes, u64)> {
241 let (codec, bytes, _) = self.encode()?.into_parts();
242 Ok((bytes, codec.into()))
243 }
244}
245
246impl LoadIpld for UnixFsFile {
247 fn decode_ipld(cid: &Cid, bytes: Bytes) -> Result<Self> {
248 UnixFsFile::decode(cid, bytes)
249 }
250}
251
252#[derive(Debug)]
253pub struct UnixFsFileReader<'a, B: BlockStore> {
254 root_node: UnixFsFile,
255 pos: usize,
257 pos_max: Option<usize>,
259 current_node: CurrentNodeState<'a>,
261 current_links: Vec<VecDeque<Link>>,
263 store: &'a B,
264}
265
266impl<B: BlockStore> UnixFsFileReader<'_, B> {
267 pub fn size(&self) -> Option<u64> {
269 self.root_node.filesize()
270 }
271}
272
273impl<'a, B: BlockStore + 'a> AsyncRead for UnixFsFileReader<'a, B> {
274 fn poll_read(
275 mut self: Pin<&mut Self>,
276 cx: &mut Context<'_>,
277 buf: &mut tokio::io::ReadBuf<'_>,
278 ) -> Poll<std::io::Result<()>> {
279 let UnixFsFileReader {
280 root_node,
281 pos,
282 pos_max,
283 current_node,
284 current_links,
285 store,
286 } = &mut *self;
287
288 match root_node {
290 UnixFsFile::Raw(data) => {
291 read_data_to_buf(pos, *pos_max, &data[*pos..], buf);
292 Poll::Ready(Ok(()))
293 }
294 UnixFsFile::Node(node) => poll_read_file_at(
295 cx,
296 node,
297 *store,
298 pos,
299 *pos_max,
300 buf,
301 current_links,
302 current_node,
303 ),
304 }
305 }
308}
309
310impl<'a, B: BlockStore + 'a> AsyncSeek for UnixFsFileReader<'a, B> {
311 fn start_seek(mut self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> {
312 let UnixFsFileReader {
313 root_node,
314 pos,
315 current_node,
316 current_links,
317 ..
318 } = &mut *self;
319 let data_len = root_node.size();
320 *current_node = CurrentNodeState::Outer;
321 *current_links = vec![root_node.links_owned().unwrap()];
322 match position {
323 std::io::SeekFrom::Start(offset) => {
324 let mut i = offset as usize;
325 if let Some(data_len) = data_len {
326 if data_len == 0 {
327 *pos = 0;
328 return Ok(());
329 }
330 i = std::cmp::min(i, data_len - 1);
331 }
332 *pos = i;
333 }
334 std::io::SeekFrom::End(offset) => {
335 if let Some(data_len) = data_len {
336 if data_len == 0 {
337 *pos = 0;
338 return Ok(());
339 }
340 let mut i = (data_len as i64 + offset) % data_len as i64;
341 if i < 0 {
342 i += data_len as i64;
343 }
344 *pos = i as usize;
345 } else {
346 return Err(std::io::Error::new(
347 std::io::ErrorKind::InvalidInput,
348 "cannot seek from end of unknown length",
349 ));
350 }
351 }
352 std::io::SeekFrom::Current(offset) => {
353 let mut i = *pos as i64 + offset;
354 i = std::cmp::max(0, i);
355
356 if let Some(data_len) = data_len {
357 if data_len == 0 {
358 *pos = 0;
359 return Ok(());
360 }
361 i = std::cmp::min(i, data_len as i64 - 1);
362 }
363 *pos = i as usize;
364 }
365 }
366 Ok(())
367 }
368
369 fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
370 Poll::Ready(Ok(self.pos as u64))
371 }
372}
373
374pub fn read_data_to_buf(
375 pos: &mut usize,
376 pos_max: Option<usize>,
377 data: &[u8],
378 buf: &mut tokio::io::ReadBuf<'_>,
379) -> usize {
380 let data_to_read = pos_max.map(|pos_max| pos_max - *pos).unwrap_or(data.len());
381 let amt = std::cmp::min(std::cmp::min(data_to_read, buf.remaining()), data.len());
382 buf.put_slice(&data[..amt]);
383 *pos += amt;
384 amt
385}
386
387pub fn find_block(node: &UnixFsFile, pos: u64, node_offset: u64) -> (u64, Option<usize>) {
388 let pivots = node
389 .blocksizes()
390 .iter()
391 .scan(node_offset, |state, &x| {
392 *state += x;
393 Some(*state)
394 })
395 .collect::<Vec<_>>();
396 let block_index = match pivots.binary_search(&pos) {
397 Ok(b) => b + 1,
398 Err(b) => b,
399 };
400 if block_index < pivots.len() {
401 let next_node_offset = if block_index > 0 {
402 pivots[block_index - 1]
403 } else {
404 node_offset
405 };
406 (next_node_offset, Some(block_index))
407 } else {
408 (pivots[pivots.len() - 1], None)
409 }
410}
411
412#[allow(clippy::large_enum_variant)]
413pub enum CurrentNodeState<'a> {
414 Outer,
416 NextNodeRequested {
418 next_node_offset: usize,
419 },
420 Loaded {
422 node_offset: usize,
423 node_pos: usize,
424 node: UnixFsFile,
425 },
426 Loading {
428 node_offset: usize,
429 fut: BoxFuture<'a, Result<UnixFsFile>>,
430 },
431}
432
433impl Debug for CurrentNodeState<'_> {
434 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
435 match self {
436 CurrentNodeState::Outer => write!(f, "CurrentNodeState::Outer"),
437 CurrentNodeState::NextNodeRequested { next_node_offset } => {
438 write!(f, "CurrentNodeState::None ({next_node_offset})")
439 }
440 CurrentNodeState::Loaded {
441 node_offset,
442 node_pos,
443 node,
444 } => {
445 write!(
446 f,
447 "CurrentNodeState::Loaded({node_offset:?}, {node_pos:?}, {node:?})"
448 )
449 }
450 CurrentNodeState::Loading { .. } => write!(f, "CurrentNodeState::Loading(Fut)"),
451 }
452 }
453}
454
455fn load_next_node<'a>(
456 next_node_offset: usize,
457 current_node: &mut CurrentNodeState<'a>,
458 current_links: &mut Vec<VecDeque<Link>>,
459 store: &'a impl BlockStore,
460) -> bool {
461 let links = loop {
462 if let Some(last_mut) = current_links.last_mut() {
463 if last_mut.is_empty() {
464 current_links.pop();
466 } else {
467 break last_mut;
469 }
470 } else {
471 return false;
473 }
474 };
475
476 let link = links.pop_front().unwrap();
477
478 let fut = boxed_fut(async move {
479 let block = store.get_block(&link.cid).await?;
480 let node = UnixFsFile::decode(&link.cid, block)?;
481
482 Ok(node)
483 });
484 *current_node = CurrentNodeState::Loading {
485 node_offset: next_node_offset,
486 fut,
487 };
488 true
489}
490
491#[allow(clippy::too_many_arguments)]
492fn poll_read_file_at<'a>(
493 cx: &mut Context<'_>,
494 root_node: &Node,
495 store: &'a impl BlockStore,
496 pos: &mut usize,
497 pos_max: Option<usize>,
498 buf: &mut tokio::io::ReadBuf<'_>,
499 current_links: &mut Vec<VecDeque<Link>>,
500 current_node: &mut CurrentNodeState<'a>,
501) -> Poll<std::io::Result<()>> {
502 loop {
503 if let Some(pos_max) = pos_max {
504 if pos_max <= *pos {
505 return Poll::Ready(Ok(()));
506 }
507 }
508 match current_node {
509 CurrentNodeState::Outer => {
510 if root_node.outer.links.is_empty() {
512 let data = root_node.inner.data.as_deref().unwrap_or(&[][..]);
514 read_data_to_buf(pos, pos_max, &data[*pos..], buf);
515 return Poll::Ready(Ok(()));
516 }
517
518 if let Some(ref data) = root_node.inner.data {
520 if *pos < data.len() {
521 read_data_to_buf(pos, pos_max, &data[*pos..], buf);
522 return Poll::Ready(Ok(()));
523 }
524 }
525 *current_node = CurrentNodeState::NextNodeRequested {
526 next_node_offset: 0,
527 };
528 }
529 CurrentNodeState::NextNodeRequested { next_node_offset } => {
530 let loaded_next_node =
531 load_next_node(*next_node_offset, current_node, current_links, store);
532 if !loaded_next_node {
533 return Poll::Ready(Ok(()));
534 }
535 }
536 CurrentNodeState::Loading { node_offset, fut } => {
537 match fut.poll_unpin(cx) {
538 Poll::Pending => {
539 return Poll::Pending;
540 }
541 Poll::Ready(Ok(node)) => {
542 match node.links_owned() {
543 Ok(links) => {
544 if !links.is_empty() {
545 let (next_node_offset, block_index) =
546 find_block(&node, *pos as u64, *node_offset as u64);
547 if let Some(block_index) = block_index {
548 let new_links =
549 links.into_iter().skip(block_index).collect();
550 current_links.push(new_links);
551 }
552 *current_node = CurrentNodeState::NextNodeRequested {
553 next_node_offset: next_node_offset as usize,
554 }
555 } else {
556 *current_node = CurrentNodeState::Loaded {
557 node_offset: *node_offset,
558 node_pos: *pos - *node_offset,
559 node,
560 }
561 }
562 }
563 Err(e) => {
564 return Poll::Ready(Err(std::io::Error::new(
565 std::io::ErrorKind::InvalidData,
566 e.to_string(),
567 )));
568 }
569 }
570 }
572 Poll::Ready(Err(e)) => {
573 *current_node = CurrentNodeState::NextNodeRequested {
574 next_node_offset: *node_offset,
575 };
576 return Poll::Ready(Err(std::io::Error::new(
577 std::io::ErrorKind::InvalidData,
578 e.to_string(),
579 )));
580 }
581 }
582 }
583 &mut CurrentNodeState::Loaded {
584 ref node_offset,
585 ref mut node_pos,
586 node: ref mut current_node_inner,
587 } => match current_node_inner {
588 UnixFsFile::Raw(data) => {
589 if *node_offset + data.len() <= *pos {
590 *current_node = CurrentNodeState::NextNodeRequested {
591 next_node_offset: node_offset + data.len(),
592 };
593 continue;
594 }
595 let bytes_read = read_data_to_buf(pos, pos_max, &data[*node_pos..], buf);
596 *node_pos += bytes_read;
597 return Poll::Ready(Ok(()));
598 }
599 UnixFsFile::Node(node) => {
600 if let Some(ref data) = node.inner.data {
601 if node_offset + data.len() <= *pos {
602 *current_node = CurrentNodeState::NextNodeRequested {
603 next_node_offset: node_offset + data.len(),
604 };
605 continue;
606 }
607 let bytes_read = read_data_to_buf(pos, pos_max, &data[*node_pos..], buf);
608 *node_pos += bytes_read;
609 return Poll::Ready(Ok(()));
610 }
611 *current_node = CurrentNodeState::NextNodeRequested {
612 next_node_offset: *node_offset,
613 };
614 }
615 },
616 }
617 }
618}