1use std::{
2 path::{Path, PathBuf},
3 task::Poll,
4 time::Duration,
5};
6
7use either::Either;
8use futures::stream::BoxStream;
9use futures::{future::BoxFuture, stream::FusedStream, FutureExt, Stream, StreamExt};
10use libp2p::PeerId;
11#[allow(unused_imports)]
12use rust_unixfs::walk::{ContinuedWalk, Walker};
13#[cfg(not(target_arch = "wasm32"))]
14use tokio::io::AsyncWriteExt;
15use tracing::{Instrument, Span};
16
17use crate::{dag::IpldDag, repo::Repo, Ipfs, IpfsPath};
18
19#[allow(unused_imports)]
20use super::{TraversalFailed, UnixfsStatus};
21
22#[must_use = "does nothing unless you `.await` or poll the stream"]
23pub struct UnixfsGet {
24 core: Option<Either<Ipfs, Repo>>,
25 dest: PathBuf,
26 span: Span,
27 path: Option<IpfsPath>,
28 providers: Vec<PeerId>,
29 local_only: bool,
30 timeout: Option<Duration>,
31 stream: Option<BoxStream<'static, UnixfsStatus>>,
32}
33
34impl UnixfsGet {
35 pub fn with_ipfs(ipfs: &Ipfs, path: impl Into<IpfsPath>, dest: impl AsRef<Path>) -> Self {
36 Self::with_either(Either::Left(ipfs.clone()), path, dest)
37 }
38
39 pub fn with_repo(repo: &Repo, path: impl Into<IpfsPath>, dest: impl AsRef<Path>) -> Self {
40 Self::with_either(Either::Right(repo.clone()), path, dest)
41 }
42
43 fn with_either(
44 core: Either<Ipfs, Repo>,
45 path: impl Into<IpfsPath>,
46 dest: impl AsRef<Path>,
47 ) -> Self {
48 let path = path.into();
49 let dest = dest.as_ref().to_path_buf();
50 Self {
51 core: Some(core),
52 dest,
53 path: Some(path),
54 span: Span::current(),
55 providers: Vec::new(),
56 local_only: false,
57 timeout: None,
58 stream: None,
59 }
60 }
61
62 pub fn span(mut self, span: Span) -> Self {
63 self.span = span;
64 self
65 }
66
67 pub fn provider(mut self, peer_id: PeerId) -> Self {
68 if !self.providers.contains(&peer_id) {
69 self.providers.push(peer_id);
70 }
71 self
72 }
73
74 pub fn providers(mut self, list: &[PeerId]) -> Self {
75 self.providers = list.to_vec();
76 self
77 }
78
79 pub fn timeout(mut self, timeout: Duration) -> Self {
80 self.timeout = Some(timeout);
81 self
82 }
83
84 pub fn local(mut self) -> Self {
85 self.local_only = true;
86 self
87 }
88
89 pub fn set_local(mut self, local: bool) -> Self {
90 self.local_only = local;
91 self
92 }
93}
94
95impl Stream for UnixfsGet {
96 type Item = UnixfsStatus;
97 fn poll_next(
98 mut self: std::pin::Pin<&mut Self>,
99 cx: &mut std::task::Context<'_>,
100 ) -> std::task::Poll<Option<Self::Item>> {
101 if self.core.is_none() && self.stream.is_none() {
102 return Poll::Ready(None);
103 }
104 loop {
105 match &mut self.stream {
106 None => {
107 let (repo, dag) = match self.core.take().expect("ipfs or repo is used") {
108 Either::Left(ipfs) => (ipfs.repo().clone(), ipfs.dag()),
109 Either::Right(repo) => (repo.clone(), IpldDag::from(repo.clone())),
110 };
111
112 let path = self.path.take().expect("starting point exist");
113 let providers = std::mem::take(&mut self.providers);
114 let local_only = self.local_only;
115 let timeout = self.timeout;
116 let dest = self.dest.clone();
117
118 #[cfg(not(target_arch = "wasm32"))]
119 let stream = async_stream::stream! {
120
121 let mut cache = None;
122 let mut total_size = None;
123 let mut written = 0;
124
125 let mut file = match tokio::fs::File::create(dest)
126 .await
127 .map_err(TraversalFailed::Io) {
128 Ok(f) => f,
129 Err(e) => {
130 yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
131 return;
132 }
133 };
134
135 let block = match dag
136 ._resolve(path.clone(), true, &providers, local_only, timeout)
137 .await
138 .map_err(TraversalFailed::Resolving)
139 .and_then(|(resolved, _)| resolved.into_unixfs_block().map_err(TraversalFailed::Path)) {
140 Ok(block) => block,
141 Err(e) => {
142 yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
143 return;
144 }
145 };
146
147 let cid = block.cid();
148 let root_name = block.cid().to_string();
149
150 let mut walker = Walker::new(*cid, root_name);
151
152 while walker.should_continue() {
153 let (next, _) = walker.pending_links();
154 let block = match repo.get_block(next).providers(&providers).set_local(local_only).timeout(timeout).await {
155 Ok(block) => block,
156 Err(e) => {
157 yield UnixfsStatus::FailedStatus { written, total_size, error: e };
158 return;
159 }
160 };
161 let block_data = block.data();
162
163 match walker.next(block_data, &mut cache) {
164 Ok(ContinuedWalk::Bucket(..)) => {}
165 Ok(ContinuedWalk::File(segment, _, _, _, size)) => {
166
167 if segment.is_first() {
168 total_size = Some(size as usize);
169 yield UnixfsStatus::ProgressStatus { written, total_size };
170 }
171 let mut n = 0usize;
176 let slice = segment.as_ref();
177 let total = slice.len();
178
179 while n < total {
180 let next = &slice[n..];
181 n += next.len();
182 if let Err(e) = file.write_all(next).await {
183 yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
184 return;
185 }
186 if let Err(e) = file.sync_all().await {
187 yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
188 return;
189 }
190
191 written += n;
192 }
193
194 yield UnixfsStatus::ProgressStatus { written, total_size };
195
196 },
197 Ok(ContinuedWalk::Directory( .. )) | Ok(ContinuedWalk::RootDirectory( .. )) => {}, Ok(ContinuedWalk::Symlink( .. )) => {},
199 Err(e) => {
200 yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
201 return;
202 }
203 };
204 };
205
206 yield UnixfsStatus::CompletedStatus { path, written, total_size }
207 };
208
209 #[cfg(target_arch = "wasm32")]
210 let stream = async_stream::stream! {
211 _ = repo;
212 _ = dag;
213 _ = path;
214 _ = providers;
215 _ = local_only;
216 _ = timeout;
217 _ = dest;
218 yield UnixfsStatus::FailedStatus { written: 0, total_size: None, error: anyhow::anyhow!("unimplemented") };
219 };
220
221 self.stream = Some(stream.boxed());
222 }
223 Some(stream) => match futures::ready!(stream.poll_next_unpin(cx)) {
224 Some(item) => {
225 if matches!(
226 item,
227 UnixfsStatus::FailedStatus { .. }
228 | UnixfsStatus::CompletedStatus { .. }
229 ) {
230 self.stream.take();
231 }
232 return Poll::Ready(Some(item));
233 }
234 None => {
235 self.stream.take();
236 return Poll::Ready(None);
237 }
238 },
239 }
240 }
241 }
242}
243
244impl std::future::IntoFuture for UnixfsGet {
245 type Output = Result<(), anyhow::Error>;
246
247 type IntoFuture = BoxFuture<'static, Self::Output>;
248
249 fn into_future(mut self) -> Self::IntoFuture {
250 let span = self.span.clone();
251 async move {
252 while let Some(status) = self.next().await {
253 match status {
254 UnixfsStatus::FailedStatus { error, .. } => {
255 return Err(error);
256 }
257 UnixfsStatus::CompletedStatus { .. } => return Ok(()),
258 _ => {}
259 }
260 }
261 Err::<_, anyhow::Error>(anyhow::anyhow!("Unable to get file"))
262 }
263 .instrument(span)
264 .boxed()
265 }
266}
267
268impl FusedStream for UnixfsGet {
269 fn is_terminated(&self) -> bool {
270 self.stream.is_none() && self.core.is_none()
271 }
272}