wnfs_unixfs_file/
unixfs.rs

1use crate::{
2    chunker::DEFAULT_CHUNK_SIZE_LIMIT,
3    codecs::Codec,
4    protobufs,
5    types::{Block, Link, LinkRef, Links, PbLinks},
6};
7use anyhow::{Result, anyhow, bail, ensure};
8use bytes::Bytes;
9use futures::FutureExt;
10use prost::Message;
11use std::{
12    collections::VecDeque,
13    fmt::Debug,
14    pin::Pin,
15    task::{Context, Poll},
16};
17use tokio::io::{AsyncRead, AsyncSeek};
18use wnfs_common::{
19    BlockStore, Cid, LoadIpld, Storable, StoreIpld,
20    utils::{BoxFuture, boxed_fut},
21};
22
23#[derive(
24    Debug, Clone, Copy, PartialEq, Eq, num_enum::IntoPrimitive, num_enum::TryFromPrimitive,
25)]
26#[repr(i32)]
27pub enum DataType {
28    Raw = 0,
29    Directory = 1,
30    File = 2,
31    Metadata = 3,
32    Symlink = 4,
33    HamtShard = 5,
34}
35
36#[derive(Debug, PartialEq, Clone)]
37pub enum UnixFsFile {
38    Raw(Bytes),
39    Node(Node),
40}
41
42#[derive(Debug, PartialEq, Clone)]
43pub struct Node {
44    pub(super) outer: protobufs::PbNode,
45    pub(super) inner: protobufs::Data,
46}
47
48impl Node {
49    fn encode(&self) -> Result<Bytes> {
50        let bytes = self.outer.encode_to_vec();
51        Ok(bytes.into())
52    }
53
54    pub fn typ(&self) -> DataType {
55        self.inner.r#type.try_into().expect("invalid data type")
56    }
57
58    pub fn data(&self) -> Option<Bytes> {
59        self.inner.data.clone()
60    }
61
62    pub fn filesize(&self) -> Option<u64> {
63        self.inner.filesize
64    }
65
66    pub fn blocksizes(&self) -> &[u64] {
67        &self.inner.blocksizes
68    }
69
70    pub fn size(&self) -> Option<usize> {
71        if self.outer.links.is_empty() {
72            return Some(
73                self.inner
74                    .data
75                    .as_ref()
76                    .map(|d| d.len())
77                    .unwrap_or_default(),
78            );
79        }
80
81        None
82    }
83
84    pub fn links(&self) -> Links<'_> {
85        Links::Node(PbLinks::new(&self.outer))
86    }
87}
88
89impl UnixFsFile {
90    pub fn empty() -> Self {
91        UnixFsFile::Raw(Bytes::new())
92    }
93
94    pub async fn load(cid: &Cid, store: &impl BlockStore) -> Result<Self> {
95        let block = store.get_block(cid).await?;
96        Self::decode(cid, block)
97    }
98
99    pub fn decode(cid: &Cid, buf: Bytes) -> Result<Self> {
100        match cid.codec() {
101            c if c == Codec::Raw as u64 => Ok(UnixFsFile::Raw(buf)),
102            _ => {
103                let outer = protobufs::PbNode::decode(buf)?;
104                let inner_data = outer
105                    .data
106                    .as_ref()
107                    .cloned()
108                    .ok_or_else(|| anyhow!("missing data"))?;
109                let inner = protobufs::Data::decode(inner_data)?;
110                let typ: DataType = inner.r#type.try_into()?;
111                let node = Node { outer, inner };
112
113                // ensure correct unixfs type
114                match typ {
115                    DataType::File => Ok(UnixFsFile::Node(node)),
116                    _ => bail!("unixfs data type unsupported: {typ:?}"),
117                }
118            }
119        }
120    }
121
122    pub fn encode(&self) -> Result<Block> {
123        let res = match self {
124            UnixFsFile::Raw(data) => {
125                let out = data.clone();
126                let links = vec![];
127                Block::new(Codec::Raw, out, links)
128            }
129            UnixFsFile::Node(node) => {
130                let out = node.encode()?;
131                let links = node
132                    .links()
133                    .map(|x| Ok(x?.cid))
134                    .collect::<Result<Vec<_>>>()?;
135                Block::new(Codec::DagPb, out, links)
136            }
137        };
138
139        ensure!(
140            res.data().len() <= DEFAULT_CHUNK_SIZE_LIMIT,
141            "node is too large: {} bytes",
142            res.data().len()
143        );
144
145        Ok(res)
146    }
147
148    pub const fn typ(&self) -> Option<DataType> {
149        match self {
150            UnixFsFile::Raw(_) => None,
151            UnixFsFile::Node(_) => Some(DataType::File),
152        }
153    }
154
155    /// Returns the size in bytes of the underlying data.
156    /// Available only for `Raw` and `File` which are a single block with no links.
157    pub fn size(&self) -> Option<usize> {
158        match self {
159            UnixFsFile::Raw(data) => Some(data.len()),
160            UnixFsFile::Node(node) => node.size(),
161        }
162    }
163
164    /// Returns the filesize in bytes.
165    /// Should only be set for `Raw` and `File`.
166    pub fn filesize(&self) -> Option<u64> {
167        match self {
168            UnixFsFile::Raw(data) => Some(data.len() as u64),
169            UnixFsFile::Node(node) => node.filesize(),
170        }
171    }
172
173    /// Returns the blocksizes of the links
174    /// Should only be set for File
175    pub fn blocksizes(&self) -> &[u64] {
176        match self {
177            UnixFsFile::Raw(_) => &[],
178            UnixFsFile::Node(node) => node.blocksizes(),
179        }
180    }
181
182    pub fn links(&self) -> Links<'_> {
183        match self {
184            UnixFsFile::Raw(_) => Links::Leaf,
185            UnixFsFile::Node(node) => Links::Node(PbLinks::new(&node.outer)),
186        }
187    }
188
189    pub fn links_owned(&self) -> Result<VecDeque<Link>> {
190        self.links().map(|l| l.map(|l| l.to_owned())).collect()
191    }
192
193    pub async fn get_link_by_name<S: AsRef<str>>(
194        &self,
195        link_name: S,
196    ) -> Result<Option<LinkRef<'_>>> {
197        let link_name = link_name.as_ref();
198        self.links()
199            .find(|l| match l {
200                Ok(l) => l.name == Some(link_name),
201                _ => false,
202            })
203            .transpose()
204    }
205
206    pub fn into_content_reader<B: BlockStore>(
207        self,
208        store: &B,
209        pos_max: Option<usize>,
210    ) -> Result<UnixFsFileReader<'_, B>> {
211        let current_links = vec![self.links_owned()?];
212
213        Ok(UnixFsFileReader {
214            root_node: self,
215            pos: 0,
216            pos_max,
217            current_node: CurrentNodeState::Outer,
218            current_links,
219            store,
220        })
221    }
222}
223
224impl Storable for UnixFsFile {
225    type Serializable = UnixFsFile;
226
227    async fn to_serializable(&self, _store: &impl BlockStore) -> Result<Self::Serializable> {
228        Ok(self.clone())
229    }
230
231    async fn from_serializable(
232        _cid: Option<&Cid>,
233        serializable: Self::Serializable,
234    ) -> Result<Self> {
235        Ok(serializable)
236    }
237}
238
239impl StoreIpld for UnixFsFile {
240    fn encode_ipld(&self) -> Result<(Bytes, u64)> {
241        let (codec, bytes, _) = self.encode()?.into_parts();
242        Ok((bytes, codec.into()))
243    }
244}
245
246impl LoadIpld for UnixFsFile {
247    fn decode_ipld(cid: &Cid, bytes: Bytes) -> Result<Self> {
248        UnixFsFile::decode(cid, bytes)
249    }
250}
251
252#[derive(Debug)]
253pub struct UnixFsFileReader<'a, B: BlockStore> {
254    root_node: UnixFsFile,
255    /// Absolute position in bytes
256    pos: usize,
257    /// Absolute max position in bytes, only used for clipping responses
258    pos_max: Option<usize>,
259    /// Current node being operated on, only used for nested nodes (not the root).
260    current_node: CurrentNodeState<'a>,
261    /// Stack of links left to traverse.
262    current_links: Vec<VecDeque<Link>>,
263    store: &'a B,
264}
265
266impl<B: BlockStore> UnixFsFileReader<'_, B> {
267    /// Returns the size in bytes, if known in advance.
268    pub fn size(&self) -> Option<u64> {
269        self.root_node.filesize()
270    }
271}
272
273impl<'a, B: BlockStore + 'a> AsyncRead for UnixFsFileReader<'a, B> {
274    fn poll_read(
275        mut self: Pin<&mut Self>,
276        cx: &mut Context<'_>,
277        buf: &mut tokio::io::ReadBuf<'_>,
278    ) -> Poll<std::io::Result<()>> {
279        let UnixFsFileReader {
280            root_node,
281            pos,
282            pos_max,
283            current_node,
284            current_links,
285            store,
286        } = &mut *self;
287
288        // let pos_old = *pos; Unused, see bytes_read below
289        match root_node {
290            UnixFsFile::Raw(data) => {
291                read_data_to_buf(pos, *pos_max, &data[*pos..], buf);
292                Poll::Ready(Ok(()))
293            }
294            UnixFsFile::Node(node) => poll_read_file_at(
295                cx,
296                node,
297                *store,
298                pos,
299                *pos_max,
300                buf,
301                current_links,
302                current_node,
303            ),
304        }
305        // let bytes_read = *pos - pos_old; // Unused, used to be used for metrics
306        // poll_res
307    }
308}
309
310impl<'a, B: BlockStore + 'a> AsyncSeek for UnixFsFileReader<'a, B> {
311    fn start_seek(mut self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> {
312        let UnixFsFileReader {
313            root_node,
314            pos,
315            current_node,
316            current_links,
317            ..
318        } = &mut *self;
319        let data_len = root_node.size();
320        *current_node = CurrentNodeState::Outer;
321        *current_links = vec![root_node.links_owned().unwrap()];
322        match position {
323            std::io::SeekFrom::Start(offset) => {
324                let mut i = offset as usize;
325                if let Some(data_len) = data_len {
326                    if data_len == 0 {
327                        *pos = 0;
328                        return Ok(());
329                    }
330                    i = std::cmp::min(i, data_len - 1);
331                }
332                *pos = i;
333            }
334            std::io::SeekFrom::End(offset) => {
335                if let Some(data_len) = data_len {
336                    if data_len == 0 {
337                        *pos = 0;
338                        return Ok(());
339                    }
340                    let mut i = (data_len as i64 + offset) % data_len as i64;
341                    if i < 0 {
342                        i += data_len as i64;
343                    }
344                    *pos = i as usize;
345                } else {
346                    return Err(std::io::Error::new(
347                        std::io::ErrorKind::InvalidInput,
348                        "cannot seek from end of unknown length",
349                    ));
350                }
351            }
352            std::io::SeekFrom::Current(offset) => {
353                let mut i = *pos as i64 + offset;
354                i = std::cmp::max(0, i);
355
356                if let Some(data_len) = data_len {
357                    if data_len == 0 {
358                        *pos = 0;
359                        return Ok(());
360                    }
361                    i = std::cmp::min(i, data_len as i64 - 1);
362                }
363                *pos = i as usize;
364            }
365        }
366        Ok(())
367    }
368
369    fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
370        Poll::Ready(Ok(self.pos as u64))
371    }
372}
373
374pub fn read_data_to_buf(
375    pos: &mut usize,
376    pos_max: Option<usize>,
377    data: &[u8],
378    buf: &mut tokio::io::ReadBuf<'_>,
379) -> usize {
380    let data_to_read = pos_max.map(|pos_max| pos_max - *pos).unwrap_or(data.len());
381    let amt = std::cmp::min(std::cmp::min(data_to_read, buf.remaining()), data.len());
382    buf.put_slice(&data[..amt]);
383    *pos += amt;
384    amt
385}
386
387pub fn find_block(node: &UnixFsFile, pos: u64, node_offset: u64) -> (u64, Option<usize>) {
388    let pivots = node
389        .blocksizes()
390        .iter()
391        .scan(node_offset, |state, &x| {
392            *state += x;
393            Some(*state)
394        })
395        .collect::<Vec<_>>();
396    let block_index = match pivots.binary_search(&pos) {
397        Ok(b) => b + 1,
398        Err(b) => b,
399    };
400    if block_index < pivots.len() {
401        let next_node_offset = if block_index > 0 {
402            pivots[block_index - 1]
403        } else {
404            node_offset
405        };
406        (next_node_offset, Some(block_index))
407    } else {
408        (pivots[pivots.len() - 1], None)
409    }
410}
411
412#[allow(clippy::large_enum_variant)]
413pub enum CurrentNodeState<'a> {
414    // Initial state
415    Outer,
416    // Need to load next node from the list
417    NextNodeRequested {
418        next_node_offset: usize,
419    },
420    // Node has been loaded and ready to be processed
421    Loaded {
422        node_offset: usize,
423        node_pos: usize,
424        node: UnixFsFile,
425    },
426    // Ongoing loading of the node
427    Loading {
428        node_offset: usize,
429        fut: BoxFuture<'a, Result<UnixFsFile>>,
430    },
431}
432
433impl Debug for CurrentNodeState<'_> {
434    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
435        match self {
436            CurrentNodeState::Outer => write!(f, "CurrentNodeState::Outer"),
437            CurrentNodeState::NextNodeRequested { next_node_offset } => {
438                write!(f, "CurrentNodeState::None ({next_node_offset})")
439            }
440            CurrentNodeState::Loaded {
441                node_offset,
442                node_pos,
443                node,
444            } => {
445                write!(
446                    f,
447                    "CurrentNodeState::Loaded({node_offset:?}, {node_pos:?}, {node:?})"
448                )
449            }
450            CurrentNodeState::Loading { .. } => write!(f, "CurrentNodeState::Loading(Fut)"),
451        }
452    }
453}
454
455fn load_next_node<'a>(
456    next_node_offset: usize,
457    current_node: &mut CurrentNodeState<'a>,
458    current_links: &mut Vec<VecDeque<Link>>,
459    store: &'a impl BlockStore,
460) -> bool {
461    let links = loop {
462        if let Some(last_mut) = current_links.last_mut() {
463            if last_mut.is_empty() {
464                // ignore empty links
465                current_links.pop();
466            } else {
467                // found non empty links
468                break last_mut;
469            }
470        } else {
471            // no links left we are done
472            return false;
473        }
474    };
475
476    let link = links.pop_front().unwrap();
477
478    let fut = boxed_fut(async move {
479        let block = store.get_block(&link.cid).await?;
480        let node = UnixFsFile::decode(&link.cid, block)?;
481
482        Ok(node)
483    });
484    *current_node = CurrentNodeState::Loading {
485        node_offset: next_node_offset,
486        fut,
487    };
488    true
489}
490
491#[allow(clippy::too_many_arguments)]
492fn poll_read_file_at<'a>(
493    cx: &mut Context<'_>,
494    root_node: &Node,
495    store: &'a impl BlockStore,
496    pos: &mut usize,
497    pos_max: Option<usize>,
498    buf: &mut tokio::io::ReadBuf<'_>,
499    current_links: &mut Vec<VecDeque<Link>>,
500    current_node: &mut CurrentNodeState<'a>,
501) -> Poll<std::io::Result<()>> {
502    loop {
503        if let Some(pos_max) = pos_max {
504            if pos_max <= *pos {
505                return Poll::Ready(Ok(()));
506            }
507        }
508        match current_node {
509            CurrentNodeState::Outer => {
510                // check for links
511                if root_node.outer.links.is_empty() {
512                    // simplest case just one file
513                    let data = root_node.inner.data.as_deref().unwrap_or(&[][..]);
514                    read_data_to_buf(pos, pos_max, &data[*pos..], buf);
515                    return Poll::Ready(Ok(()));
516                }
517
518                // read root local data
519                if let Some(ref data) = root_node.inner.data {
520                    if *pos < data.len() {
521                        read_data_to_buf(pos, pos_max, &data[*pos..], buf);
522                        return Poll::Ready(Ok(()));
523                    }
524                }
525                *current_node = CurrentNodeState::NextNodeRequested {
526                    next_node_offset: 0,
527                };
528            }
529            CurrentNodeState::NextNodeRequested { next_node_offset } => {
530                let loaded_next_node =
531                    load_next_node(*next_node_offset, current_node, current_links, store);
532                if !loaded_next_node {
533                    return Poll::Ready(Ok(()));
534                }
535            }
536            CurrentNodeState::Loading { node_offset, fut } => {
537                match fut.poll_unpin(cx) {
538                    Poll::Pending => {
539                        return Poll::Pending;
540                    }
541                    Poll::Ready(Ok(node)) => {
542                        match node.links_owned() {
543                            Ok(links) => {
544                                if !links.is_empty() {
545                                    let (next_node_offset, block_index) =
546                                        find_block(&node, *pos as u64, *node_offset as u64);
547                                    if let Some(block_index) = block_index {
548                                        let new_links =
549                                            links.into_iter().skip(block_index).collect();
550                                        current_links.push(new_links);
551                                    }
552                                    *current_node = CurrentNodeState::NextNodeRequested {
553                                        next_node_offset: next_node_offset as usize,
554                                    }
555                                } else {
556                                    *current_node = CurrentNodeState::Loaded {
557                                        node_offset: *node_offset,
558                                        node_pos: *pos - *node_offset,
559                                        node,
560                                    }
561                                }
562                            }
563                            Err(e) => {
564                                return Poll::Ready(Err(std::io::Error::new(
565                                    std::io::ErrorKind::InvalidData,
566                                    e.to_string(),
567                                )));
568                            }
569                        }
570                        // TODO: do one read
571                    }
572                    Poll::Ready(Err(e)) => {
573                        *current_node = CurrentNodeState::NextNodeRequested {
574                            next_node_offset: *node_offset,
575                        };
576                        return Poll::Ready(Err(std::io::Error::new(
577                            std::io::ErrorKind::InvalidData,
578                            e.to_string(),
579                        )));
580                    }
581                }
582            }
583            &mut CurrentNodeState::Loaded {
584                ref node_offset,
585                ref mut node_pos,
586                node: ref mut current_node_inner,
587            } => match current_node_inner {
588                UnixFsFile::Raw(data) => {
589                    if *node_offset + data.len() <= *pos {
590                        *current_node = CurrentNodeState::NextNodeRequested {
591                            next_node_offset: node_offset + data.len(),
592                        };
593                        continue;
594                    }
595                    let bytes_read = read_data_to_buf(pos, pos_max, &data[*node_pos..], buf);
596                    *node_pos += bytes_read;
597                    return Poll::Ready(Ok(()));
598                }
599                UnixFsFile::Node(node) => {
600                    if let Some(ref data) = node.inner.data {
601                        if node_offset + data.len() <= *pos {
602                            *current_node = CurrentNodeState::NextNodeRequested {
603                                next_node_offset: node_offset + data.len(),
604                            };
605                            continue;
606                        }
607                        let bytes_read = read_data_to_buf(pos, pos_max, &data[*node_pos..], buf);
608                        *node_pos += bytes_read;
609                        return Poll::Ready(Ok(()));
610                    }
611                    *current_node = CurrentNodeState::NextNodeRequested {
612                        next_node_offset: *node_offset,
613                    };
614                }
615            },
616        }
617    }
618}