rust-ipfs 0.15.0

IPFS node implementation
Documentation
use std::task::{Context, Poll};

use crate::{
    repo::{DefaultStorage, Repo},
    Block,
};
use bytes::Bytes;
use either::Either;
#[allow(unused_imports)]
use futures::{
    future::BoxFuture,
    stream::{BoxStream, FusedStream},
    FutureExt, Stream, StreamExt, TryFutureExt,
};
use rust_unixfs::file::adder::{Chunker, FileAdderBuilder};
#[cfg(not(target_arch = "wasm32"))]
use std::path::{Path, PathBuf};
use std::pin::Pin;
#[cfg(not(target_arch = "wasm32"))]
use tokio_util::io::ReaderStream;
use tracing::{Instrument, Span};

use crate::{Ipfs, IpfsPath};

use super::{TraversalFailed, UnixfsStatus};

pub enum AddOpt {
    #[cfg(not(target_arch = "wasm32"))]
    File(PathBuf),
    Stream {
        name: Option<String>,
        total: Option<usize>,
        stream: BoxStream<'static, Result<Bytes, std::io::Error>>,
    },
}

#[cfg(not(target_arch = "wasm32"))]
impl From<PathBuf> for AddOpt {
    fn from(path: PathBuf) -> Self {
        AddOpt::File(path)
    }
}

#[cfg(not(target_arch = "wasm32"))]
impl From<&Path> for AddOpt {
    fn from(path: &Path) -> Self {
        AddOpt::File(path.to_path_buf())
    }
}

#[must_use = "does nothing unless you `.await` or poll the stream"]
pub struct UnixfsAdd {
    core: Option<Either<Ipfs, Repo<DefaultStorage>>>,
    opt: Option<AddOpt>,
    span: Span,
    chunk: Chunker,
    pin: bool,
    provide: bool,
    wrap: bool,
    stream: Option<BoxStream<'static, UnixfsStatus>>,
}

impl UnixfsAdd {
    pub fn with_ipfs(ipfs: &Ipfs, opt: impl Into<AddOpt>) -> Self {
        Self::with_either(Either::Left(ipfs.clone()), opt)
    }

    pub fn with_repo(repo: &Repo<DefaultStorage>, opt: impl Into<AddOpt>) -> Self {
        Self::with_either(Either::Right(repo.clone()), opt)
    }

    fn with_either(core: Either<Ipfs, Repo<DefaultStorage>>, opt: impl Into<AddOpt>) -> Self {
        let opt = opt.into();
        Self {
            core: Some(core),
            opt: Some(opt),
            span: Span::current(),
            chunk: Chunker::Size(256 * 1024),
            pin: true,
            provide: false,
            wrap: false,
            stream: None,
        }
    }

    pub fn span(mut self, span: Span) -> Self {
        self.span = span;
        self
    }

    pub fn chunk(mut self, chunk: Chunker) -> Self {
        self.chunk = chunk;
        self
    }

    pub fn pin(mut self, pin: bool) -> Self {
        self.pin = pin;
        self
    }

    pub fn provide(mut self) -> Self {
        self.provide = true;
        self
    }

    pub fn wrap(mut self) -> Self {
        self.wrap = true;
        self
    }
}

impl Stream for UnixfsAdd {
    type Item = UnixfsStatus;
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        if self.core.is_none() && self.stream.is_none() {
            return Poll::Ready(None);
        }
        loop {
            match &mut self.stream {
                None => {
                    let (ipfs, repo) = match self.core.take().expect("ipfs or repo is used") {
                        Either::Left(ipfs) => {
                            let repo = ipfs.repo().clone();
                            (Some(ipfs), repo)
                        }
                        Either::Right(repo) => (None, repo),
                    };
                    let option = self.opt.take().expect("option already constructed");
                    let chunk = self.chunk;
                    let pin = self.pin;
                    let provide = self.provide;
                    let wrap = self.wrap;

                    let stream = async_stream::stream! {
                        let _g = repo.gc_guard().await;

                        let mut written = 0;

                        let (name, total_size, mut stream) = match option {
                            #[cfg(not(target_arch = "wasm32"))]
                            AddOpt::File(path) => match tokio::fs::File::open(path.clone())
                                .and_then(|file| async move {
                                    let size = file.metadata().await?.len() as usize;

                                    let stream = ReaderStream::new(file);

                                    let name: Option<String> = path.file_name().map(|f| f.to_string_lossy().to_string());

                                    Ok((name, Some(size), stream.boxed()))
                                }).await {
                                    Ok(s) => s,
                                    Err(e) => {
                                        yield UnixfsStatus::FailedStatus { written, total_size: None, error: e.into() };
                                        return;
                                    }
                                },
                            AddOpt::Stream { name, total, stream } => (name, total, stream),
                        };

                        let mut adder = FileAdderBuilder::default()
                            .with_chunker(chunk)
                            .build();

                        yield UnixfsStatus::ProgressStatus { written, total_size };

                        while let Some(buffer) = stream.next().await {
                            let buffer = match buffer {
                                Ok(buf) => buf,
                                Err(e) => {
                                    yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
                                    return;
                                }
                            };

                            let mut total = 0;
                            while total < buffer.len() {
                                let (blocks, consumed) = adder.push(&buffer[total..]);
                                for (cid, block) in blocks {
                                    let block = match Block::new(cid, block) {
                                        Ok(block) => block,
                                        Err(e) => {
                                            yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
                                            return;
                                        }
                                    };
                                    let _cid = match repo.put_block(&block).await {
                                        Ok(cid) => cid,
                                        Err(e) => {
                                            yield UnixfsStatus::FailedStatus { written, total_size, error: e };
                                            return;
                                        }
                                    };
                                }
                                total += consumed;
                                written += consumed;
                            }

                            yield UnixfsStatus::ProgressStatus { written, total_size };
                        }

                        let blocks = adder.finish();
                        let mut last_cid = None;

                        for (cid, block) in blocks {
                            let block = match Block::new(cid, block) {
                                Ok(block) => block,
                                Err(e) => {
                                    yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
                                    return;
                                }
                            };
                            let _cid = match repo.put_block(&block).await {
                                Ok(cid) => cid,
                                Err(e) => {
                                    yield UnixfsStatus::FailedStatus { written, total_size, error: e };
                                    return;
                                }
                            };
                            last_cid = Some(cid);
                        }

                        let cid = match last_cid {
                            Some(cid) => cid,
                            None => {
                                yield UnixfsStatus::FailedStatus { written, total_size, error: TraversalFailed::Io(std::io::ErrorKind::InvalidData.into()).into() };
                                return;
                            }
                        };

                        let mut path = IpfsPath::from(cid);

                        if wrap {
                            if let Some(name) = name {
                                let result = {
                                    let repo = repo.clone();
                                    async move {
                                        let mut opts = rust_unixfs::dir::builder::TreeOptions::default();
                                        opts.wrap_with_directory();

                                        let mut tree = rust_unixfs::dir::builder::BufferingTreeBuilder::new(opts);
                                        tree.put_link(&name, cid, written as _)?;

                                        let mut iter = tree.build();
                                        let mut cids = Vec::new();

                                        while let Some(node) = iter.next_borrowed() {
                                            //TODO: Determine best course to prevent additional allocation
                                            let node = node?;
                                            let cid = node.cid.to_owned();
                                            let block = Block::new(cid, node.block.to_vec())?;

                                            repo.put_block(&block).await?;

                                            cids.push(cid);
                                        }
                                        let cid = cids.last().ok_or(anyhow::anyhow!("no cid available"))?;
                                        let path = IpfsPath::from(*cid).sub_path(&name)?;

                                        Ok::<_, anyhow::Error>(path)
                                    }
                                };

                                path = match result.await {
                                    Ok(path) => path,
                                    Err(e) => {
                                        yield UnixfsStatus::FailedStatus { written, total_size, error: e };
                                        return;
                                    }
                                };
                            }
                        }

                        let cid = path.root().cid().copied().expect("Cid is apart of the path");

                        if pin && !repo.is_pinned(&cid).await.unwrap_or_default() {
                            if let Err(e) = repo.pin(cid).recursive().await {
                                error!("Unable to pin {cid}: {e}");
                            }
                        }

                        if provide {
                            if let Some(ipfs) = ipfs {
                                if let Err(e) = ipfs.provide(cid).await {
                                    error!("Unable to provide {cid}: {e}");
                                }
                            }
                        }


                        yield UnixfsStatus::CompletedStatus { path, written, total_size }
                    };

                    self.stream = Some(stream.boxed());
                }
                Some(stream) => match futures::ready!(stream.poll_next_unpin(cx)) {
                    Some(item) => {
                        if matches!(
                            item,
                            UnixfsStatus::FailedStatus { .. }
                                | UnixfsStatus::CompletedStatus { .. }
                        ) {
                            self.stream.take();
                        }
                        return Poll::Ready(Some(item));
                    }
                    None => {
                        self.stream.take();
                        return Poll::Ready(None);
                    }
                },
            }
        }
    }
}

impl std::future::IntoFuture for UnixfsAdd {
    type Output = Result<IpfsPath, anyhow::Error>;

    type IntoFuture = BoxFuture<'static, Self::Output>;

    fn into_future(mut self) -> Self::IntoFuture {
        let span = self.span.clone();
        async move {
            while let Some(status) = self.next().await {
                match status {
                    UnixfsStatus::CompletedStatus { path, .. } => return Ok(path),
                    UnixfsStatus::FailedStatus { error, .. } => {
                        return Err(error);
                    }
                    _ => {}
                }
            }
            Err::<_, anyhow::Error>(anyhow::anyhow!("Unable to add file"))
        }
        .instrument(span)
        .boxed()
    }
}

impl FusedStream for UnixfsAdd {
    fn is_terminated(&self) -> bool {
        self.stream.is_none() && self.core.is_none()
    }
}