1use connexa::prelude::PeerId;
2use either::Either;
3use futures::stream::BoxStream;
4use futures::{future::BoxFuture, stream::FusedStream, FutureExt, Stream, StreamExt};
5#[allow(unused_imports)]
6use rust_unixfs::walk::{ContinuedWalk, Walker};
7use std::pin::Pin;
8use std::task::Context;
9use std::{
10 path::{Path, PathBuf},
11 task::Poll,
12 time::Duration,
13};
14#[cfg(not(target_arch = "wasm32"))]
15use tokio::io::AsyncWriteExt;
16use tracing::{Instrument, Span};
17
18use crate::repo::DefaultStorage;
19use crate::{dag::IpldDag, repo::Repo, Ipfs, IpfsPath};
20
21#[allow(unused_imports)]
22use super::{TraversalFailed, UnixfsStatus};
23
24#[must_use = "does nothing unless you `.await` or poll the stream"]
25pub struct UnixfsGet {
26 core: Option<Either<Ipfs, Repo<DefaultStorage>>>,
27 dest: PathBuf,
28 span: Span,
29 path: Option<IpfsPath>,
30 providers: Vec<PeerId>,
31 local_only: bool,
32 timeout: Option<Duration>,
33 stream: Option<BoxStream<'static, UnixfsStatus>>,
34}
35
36impl UnixfsGet {
37 pub fn with_ipfs(ipfs: &Ipfs, path: impl Into<IpfsPath>, dest: impl AsRef<Path>) -> Self {
38 Self::with_either(Either::Left(ipfs.clone()), path, dest)
39 }
40
41 pub fn with_repo(
42 repo: &Repo<DefaultStorage>,
43 path: impl Into<IpfsPath>,
44 dest: impl AsRef<Path>,
45 ) -> Self {
46 Self::with_either(Either::Right(repo.clone()), path, dest)
47 }
48
49 fn with_either(
50 core: Either<Ipfs, Repo<DefaultStorage>>,
51 path: impl Into<IpfsPath>,
52 dest: impl AsRef<Path>,
53 ) -> Self {
54 let path = path.into();
55 let dest = dest.as_ref().to_path_buf();
56 Self {
57 core: Some(core),
58 dest,
59 path: Some(path),
60 span: Span::current(),
61 providers: Vec::new(),
62 local_only: false,
63 timeout: None,
64 stream: None,
65 }
66 }
67
68 pub fn span(mut self, span: Span) -> Self {
69 self.span = span;
70 self
71 }
72
73 pub fn provider(mut self, peer_id: PeerId) -> Self {
74 if !self.providers.contains(&peer_id) {
75 self.providers.push(peer_id);
76 }
77 self
78 }
79
80 pub fn providers(mut self, list: &[PeerId]) -> Self {
81 self.providers = list.to_vec();
82 self
83 }
84
85 pub fn timeout(mut self, timeout: Duration) -> Self {
86 self.timeout = Some(timeout);
87 self
88 }
89
90 pub fn local(mut self) -> Self {
91 self.local_only = true;
92 self
93 }
94
95 pub fn set_local(mut self, local: bool) -> Self {
96 self.local_only = local;
97 self
98 }
99}
100
101impl Stream for UnixfsGet {
102 type Item = UnixfsStatus;
103 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
104 if self.core.is_none() && self.stream.is_none() {
105 return Poll::Ready(None);
106 }
107 loop {
108 match &mut self.stream {
109 None => {
110 let (repo, dag) = match self.core.take().expect("ipfs or repo is used") {
111 Either::Left(ipfs) => (ipfs.repo().clone(), ipfs.dag()),
112 Either::Right(repo) => (repo.clone(), IpldDag::from(repo.clone())),
113 };
114
115 let path = self.path.take().expect("starting point exist");
116 let providers = std::mem::take(&mut self.providers);
117 let local_only = self.local_only;
118 let timeout = self.timeout;
119 let dest = self.dest.clone();
120
121 #[cfg(not(target_arch = "wasm32"))]
122 let stream = async_stream::stream! {
123
124 let mut cache = None;
125 let mut total_size = None;
126 let mut written = 0;
127
128 let mut file = match tokio::fs::File::create(dest)
129 .await
130 .map_err(TraversalFailed::Io) {
131 Ok(f) => f,
132 Err(e) => {
133 yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
134 return;
135 }
136 };
137
138 let block = match dag
139 ._resolve(path.clone(), true, &providers, local_only, timeout)
140 .await
141 .map_err(TraversalFailed::Resolving)
142 .and_then(|(resolved, _)| resolved.into_unixfs_block().map_err(TraversalFailed::Path)) {
143 Ok(block) => block,
144 Err(e) => {
145 yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
146 return;
147 }
148 };
149
150 let cid = block.cid();
151 let root_name = block.cid().to_string();
152
153 let mut walker = Walker::new(*cid, root_name);
154
155 while walker.should_continue() {
156 let (next, _) = walker.pending_links();
157 let block = match repo.get_block(next).providers(&providers).set_local(local_only).timeout(timeout).await {
158 Ok(block) => block,
159 Err(e) => {
160 yield UnixfsStatus::FailedStatus { written, total_size, error: e };
161 return;
162 }
163 };
164 let block_data = block.data();
165
166 match walker.next(block_data, &mut cache) {
167 Ok(ContinuedWalk::Bucket(..)) => {}
168 Ok(ContinuedWalk::File(segment, _, _, _, size)) => {
169
170 if segment.is_first() {
171 total_size = Some(size as usize);
172 yield UnixfsStatus::ProgressStatus { written, total_size };
173 }
174 let mut n = 0usize;
179 let slice = segment.as_ref();
180 let total = slice.len();
181
182 while n < total {
183 let next = &slice[n..];
184 n += next.len();
185 if let Err(e) = file.write_all(next).await {
186 yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
187 return;
188 }
189 if let Err(e) = file.sync_all().await {
190 yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
191 return;
192 }
193
194 written += n;
195 }
196
197 yield UnixfsStatus::ProgressStatus { written, total_size };
198
199 },
200 Ok(ContinuedWalk::Directory( .. )) | Ok(ContinuedWalk::RootDirectory( .. )) => {}, Ok(ContinuedWalk::Symlink( .. )) => {},
202 Err(e) => {
203 yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
204 return;
205 }
206 };
207 };
208
209 yield UnixfsStatus::CompletedStatus { path, written, total_size }
210 };
211
212 #[cfg(target_arch = "wasm32")]
213 let stream = async_stream::stream! {
214 _ = repo;
215 _ = dag;
216 _ = path;
217 _ = providers;
218 _ = local_only;
219 _ = timeout;
220 _ = dest;
221 yield UnixfsStatus::FailedStatus { written: 0, total_size: None, error: anyhow::anyhow!("unimplemented") };
222 };
223
224 self.stream = Some(stream.boxed());
225 }
226 Some(stream) => match futures::ready!(stream.poll_next_unpin(cx)) {
227 Some(item) => {
228 if matches!(
229 item,
230 UnixfsStatus::FailedStatus { .. }
231 | UnixfsStatus::CompletedStatus { .. }
232 ) {
233 self.stream.take();
234 }
235 return Poll::Ready(Some(item));
236 }
237 None => {
238 self.stream.take();
239 return Poll::Ready(None);
240 }
241 },
242 }
243 }
244 }
245}
246
247impl std::future::IntoFuture for UnixfsGet {
248 type Output = Result<(), anyhow::Error>;
249
250 type IntoFuture = BoxFuture<'static, Self::Output>;
251
252 fn into_future(mut self) -> Self::IntoFuture {
253 let span = self.span.clone();
254 async move {
255 while let Some(status) = self.next().await {
256 match status {
257 UnixfsStatus::FailedStatus { error, .. } => {
258 return Err(error);
259 }
260 UnixfsStatus::CompletedStatus { .. } => return Ok(()),
261 _ => {}
262 }
263 }
264 Err::<_, anyhow::Error>(anyhow::anyhow!("Unable to get file"))
265 }
266 .instrument(span)
267 .boxed()
268 }
269}
270
271impl FusedStream for UnixfsGet {
272 fn is_terminated(&self) -> bool {
273 self.stream.is_none() && self.core.is_none()
274 }
275}