Skip to main content

rust_ipfs/unixfs/
cat.rs

1use crate::repo::DefaultStorage;
2use crate::{dag::IpldDag, repo::Repo, Block, Ipfs};
3use async_stream::try_stream;
4use bytes::Bytes;
5use connexa::prelude::PeerId;
6use either::Either;
7use futures::future::BoxFuture;
8use futures::stream::{BoxStream, FusedStream, Stream};
9use futures::{FutureExt, StreamExt, TryStreamExt};
10use rust_unixfs::file::visit::IdleFileVisit;
11use std::ops::Range;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14use std::time::Duration;
15use tracing::{Instrument, Span};
16
17use super::TraversalFailed;
18
19/// IPFS cat operation, producing a stream of file bytes. This is generic over the different kinds
20/// of ways to own an `Ipfs` value in order to support both operating with borrowed `Ipfs` value
21/// and an owned value. Passing an owned value allows the return value to be `'static`, which can
22/// be helpful in some contexts, like the http.
23///
24/// Returns a stream of bytes on the file pointed with the Cid.
25#[must_use = "does nothing unless you `.await` or poll the stream"]
26pub struct UnixfsCat {
27    core: Option<Either<Ipfs, Repo<DefaultStorage>>>,
28    span: Span,
29    length: Option<usize>,
30    starting_point: Option<StartingPoint>,
31    range: Option<Range<u64>>,
32    providers: Vec<PeerId>,
33    local_only: bool,
34    timeout: Option<Duration>,
35    stream: Option<BoxStream<'static, Result<Bytes, TraversalFailed>>>,
36}
37
38impl UnixfsCat {
39    pub fn with_ipfs(ipfs: &Ipfs, starting_point: impl Into<StartingPoint>) -> Self {
40        Self::with_either(Either::Left(ipfs.clone()), starting_point)
41    }
42
43    pub fn with_repo(
44        repo: &Repo<DefaultStorage>,
45        starting_point: impl Into<StartingPoint>,
46    ) -> Self {
47        Self::with_either(Either::Right(repo.clone()), starting_point)
48    }
49
50    fn with_either(
51        core: Either<Ipfs, Repo<DefaultStorage>>,
52        starting_point: impl Into<StartingPoint>,
53    ) -> Self {
54        let starting_point = starting_point.into();
55        Self {
56            core: Some(core),
57            starting_point: Some(starting_point),
58            span: Span::current(),
59            range: None,
60            length: None,
61            providers: Vec::new(),
62            local_only: false,
63            timeout: None,
64            stream: None,
65        }
66    }
67
68    pub fn span(mut self, span: Span) -> Self {
69        self.span = span;
70        self
71    }
72
73    pub fn provider(mut self, peer_id: PeerId) -> Self {
74        if !self.providers.contains(&peer_id) {
75            self.providers.push(peer_id);
76        }
77        self
78    }
79
80    pub fn max_length(mut self, length: usize) -> Self {
81        self.length = Some(length);
82        self
83    }
84
85    pub fn set_max_length(mut self, length: impl Into<Option<usize>>) -> Self {
86        self.length = length.into();
87        self
88    }
89
90    pub fn providers(mut self, list: &[PeerId]) -> Self {
91        self.providers = list.to_vec();
92        self
93    }
94
95    pub fn timeout(mut self, timeout: Duration) -> Self {
96        self.timeout = Some(timeout);
97        self
98    }
99
100    pub fn local(mut self) -> Self {
101        self.local_only = true;
102        self
103    }
104
105    pub fn set_local(mut self, local: bool) -> Self {
106        self.local_only = local;
107        self
108    }
109}
110
111/// The starting point for unixfs walks. Can be converted from IpfsPath and Blocks, and Cids can be
112/// converted to IpfsPath.
113pub enum StartingPoint {
114    Left(crate::IpfsPath),
115    Right(Block),
116}
117
118impl<T: Into<crate::IpfsPath>> From<T> for StartingPoint {
119    fn from(a: T) -> Self {
120        Self::Left(a.into())
121    }
122}
123
124impl From<Block> for StartingPoint {
125    fn from(b: Block) -> Self {
126        Self::Right(b)
127    }
128}
129
130impl Stream for UnixfsCat {
131    type Item = Result<Bytes, TraversalFailed>;
132    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
133        if self.core.is_none() && self.stream.is_none() {
134            return Poll::Ready(None);
135        }
136        loop {
137            match &mut self.stream {
138                Some(stream) => match futures::ready!(stream.poll_next_unpin(cx)) {
139                    None => {
140                        self.stream.take();
141                        return Poll::Ready(None);
142                    }
143                    task => return Poll::Ready(task),
144                },
145                None => {
146                    let Some(core) = self.core.take() else {
147                        return Poll::Ready(None);
148                    };
149
150                    let (repo, dag) = match core {
151                        Either::Left(ipfs) => (ipfs.repo().clone(), ipfs.dag()),
152                        Either::Right(repo) => (repo.clone(), IpldDag::from(repo.clone())),
153                    };
154
155                    let mut visit = IdleFileVisit::default();
156
157                    if let Some(range) = self.range.clone() {
158                        visit = visit.with_target_range(range);
159                    }
160
161                    let starting_point = self.starting_point.take().expect("starting point exist");
162                    let providers = std::mem::take(&mut self.providers);
163                    let local_only = self.local_only;
164                    let timeout = self.timeout;
165
166                    let length = self.length;
167
168                    // using async_stream here at least to get on faster; writing custom streams is not too easy
169                    // but this might be easy enough to write open.
170                    let stream = try_stream! {
171
172                        // Get the root block to start the traversal. The stream does not expose any of the file
173                        // metadata. To get to it the user needs to create a Visitor over the first block.
174                        let block = match starting_point {
175                            StartingPoint::Left(path) => dag
176                                ._resolve(path.clone(), true, &providers, local_only, timeout)
177                                .await
178                                .map_err(TraversalFailed::Resolving)
179                                .and_then(|(resolved, _)| {
180                                    resolved.into_unixfs_block().map_err(TraversalFailed::Path)
181                                })?,
182                            StartingPoint::Right(block) => block,
183                        };
184
185                        let mut cache = None;
186                        let mut size = 0;
187
188                        // Start the visit from the root block. We need to move the both components as Options into the
189                        // stream as we can't yet return them from this Future context.
190                        let (visit, bytes) = visit.start(block.data()).map(|(bytes, _, _, visit)| {
191                            let bytes = (!bytes.is_empty()).then(|| Bytes::copy_from_slice(bytes));
192                            (visit, bytes)
193                        }).map_err(|e| {
194                            TraversalFailed::Walking(*block.cid(), e)
195                        }).and_then(|(visit, bytes)| {
196                            if let Some(bytes) = &bytes {
197                                size += bytes.len();
198                                if let Some(length) = length {
199                                    if size > length {
200                                        return Err::<_, TraversalFailed>(TraversalFailed::MaxLengthExceeded { size, length });
201                                    }
202                                }
203                            }
204                            Ok::<_, TraversalFailed>((visit, bytes))
205                        })?;
206
207
208                        if let Some(bytes) = bytes {
209                            yield bytes;
210                        }
211
212                        let mut visit = match visit {
213                            Some(visit) => visit,
214                            None => return,
215                        };
216
217                        loop {
218                            // TODO: if it was possible, it would make sense to start downloading N of these
219                            // we could just create an FuturesUnordered which would drop the value right away. that
220                            // would probably always cost many unnecessary clones, but it would be nice to "shut"
221                            // the subscriber so that it will only resolve to a value but still keep the operation
222                            // going. Not that we have any "operation" concept of the Want yet.
223                            let (next, _) = visit.pending_links();
224
225                            let borrow = &repo;
226                            let block = borrow.get_block(next).providers(&providers).set_local(local_only).timeout(timeout).await.map_err(|e| TraversalFailed::Loading(*next, e))?;
227
228                            let (bytes, next_visit) = visit.continue_walk(block.data(), &mut cache).map_err(|e| TraversalFailed::Walking(*block.cid(), e))?;
229
230                            size += bytes.len();
231
232                            if let Some(length) = length {
233                                if size > length {
234                                    let fn_err = || Err::<_, TraversalFailed>(TraversalFailed::MaxLengthExceeded { size, length });
235                                    fn_err()?;
236                                    return;
237                                }
238                            }
239
240                            if !bytes.is_empty() {
241                                yield Bytes::copy_from_slice(bytes);
242                            }
243
244                            match next_visit {
245                                Some(v) => visit = v,
246                                None => return,
247                            }
248
249                        }
250                    }.boxed();
251
252                    self.stream.replace(stream);
253                }
254            }
255        }
256    }
257}
258
259impl std::future::IntoFuture for UnixfsCat {
260    type Output = Result<Bytes, TraversalFailed>;
261
262    type IntoFuture = BoxFuture<'static, Self::Output>;
263
264    fn into_future(mut self) -> Self::IntoFuture {
265        let span = self.span.clone();
266        async move {
267            let mut data = vec![];
268            while let Some(bytes) = self.try_next().await? {
269                data.extend(bytes);
270            }
271            Ok(data.into())
272        }
273        .instrument(span)
274        .boxed()
275    }
276}
277
278impl FusedStream for UnixfsCat {
279    fn is_terminated(&self) -> bool {
280        self.stream.is_none() && self.core.is_none()
281    }
282}