1use crate::repo::DefaultStorage;
2use crate::{dag::IpldDag, repo::Repo, Block, Ipfs};
3use async_stream::try_stream;
4use bytes::Bytes;
5use connexa::prelude::PeerId;
6use either::Either;
7use futures::future::BoxFuture;
8use futures::stream::{BoxStream, FusedStream, Stream};
9use futures::{FutureExt, StreamExt, TryStreamExt};
10use rust_unixfs::file::visit::IdleFileVisit;
11use std::ops::Range;
12use std::pin::Pin;
13use std::task::{Context, Poll};
14use std::time::Duration;
15use tracing::{Instrument, Span};
16
17use super::TraversalFailed;
18
19#[must_use = "does nothing unless you `.await` or poll the stream"]
26pub struct UnixfsCat {
27 core: Option<Either<Ipfs, Repo<DefaultStorage>>>,
28 span: Span,
29 length: Option<usize>,
30 starting_point: Option<StartingPoint>,
31 range: Option<Range<u64>>,
32 providers: Vec<PeerId>,
33 local_only: bool,
34 timeout: Option<Duration>,
35 stream: Option<BoxStream<'static, Result<Bytes, TraversalFailed>>>,
36}
37
38impl UnixfsCat {
39 pub fn with_ipfs(ipfs: &Ipfs, starting_point: impl Into<StartingPoint>) -> Self {
40 Self::with_either(Either::Left(ipfs.clone()), starting_point)
41 }
42
43 pub fn with_repo(
44 repo: &Repo<DefaultStorage>,
45 starting_point: impl Into<StartingPoint>,
46 ) -> Self {
47 Self::with_either(Either::Right(repo.clone()), starting_point)
48 }
49
50 fn with_either(
51 core: Either<Ipfs, Repo<DefaultStorage>>,
52 starting_point: impl Into<StartingPoint>,
53 ) -> Self {
54 let starting_point = starting_point.into();
55 Self {
56 core: Some(core),
57 starting_point: Some(starting_point),
58 span: Span::current(),
59 range: None,
60 length: None,
61 providers: Vec::new(),
62 local_only: false,
63 timeout: None,
64 stream: None,
65 }
66 }
67
68 pub fn span(mut self, span: Span) -> Self {
69 self.span = span;
70 self
71 }
72
73 pub fn provider(mut self, peer_id: PeerId) -> Self {
74 if !self.providers.contains(&peer_id) {
75 self.providers.push(peer_id);
76 }
77 self
78 }
79
80 pub fn max_length(mut self, length: usize) -> Self {
81 self.length = Some(length);
82 self
83 }
84
85 pub fn set_max_length(mut self, length: impl Into<Option<usize>>) -> Self {
86 self.length = length.into();
87 self
88 }
89
90 pub fn providers(mut self, list: &[PeerId]) -> Self {
91 self.providers = list.to_vec();
92 self
93 }
94
95 pub fn timeout(mut self, timeout: Duration) -> Self {
96 self.timeout = Some(timeout);
97 self
98 }
99
100 pub fn local(mut self) -> Self {
101 self.local_only = true;
102 self
103 }
104
105 pub fn set_local(mut self, local: bool) -> Self {
106 self.local_only = local;
107 self
108 }
109}
110
111pub enum StartingPoint {
114 Left(crate::IpfsPath),
115 Right(Block),
116}
117
118impl<T: Into<crate::IpfsPath>> From<T> for StartingPoint {
119 fn from(a: T) -> Self {
120 Self::Left(a.into())
121 }
122}
123
124impl From<Block> for StartingPoint {
125 fn from(b: Block) -> Self {
126 Self::Right(b)
127 }
128}
129
130impl Stream for UnixfsCat {
131 type Item = Result<Bytes, TraversalFailed>;
132 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
133 if self.core.is_none() && self.stream.is_none() {
134 return Poll::Ready(None);
135 }
136 loop {
137 match &mut self.stream {
138 Some(stream) => match futures::ready!(stream.poll_next_unpin(cx)) {
139 None => {
140 self.stream.take();
141 return Poll::Ready(None);
142 }
143 task => return Poll::Ready(task),
144 },
145 None => {
146 let Some(core) = self.core.take() else {
147 return Poll::Ready(None);
148 };
149
150 let (repo, dag) = match core {
151 Either::Left(ipfs) => (ipfs.repo().clone(), ipfs.dag()),
152 Either::Right(repo) => (repo.clone(), IpldDag::from(repo.clone())),
153 };
154
155 let mut visit = IdleFileVisit::default();
156
157 if let Some(range) = self.range.clone() {
158 visit = visit.with_target_range(range);
159 }
160
161 let starting_point = self.starting_point.take().expect("starting point exist");
162 let providers = std::mem::take(&mut self.providers);
163 let local_only = self.local_only;
164 let timeout = self.timeout;
165
166 let length = self.length;
167
168 let stream = try_stream! {
171
172 let block = match starting_point {
175 StartingPoint::Left(path) => dag
176 ._resolve(path.clone(), true, &providers, local_only, timeout)
177 .await
178 .map_err(TraversalFailed::Resolving)
179 .and_then(|(resolved, _)| {
180 resolved.into_unixfs_block().map_err(TraversalFailed::Path)
181 })?,
182 StartingPoint::Right(block) => block,
183 };
184
185 let mut cache = None;
186 let mut size = 0;
187
188 let (visit, bytes) = visit.start(block.data()).map(|(bytes, _, _, visit)| {
191 let bytes = (!bytes.is_empty()).then(|| Bytes::copy_from_slice(bytes));
192 (visit, bytes)
193 }).map_err(|e| {
194 TraversalFailed::Walking(*block.cid(), e)
195 }).and_then(|(visit, bytes)| {
196 if let Some(bytes) = &bytes {
197 size += bytes.len();
198 if let Some(length) = length {
199 if size > length {
200 return Err::<_, TraversalFailed>(TraversalFailed::MaxLengthExceeded { size, length });
201 }
202 }
203 }
204 Ok::<_, TraversalFailed>((visit, bytes))
205 })?;
206
207
208 if let Some(bytes) = bytes {
209 yield bytes;
210 }
211
212 let mut visit = match visit {
213 Some(visit) => visit,
214 None => return,
215 };
216
217 loop {
218 let (next, _) = visit.pending_links();
224
225 let borrow = &repo;
226 let block = borrow.get_block(next).providers(&providers).set_local(local_only).timeout(timeout).await.map_err(|e| TraversalFailed::Loading(*next, e))?;
227
228 let (bytes, next_visit) = visit.continue_walk(block.data(), &mut cache).map_err(|e| TraversalFailed::Walking(*block.cid(), e))?;
229
230 size += bytes.len();
231
232 if let Some(length) = length {
233 if size > length {
234 let fn_err = || Err::<_, TraversalFailed>(TraversalFailed::MaxLengthExceeded { size, length });
235 fn_err()?;
236 return;
237 }
238 }
239
240 if !bytes.is_empty() {
241 yield Bytes::copy_from_slice(bytes);
242 }
243
244 match next_visit {
245 Some(v) => visit = v,
246 None => return,
247 }
248
249 }
250 }.boxed();
251
252 self.stream.replace(stream);
253 }
254 }
255 }
256 }
257}
258
259impl std::future::IntoFuture for UnixfsCat {
260 type Output = Result<Bytes, TraversalFailed>;
261
262 type IntoFuture = BoxFuture<'static, Self::Output>;
263
264 fn into_future(mut self) -> Self::IntoFuture {
265 let span = self.span.clone();
266 async move {
267 let mut data = vec![];
268 while let Some(bytes) = self.try_next().await? {
269 data.extend(bytes);
270 }
271 Ok(data.into())
272 }
273 .instrument(span)
274 .boxed()
275 }
276}
277
278impl FusedStream for UnixfsCat {
279 fn is_terminated(&self) -> bool {
280 self.stream.is_none() && self.core.is_none()
281 }
282}