rust_ipfs/unixfs/
cat.rs

1use crate::{dag::IpldDag, repo::Repo, Block, Ipfs};
2use async_stream::try_stream;
3use bytes::Bytes;
4use either::Either;
5use futures::future::BoxFuture;
6use futures::stream::{BoxStream, FusedStream, Stream};
7use futures::{FutureExt, StreamExt, TryStreamExt};
8use libp2p::PeerId;
9use rust_unixfs::file::visit::IdleFileVisit;
10use std::ops::Range;
11use std::task::Poll;
12use std::time::Duration;
13use tracing::{Instrument, Span};
14
15use super::TraversalFailed;
16
17/// IPFS cat operation, producing a stream of file bytes. This is generic over the different kinds
18/// of ways to own an `Ipfs` value in order to support both operating with borrowed `Ipfs` value
19/// and an owned value. Passing an owned value allows the return value to be `'static`, which can
20/// be helpful in some contexts, like the http.
21///
22/// Returns a stream of bytes on the file pointed with the Cid.
23#[must_use = "does nothing unless you `.await` or poll the stream"]
24pub struct UnixfsCat {
25    core: Option<Either<Ipfs, Repo>>,
26    span: Span,
27    length: Option<usize>,
28    starting_point: Option<StartingPoint>,
29    range: Option<Range<u64>>,
30    providers: Vec<PeerId>,
31    local_only: bool,
32    timeout: Option<Duration>,
33    stream: Option<BoxStream<'static, Result<Bytes, TraversalFailed>>>,
34}
35
36impl UnixfsCat {
37    pub fn with_ipfs(ipfs: &Ipfs, starting_point: impl Into<StartingPoint>) -> Self {
38        Self::with_either(Either::Left(ipfs.clone()), starting_point)
39    }
40
41    pub fn with_repo(repo: &Repo, starting_point: impl Into<StartingPoint>) -> Self {
42        Self::with_either(Either::Right(repo.clone()), starting_point)
43    }
44
45    fn with_either(core: Either<Ipfs, Repo>, starting_point: impl Into<StartingPoint>) -> Self {
46        let starting_point = starting_point.into();
47        Self {
48            core: Some(core),
49            starting_point: Some(starting_point),
50            span: Span::current(),
51            range: None,
52            length: None,
53            providers: Vec::new(),
54            local_only: false,
55            timeout: None,
56            stream: None,
57        }
58    }
59
60    pub fn span(mut self, span: Span) -> Self {
61        self.span = span;
62        self
63    }
64
65    pub fn provider(mut self, peer_id: PeerId) -> Self {
66        if !self.providers.contains(&peer_id) {
67            self.providers.push(peer_id);
68        }
69        self
70    }
71
72    pub fn max_length(mut self, length: usize) -> Self {
73        self.length = Some(length);
74        self
75    }
76
77    pub fn set_max_length(mut self, length: impl Into<Option<usize>>) -> Self {
78        self.length = length.into();
79        self
80    }
81
82    pub fn providers(mut self, list: &[PeerId]) -> Self {
83        self.providers = list.to_vec();
84        self
85    }
86
87    pub fn timeout(mut self, timeout: Duration) -> Self {
88        self.timeout = Some(timeout);
89        self
90    }
91
92    pub fn local(mut self) -> Self {
93        self.local_only = true;
94        self
95    }
96
97    pub fn set_local(mut self, local: bool) -> Self {
98        self.local_only = local;
99        self
100    }
101}
102
103/// The starting point for unixfs walks. Can be converted from IpfsPath and Blocks, and Cids can be
104/// converted to IpfsPath.
105pub enum StartingPoint {
106    Left(crate::IpfsPath),
107    Right(Block),
108}
109
110impl<T: Into<crate::IpfsPath>> From<T> for StartingPoint {
111    fn from(a: T) -> Self {
112        Self::Left(a.into())
113    }
114}
115
116impl From<Block> for StartingPoint {
117    fn from(b: Block) -> Self {
118        Self::Right(b)
119    }
120}
121
122impl Stream for UnixfsCat {
123    type Item = Result<Bytes, TraversalFailed>;
124    fn poll_next(
125        mut self: std::pin::Pin<&mut Self>,
126        cx: &mut std::task::Context<'_>,
127    ) -> Poll<Option<Self::Item>> {
128        if self.core.is_none() && self.stream.is_none() {
129            return Poll::Ready(None);
130        }
131        loop {
132            match &mut self.stream {
133                Some(stream) => match futures::ready!(stream.poll_next_unpin(cx)) {
134                    None => {
135                        self.stream.take();
136                        return Poll::Ready(None);
137                    }
138                    task => return Poll::Ready(task),
139                },
140                None => {
141                    let Some(core) = self.core.take() else {
142                        return Poll::Ready(None);
143                    };
144
145                    let (repo, dag) = match core {
146                        Either::Left(ipfs) => (ipfs.repo().clone(), ipfs.dag()),
147                        Either::Right(repo) => (repo.clone(), IpldDag::from(repo.clone())),
148                    };
149
150                    let mut visit = IdleFileVisit::default();
151
152                    if let Some(range) = self.range.clone() {
153                        visit = visit.with_target_range(range);
154                    }
155
156                    let starting_point = self.starting_point.take().expect("starting point exist");
157                    let providers = std::mem::take(&mut self.providers);
158                    let local_only = self.local_only;
159                    let timeout = self.timeout;
160
161                    let length = self.length;
162
163                    // using async_stream here at least to get on faster; writing custom streams is not too easy
164                    // but this might be easy enough to write open.
165                    let stream = try_stream! {
166
167                        // Get the root block to start the traversal. The stream does not expose any of the file
168                        // metadata. To get to it the user needs to create a Visitor over the first block.
169                        let block = match starting_point {
170                            StartingPoint::Left(path) => dag
171                                ._resolve(path.clone(), true, &providers, local_only, timeout)
172                                .await
173                                .map_err(TraversalFailed::Resolving)
174                                .and_then(|(resolved, _)| {
175                                    resolved.into_unixfs_block().map_err(TraversalFailed::Path)
176                                })?,
177                            StartingPoint::Right(block) => block,
178                        };
179
180                        let mut cache = None;
181                        let mut size = 0;
182
183                        // Start the visit from the root block. We need to move the both components as Options into the
184                        // stream as we can't yet return them from this Future context.
185                        let (visit, bytes) = visit.start(block.data()).map(|(bytes, _, _, visit)| {
186                            let bytes = (!bytes.is_empty()).then(|| Bytes::copy_from_slice(bytes));
187                            (visit, bytes)
188                        }).map_err(|e| {
189                            TraversalFailed::Walking(*block.cid(), e)
190                        }).and_then(|(visit, bytes)| {
191                            if let Some(bytes) = &bytes {
192                                size += bytes.len();
193                                if let Some(length) = length {
194                                    if size > length {
195                                        return Err::<_, TraversalFailed>(TraversalFailed::MaxLengthExceeded { size, length });
196                                    }
197                                }
198                            }
199                            Ok::<_, TraversalFailed>((visit, bytes))
200                        })?;
201
202
203                        if let Some(bytes) = bytes {
204                            yield bytes;
205                        }
206
207                        let mut visit = match visit {
208                            Some(visit) => visit,
209                            None => return,
210                        };
211
212                        loop {
213                            // TODO: if it was possible, it would make sense to start downloading N of these
214                            // we could just create an FuturesUnordered which would drop the value right away. that
215                            // would probably always cost many unnecessary clones, but it would be nice to "shut"
216                            // the subscriber so that it will only resolve to a value but still keep the operation
217                            // going. Not that we have any "operation" concept of the Want yet.
218                            let (next, _) = visit.pending_links();
219
220                            let borrow = &repo;
221                            let block = borrow.get_block(next).providers(&providers).set_local(local_only).timeout(timeout).await.map_err(|e| TraversalFailed::Loading(*next, e))?;
222
223                            let (bytes, next_visit) = visit.continue_walk(block.data(), &mut cache).map_err(|e| TraversalFailed::Walking(*block.cid(), e))?;
224
225                            size += bytes.len();
226
227                            if let Some(length) = length {
228                                if size > length {
229                                    let fn_err = || Err::<_, TraversalFailed>(TraversalFailed::MaxLengthExceeded { size, length });
230                                    fn_err()?;
231                                    return;
232                                }
233                            }
234
235                            if !bytes.is_empty() {
236                                yield Bytes::copy_from_slice(bytes);
237                            }
238
239                            match next_visit {
240                                Some(v) => visit = v,
241                                None => return,
242                            }
243
244                        }
245                    }.boxed();
246
247                    self.stream.replace(stream);
248                }
249            }
250        }
251    }
252}
253
254impl std::future::IntoFuture for UnixfsCat {
255    type Output = Result<Bytes, TraversalFailed>;
256
257    type IntoFuture = BoxFuture<'static, Self::Output>;
258
259    fn into_future(mut self) -> Self::IntoFuture {
260        let span = self.span.clone();
261        async move {
262            let mut data = vec![];
263            while let Some(bytes) = self.try_next().await? {
264                data.extend(bytes);
265            }
266            Ok(data.into())
267        }
268        .instrument(span)
269        .boxed()
270    }
271}
272
273impl FusedStream for UnixfsCat {
274    fn is_terminated(&self) -> bool {
275        self.stream.is_none() && self.core.is_none()
276    }
277}