1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
use crate::{dag::IpldDag, repo::Repo, Block, Ipfs};
use async_stream::stream;
use either::Either;
use futures::future::BoxFuture;
use futures::stream::{BoxStream, Stream};
use futures::{FutureExt, StreamExt, TryStreamExt};
use libp2p::PeerId;
use rust_unixfs::file::visit::IdleFileVisit;
use std::ops::Range;
use std::{borrow::Borrow, time::Duration};
use super::TraversalFailed;
/// IPFS cat operation, producing a stream of file bytes. This is generic over the different kinds
/// of ways to own an `Ipfs` value in order to support both operating with borrowed `Ipfs` value
/// and an owned value. Passing an owned value allows the return value to be `'static`, which can
/// be helpful in some contexts, like the http.
///
/// Returns a stream of bytes on the file pointed with the Cid.
pub fn cat<'a>(
which: Either<&Ipfs, &Repo>,
starting_point: impl Into<StartingPoint> + Send + 'a,
range: Option<Range<u64>>,
providers: &'a [PeerId],
local_only: bool,
timeout: Option<Duration>,
) -> UnixfsCat<'a> {
let (repo, dag, session) = match which {
Either::Left(ipfs) => (
ipfs.repo().clone(),
ipfs.dag(),
Some(crate::BITSWAP_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst)),
),
Either::Right(repo) => {
let session = repo
.is_online()
.then_some(crate::BITSWAP_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst));
(repo.clone(), IpldDag::from(repo.clone()), session)
}
};
let mut visit = IdleFileVisit::default();
if let Some(range) = range {
visit = visit.with_target_range(range);
}
// using async_stream here at least to get on faster; writing custom streams is not too easy
// but this might be easy enough to write open.
let stream = stream! {
// Get the root block to start the traversal. The stream does not expose any of the file
// metadata. To get to it the user needs to create a Visitor over the first block.
let block = match starting_point.into() {
StartingPoint::Left(path) => match dag
.resolve_with_session(session, path.clone(), true, providers, local_only, timeout)
.await
.map_err(TraversalFailed::Resolving)
.and_then(|(resolved, _)| {
resolved.into_unixfs_block().map_err(TraversalFailed::Path)
}) {
Ok(block) => block,
Err(e) => {
yield Err(e);
return;
}
},
StartingPoint::Right(block) => block,
};
let mut cache = None;
// Start the visit from the root block. We need to move the both components as Options into the
// stream as we can't yet return them from this Future context.
let (visit, bytes) = match visit.start(block.data()) {
Ok((bytes, _, _, visit)) => {
let bytes = if !bytes.is_empty() {
Some(bytes.to_vec())
} else {
None
};
(visit, bytes)
}
Err(e) => {
yield Err(TraversalFailed::Walking(*block.cid(), e));
return;
}
};
if let Some(bytes) = bytes {
yield Ok(bytes);
}
let mut visit = match visit {
Some(visit) => visit,
None => return,
};
loop {
// TODO: if it was possible, it would make sense to start downloading N of these
// we could just create an FuturesUnordered which would drop the value right away. that
// would probably always cost many unnecessary clones, but it would be nice to "shut"
// the subscriber so that it will only resolve to a value but still keep the operation
// going. Not that we have any "operation" concept of the Want yet.
let (next, _) = visit.pending_links();
let borrow = repo.borrow();
let block = match borrow.get_block_with_session(session, next, providers, local_only, timeout).await {
Ok(block) => block,
Err(e) => {
yield Err(TraversalFailed::Loading(*next, e));
return;
},
};
match visit.continue_walk(block.data(), &mut cache) {
Ok((bytes, next_visit)) => {
if !bytes.is_empty() {
// TODO: manual implementation could allow returning just the slice
yield Ok(bytes.to_vec());
}
match next_visit {
Some(v) => visit = v,
None => return,
}
}
Err(e) => {
yield Err(TraversalFailed::Walking(*block.cid(), e));
return;
}
}
}
};
UnixfsCat {
stream: stream.boxed(),
}
}
/// The starting point for unixfs walks. Can be converted from IpfsPath and Blocks, and Cids can be
/// converted to IpfsPath.
pub enum StartingPoint {
Left(crate::IpfsPath),
Right(Block),
}
impl<T: Into<crate::IpfsPath>> From<T> for StartingPoint {
fn from(a: T) -> Self {
Self::Left(a.into())
}
}
impl From<Block> for StartingPoint {
fn from(b: Block) -> Self {
Self::Right(b)
}
}
pub struct UnixfsCat<'a> {
stream: BoxStream<'a, Result<Vec<u8>, TraversalFailed>>,
}
impl<'a> Stream for UnixfsCat<'a> {
type Item = Result<Vec<u8>, TraversalFailed>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx)
}
}
impl<'a> std::future::IntoFuture for UnixfsCat<'a> {
type Output = Result<Vec<u8>, TraversalFailed>;
type IntoFuture = BoxFuture<'a, Self::Output>;
fn into_future(mut self) -> Self::IntoFuture {
async move {
let mut data = vec![];
while let Some(bytes) = self.stream.try_next().await? {
data.extend(bytes);
}
Ok(data)
}
.boxed()
}
}