wnfs_unixfs_file/
unixfs.rs

1use crate::{
2    chunker::DEFAULT_CHUNK_SIZE_LIMIT,
3    codecs::Codec,
4    protobufs,
5    types::{Block, Link, LinkRef, Links, PbLinks},
6};
7use anyhow::{anyhow, bail, ensure, Result};
8use bytes::Bytes;
9use futures::FutureExt;
10use libipld::Cid;
11use prost::Message;
12use std::{
13    collections::VecDeque,
14    fmt::Debug,
15    pin::Pin,
16    task::{Context, Poll},
17};
18use tokio::io::{AsyncRead, AsyncSeek};
19use wnfs_common::{
20    utils::{boxed_fut, BoxFuture},
21    BlockStore, LoadIpld, Storable, StoreIpld,
22};
23
24#[derive(
25    Debug, Clone, Copy, PartialEq, Eq, num_enum::IntoPrimitive, num_enum::TryFromPrimitive,
26)]
27#[repr(i32)]
28pub enum DataType {
29    Raw = 0,
30    Directory = 1,
31    File = 2,
32    Metadata = 3,
33    Symlink = 4,
34    HamtShard = 5,
35}
36
37#[derive(Debug, PartialEq, Clone)]
38pub enum UnixFsFile {
39    Raw(Bytes),
40    Node(Node),
41}
42
43#[derive(Debug, PartialEq, Clone)]
44pub struct Node {
45    pub(super) outer: protobufs::PbNode,
46    pub(super) inner: protobufs::Data,
47}
48
49impl Node {
50    fn encode(&self) -> Result<Bytes> {
51        let bytes = self.outer.encode_to_vec();
52        Ok(bytes.into())
53    }
54
55    pub fn typ(&self) -> DataType {
56        self.inner.r#type.try_into().expect("invalid data type")
57    }
58
59    pub fn data(&self) -> Option<Bytes> {
60        self.inner.data.clone()
61    }
62
63    pub fn filesize(&self) -> Option<u64> {
64        self.inner.filesize
65    }
66
67    pub fn blocksizes(&self) -> &[u64] {
68        &self.inner.blocksizes
69    }
70
71    pub fn size(&self) -> Option<usize> {
72        if self.outer.links.is_empty() {
73            return Some(
74                self.inner
75                    .data
76                    .as_ref()
77                    .map(|d| d.len())
78                    .unwrap_or_default(),
79            );
80        }
81
82        None
83    }
84
85    pub fn links(&self) -> Links {
86        Links::Node(PbLinks::new(&self.outer))
87    }
88}
89
90impl UnixFsFile {
91    pub fn empty() -> Self {
92        UnixFsFile::Raw(Bytes::new())
93    }
94
95    pub async fn load(cid: &Cid, store: &impl BlockStore) -> Result<Self> {
96        let block = store.get_block(cid).await?;
97        Self::decode(cid, block)
98    }
99
100    pub fn decode(cid: &Cid, buf: Bytes) -> Result<Self> {
101        match cid.codec() {
102            c if c == Codec::Raw as u64 => Ok(UnixFsFile::Raw(buf)),
103            _ => {
104                let outer = protobufs::PbNode::decode(buf)?;
105                let inner_data = outer
106                    .data
107                    .as_ref()
108                    .cloned()
109                    .ok_or_else(|| anyhow!("missing data"))?;
110                let inner = protobufs::Data::decode(inner_data)?;
111                let typ: DataType = inner.r#type.try_into()?;
112                let node = Node { outer, inner };
113
114                // ensure correct unixfs type
115                match typ {
116                    DataType::File => Ok(UnixFsFile::Node(node)),
117                    _ => bail!("unixfs data type unsupported: {typ:?}"),
118                }
119            }
120        }
121    }
122
123    pub fn encode(&self) -> Result<Block> {
124        let res = match self {
125            UnixFsFile::Raw(data) => {
126                let out = data.clone();
127                let links = vec![];
128                Block::new(Codec::Raw, out, links)
129            }
130            UnixFsFile::Node(node) => {
131                let out = node.encode()?;
132                let links = node
133                    .links()
134                    .map(|x| Ok(x?.cid))
135                    .collect::<Result<Vec<_>>>()?;
136                Block::new(Codec::DagPb, out, links)
137            }
138        };
139
140        ensure!(
141            res.data().len() <= DEFAULT_CHUNK_SIZE_LIMIT,
142            "node is too large: {} bytes",
143            res.data().len()
144        );
145
146        Ok(res)
147    }
148
149    pub const fn typ(&self) -> Option<DataType> {
150        match self {
151            UnixFsFile::Raw(_) => None,
152            UnixFsFile::Node(_) => Some(DataType::File),
153        }
154    }
155
156    /// Returns the size in bytes of the underlying data.
157    /// Available only for `Raw` and `File` which are a single block with no links.
158    pub fn size(&self) -> Option<usize> {
159        match self {
160            UnixFsFile::Raw(data) => Some(data.len()),
161            UnixFsFile::Node(node) => node.size(),
162        }
163    }
164
165    /// Returns the filesize in bytes.
166    /// Should only be set for `Raw` and `File`.
167    pub fn filesize(&self) -> Option<u64> {
168        match self {
169            UnixFsFile::Raw(data) => Some(data.len() as u64),
170            UnixFsFile::Node(node) => node.filesize(),
171        }
172    }
173
174    /// Returns the blocksizes of the links
175    /// Should only be set for File
176    pub fn blocksizes(&self) -> &[u64] {
177        match self {
178            UnixFsFile::Raw(_) => &[],
179            UnixFsFile::Node(node) => node.blocksizes(),
180        }
181    }
182
183    pub fn links(&self) -> Links<'_> {
184        match self {
185            UnixFsFile::Raw(_) => Links::Leaf,
186            UnixFsFile::Node(node) => Links::Node(PbLinks::new(&node.outer)),
187        }
188    }
189
190    pub fn links_owned(&self) -> Result<VecDeque<Link>> {
191        self.links().map(|l| l.map(|l| l.to_owned())).collect()
192    }
193
194    pub async fn get_link_by_name<S: AsRef<str>>(
195        &self,
196        link_name: S,
197    ) -> Result<Option<LinkRef<'_>>> {
198        let link_name = link_name.as_ref();
199        self.links()
200            .find(|l| match l {
201                Ok(l) => l.name == Some(link_name),
202                _ => false,
203            })
204            .transpose()
205    }
206
207    pub fn into_content_reader<B: BlockStore>(
208        self,
209        store: &B,
210        pos_max: Option<usize>,
211    ) -> Result<UnixFsFileReader<'_, B>> {
212        let current_links = vec![self.links_owned()?];
213
214        Ok(UnixFsFileReader {
215            root_node: self,
216            pos: 0,
217            pos_max,
218            current_node: CurrentNodeState::Outer,
219            current_links,
220            store,
221        })
222    }
223}
224
225impl Storable for UnixFsFile {
226    type Serializable = UnixFsFile;
227
228    async fn to_serializable(&self, _store: &impl BlockStore) -> Result<Self::Serializable> {
229        Ok(self.clone())
230    }
231
232    async fn from_serializable(
233        _cid: Option<&Cid>,
234        serializable: Self::Serializable,
235    ) -> Result<Self> {
236        Ok(serializable)
237    }
238}
239
240impl StoreIpld for UnixFsFile {
241    fn encode_ipld(&self) -> Result<(Bytes, u64)> {
242        let (codec, bytes, _) = self.encode()?.into_parts();
243        Ok((bytes, codec.into()))
244    }
245}
246
247impl LoadIpld for UnixFsFile {
248    fn decode_ipld(cid: &Cid, bytes: Bytes) -> Result<Self> {
249        UnixFsFile::decode(cid, bytes)
250    }
251}
252
253#[derive(Debug)]
254pub struct UnixFsFileReader<'a, B: BlockStore> {
255    root_node: UnixFsFile,
256    /// Absolute position in bytes
257    pos: usize,
258    /// Absolute max position in bytes, only used for clipping responses
259    pos_max: Option<usize>,
260    /// Current node being operated on, only used for nested nodes (not the root).
261    current_node: CurrentNodeState<'a>,
262    /// Stack of links left to traverse.
263    current_links: Vec<VecDeque<Link>>,
264    store: &'a B,
265}
266
267impl<'a, B: BlockStore> UnixFsFileReader<'a, B> {
268    /// Returns the size in bytes, if known in advance.
269    pub fn size(&self) -> Option<u64> {
270        self.root_node.filesize()
271    }
272}
273
274impl<'a, B: BlockStore + 'a> AsyncRead for UnixFsFileReader<'a, B> {
275    fn poll_read(
276        mut self: Pin<&mut Self>,
277        cx: &mut Context<'_>,
278        buf: &mut tokio::io::ReadBuf<'_>,
279    ) -> Poll<std::io::Result<()>> {
280        let UnixFsFileReader {
281            root_node,
282            pos,
283            pos_max,
284            current_node,
285            current_links,
286            store,
287        } = &mut *self;
288
289        // let pos_old = *pos; Unused, see bytes_read below
290        match root_node {
291            UnixFsFile::Raw(data) => {
292                read_data_to_buf(pos, *pos_max, &data[*pos..], buf);
293                Poll::Ready(Ok(()))
294            }
295            UnixFsFile::Node(node) => poll_read_file_at(
296                cx,
297                node,
298                *store,
299                pos,
300                *pos_max,
301                buf,
302                current_links,
303                current_node,
304            ),
305        }
306        // let bytes_read = *pos - pos_old; // Unused, used to be used for metrics
307        // poll_res
308    }
309}
310
311impl<'a, B: BlockStore + 'a> AsyncSeek for UnixFsFileReader<'a, B> {
312    fn start_seek(mut self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> {
313        let UnixFsFileReader {
314            root_node,
315            pos,
316            current_node,
317            current_links,
318            ..
319        } = &mut *self;
320        let data_len = root_node.size();
321        *current_node = CurrentNodeState::Outer;
322        *current_links = vec![root_node.links_owned().unwrap()];
323        match position {
324            std::io::SeekFrom::Start(offset) => {
325                let mut i = offset as usize;
326                if let Some(data_len) = data_len {
327                    if data_len == 0 {
328                        *pos = 0;
329                        return Ok(());
330                    }
331                    i = std::cmp::min(i, data_len - 1);
332                }
333                *pos = i;
334            }
335            std::io::SeekFrom::End(offset) => {
336                if let Some(data_len) = data_len {
337                    if data_len == 0 {
338                        *pos = 0;
339                        return Ok(());
340                    }
341                    let mut i = (data_len as i64 + offset) % data_len as i64;
342                    if i < 0 {
343                        i += data_len as i64;
344                    }
345                    *pos = i as usize;
346                } else {
347                    return Err(std::io::Error::new(
348                        std::io::ErrorKind::InvalidInput,
349                        "cannot seek from end of unknown length",
350                    ));
351                }
352            }
353            std::io::SeekFrom::Current(offset) => {
354                let mut i = *pos as i64 + offset;
355                i = std::cmp::max(0, i);
356
357                if let Some(data_len) = data_len {
358                    if data_len == 0 {
359                        *pos = 0;
360                        return Ok(());
361                    }
362                    i = std::cmp::min(i, data_len as i64 - 1);
363                }
364                *pos = i as usize;
365            }
366        }
367        Ok(())
368    }
369
370    fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
371        Poll::Ready(Ok(self.pos as u64))
372    }
373}
374
375pub fn read_data_to_buf(
376    pos: &mut usize,
377    pos_max: Option<usize>,
378    data: &[u8],
379    buf: &mut tokio::io::ReadBuf<'_>,
380) -> usize {
381    let data_to_read = pos_max.map(|pos_max| pos_max - *pos).unwrap_or(data.len());
382    let amt = std::cmp::min(std::cmp::min(data_to_read, buf.remaining()), data.len());
383    buf.put_slice(&data[..amt]);
384    *pos += amt;
385    amt
386}
387
388pub fn find_block(node: &UnixFsFile, pos: u64, node_offset: u64) -> (u64, Option<usize>) {
389    let pivots = node
390        .blocksizes()
391        .iter()
392        .scan(node_offset, |state, &x| {
393            *state += x;
394            Some(*state)
395        })
396        .collect::<Vec<_>>();
397    let block_index = match pivots.binary_search(&pos) {
398        Ok(b) => b + 1,
399        Err(b) => b,
400    };
401    if block_index < pivots.len() {
402        let next_node_offset = if block_index > 0 {
403            pivots[block_index - 1]
404        } else {
405            node_offset
406        };
407        (next_node_offset, Some(block_index))
408    } else {
409        (pivots[pivots.len() - 1], None)
410    }
411}
412
413#[allow(clippy::large_enum_variant)]
414pub enum CurrentNodeState<'a> {
415    // Initial state
416    Outer,
417    // Need to load next node from the list
418    NextNodeRequested {
419        next_node_offset: usize,
420    },
421    // Node has been loaded and ready to be processed
422    Loaded {
423        node_offset: usize,
424        node_pos: usize,
425        node: UnixFsFile,
426    },
427    // Ongoing loading of the node
428    Loading {
429        node_offset: usize,
430        fut: BoxFuture<'a, Result<UnixFsFile>>,
431    },
432}
433
434impl<'a> Debug for CurrentNodeState<'a> {
435    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
436        match self {
437            CurrentNodeState::Outer => write!(f, "CurrentNodeState::Outer"),
438            CurrentNodeState::NextNodeRequested { next_node_offset } => {
439                write!(f, "CurrentNodeState::None ({next_node_offset})")
440            }
441            CurrentNodeState::Loaded {
442                node_offset,
443                node_pos,
444                node,
445            } => {
446                write!(
447                    f,
448                    "CurrentNodeState::Loaded({node_offset:?}, {node_pos:?}, {node:?})"
449                )
450            }
451            CurrentNodeState::Loading { .. } => write!(f, "CurrentNodeState::Loading(Fut)"),
452        }
453    }
454}
455
456fn load_next_node<'a>(
457    next_node_offset: usize,
458    current_node: &mut CurrentNodeState<'a>,
459    current_links: &mut Vec<VecDeque<Link>>,
460    store: &'a impl BlockStore,
461) -> bool {
462    let links = loop {
463        if let Some(last_mut) = current_links.last_mut() {
464            if last_mut.is_empty() {
465                // ignore empty links
466                current_links.pop();
467            } else {
468                // found non empty links
469                break last_mut;
470            }
471        } else {
472            // no links left we are done
473            return false;
474        }
475    };
476
477    let link = links.pop_front().unwrap();
478
479    let fut = boxed_fut(async move {
480        let block = store.get_block(&link.cid).await?;
481        let node = UnixFsFile::decode(&link.cid, block)?;
482
483        Ok(node)
484    });
485    *current_node = CurrentNodeState::Loading {
486        node_offset: next_node_offset,
487        fut,
488    };
489    true
490}
491
492#[allow(clippy::too_many_arguments)]
493fn poll_read_file_at<'a>(
494    cx: &mut Context<'_>,
495    root_node: &Node,
496    store: &'a impl BlockStore,
497    pos: &mut usize,
498    pos_max: Option<usize>,
499    buf: &mut tokio::io::ReadBuf<'_>,
500    current_links: &mut Vec<VecDeque<Link>>,
501    current_node: &mut CurrentNodeState<'a>,
502) -> Poll<std::io::Result<()>> {
503    loop {
504        if let Some(pos_max) = pos_max {
505            if pos_max <= *pos {
506                return Poll::Ready(Ok(()));
507            }
508        }
509        match current_node {
510            CurrentNodeState::Outer => {
511                // check for links
512                if root_node.outer.links.is_empty() {
513                    // simplest case just one file
514                    let data = root_node.inner.data.as_deref().unwrap_or(&[][..]);
515                    read_data_to_buf(pos, pos_max, &data[*pos..], buf);
516                    return Poll::Ready(Ok(()));
517                }
518
519                // read root local data
520                if let Some(ref data) = root_node.inner.data {
521                    if *pos < data.len() {
522                        read_data_to_buf(pos, pos_max, &data[*pos..], buf);
523                        return Poll::Ready(Ok(()));
524                    }
525                }
526                *current_node = CurrentNodeState::NextNodeRequested {
527                    next_node_offset: 0,
528                };
529            }
530            CurrentNodeState::NextNodeRequested { next_node_offset } => {
531                let loaded_next_node =
532                    load_next_node(*next_node_offset, current_node, current_links, store);
533                if !loaded_next_node {
534                    return Poll::Ready(Ok(()));
535                }
536            }
537            CurrentNodeState::Loading { node_offset, fut } => {
538                match fut.poll_unpin(cx) {
539                    Poll::Pending => {
540                        return Poll::Pending;
541                    }
542                    Poll::Ready(Ok(node)) => {
543                        match node.links_owned() {
544                            Ok(links) => {
545                                if !links.is_empty() {
546                                    let (next_node_offset, block_index) =
547                                        find_block(&node, *pos as u64, *node_offset as u64);
548                                    if let Some(block_index) = block_index {
549                                        let new_links =
550                                            links.into_iter().skip(block_index).collect();
551                                        current_links.push(new_links);
552                                    }
553                                    *current_node = CurrentNodeState::NextNodeRequested {
554                                        next_node_offset: next_node_offset as usize,
555                                    }
556                                } else {
557                                    *current_node = CurrentNodeState::Loaded {
558                                        node_offset: *node_offset,
559                                        node_pos: *pos - *node_offset,
560                                        node,
561                                    }
562                                }
563                            }
564                            Err(e) => {
565                                return Poll::Ready(Err(std::io::Error::new(
566                                    std::io::ErrorKind::InvalidData,
567                                    e.to_string(),
568                                )));
569                            }
570                        }
571                        // TODO: do one read
572                    }
573                    Poll::Ready(Err(e)) => {
574                        *current_node = CurrentNodeState::NextNodeRequested {
575                            next_node_offset: *node_offset,
576                        };
577                        return Poll::Ready(Err(std::io::Error::new(
578                            std::io::ErrorKind::InvalidData,
579                            e.to_string(),
580                        )));
581                    }
582                }
583            }
584            CurrentNodeState::Loaded {
585                ref node_offset,
586                ref mut node_pos,
587                node: ref mut current_node_inner,
588            } => match current_node_inner {
589                UnixFsFile::Raw(data) => {
590                    if *node_offset + data.len() <= *pos {
591                        *current_node = CurrentNodeState::NextNodeRequested {
592                            next_node_offset: node_offset + data.len(),
593                        };
594                        continue;
595                    }
596                    let bytes_read = read_data_to_buf(pos, pos_max, &data[*node_pos..], buf);
597                    *node_pos += bytes_read;
598                    return Poll::Ready(Ok(()));
599                }
600                UnixFsFile::Node(node) => {
601                    if let Some(ref data) = node.inner.data {
602                        if node_offset + data.len() <= *pos {
603                            *current_node = CurrentNodeState::NextNodeRequested {
604                                next_node_offset: node_offset + data.len(),
605                            };
606                            continue;
607                        }
608                        let bytes_read = read_data_to_buf(pos, pos_max, &data[*node_pos..], buf);
609                        *node_pos += bytes_read;
610                        return Poll::Ready(Ok(()));
611                    }
612                    *current_node = CurrentNodeState::NextNodeRequested {
613                        next_node_offset: *node_offset,
614                    };
615                }
616            },
617        }
618    }
619}