Skip to main content

rust_ipfs/unixfs/
get.rs

1use connexa::prelude::PeerId;
2use either::Either;
3use futures::stream::BoxStream;
4use futures::{future::BoxFuture, stream::FusedStream, FutureExt, Stream, StreamExt};
5#[allow(unused_imports)]
6use rust_unixfs::walk::{ContinuedWalk, Walker};
7use std::pin::Pin;
8use std::task::Context;
9use std::{
10    path::{Path, PathBuf},
11    task::Poll,
12    time::Duration,
13};
14#[cfg(not(target_arch = "wasm32"))]
15use tokio::io::AsyncWriteExt;
16use tracing::{Instrument, Span};
17
18use crate::repo::DefaultStorage;
19use crate::{dag::IpldDag, repo::Repo, Ipfs, IpfsPath};
20
21#[allow(unused_imports)]
22use super::{TraversalFailed, UnixfsStatus};
23
24#[must_use = "does nothing unless you `.await` or poll the stream"]
25pub struct UnixfsGet {
26    core: Option<Either<Ipfs, Repo<DefaultStorage>>>,
27    dest: PathBuf,
28    span: Span,
29    path: Option<IpfsPath>,
30    providers: Vec<PeerId>,
31    local_only: bool,
32    timeout: Option<Duration>,
33    stream: Option<BoxStream<'static, UnixfsStatus>>,
34}
35
36impl UnixfsGet {
37    pub fn with_ipfs(ipfs: &Ipfs, path: impl Into<IpfsPath>, dest: impl AsRef<Path>) -> Self {
38        Self::with_either(Either::Left(ipfs.clone()), path, dest)
39    }
40
41    pub fn with_repo(
42        repo: &Repo<DefaultStorage>,
43        path: impl Into<IpfsPath>,
44        dest: impl AsRef<Path>,
45    ) -> Self {
46        Self::with_either(Either::Right(repo.clone()), path, dest)
47    }
48
49    fn with_either(
50        core: Either<Ipfs, Repo<DefaultStorage>>,
51        path: impl Into<IpfsPath>,
52        dest: impl AsRef<Path>,
53    ) -> Self {
54        let path = path.into();
55        let dest = dest.as_ref().to_path_buf();
56        Self {
57            core: Some(core),
58            dest,
59            path: Some(path),
60            span: Span::current(),
61            providers: Vec::new(),
62            local_only: false,
63            timeout: None,
64            stream: None,
65        }
66    }
67
68    pub fn span(mut self, span: Span) -> Self {
69        self.span = span;
70        self
71    }
72
73    pub fn provider(mut self, peer_id: PeerId) -> Self {
74        if !self.providers.contains(&peer_id) {
75            self.providers.push(peer_id);
76        }
77        self
78    }
79
80    pub fn providers(mut self, list: &[PeerId]) -> Self {
81        self.providers = list.to_vec();
82        self
83    }
84
85    pub fn timeout(mut self, timeout: Duration) -> Self {
86        self.timeout = Some(timeout);
87        self
88    }
89
90    pub fn local(mut self) -> Self {
91        self.local_only = true;
92        self
93    }
94
95    pub fn set_local(mut self, local: bool) -> Self {
96        self.local_only = local;
97        self
98    }
99}
100
101impl Stream for UnixfsGet {
102    type Item = UnixfsStatus;
103    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
104        if self.core.is_none() && self.stream.is_none() {
105            return Poll::Ready(None);
106        }
107        loop {
108            match &mut self.stream {
109                None => {
110                    let (repo, dag) = match self.core.take().expect("ipfs or repo is used") {
111                        Either::Left(ipfs) => (ipfs.repo().clone(), ipfs.dag()),
112                        Either::Right(repo) => (repo.clone(), IpldDag::from(repo.clone())),
113                    };
114
115                    let path = self.path.take().expect("starting point exist");
116                    let providers = std::mem::take(&mut self.providers);
117                    let local_only = self.local_only;
118                    let timeout = self.timeout;
119                    let dest = self.dest.clone();
120
121                    #[cfg(not(target_arch = "wasm32"))]
122                    let stream = async_stream::stream! {
123
124                        let mut cache = None;
125                        let mut total_size = None;
126                        let mut written = 0;
127
128                        let mut file = match tokio::fs::File::create(dest)
129                            .await
130                            .map_err(TraversalFailed::Io) {
131                                Ok(f) => f,
132                                Err(e) => {
133                                    yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
134                                    return;
135                                }
136                            };
137
138                        let block  = match dag
139                            ._resolve(path.clone(), true, &providers, local_only, timeout)
140                            .await
141                            .map_err(TraversalFailed::Resolving)
142                            .and_then(|(resolved, _)| resolved.into_unixfs_block().map_err(TraversalFailed::Path)) {
143                                Ok(block) => block,
144                                Err(e) => {
145                                    yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
146                                    return;
147                                }
148                        };
149
150                        let cid = block.cid();
151                        let root_name = block.cid().to_string();
152
153                        let mut walker = Walker::new(*cid, root_name);
154
155                        while walker.should_continue() {
156                            let (next, _) = walker.pending_links();
157                            let block = match repo.get_block(next).providers(&providers).set_local(local_only).timeout(timeout).await {
158                                Ok(block) => block,
159                                Err(e) => {
160                                    yield UnixfsStatus::FailedStatus { written, total_size, error: e };
161                                    return;
162                                }
163                            };
164                            let block_data = block.data();
165
166                            match walker.next(block_data, &mut cache) {
167                                Ok(ContinuedWalk::Bucket(..)) => {}
168                                Ok(ContinuedWalk::File(segment, _, _, _, size)) => {
169
170                                    if segment.is_first() {
171                                        total_size = Some(size as usize);
172                                        yield UnixfsStatus::ProgressStatus { written, total_size };
173                                    }
174                                    // even if the largest of files can have 256 kB blocks and about the same
175                                    // amount of content, try to consume it in small parts not to grow the buffers
176                                    // too much.
177
178                                    let mut n = 0usize;
179                                    let slice = segment.as_ref();
180                                    let total = slice.len();
181
182                                    while n < total {
183                                        let next = &slice[n..];
184                                        n += next.len();
185                                        if let Err(e) = file.write_all(next).await {
186                                            yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
187                                            return;
188                                        }
189                                        if let Err(e) = file.sync_all().await {
190                                            yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
191                                            return;
192                                        }
193
194                                        written += n;
195                                    }
196
197                                    yield UnixfsStatus::ProgressStatus { written, total_size };
198
199                                },
200                                Ok(ContinuedWalk::Directory( .. )) | Ok(ContinuedWalk::RootDirectory( .. )) => {}, //TODO
201                                Ok(ContinuedWalk::Symlink( .. )) => {},
202                                Err(e) => {
203                                    yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
204                                    return;
205                                }
206                            };
207                        };
208
209                        yield UnixfsStatus::CompletedStatus { path, written, total_size }
210                    };
211
212                    #[cfg(target_arch = "wasm32")]
213                    let stream = async_stream::stream! {
214                        _ = repo;
215                        _ = dag;
216                        _ = path;
217                        _ = providers;
218                        _ = local_only;
219                        _ = timeout;
220                        _ = dest;
221                        yield UnixfsStatus::FailedStatus { written: 0, total_size: None, error: anyhow::anyhow!("unimplemented") };
222                    };
223
224                    self.stream = Some(stream.boxed());
225                }
226                Some(stream) => match futures::ready!(stream.poll_next_unpin(cx)) {
227                    Some(item) => {
228                        if matches!(
229                            item,
230                            UnixfsStatus::FailedStatus { .. }
231                                | UnixfsStatus::CompletedStatus { .. }
232                        ) {
233                            self.stream.take();
234                        }
235                        return Poll::Ready(Some(item));
236                    }
237                    None => {
238                        self.stream.take();
239                        return Poll::Ready(None);
240                    }
241                },
242            }
243        }
244    }
245}
246
247impl std::future::IntoFuture for UnixfsGet {
248    type Output = Result<(), anyhow::Error>;
249
250    type IntoFuture = BoxFuture<'static, Self::Output>;
251
252    fn into_future(mut self) -> Self::IntoFuture {
253        let span = self.span.clone();
254        async move {
255            while let Some(status) = self.next().await {
256                match status {
257                    UnixfsStatus::FailedStatus { error, .. } => {
258                        return Err(error);
259                    }
260                    UnixfsStatus::CompletedStatus { .. } => return Ok(()),
261                    _ => {}
262                }
263            }
264            Err::<_, anyhow::Error>(anyhow::anyhow!("Unable to get file"))
265        }
266        .instrument(span)
267        .boxed()
268    }
269}
270
271impl FusedStream for UnixfsGet {
272    fn is_terminated(&self) -> bool {
273        self.stream.is_none() && self.core.is_none()
274    }
275}