Skip to main content

rust_ipfs/unixfs/
add.rs

1use std::task::{Context, Poll};
2
3use crate::{
4    repo::{DefaultStorage, Repo},
5    Block,
6};
7use bytes::Bytes;
8use either::Either;
9#[allow(unused_imports)]
10use futures::{
11    future::BoxFuture,
12    stream::{BoxStream, FusedStream},
13    FutureExt, Stream, StreamExt, TryFutureExt,
14};
15use rust_unixfs::file::adder::{Chunker, FileAdderBuilder};
16#[cfg(not(target_arch = "wasm32"))]
17use std::path::{Path, PathBuf};
18use std::pin::Pin;
19#[cfg(not(target_arch = "wasm32"))]
20use tokio_util::io::ReaderStream;
21use tracing::{Instrument, Span};
22
23use crate::{Ipfs, IpfsPath};
24
25use super::{TraversalFailed, UnixfsStatus};
26
27pub enum AddOpt {
28    #[cfg(not(target_arch = "wasm32"))]
29    File(PathBuf),
30    Stream {
31        name: Option<String>,
32        total: Option<usize>,
33        stream: BoxStream<'static, Result<Bytes, std::io::Error>>,
34    },
35}
36
37#[cfg(not(target_arch = "wasm32"))]
38impl From<PathBuf> for AddOpt {
39    fn from(path: PathBuf) -> Self {
40        AddOpt::File(path)
41    }
42}
43
44#[cfg(not(target_arch = "wasm32"))]
45impl From<&Path> for AddOpt {
46    fn from(path: &Path) -> Self {
47        AddOpt::File(path.to_path_buf())
48    }
49}
50
51#[must_use = "does nothing unless you `.await` or poll the stream"]
52pub struct UnixfsAdd {
53    core: Option<Either<Ipfs, Repo<DefaultStorage>>>,
54    opt: Option<AddOpt>,
55    span: Span,
56    chunk: Chunker,
57    pin: bool,
58    provide: bool,
59    wrap: bool,
60    stream: Option<BoxStream<'static, UnixfsStatus>>,
61}
62
63impl UnixfsAdd {
64    pub fn with_ipfs(ipfs: &Ipfs, opt: impl Into<AddOpt>) -> Self {
65        Self::with_either(Either::Left(ipfs.clone()), opt)
66    }
67
68    pub fn with_repo(repo: &Repo<DefaultStorage>, opt: impl Into<AddOpt>) -> Self {
69        Self::with_either(Either::Right(repo.clone()), opt)
70    }
71
72    fn with_either(core: Either<Ipfs, Repo<DefaultStorage>>, opt: impl Into<AddOpt>) -> Self {
73        let opt = opt.into();
74        Self {
75            core: Some(core),
76            opt: Some(opt),
77            span: Span::current(),
78            chunk: Chunker::Size(256 * 1024),
79            pin: true,
80            provide: false,
81            wrap: false,
82            stream: None,
83        }
84    }
85
86    pub fn span(mut self, span: Span) -> Self {
87        self.span = span;
88        self
89    }
90
91    pub fn chunk(mut self, chunk: Chunker) -> Self {
92        self.chunk = chunk;
93        self
94    }
95
96    pub fn pin(mut self, pin: bool) -> Self {
97        self.pin = pin;
98        self
99    }
100
101    pub fn provide(mut self) -> Self {
102        self.provide = true;
103        self
104    }
105
106    pub fn wrap(mut self) -> Self {
107        self.wrap = true;
108        self
109    }
110}
111
112impl Stream for UnixfsAdd {
113    type Item = UnixfsStatus;
114    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
115        if self.core.is_none() && self.stream.is_none() {
116            return Poll::Ready(None);
117        }
118        loop {
119            match &mut self.stream {
120                None => {
121                    let (ipfs, repo) = match self.core.take().expect("ipfs or repo is used") {
122                        Either::Left(ipfs) => {
123                            let repo = ipfs.repo().clone();
124                            (Some(ipfs), repo)
125                        }
126                        Either::Right(repo) => (None, repo),
127                    };
128                    let option = self.opt.take().expect("option already constructed");
129                    let chunk = self.chunk;
130                    let pin = self.pin;
131                    let provide = self.provide;
132                    let wrap = self.wrap;
133
134                    let stream = async_stream::stream! {
135                        let _g = repo.gc_guard().await;
136
137                        let mut written = 0;
138
139                        let (name, total_size, mut stream) = match option {
140                            #[cfg(not(target_arch = "wasm32"))]
141                            AddOpt::File(path) => match tokio::fs::File::open(path.clone())
142                                .and_then(|file| async move {
143                                    let size = file.metadata().await?.len() as usize;
144
145                                    let stream = ReaderStream::new(file);
146
147                                    let name: Option<String> = path.file_name().map(|f| f.to_string_lossy().to_string());
148
149                                    Ok((name, Some(size), stream.boxed()))
150                                }).await {
151                                    Ok(s) => s,
152                                    Err(e) => {
153                                        yield UnixfsStatus::FailedStatus { written, total_size: None, error: e.into() };
154                                        return;
155                                    }
156                                },
157                            AddOpt::Stream { name, total, stream } => (name, total, stream),
158                        };
159
160                        let mut adder = FileAdderBuilder::default()
161                            .with_chunker(chunk)
162                            .build();
163
164                        yield UnixfsStatus::ProgressStatus { written, total_size };
165
166                        while let Some(buffer) = stream.next().await {
167                            let buffer = match buffer {
168                                Ok(buf) => buf,
169                                Err(e) => {
170                                    yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
171                                    return;
172                                }
173                            };
174
175                            let mut total = 0;
176                            while total < buffer.len() {
177                                let (blocks, consumed) = adder.push(&buffer[total..]);
178                                for (cid, block) in blocks {
179                                    let block = match Block::new(cid, block) {
180                                        Ok(block) => block,
181                                        Err(e) => {
182                                            yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
183                                            return;
184                                        }
185                                    };
186                                    let _cid = match repo.put_block(&block).await {
187                                        Ok(cid) => cid,
188                                        Err(e) => {
189                                            yield UnixfsStatus::FailedStatus { written, total_size, error: e };
190                                            return;
191                                        }
192                                    };
193                                }
194                                total += consumed;
195                                written += consumed;
196                            }
197
198                            yield UnixfsStatus::ProgressStatus { written, total_size };
199                        }
200
201                        let blocks = adder.finish();
202                        let mut last_cid = None;
203
204                        for (cid, block) in blocks {
205                            let block = match Block::new(cid, block) {
206                                Ok(block) => block,
207                                Err(e) => {
208                                    yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
209                                    return;
210                                }
211                            };
212                            let _cid = match repo.put_block(&block).await {
213                                Ok(cid) => cid,
214                                Err(e) => {
215                                    yield UnixfsStatus::FailedStatus { written, total_size, error: e };
216                                    return;
217                                }
218                            };
219                            last_cid = Some(cid);
220                        }
221
222                        let cid = match last_cid {
223                            Some(cid) => cid,
224                            None => {
225                                yield UnixfsStatus::FailedStatus { written, total_size, error: TraversalFailed::Io(std::io::ErrorKind::InvalidData.into()).into() };
226                                return;
227                            }
228                        };
229
230                        let mut path = IpfsPath::from(cid);
231
232                        if wrap {
233                            if let Some(name) = name {
234                                let result = {
235                                    let repo = repo.clone();
236                                    async move {
237                                        let mut opts = rust_unixfs::dir::builder::TreeOptions::default();
238                                        opts.wrap_with_directory();
239
240                                        let mut tree = rust_unixfs::dir::builder::BufferingTreeBuilder::new(opts);
241                                        tree.put_link(&name, cid, written as _)?;
242
243                                        let mut iter = tree.build();
244                                        let mut cids = Vec::new();
245
246                                        while let Some(node) = iter.next_borrowed() {
247                                            //TODO: Determine best course to prevent additional allocation
248                                            let node = node?;
249                                            let cid = node.cid.to_owned();
250                                            let block = Block::new(cid, node.block.to_vec())?;
251
252                                            repo.put_block(&block).await?;
253
254                                            cids.push(cid);
255                                        }
256                                        let cid = cids.last().ok_or(anyhow::anyhow!("no cid available"))?;
257                                        let path = IpfsPath::from(*cid).sub_path(&name)?;
258
259                                        Ok::<_, anyhow::Error>(path)
260                                    }
261                                };
262
263                                path = match result.await {
264                                    Ok(path) => path,
265                                    Err(e) => {
266                                        yield UnixfsStatus::FailedStatus { written, total_size, error: e };
267                                        return;
268                                    }
269                                };
270                            }
271                        }
272
273                        let cid = path.root().cid().copied().expect("Cid is apart of the path");
274
275                        if pin && !repo.is_pinned(&cid).await.unwrap_or_default() {
276                            if let Err(e) = repo.pin(cid).recursive().await {
277                                error!("Unable to pin {cid}: {e}");
278                            }
279                        }
280
281                        if provide {
282                            if let Some(ipfs) = ipfs {
283                                if let Err(e) = ipfs.provide(cid).await {
284                                    error!("Unable to provide {cid}: {e}");
285                                }
286                            }
287                        }
288
289
290                        yield UnixfsStatus::CompletedStatus { path, written, total_size }
291                    };
292
293                    self.stream = Some(stream.boxed());
294                }
295                Some(stream) => match futures::ready!(stream.poll_next_unpin(cx)) {
296                    Some(item) => {
297                        if matches!(
298                            item,
299                            UnixfsStatus::FailedStatus { .. }
300                                | UnixfsStatus::CompletedStatus { .. }
301                        ) {
302                            self.stream.take();
303                        }
304                        return Poll::Ready(Some(item));
305                    }
306                    None => {
307                        self.stream.take();
308                        return Poll::Ready(None);
309                    }
310                },
311            }
312        }
313    }
314}
315
316impl std::future::IntoFuture for UnixfsAdd {
317    type Output = Result<IpfsPath, anyhow::Error>;
318
319    type IntoFuture = BoxFuture<'static, Self::Output>;
320
321    fn into_future(mut self) -> Self::IntoFuture {
322        let span = self.span.clone();
323        async move {
324            while let Some(status) = self.next().await {
325                match status {
326                    UnixfsStatus::CompletedStatus { path, .. } => return Ok(path),
327                    UnixfsStatus::FailedStatus { error, .. } => {
328                        return Err(error);
329                    }
330                    _ => {}
331                }
332            }
333            Err::<_, anyhow::Error>(anyhow::anyhow!("Unable to add file"))
334        }
335        .instrument(span)
336        .boxed()
337    }
338}
339
340impl FusedStream for UnixfsAdd {
341    fn is_terminated(&self) -> bool {
342        self.stream.is_none() && self.core.is_none()
343    }
344}