1use crate::{dag::IpldDag, repo::Repo, Block, Ipfs};
2use async_stream::try_stream;
3use bytes::Bytes;
4use either::Either;
5use futures::future::BoxFuture;
6use futures::stream::{BoxStream, FusedStream, Stream};
7use futures::{FutureExt, StreamExt, TryStreamExt};
8use libp2p::PeerId;
9use rust_unixfs::file::visit::IdleFileVisit;
10use std::ops::Range;
11use std::task::Poll;
12use std::time::Duration;
13use tracing::{Instrument, Span};
14
15use super::TraversalFailed;
16
17#[must_use = "does nothing unless you `.await` or poll the stream"]
24pub struct UnixfsCat {
25 core: Option<Either<Ipfs, Repo>>,
26 span: Span,
27 length: Option<usize>,
28 starting_point: Option<StartingPoint>,
29 range: Option<Range<u64>>,
30 providers: Vec<PeerId>,
31 local_only: bool,
32 timeout: Option<Duration>,
33 stream: Option<BoxStream<'static, Result<Bytes, TraversalFailed>>>,
34}
35
36impl UnixfsCat {
37 pub fn with_ipfs(ipfs: &Ipfs, starting_point: impl Into<StartingPoint>) -> Self {
38 Self::with_either(Either::Left(ipfs.clone()), starting_point)
39 }
40
41 pub fn with_repo(repo: &Repo, starting_point: impl Into<StartingPoint>) -> Self {
42 Self::with_either(Either::Right(repo.clone()), starting_point)
43 }
44
45 fn with_either(core: Either<Ipfs, Repo>, starting_point: impl Into<StartingPoint>) -> Self {
46 let starting_point = starting_point.into();
47 Self {
48 core: Some(core),
49 starting_point: Some(starting_point),
50 span: Span::current(),
51 range: None,
52 length: None,
53 providers: Vec::new(),
54 local_only: false,
55 timeout: None,
56 stream: None,
57 }
58 }
59
60 pub fn span(mut self, span: Span) -> Self {
61 self.span = span;
62 self
63 }
64
65 pub fn provider(mut self, peer_id: PeerId) -> Self {
66 if !self.providers.contains(&peer_id) {
67 self.providers.push(peer_id);
68 }
69 self
70 }
71
72 pub fn max_length(mut self, length: usize) -> Self {
73 self.length = Some(length);
74 self
75 }
76
77 pub fn set_max_length(mut self, length: impl Into<Option<usize>>) -> Self {
78 self.length = length.into();
79 self
80 }
81
82 pub fn providers(mut self, list: &[PeerId]) -> Self {
83 self.providers = list.to_vec();
84 self
85 }
86
87 pub fn timeout(mut self, timeout: Duration) -> Self {
88 self.timeout = Some(timeout);
89 self
90 }
91
92 pub fn local(mut self) -> Self {
93 self.local_only = true;
94 self
95 }
96
97 pub fn set_local(mut self, local: bool) -> Self {
98 self.local_only = local;
99 self
100 }
101}
102
103pub enum StartingPoint {
106 Left(crate::IpfsPath),
107 Right(Block),
108}
109
110impl<T: Into<crate::IpfsPath>> From<T> for StartingPoint {
111 fn from(a: T) -> Self {
112 Self::Left(a.into())
113 }
114}
115
116impl From<Block> for StartingPoint {
117 fn from(b: Block) -> Self {
118 Self::Right(b)
119 }
120}
121
122impl Stream for UnixfsCat {
123 type Item = Result<Bytes, TraversalFailed>;
124 fn poll_next(
125 mut self: std::pin::Pin<&mut Self>,
126 cx: &mut std::task::Context<'_>,
127 ) -> Poll<Option<Self::Item>> {
128 if self.core.is_none() && self.stream.is_none() {
129 return Poll::Ready(None);
130 }
131 loop {
132 match &mut self.stream {
133 Some(stream) => match futures::ready!(stream.poll_next_unpin(cx)) {
134 None => {
135 self.stream.take();
136 return Poll::Ready(None);
137 }
138 task => return Poll::Ready(task),
139 },
140 None => {
141 let Some(core) = self.core.take() else {
142 return Poll::Ready(None);
143 };
144
145 let (repo, dag) = match core {
146 Either::Left(ipfs) => (ipfs.repo().clone(), ipfs.dag()),
147 Either::Right(repo) => (repo.clone(), IpldDag::from(repo.clone())),
148 };
149
150 let mut visit = IdleFileVisit::default();
151
152 if let Some(range) = self.range.clone() {
153 visit = visit.with_target_range(range);
154 }
155
156 let starting_point = self.starting_point.take().expect("starting point exist");
157 let providers = std::mem::take(&mut self.providers);
158 let local_only = self.local_only;
159 let timeout = self.timeout;
160
161 let length = self.length;
162
163 let stream = try_stream! {
166
167 let block = match starting_point {
170 StartingPoint::Left(path) => dag
171 ._resolve(path.clone(), true, &providers, local_only, timeout)
172 .await
173 .map_err(TraversalFailed::Resolving)
174 .and_then(|(resolved, _)| {
175 resolved.into_unixfs_block().map_err(TraversalFailed::Path)
176 })?,
177 StartingPoint::Right(block) => block,
178 };
179
180 let mut cache = None;
181 let mut size = 0;
182
183 let (visit, bytes) = visit.start(block.data()).map(|(bytes, _, _, visit)| {
186 let bytes = (!bytes.is_empty()).then(|| Bytes::copy_from_slice(bytes));
187 (visit, bytes)
188 }).map_err(|e| {
189 TraversalFailed::Walking(*block.cid(), e)
190 }).and_then(|(visit, bytes)| {
191 if let Some(bytes) = &bytes {
192 size += bytes.len();
193 if let Some(length) = length {
194 if size > length {
195 return Err::<_, TraversalFailed>(TraversalFailed::MaxLengthExceeded { size, length });
196 }
197 }
198 }
199 Ok::<_, TraversalFailed>((visit, bytes))
200 })?;
201
202
203 if let Some(bytes) = bytes {
204 yield bytes;
205 }
206
207 let mut visit = match visit {
208 Some(visit) => visit,
209 None => return,
210 };
211
212 loop {
213 let (next, _) = visit.pending_links();
219
220 let borrow = &repo;
221 let block = borrow.get_block(next).providers(&providers).set_local(local_only).timeout(timeout).await.map_err(|e| TraversalFailed::Loading(*next, e))?;
222
223 let (bytes, next_visit) = visit.continue_walk(block.data(), &mut cache).map_err(|e| TraversalFailed::Walking(*block.cid(), e))?;
224
225 size += bytes.len();
226
227 if let Some(length) = length {
228 if size > length {
229 let fn_err = || Err::<_, TraversalFailed>(TraversalFailed::MaxLengthExceeded { size, length });
230 fn_err()?;
231 return;
232 }
233 }
234
235 if !bytes.is_empty() {
236 yield Bytes::copy_from_slice(bytes);
237 }
238
239 match next_visit {
240 Some(v) => visit = v,
241 None => return,
242 }
243
244 }
245 }.boxed();
246
247 self.stream.replace(stream);
248 }
249 }
250 }
251 }
252}
253
254impl std::future::IntoFuture for UnixfsCat {
255 type Output = Result<Bytes, TraversalFailed>;
256
257 type IntoFuture = BoxFuture<'static, Self::Output>;
258
259 fn into_future(mut self) -> Self::IntoFuture {
260 let span = self.span.clone();
261 async move {
262 let mut data = vec![];
263 while let Some(bytes) = self.try_next().await? {
264 data.extend(bytes);
265 }
266 Ok(data.into())
267 }
268 .instrument(span)
269 .boxed()
270 }
271}
272
273impl FusedStream for UnixfsCat {
274 fn is_terminated(&self) -> bool {
275 self.stream.is_none() && self.core.is_none()
276 }
277}