1use crate::{
2 chunker::DEFAULT_CHUNK_SIZE_LIMIT,
3 codecs::Codec,
4 protobufs,
5 types::{Block, Link, LinkRef, Links, PbLinks},
6};
7use anyhow::{anyhow, bail, ensure, Result};
8use bytes::Bytes;
9use futures::FutureExt;
10use libipld::Cid;
11use prost::Message;
12use std::{
13 collections::VecDeque,
14 fmt::Debug,
15 pin::Pin,
16 task::{Context, Poll},
17};
18use tokio::io::{AsyncRead, AsyncSeek};
19use wnfs_common::{
20 utils::{boxed_fut, BoxFuture},
21 BlockStore, LoadIpld, Storable, StoreIpld,
22};
23
24#[derive(
25 Debug, Clone, Copy, PartialEq, Eq, num_enum::IntoPrimitive, num_enum::TryFromPrimitive,
26)]
27#[repr(i32)]
28pub enum DataType {
29 Raw = 0,
30 Directory = 1,
31 File = 2,
32 Metadata = 3,
33 Symlink = 4,
34 HamtShard = 5,
35}
36
37#[derive(Debug, PartialEq, Clone)]
38pub enum UnixFsFile {
39 Raw(Bytes),
40 Node(Node),
41}
42
43#[derive(Debug, PartialEq, Clone)]
44pub struct Node {
45 pub(super) outer: protobufs::PbNode,
46 pub(super) inner: protobufs::Data,
47}
48
49impl Node {
50 fn encode(&self) -> Result<Bytes> {
51 let bytes = self.outer.encode_to_vec();
52 Ok(bytes.into())
53 }
54
55 pub fn typ(&self) -> DataType {
56 self.inner.r#type.try_into().expect("invalid data type")
57 }
58
59 pub fn data(&self) -> Option<Bytes> {
60 self.inner.data.clone()
61 }
62
63 pub fn filesize(&self) -> Option<u64> {
64 self.inner.filesize
65 }
66
67 pub fn blocksizes(&self) -> &[u64] {
68 &self.inner.blocksizes
69 }
70
71 pub fn size(&self) -> Option<usize> {
72 if self.outer.links.is_empty() {
73 return Some(
74 self.inner
75 .data
76 .as_ref()
77 .map(|d| d.len())
78 .unwrap_or_default(),
79 );
80 }
81
82 None
83 }
84
85 pub fn links(&self) -> Links {
86 Links::Node(PbLinks::new(&self.outer))
87 }
88}
89
90impl UnixFsFile {
91 pub fn empty() -> Self {
92 UnixFsFile::Raw(Bytes::new())
93 }
94
95 pub async fn load(cid: &Cid, store: &impl BlockStore) -> Result<Self> {
96 let block = store.get_block(cid).await?;
97 Self::decode(cid, block)
98 }
99
100 pub fn decode(cid: &Cid, buf: Bytes) -> Result<Self> {
101 match cid.codec() {
102 c if c == Codec::Raw as u64 => Ok(UnixFsFile::Raw(buf)),
103 _ => {
104 let outer = protobufs::PbNode::decode(buf)?;
105 let inner_data = outer
106 .data
107 .as_ref()
108 .cloned()
109 .ok_or_else(|| anyhow!("missing data"))?;
110 let inner = protobufs::Data::decode(inner_data)?;
111 let typ: DataType = inner.r#type.try_into()?;
112 let node = Node { outer, inner };
113
114 match typ {
116 DataType::File => Ok(UnixFsFile::Node(node)),
117 _ => bail!("unixfs data type unsupported: {typ:?}"),
118 }
119 }
120 }
121 }
122
123 pub fn encode(&self) -> Result<Block> {
124 let res = match self {
125 UnixFsFile::Raw(data) => {
126 let out = data.clone();
127 let links = vec![];
128 Block::new(Codec::Raw, out, links)
129 }
130 UnixFsFile::Node(node) => {
131 let out = node.encode()?;
132 let links = node
133 .links()
134 .map(|x| Ok(x?.cid))
135 .collect::<Result<Vec<_>>>()?;
136 Block::new(Codec::DagPb, out, links)
137 }
138 };
139
140 ensure!(
141 res.data().len() <= DEFAULT_CHUNK_SIZE_LIMIT,
142 "node is too large: {} bytes",
143 res.data().len()
144 );
145
146 Ok(res)
147 }
148
149 pub const fn typ(&self) -> Option<DataType> {
150 match self {
151 UnixFsFile::Raw(_) => None,
152 UnixFsFile::Node(_) => Some(DataType::File),
153 }
154 }
155
156 pub fn size(&self) -> Option<usize> {
159 match self {
160 UnixFsFile::Raw(data) => Some(data.len()),
161 UnixFsFile::Node(node) => node.size(),
162 }
163 }
164
165 pub fn filesize(&self) -> Option<u64> {
168 match self {
169 UnixFsFile::Raw(data) => Some(data.len() as u64),
170 UnixFsFile::Node(node) => node.filesize(),
171 }
172 }
173
174 pub fn blocksizes(&self) -> &[u64] {
177 match self {
178 UnixFsFile::Raw(_) => &[],
179 UnixFsFile::Node(node) => node.blocksizes(),
180 }
181 }
182
183 pub fn links(&self) -> Links<'_> {
184 match self {
185 UnixFsFile::Raw(_) => Links::Leaf,
186 UnixFsFile::Node(node) => Links::Node(PbLinks::new(&node.outer)),
187 }
188 }
189
190 pub fn links_owned(&self) -> Result<VecDeque<Link>> {
191 self.links().map(|l| l.map(|l| l.to_owned())).collect()
192 }
193
194 pub async fn get_link_by_name<S: AsRef<str>>(
195 &self,
196 link_name: S,
197 ) -> Result<Option<LinkRef<'_>>> {
198 let link_name = link_name.as_ref();
199 self.links()
200 .find(|l| match l {
201 Ok(l) => l.name == Some(link_name),
202 _ => false,
203 })
204 .transpose()
205 }
206
207 pub fn into_content_reader<B: BlockStore>(
208 self,
209 store: &B,
210 pos_max: Option<usize>,
211 ) -> Result<UnixFsFileReader<'_, B>> {
212 let current_links = vec![self.links_owned()?];
213
214 Ok(UnixFsFileReader {
215 root_node: self,
216 pos: 0,
217 pos_max,
218 current_node: CurrentNodeState::Outer,
219 current_links,
220 store,
221 })
222 }
223}
224
225impl Storable for UnixFsFile {
226 type Serializable = UnixFsFile;
227
228 async fn to_serializable(&self, _store: &impl BlockStore) -> Result<Self::Serializable> {
229 Ok(self.clone())
230 }
231
232 async fn from_serializable(
233 _cid: Option<&Cid>,
234 serializable: Self::Serializable,
235 ) -> Result<Self> {
236 Ok(serializable)
237 }
238}
239
240impl StoreIpld for UnixFsFile {
241 fn encode_ipld(&self) -> Result<(Bytes, u64)> {
242 let (codec, bytes, _) = self.encode()?.into_parts();
243 Ok((bytes, codec.into()))
244 }
245}
246
247impl LoadIpld for UnixFsFile {
248 fn decode_ipld(cid: &Cid, bytes: Bytes) -> Result<Self> {
249 UnixFsFile::decode(cid, bytes)
250 }
251}
252
253#[derive(Debug)]
254pub struct UnixFsFileReader<'a, B: BlockStore> {
255 root_node: UnixFsFile,
256 pos: usize,
258 pos_max: Option<usize>,
260 current_node: CurrentNodeState<'a>,
262 current_links: Vec<VecDeque<Link>>,
264 store: &'a B,
265}
266
267impl<'a, B: BlockStore> UnixFsFileReader<'a, B> {
268 pub fn size(&self) -> Option<u64> {
270 self.root_node.filesize()
271 }
272}
273
274impl<'a, B: BlockStore + 'a> AsyncRead for UnixFsFileReader<'a, B> {
275 fn poll_read(
276 mut self: Pin<&mut Self>,
277 cx: &mut Context<'_>,
278 buf: &mut tokio::io::ReadBuf<'_>,
279 ) -> Poll<std::io::Result<()>> {
280 let UnixFsFileReader {
281 root_node,
282 pos,
283 pos_max,
284 current_node,
285 current_links,
286 store,
287 } = &mut *self;
288
289 match root_node {
291 UnixFsFile::Raw(data) => {
292 read_data_to_buf(pos, *pos_max, &data[*pos..], buf);
293 Poll::Ready(Ok(()))
294 }
295 UnixFsFile::Node(node) => poll_read_file_at(
296 cx,
297 node,
298 *store,
299 pos,
300 *pos_max,
301 buf,
302 current_links,
303 current_node,
304 ),
305 }
306 }
309}
310
311impl<'a, B: BlockStore + 'a> AsyncSeek for UnixFsFileReader<'a, B> {
312 fn start_seek(mut self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> {
313 let UnixFsFileReader {
314 root_node,
315 pos,
316 current_node,
317 current_links,
318 ..
319 } = &mut *self;
320 let data_len = root_node.size();
321 *current_node = CurrentNodeState::Outer;
322 *current_links = vec![root_node.links_owned().unwrap()];
323 match position {
324 std::io::SeekFrom::Start(offset) => {
325 let mut i = offset as usize;
326 if let Some(data_len) = data_len {
327 if data_len == 0 {
328 *pos = 0;
329 return Ok(());
330 }
331 i = std::cmp::min(i, data_len - 1);
332 }
333 *pos = i;
334 }
335 std::io::SeekFrom::End(offset) => {
336 if let Some(data_len) = data_len {
337 if data_len == 0 {
338 *pos = 0;
339 return Ok(());
340 }
341 let mut i = (data_len as i64 + offset) % data_len as i64;
342 if i < 0 {
343 i += data_len as i64;
344 }
345 *pos = i as usize;
346 } else {
347 return Err(std::io::Error::new(
348 std::io::ErrorKind::InvalidInput,
349 "cannot seek from end of unknown length",
350 ));
351 }
352 }
353 std::io::SeekFrom::Current(offset) => {
354 let mut i = *pos as i64 + offset;
355 i = std::cmp::max(0, i);
356
357 if let Some(data_len) = data_len {
358 if data_len == 0 {
359 *pos = 0;
360 return Ok(());
361 }
362 i = std::cmp::min(i, data_len as i64 - 1);
363 }
364 *pos = i as usize;
365 }
366 }
367 Ok(())
368 }
369
370 fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<std::io::Result<u64>> {
371 Poll::Ready(Ok(self.pos as u64))
372 }
373}
374
375pub fn read_data_to_buf(
376 pos: &mut usize,
377 pos_max: Option<usize>,
378 data: &[u8],
379 buf: &mut tokio::io::ReadBuf<'_>,
380) -> usize {
381 let data_to_read = pos_max.map(|pos_max| pos_max - *pos).unwrap_or(data.len());
382 let amt = std::cmp::min(std::cmp::min(data_to_read, buf.remaining()), data.len());
383 buf.put_slice(&data[..amt]);
384 *pos += amt;
385 amt
386}
387
388pub fn find_block(node: &UnixFsFile, pos: u64, node_offset: u64) -> (u64, Option<usize>) {
389 let pivots = node
390 .blocksizes()
391 .iter()
392 .scan(node_offset, |state, &x| {
393 *state += x;
394 Some(*state)
395 })
396 .collect::<Vec<_>>();
397 let block_index = match pivots.binary_search(&pos) {
398 Ok(b) => b + 1,
399 Err(b) => b,
400 };
401 if block_index < pivots.len() {
402 let next_node_offset = if block_index > 0 {
403 pivots[block_index - 1]
404 } else {
405 node_offset
406 };
407 (next_node_offset, Some(block_index))
408 } else {
409 (pivots[pivots.len() - 1], None)
410 }
411}
412
413#[allow(clippy::large_enum_variant)]
414pub enum CurrentNodeState<'a> {
415 Outer,
417 NextNodeRequested {
419 next_node_offset: usize,
420 },
421 Loaded {
423 node_offset: usize,
424 node_pos: usize,
425 node: UnixFsFile,
426 },
427 Loading {
429 node_offset: usize,
430 fut: BoxFuture<'a, Result<UnixFsFile>>,
431 },
432}
433
434impl<'a> Debug for CurrentNodeState<'a> {
435 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
436 match self {
437 CurrentNodeState::Outer => write!(f, "CurrentNodeState::Outer"),
438 CurrentNodeState::NextNodeRequested { next_node_offset } => {
439 write!(f, "CurrentNodeState::None ({next_node_offset})")
440 }
441 CurrentNodeState::Loaded {
442 node_offset,
443 node_pos,
444 node,
445 } => {
446 write!(
447 f,
448 "CurrentNodeState::Loaded({node_offset:?}, {node_pos:?}, {node:?})"
449 )
450 }
451 CurrentNodeState::Loading { .. } => write!(f, "CurrentNodeState::Loading(Fut)"),
452 }
453 }
454}
455
456fn load_next_node<'a>(
457 next_node_offset: usize,
458 current_node: &mut CurrentNodeState<'a>,
459 current_links: &mut Vec<VecDeque<Link>>,
460 store: &'a impl BlockStore,
461) -> bool {
462 let links = loop {
463 if let Some(last_mut) = current_links.last_mut() {
464 if last_mut.is_empty() {
465 current_links.pop();
467 } else {
468 break last_mut;
470 }
471 } else {
472 return false;
474 }
475 };
476
477 let link = links.pop_front().unwrap();
478
479 let fut = boxed_fut(async move {
480 let block = store.get_block(&link.cid).await?;
481 let node = UnixFsFile::decode(&link.cid, block)?;
482
483 Ok(node)
484 });
485 *current_node = CurrentNodeState::Loading {
486 node_offset: next_node_offset,
487 fut,
488 };
489 true
490}
491
492#[allow(clippy::too_many_arguments)]
493fn poll_read_file_at<'a>(
494 cx: &mut Context<'_>,
495 root_node: &Node,
496 store: &'a impl BlockStore,
497 pos: &mut usize,
498 pos_max: Option<usize>,
499 buf: &mut tokio::io::ReadBuf<'_>,
500 current_links: &mut Vec<VecDeque<Link>>,
501 current_node: &mut CurrentNodeState<'a>,
502) -> Poll<std::io::Result<()>> {
503 loop {
504 if let Some(pos_max) = pos_max {
505 if pos_max <= *pos {
506 return Poll::Ready(Ok(()));
507 }
508 }
509 match current_node {
510 CurrentNodeState::Outer => {
511 if root_node.outer.links.is_empty() {
513 let data = root_node.inner.data.as_deref().unwrap_or(&[][..]);
515 read_data_to_buf(pos, pos_max, &data[*pos..], buf);
516 return Poll::Ready(Ok(()));
517 }
518
519 if let Some(ref data) = root_node.inner.data {
521 if *pos < data.len() {
522 read_data_to_buf(pos, pos_max, &data[*pos..], buf);
523 return Poll::Ready(Ok(()));
524 }
525 }
526 *current_node = CurrentNodeState::NextNodeRequested {
527 next_node_offset: 0,
528 };
529 }
530 CurrentNodeState::NextNodeRequested { next_node_offset } => {
531 let loaded_next_node =
532 load_next_node(*next_node_offset, current_node, current_links, store);
533 if !loaded_next_node {
534 return Poll::Ready(Ok(()));
535 }
536 }
537 CurrentNodeState::Loading { node_offset, fut } => {
538 match fut.poll_unpin(cx) {
539 Poll::Pending => {
540 return Poll::Pending;
541 }
542 Poll::Ready(Ok(node)) => {
543 match node.links_owned() {
544 Ok(links) => {
545 if !links.is_empty() {
546 let (next_node_offset, block_index) =
547 find_block(&node, *pos as u64, *node_offset as u64);
548 if let Some(block_index) = block_index {
549 let new_links =
550 links.into_iter().skip(block_index).collect();
551 current_links.push(new_links);
552 }
553 *current_node = CurrentNodeState::NextNodeRequested {
554 next_node_offset: next_node_offset as usize,
555 }
556 } else {
557 *current_node = CurrentNodeState::Loaded {
558 node_offset: *node_offset,
559 node_pos: *pos - *node_offset,
560 node,
561 }
562 }
563 }
564 Err(e) => {
565 return Poll::Ready(Err(std::io::Error::new(
566 std::io::ErrorKind::InvalidData,
567 e.to_string(),
568 )));
569 }
570 }
571 }
573 Poll::Ready(Err(e)) => {
574 *current_node = CurrentNodeState::NextNodeRequested {
575 next_node_offset: *node_offset,
576 };
577 return Poll::Ready(Err(std::io::Error::new(
578 std::io::ErrorKind::InvalidData,
579 e.to_string(),
580 )));
581 }
582 }
583 }
584 CurrentNodeState::Loaded {
585 ref node_offset,
586 ref mut node_pos,
587 node: ref mut current_node_inner,
588 } => match current_node_inner {
589 UnixFsFile::Raw(data) => {
590 if *node_offset + data.len() <= *pos {
591 *current_node = CurrentNodeState::NextNodeRequested {
592 next_node_offset: node_offset + data.len(),
593 };
594 continue;
595 }
596 let bytes_read = read_data_to_buf(pos, pos_max, &data[*node_pos..], buf);
597 *node_pos += bytes_read;
598 return Poll::Ready(Ok(()));
599 }
600 UnixFsFile::Node(node) => {
601 if let Some(ref data) = node.inner.data {
602 if node_offset + data.len() <= *pos {
603 *current_node = CurrentNodeState::NextNodeRequested {
604 next_node_offset: node_offset + data.len(),
605 };
606 continue;
607 }
608 let bytes_read = read_data_to_buf(pos, pos_max, &data[*node_pos..], buf);
609 *node_pos += bytes_read;
610 return Poll::Ready(Ok(()));
611 }
612 *current_node = CurrentNodeState::NextNodeRequested {
613 next_node_offset: *node_offset,
614 };
615 }
616 },
617 }
618 }
619}