use std::time::Duration;
use either::Either;
use futures::{future::BoxFuture, stream::BoxStream, FutureExt, Stream, StreamExt};
use libipld::Cid;
use libp2p::PeerId;
use rust_unixfs::walk::{ContinuedWalk, Walker};
use crate::{dag::IpldDag, repo::Repo, Ipfs, IpfsPath};
#[derive(Debug)]
pub enum NodeItem {
Error { error: anyhow::Error },
RootDirectory { cid: Cid, path: String },
Directory { cid: Cid, path: String },
File { cid: Cid, file: String, size: usize },
}
pub fn ls<'a>(
which: Either<&Ipfs, &Repo>,
path: IpfsPath,
providers: &'a [PeerId],
local_only: bool,
timeout: Option<Duration>,
) -> UnixfsLs<'a> {
let (repo, dag, session) = match which {
Either::Left(ipfs) => (
ipfs.repo().clone(),
ipfs.dag(),
Some(crate::BITSWAP_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst)),
),
Either::Right(repo) => {
let session = repo
.is_online()
.then_some(crate::BITSWAP_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst));
(repo.clone(), IpldDag::from(repo.clone()), session)
}
};
let stream = async_stream::stream! {
let resolved = match dag
.resolve_with_session(session, path.clone(), true, providers, local_only, timeout)
.await {
Ok((resolved, _)) => resolved,
Err(e) => {
yield NodeItem::Error { error: e.into() };
return;
}
};
let block = match resolved.into_unixfs_block() {
Ok(block) => block,
Err(e) => {
yield NodeItem::Error { error: e.into() };
return;
}
};
let cid = block.cid();
let root_name = cid.to_string();
let mut walker = Walker::new(*cid, root_name);
let mut cache = None;
let mut root_directory = String::new();
while walker.should_continue() {
let (next, _) = walker.pending_links();
let block = match repo.get_block_with_session(session, next, providers, local_only, timeout).await {
Ok(block) => block,
Err(error) => {
yield NodeItem::Error { error };
return;
}
};
let block_data = block.data();
match walker.next(block_data, &mut cache) {
Ok(ContinuedWalk::Bucket(..)) => {}
Ok(ContinuedWalk::File(_, cid, path, _, size)) => {
let file = path.to_string_lossy().to_string().replace(&format!("{root_directory}/"), "");
yield NodeItem::File { cid: *cid, file, size: size as _ };
},
Ok(ContinuedWalk::RootDirectory( cid, path, _)) => {
let path = path.to_string_lossy().to_string();
root_directory = path.clone();
yield NodeItem::RootDirectory { cid: *cid, path };
}
Ok(ContinuedWalk::Directory( cid, path, _)) => {
let path = path.to_string_lossy().to_string().replace(&format!("{root_directory}/"), "");
yield NodeItem::Directory { cid: *cid, path };
}
Ok(ContinuedWalk::Symlink( .. )) => {},
Err(error) => {
yield NodeItem::Error { error: anyhow::anyhow!("{error}") };
return;
}
};
};
};
UnixfsLs {
stream: stream.boxed(),
}
}
pub struct UnixfsLs<'a> {
stream: BoxStream<'a, NodeItem>,
}
impl<'a> Stream for UnixfsLs<'a> {
type Item = NodeItem;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx)
}
}
impl<'a> std::future::IntoFuture for UnixfsLs<'a> {
type Output = Result<Vec<NodeItem>, anyhow::Error>;
type IntoFuture = BoxFuture<'a, Self::Output>;
fn into_future(mut self) -> Self::IntoFuture {
async move {
let mut items = vec![];
while let Some(status) = self.stream.next().await {
match status {
NodeItem::Error { error } => return Err(error),
item => items.push(item),
}
}
Ok(items)
}
.boxed()
}
}