use std::{
collections::VecDeque,
fmt::Debug,
pin::Pin,
task::{Context, Poll},
};
use anyhow::{anyhow, bail, ensure, Result};
use bytes::{Buf, Bytes};
use cid::{multihash::MultihashDigest, Cid};
use futures::{future::BoxFuture, stream::BoxStream, FutureExt, Stream, StreamExt};
use iroh_metrics::resolver::OutMetrics;
use prost::Message;
use tokio::io::{AsyncRead, AsyncSeek};
use crate::{
chunker::DEFAULT_CHUNK_SIZE_LIMIT,
codecs::Codec,
content_loader::{ContentLoader, LoaderContext},
hamt::Hamt,
types::{Block, Link, LinkRef, Links, PbLinks},
};
pub(crate) mod unixfs_pb {
#![allow(clippy::all)]
include!(concat!(env!("OUT_DIR"), "/unixfs_pb.rs"));
}
pub(crate) mod dag_pb {
#![allow(clippy::all)]
include!(concat!(env!("OUT_DIR"), "/merkledag_pb.rs"));
}
#[derive(
Debug, Clone, Copy, PartialEq, Eq, num_enum::IntoPrimitive, num_enum::TryFromPrimitive,
)]
#[repr(i32)]
pub enum DataType {
Raw = 0,
Directory = 1,
File = 2,
Metadata = 3,
Symlink = 4,
HamtShard = 5,
}
#[derive(Debug, Clone)]
pub struct Unixfs {
inner: unixfs_pb::Data,
}
impl Unixfs {
pub fn from_bytes<B: Buf>(bytes: B) -> Result<Self> {
let proto = unixfs_pb::Data::decode(bytes)?;
Ok(Unixfs { inner: proto })
}
pub fn typ(&self) -> DataType {
self.inner.r#type.try_into().expect("invalid data type")
}
pub fn data(&self) -> Option<&Bytes> {
self.inner.data.as_ref()
}
}
#[derive(Debug, PartialEq, Clone)]
pub enum UnixfsNode {
Raw(Bytes),
RawNode(Node),
Directory(Node),
File(Node),
Symlink(Node),
HamtShard(Node, Hamt),
}
#[derive(
Debug, Copy, Clone, PartialEq, Eq, num_enum::IntoPrimitive, num_enum::TryFromPrimitive, Hash,
)]
#[repr(u64)]
pub enum HamtHashFunction {
Murmur3 = 0x22,
}
#[derive(Debug, PartialEq, Clone)]
pub struct Node {
pub(super) outer: dag_pb::PbNode,
pub(super) inner: unixfs_pb::Data,
}
impl Node {
fn encode(&self) -> Result<Bytes> {
let bytes = self.outer.encode_to_vec();
Ok(bytes.into())
}
pub fn typ(&self) -> DataType {
self.inner.r#type.try_into().expect("invalid data type")
}
pub fn data(&self) -> Option<Bytes> {
self.inner.data.clone()
}
pub fn filesize(&self) -> Option<u64> {
self.inner.filesize
}
pub fn blocksizes(&self) -> &[u64] {
&self.inner.blocksizes
}
pub fn size(&self) -> Option<usize> {
if self.outer.links.is_empty() {
return Some(
self.inner
.data
.as_ref()
.map(|d| d.len())
.unwrap_or_default(),
);
}
None
}
pub fn links(&self) -> Links {
match self.typ() {
DataType::Raw => Links::RawNode(PbLinks::new(&self.outer)),
DataType::Directory => Links::Directory(PbLinks::new(&self.outer)),
DataType::File => Links::File(PbLinks::new(&self.outer)),
DataType::Symlink => Links::Symlink(PbLinks::new(&self.outer)),
DataType::HamtShard => Links::HamtShard(PbLinks::new(&self.outer)),
DataType::Metadata => unimplemented!(),
}
}
pub fn hash_type(&self) -> Option<HamtHashFunction> {
self.inner.hash_type.and_then(|t| t.try_into().ok())
}
pub fn fanout(&self) -> Option<u32> {
self.inner.fanout.and_then(|f| u32::try_from(f).ok())
}
}
impl UnixfsNode {
pub fn decode(cid: &Cid, buf: Bytes) -> Result<Self> {
match cid.codec() {
c if c == Codec::Raw as u64 => Ok(UnixfsNode::Raw(buf)),
_ => {
let outer = dag_pb::PbNode::decode(buf)?;
let inner_data = outer
.data
.as_ref()
.cloned()
.ok_or_else(|| anyhow!("missing data"))?;
let inner = unixfs_pb::Data::decode(inner_data)?;
let typ: DataType = inner.r#type.try_into()?;
let node = Node { outer, inner };
match typ {
DataType::Raw => todo!(),
DataType::Directory => Ok(UnixfsNode::Directory(node)),
DataType::File => Ok(UnixfsNode::File(node)),
DataType::Symlink => Ok(UnixfsNode::Symlink(node)),
DataType::HamtShard => {
let hamt = Hamt::from_node(&node)?;
Ok(UnixfsNode::HamtShard(node, hamt))
}
DataType::Metadata => bail!("unixfs metadata is not supported"),
}
}
}
}
pub fn encode(&self) -> Result<Block> {
let res = match self {
UnixfsNode::Raw(data) => {
let out = data.clone();
let links = vec![];
let cid = Cid::new_v1(Codec::Raw as _, cid::multihash::Code::Sha2_256.digest(&out));
Block::new(cid, out, links)
}
UnixfsNode::RawNode(node)
| UnixfsNode::Directory(node)
| UnixfsNode::File(node)
| UnixfsNode::Symlink(node)
| UnixfsNode::HamtShard(node, _) => {
let out = node.encode()?;
let links = node
.links()
.map(|x| Ok(x?.cid))
.collect::<Result<Vec<_>>>()?;
let cid = Cid::new_v1(
Codec::DagPb as _,
cid::multihash::Code::Sha2_256.digest(&out),
);
Block::new(cid, out, links)
}
};
ensure!(
res.data().len() <= DEFAULT_CHUNK_SIZE_LIMIT,
"node is too large: {} bytes",
res.data().len()
);
Ok(res)
}
pub const fn typ(&self) -> Option<DataType> {
match self {
UnixfsNode::Raw(_) => None,
UnixfsNode::RawNode(_) => Some(DataType::Raw),
UnixfsNode::Directory(_) => Some(DataType::Directory),
UnixfsNode::File(_) => Some(DataType::File),
UnixfsNode::Symlink(_) => Some(DataType::Symlink),
UnixfsNode::HamtShard(_, _) => Some(DataType::HamtShard),
}
}
pub fn size(&self) -> Option<usize> {
match self {
UnixfsNode::Raw(data) => Some(data.len()),
UnixfsNode::Directory(node)
| UnixfsNode::RawNode(node)
| UnixfsNode::File(node)
| UnixfsNode::Symlink(node)
| UnixfsNode::HamtShard(node, _) => node.size(),
}
}
pub fn filesize(&self) -> Option<u64> {
match self {
UnixfsNode::Raw(data) => Some(data.len() as u64),
UnixfsNode::Directory(node)
| UnixfsNode::RawNode(node)
| UnixfsNode::File(node)
| UnixfsNode::Symlink(node)
| UnixfsNode::HamtShard(node, _) => node.filesize(),
}
}
pub fn blocksizes(&self) -> &[u64] {
match self {
UnixfsNode::Raw(_) => &[],
UnixfsNode::Directory(node)
| UnixfsNode::RawNode(node)
| UnixfsNode::Symlink(node)
| UnixfsNode::HamtShard(node, _)
| UnixfsNode::File(node) => node.blocksizes(),
}
}
pub fn links(&self) -> Links<'_> {
match self {
UnixfsNode::Raw(_) => Links::Raw,
UnixfsNode::RawNode(node) => Links::RawNode(PbLinks::new(&node.outer)),
UnixfsNode::Directory(node) => Links::Directory(PbLinks::new(&node.outer)),
UnixfsNode::File(node) => Links::File(PbLinks::new(&node.outer)),
UnixfsNode::Symlink(node) => Links::Symlink(PbLinks::new(&node.outer)),
UnixfsNode::HamtShard(node, _) => Links::HamtShard(PbLinks::new(&node.outer)),
}
}
pub fn links_owned(&self) -> Result<VecDeque<Link>> {
self.links().map(|l| l.map(|l| l.to_owned())).collect()
}
pub const fn is_dir(&self) -> bool {
matches!(self, Self::Directory(_) | Self::HamtShard(_, _))
}
pub async fn get_link_by_name<S: AsRef<str>>(
&self,
link_name: S,
) -> Result<Option<LinkRef<'_>>> {
let link_name = link_name.as_ref();
self.links()
.find(|l| match l {
Ok(l) => l.name == Some(link_name),
_ => false,
})
.transpose()
}
pub fn symlink(&self) -> Result<Option<&str>> {
if let Self::Symlink(ref node) = self {
let link = std::str::from_utf8(node.inner.data.as_deref().unwrap_or_default())?;
Ok(Some(link))
} else {
Ok(None)
}
}
pub fn as_child_reader<C: ContentLoader>(
&self,
ctx: LoaderContext,
loader: C,
om: OutMetrics,
) -> Result<Option<UnixfsChildStream>> {
match self {
UnixfsNode::Raw(_)
| UnixfsNode::RawNode(_)
| UnixfsNode::File(_)
| UnixfsNode::Symlink(_) => Ok(None),
UnixfsNode::Directory(_) => {
let source = self.links().map(|l| l.map(|l| l.to_owned()));
let stream = futures::stream::iter(source).boxed();
Ok(Some(UnixfsChildStream::Directory {
stream,
out_metrics: om,
}))
}
UnixfsNode::HamtShard(_, hamt) => Ok(Some(UnixfsChildStream::Hamt {
stream: hamt.children(ctx, loader).boxed(),
pos: 0,
out_metrics: om,
})),
}
}
pub fn into_content_reader<C: ContentLoader>(
self,
ctx: LoaderContext,
loader: C,
om: OutMetrics,
pos_max: Option<usize>,
) -> Result<Option<UnixfsContentReader<C>>> {
match self {
UnixfsNode::Raw(_)
| UnixfsNode::RawNode(_)
| UnixfsNode::File(_)
| UnixfsNode::Symlink(_) => {
let current_links = vec![self.links_owned()?];
Ok(Some(UnixfsContentReader::File {
root_node: self,
pos: 0,
pos_max,
current_node: CurrentNodeState::Outer,
current_links,
loader,
out_metrics: om,
ctx: std::sync::Arc::new(tokio::sync::Mutex::new(ctx)),
}))
}
UnixfsNode::HamtShard(_, _) | UnixfsNode::Directory(_) => Ok(None),
}
}
}
pub enum UnixfsChildStream<'a> {
Hamt {
stream: BoxStream<'a, Result<Link>>,
pos: usize,
out_metrics: OutMetrics,
},
Directory {
stream: BoxStream<'a, Result<Link>>,
out_metrics: OutMetrics,
},
}
impl<'a> Debug for UnixfsChildStream<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
UnixfsChildStream::Hamt {
pos, out_metrics, ..
} =>
write!(f, "UnixfsChildStream::Hamt {{ stream: BoxStream<Result<Link>>, pos: {pos}, out_metrics {out_metrics:?} }}"),
UnixfsChildStream::Directory { out_metrics, .. } =>
write!(f, "UnixfsChildStream::Directory {{ stream: BoxStream<Result<Link>>, out_metrics {out_metrics:?} }}"),
}
}
}
#[derive(Debug)]
pub enum UnixfsContentReader<C: ContentLoader> {
File {
root_node: UnixfsNode,
pos: usize,
pos_max: Option<usize>,
current_node: CurrentNodeState,
current_links: Vec<VecDeque<Link>>,
loader: C,
out_metrics: OutMetrics,
ctx: std::sync::Arc<tokio::sync::Mutex<LoaderContext>>,
},
}
impl<C: ContentLoader> UnixfsContentReader<C> {
pub fn size(&self) -> Option<u64> {
match self {
UnixfsContentReader::File { root_node, .. } => {
root_node.filesize()
}
}
}
}
impl Stream for UnixfsChildStream<'_> {
type Item = Result<Link>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match &mut *self {
UnixfsChildStream::Hamt { stream, .. } => Pin::new(stream).poll_next(cx),
UnixfsChildStream::Directory { stream, .. } => Pin::new(stream).poll_next(cx),
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
match self {
UnixfsChildStream::Directory { stream, .. } => stream.size_hint(),
UnixfsChildStream::Hamt { .. } => (0, None),
}
}
}
impl<C: ContentLoader + Unpin + 'static> AsyncRead for UnixfsContentReader<C> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
match &mut *self {
UnixfsContentReader::File {
root_node,
pos,
pos_max,
current_node,
current_links,
loader,
out_metrics,
ctx,
} => {
let typ = root_node.typ();
let pos_old = *pos;
let poll_res = match root_node {
UnixfsNode::Raw(data) => {
read_data_to_buf(pos, *pos_max, &data[*pos..], buf);
Poll::Ready(Ok(()))
}
UnixfsNode::File(node) => poll_read_file_at(
cx,
node,
loader.clone(),
pos,
*pos_max,
buf,
current_links,
current_node,
ctx.clone(),
),
UnixfsNode::Symlink(node) => {
let data = node.inner.data.as_deref().unwrap_or_default();
read_data_to_buf(pos, *pos_max, &data[*pos..], buf);
Poll::Ready(Ok(()))
}
_ => Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("unsupported Unixfs type for file types: {typ:?} "),
))),
};
let bytes_read = *pos - pos_old;
out_metrics.observe_bytes_read(pos_old, bytes_read);
poll_res
}
}
}
}
impl<C: ContentLoader + Unpin + 'static> AsyncSeek for UnixfsContentReader<C> {
fn start_seek(mut self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> {
match &mut *self {
UnixfsContentReader::File {
root_node,
pos,
current_node,
current_links,
..
} => {
let data_len = root_node.size();
*current_node = CurrentNodeState::Outer;
*current_links = vec![root_node.links_owned().unwrap()];
match position {
std::io::SeekFrom::Start(offset) => {
let mut i = offset as usize;
if let Some(data_len) = data_len {
if data_len == 0 {
*pos = 0;
return Ok(());
}
i = std::cmp::min(i, data_len - 1);
}
*pos = i;
}
std::io::SeekFrom::End(offset) => {
if let Some(data_len) = data_len {
if data_len == 0 {
*pos = 0;
return Ok(());
}
let mut i = (data_len as i64 + offset) % data_len as i64;
if i < 0 {
i += data_len as i64;
}
*pos = i as usize;
} else {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"cannot seek from end of unknown length",
));
}
}
std::io::SeekFrom::Current(offset) => {
let mut i = *pos as i64 + offset;
i = std::cmp::max(0, i);
if let Some(data_len) = data_len {
if data_len == 0 {
*pos = 0;
return Ok(());
}
i = std::cmp::min(i, data_len as i64 - 1);
}
*pos = i as usize;
}
}
}
}
Ok(())
}
fn poll_complete(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<std::io::Result<u64>> {
match &mut *self {
UnixfsContentReader::File { pos, .. } => Poll::Ready(Ok(*pos as u64)),
}
}
}
pub fn read_data_to_buf(
pos: &mut usize,
pos_max: Option<usize>,
data: &[u8],
buf: &mut tokio::io::ReadBuf<'_>,
) -> usize {
let data_to_read = pos_max.map(|pos_max| pos_max - *pos).unwrap_or(data.len());
let amt = std::cmp::min(std::cmp::min(data_to_read, buf.remaining()), data.len());
buf.put_slice(&data[..amt]);
*pos += amt;
amt
}
pub fn find_block(node: &UnixfsNode, pos: u64, node_offset: u64) -> (u64, Option<usize>) {
let pivots = node
.blocksizes()
.iter()
.scan(node_offset, |state, &x| {
*state += x;
Some(*state)
})
.collect::<Vec<_>>();
let block_index = match pivots.binary_search(&pos) {
Ok(b) => b + 1,
Err(b) => b,
};
if block_index < pivots.len() {
let next_node_offset = if block_index > 0 {
pivots[block_index - 1]
} else {
node_offset
};
(next_node_offset, Some(block_index))
} else {
(pivots[pivots.len() - 1], None)
}
}
#[allow(clippy::large_enum_variant)]
pub enum CurrentNodeState {
Outer,
NextNodeRequested {
next_node_offset: usize,
},
Loaded {
node_offset: usize,
node_pos: usize,
node: UnixfsNode,
},
Loading {
node_offset: usize,
fut: BoxFuture<'static, Result<UnixfsNode>>,
},
}
impl Debug for CurrentNodeState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
CurrentNodeState::Outer => write!(f, "CurrentNodeState::Outer"),
CurrentNodeState::NextNodeRequested { next_node_offset } => {
write!(f, "CurrentNodeState::None ({next_node_offset})")
}
CurrentNodeState::Loaded {
node_offset,
node_pos,
node,
} => {
write!(
f,
"CurrentNodeState::Loaded({node_offset:?}, {node_pos:?}, {node:?})"
)
}
CurrentNodeState::Loading { .. } => write!(f, "CurrentNodeState::Loading(Fut)"),
}
}
}
fn load_next_node<C: ContentLoader + 'static>(
next_node_offset: usize,
current_node: &mut CurrentNodeState,
current_links: &mut Vec<VecDeque<Link>>,
loader: C,
ctx: std::sync::Arc<tokio::sync::Mutex<LoaderContext>>,
) -> bool {
let links = loop {
if let Some(last_mut) = current_links.last_mut() {
if last_mut.is_empty() {
current_links.pop();
} else {
break last_mut;
}
} else {
return false;
}
};
let link = links.pop_front().unwrap();
let fut = async move {
let ctx = ctx.lock().await;
let loaded_cid = loader.load_cid(&link.cid, &ctx).await?;
let node = UnixfsNode::decode(&link.cid, loaded_cid.data)?;
Ok(node)
}
.boxed();
*current_node = CurrentNodeState::Loading {
node_offset: next_node_offset,
fut,
};
true
}
#[allow(clippy::too_many_arguments)]
fn poll_read_file_at<C: ContentLoader + 'static>(
cx: &mut Context<'_>,
root_node: &Node,
loader: C,
pos: &mut usize,
pos_max: Option<usize>,
buf: &mut tokio::io::ReadBuf<'_>,
current_links: &mut Vec<VecDeque<Link>>,
current_node: &mut CurrentNodeState,
ctx: std::sync::Arc<tokio::sync::Mutex<LoaderContext>>,
) -> Poll<std::io::Result<()>> {
loop {
if let Some(pos_max) = pos_max {
if pos_max <= *pos {
return Poll::Ready(Ok(()));
}
}
match current_node {
CurrentNodeState::Outer => {
if root_node.outer.links.is_empty() {
let data = root_node.inner.data.as_deref().unwrap_or(&[][..]);
read_data_to_buf(pos, pos_max, &data[*pos..], buf);
return Poll::Ready(Ok(()));
}
if let Some(ref data) = root_node.inner.data {
if *pos < data.len() {
read_data_to_buf(pos, pos_max, &data[*pos..], buf);
return Poll::Ready(Ok(()));
}
}
*current_node = CurrentNodeState::NextNodeRequested {
next_node_offset: 0,
};
}
CurrentNodeState::NextNodeRequested { next_node_offset } => {
let loaded_next_node = load_next_node(
*next_node_offset,
current_node,
current_links,
loader.clone(),
ctx.clone(),
);
if !loaded_next_node {
return Poll::Ready(Ok(()));
}
}
CurrentNodeState::Loading { node_offset, fut } => {
match fut.poll_unpin(cx) {
Poll::Pending => {
return Poll::Pending;
}
Poll::Ready(Ok(node)) => {
match node.links_owned() {
Ok(links) => {
if !links.is_empty() {
let (next_node_offset, block_index) =
find_block(&node, *pos as u64, *node_offset as u64);
if let Some(block_index) = block_index {
let new_links =
links.into_iter().skip(block_index).collect();
current_links.push(new_links);
}
*current_node = CurrentNodeState::NextNodeRequested {
next_node_offset: next_node_offset as usize,
}
} else {
*current_node = CurrentNodeState::Loaded {
node_offset: *node_offset,
node_pos: *pos - *node_offset,
node,
}
}
}
Err(e) => {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
e.to_string(),
)));
}
}
}
Poll::Ready(Err(e)) => {
*current_node = CurrentNodeState::NextNodeRequested {
next_node_offset: *node_offset,
};
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
e.to_string(),
)));
}
}
}
CurrentNodeState::Loaded {
ref node_offset,
ref mut node_pos,
node: ref mut current_node_inner,
} => match current_node_inner {
UnixfsNode::Raw(data) => {
if *node_offset + data.len() <= *pos {
*current_node = CurrentNodeState::NextNodeRequested {
next_node_offset: node_offset + data.len(),
};
continue;
}
let bytes_read = read_data_to_buf(pos, pos_max, &data[*node_pos..], buf);
*node_pos += bytes_read;
return Poll::Ready(Ok(()));
}
UnixfsNode::File(node) | UnixfsNode::RawNode(node) => {
if let Some(ref data) = node.inner.data {
if node_offset + data.len() <= *pos {
*current_node = CurrentNodeState::NextNodeRequested {
next_node_offset: node_offset + data.len(),
};
continue;
}
let bytes_read = read_data_to_buf(pos, pos_max, &data[*node_pos..], buf);
*node_pos += bytes_read;
return Poll::Ready(Ok(()));
}
*current_node = CurrentNodeState::NextNodeRequested {
next_node_offset: *node_offset,
};
}
_ => {
return Poll::Ready(Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"invalid type nested in chunked file: {:?}",
current_node_inner.typ()
),
)));
}
},
}
}
}