rust_ipfs/unixfs/
get.rs

1use std::{
2    path::{Path, PathBuf},
3    task::Poll,
4    time::Duration,
5};
6
7use either::Either;
8use futures::stream::BoxStream;
9use futures::{future::BoxFuture, stream::FusedStream, FutureExt, Stream, StreamExt};
10use libp2p::PeerId;
11#[allow(unused_imports)]
12use rust_unixfs::walk::{ContinuedWalk, Walker};
13#[cfg(not(target_arch = "wasm32"))]
14use tokio::io::AsyncWriteExt;
15use tracing::{Instrument, Span};
16
17use crate::{dag::IpldDag, repo::Repo, Ipfs, IpfsPath};
18
19#[allow(unused_imports)]
20use super::{TraversalFailed, UnixfsStatus};
21
22#[must_use = "does nothing unless you `.await` or poll the stream"]
23pub struct UnixfsGet {
24    core: Option<Either<Ipfs, Repo>>,
25    dest: PathBuf,
26    span: Span,
27    path: Option<IpfsPath>,
28    providers: Vec<PeerId>,
29    local_only: bool,
30    timeout: Option<Duration>,
31    stream: Option<BoxStream<'static, UnixfsStatus>>,
32}
33
34impl UnixfsGet {
35    pub fn with_ipfs(ipfs: &Ipfs, path: impl Into<IpfsPath>, dest: impl AsRef<Path>) -> Self {
36        Self::with_either(Either::Left(ipfs.clone()), path, dest)
37    }
38
39    pub fn with_repo(repo: &Repo, path: impl Into<IpfsPath>, dest: impl AsRef<Path>) -> Self {
40        Self::with_either(Either::Right(repo.clone()), path, dest)
41    }
42
43    fn with_either(
44        core: Either<Ipfs, Repo>,
45        path: impl Into<IpfsPath>,
46        dest: impl AsRef<Path>,
47    ) -> Self {
48        let path = path.into();
49        let dest = dest.as_ref().to_path_buf();
50        Self {
51            core: Some(core),
52            dest,
53            path: Some(path),
54            span: Span::current(),
55            providers: Vec::new(),
56            local_only: false,
57            timeout: None,
58            stream: None,
59        }
60    }
61
62    pub fn span(mut self, span: Span) -> Self {
63        self.span = span;
64        self
65    }
66
67    pub fn provider(mut self, peer_id: PeerId) -> Self {
68        if !self.providers.contains(&peer_id) {
69            self.providers.push(peer_id);
70        }
71        self
72    }
73
74    pub fn providers(mut self, list: &[PeerId]) -> Self {
75        self.providers = list.to_vec();
76        self
77    }
78
79    pub fn timeout(mut self, timeout: Duration) -> Self {
80        self.timeout = Some(timeout);
81        self
82    }
83
84    pub fn local(mut self) -> Self {
85        self.local_only = true;
86        self
87    }
88
89    pub fn set_local(mut self, local: bool) -> Self {
90        self.local_only = local;
91        self
92    }
93}
94
95impl Stream for UnixfsGet {
96    type Item = UnixfsStatus;
97    fn poll_next(
98        mut self: std::pin::Pin<&mut Self>,
99        cx: &mut std::task::Context<'_>,
100    ) -> std::task::Poll<Option<Self::Item>> {
101        if self.core.is_none() && self.stream.is_none() {
102            return Poll::Ready(None);
103        }
104        loop {
105            match &mut self.stream {
106                None => {
107                    let (repo, dag) = match self.core.take().expect("ipfs or repo is used") {
108                        Either::Left(ipfs) => (ipfs.repo().clone(), ipfs.dag()),
109                        Either::Right(repo) => (repo.clone(), IpldDag::from(repo.clone())),
110                    };
111
112                    let path = self.path.take().expect("starting point exist");
113                    let providers = std::mem::take(&mut self.providers);
114                    let local_only = self.local_only;
115                    let timeout = self.timeout;
116                    let dest = self.dest.clone();
117
118                    #[cfg(not(target_arch = "wasm32"))]
119                    let stream = async_stream::stream! {
120
121                        let mut cache = None;
122                        let mut total_size = None;
123                        let mut written = 0;
124
125                        let mut file = match tokio::fs::File::create(dest)
126                            .await
127                            .map_err(TraversalFailed::Io) {
128                                Ok(f) => f,
129                                Err(e) => {
130                                    yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
131                                    return;
132                                }
133                            };
134
135                        let block  = match dag
136                            ._resolve(path.clone(), true, &providers, local_only, timeout)
137                            .await
138                            .map_err(TraversalFailed::Resolving)
139                            .and_then(|(resolved, _)| resolved.into_unixfs_block().map_err(TraversalFailed::Path)) {
140                                Ok(block) => block,
141                                Err(e) => {
142                                    yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
143                                    return;
144                                }
145                        };
146
147                        let cid = block.cid();
148                        let root_name = block.cid().to_string();
149
150                        let mut walker = Walker::new(*cid, root_name);
151
152                        while walker.should_continue() {
153                            let (next, _) = walker.pending_links();
154                            let block = match repo.get_block(next).providers(&providers).set_local(local_only).timeout(timeout).await {
155                                Ok(block) => block,
156                                Err(e) => {
157                                    yield UnixfsStatus::FailedStatus { written, total_size, error: e };
158                                    return;
159                                }
160                            };
161                            let block_data = block.data();
162
163                            match walker.next(block_data, &mut cache) {
164                                Ok(ContinuedWalk::Bucket(..)) => {}
165                                Ok(ContinuedWalk::File(segment, _, _, _, size)) => {
166
167                                    if segment.is_first() {
168                                        total_size = Some(size as usize);
169                                        yield UnixfsStatus::ProgressStatus { written, total_size };
170                                    }
171                                    // even if the largest of files can have 256 kB blocks and about the same
172                                    // amount of content, try to consume it in small parts not to grow the buffers
173                                    // too much.
174
175                                    let mut n = 0usize;
176                                    let slice = segment.as_ref();
177                                    let total = slice.len();
178
179                                    while n < total {
180                                        let next = &slice[n..];
181                                        n += next.len();
182                                        if let Err(e) = file.write_all(next).await {
183                                            yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
184                                            return;
185                                        }
186                                        if let Err(e) = file.sync_all().await {
187                                            yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
188                                            return;
189                                        }
190
191                                        written += n;
192                                    }
193
194                                    yield UnixfsStatus::ProgressStatus { written, total_size };
195
196                                },
197                                Ok(ContinuedWalk::Directory( .. )) | Ok(ContinuedWalk::RootDirectory( .. )) => {}, //TODO
198                                Ok(ContinuedWalk::Symlink( .. )) => {},
199                                Err(e) => {
200                                    yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
201                                    return;
202                                }
203                            };
204                        };
205
206                        yield UnixfsStatus::CompletedStatus { path, written, total_size }
207                    };
208
209                    #[cfg(target_arch = "wasm32")]
210                    let stream = async_stream::stream! {
211                        _ = repo;
212                        _ = dag;
213                        _ = path;
214                        _ = providers;
215                        _ = local_only;
216                        _ = timeout;
217                        _ = dest;
218                        yield UnixfsStatus::FailedStatus { written: 0, total_size: None, error: anyhow::anyhow!("unimplemented") };
219                    };
220
221                    self.stream = Some(stream.boxed());
222                }
223                Some(stream) => match futures::ready!(stream.poll_next_unpin(cx)) {
224                    Some(item) => {
225                        if matches!(
226                            item,
227                            UnixfsStatus::FailedStatus { .. }
228                                | UnixfsStatus::CompletedStatus { .. }
229                        ) {
230                            self.stream.take();
231                        }
232                        return Poll::Ready(Some(item));
233                    }
234                    None => {
235                        self.stream.take();
236                        return Poll::Ready(None);
237                    }
238                },
239            }
240        }
241    }
242}
243
244impl std::future::IntoFuture for UnixfsGet {
245    type Output = Result<(), anyhow::Error>;
246
247    type IntoFuture = BoxFuture<'static, Self::Output>;
248
249    fn into_future(mut self) -> Self::IntoFuture {
250        let span = self.span.clone();
251        async move {
252            while let Some(status) = self.next().await {
253                match status {
254                    UnixfsStatus::FailedStatus { error, .. } => {
255                        return Err(error);
256                    }
257                    UnixfsStatus::CompletedStatus { .. } => return Ok(()),
258                    _ => {}
259                }
260            }
261            Err::<_, anyhow::Error>(anyhow::anyhow!("Unable to get file"))
262        }
263        .instrument(span)
264        .boxed()
265    }
266}
267
268impl FusedStream for UnixfsGet {
269    fn is_terminated(&self) -> bool {
270        self.stream.is_none() && self.core.is_none()
271    }
272}