rust_ipfs/unixfs/
add.rs

1use std::task::Poll;
2
3#[cfg(not(target_arch = "wasm32"))]
4use std::path::{Path, PathBuf};
5
6use crate::{repo::Repo, Block};
7use bytes::Bytes;
8use either::Either;
9#[allow(unused_imports)]
10use futures::{
11    future::BoxFuture,
12    stream::{BoxStream, FusedStream},
13    FutureExt, Stream, StreamExt, TryFutureExt,
14};
15use rust_unixfs::file::adder::{Chunker, FileAdderBuilder};
16#[cfg(not(target_arch = "wasm32"))]
17use tokio_util::io::ReaderStream;
18use tracing::{Instrument, Span};
19
20use crate::{Ipfs, IpfsPath};
21
22use super::{TraversalFailed, UnixfsStatus};
23
24pub enum AddOpt {
25    #[cfg(not(target_arch = "wasm32"))]
26    File(PathBuf),
27    Stream {
28        name: Option<String>,
29        total: Option<usize>,
30        stream: BoxStream<'static, Result<Bytes, std::io::Error>>,
31    },
32}
33
34#[cfg(not(target_arch = "wasm32"))]
35impl From<PathBuf> for AddOpt {
36    fn from(path: PathBuf) -> Self {
37        AddOpt::File(path)
38    }
39}
40
41#[cfg(not(target_arch = "wasm32"))]
42impl From<&Path> for AddOpt {
43    fn from(path: &Path) -> Self {
44        AddOpt::File(path.to_path_buf())
45    }
46}
47
48#[must_use = "does nothing unless you `.await` or poll the stream"]
49pub struct UnixfsAdd {
50    core: Option<Either<Ipfs, Repo>>,
51    opt: Option<AddOpt>,
52    span: Span,
53    chunk: Chunker,
54    pin: bool,
55    provide: bool,
56    wrap: bool,
57    stream: Option<BoxStream<'static, UnixfsStatus>>,
58}
59
60impl UnixfsAdd {
61    pub fn with_ipfs(ipfs: &Ipfs, opt: impl Into<AddOpt>) -> Self {
62        Self::with_either(Either::Left(ipfs.clone()), opt)
63    }
64
65    pub fn with_repo(repo: &Repo, opt: impl Into<AddOpt>) -> Self {
66        Self::with_either(Either::Right(repo.clone()), opt)
67    }
68
69    fn with_either(core: Either<Ipfs, Repo>, opt: impl Into<AddOpt>) -> Self {
70        let opt = opt.into();
71        Self {
72            core: Some(core),
73            opt: Some(opt),
74            span: Span::current(),
75            chunk: Chunker::Size(256 * 1024),
76            pin: true,
77            provide: false,
78            wrap: false,
79            stream: None,
80        }
81    }
82
83    pub fn span(mut self, span: Span) -> Self {
84        self.span = span;
85        self
86    }
87
88    pub fn chunk(mut self, chunk: Chunker) -> Self {
89        self.chunk = chunk;
90        self
91    }
92
93    pub fn pin(mut self, pin: bool) -> Self {
94        self.pin = pin;
95        self
96    }
97
98    pub fn provide(mut self) -> Self {
99        self.provide = true;
100        self
101    }
102
103    pub fn wrap(mut self) -> Self {
104        self.wrap = true;
105        self
106    }
107}
108
109impl Stream for UnixfsAdd {
110    type Item = UnixfsStatus;
111    fn poll_next(
112        mut self: std::pin::Pin<&mut Self>,
113        cx: &mut std::task::Context<'_>,
114    ) -> Poll<Option<Self::Item>> {
115        if self.core.is_none() && self.stream.is_none() {
116            return Poll::Ready(None);
117        }
118        loop {
119            match &mut self.stream {
120                None => {
121                    let (ipfs, repo) = match self.core.take().expect("ipfs or repo is used") {
122                        Either::Left(ipfs) => {
123                            let repo = ipfs.repo().clone();
124                            (Some(ipfs), repo)
125                        }
126                        Either::Right(repo) => (None, repo),
127                    };
128                    let option = self.opt.take().expect("option already constructed");
129                    let chunk = self.chunk;
130                    let pin = self.pin;
131                    let provide = self.provide;
132                    let wrap = self.wrap;
133
134                    let stream = async_stream::stream! {
135                        let _g = repo.gc_guard().await;
136
137                        let mut written = 0;
138
139                        let (name, total_size, mut stream) = match option {
140                            #[cfg(not(target_arch = "wasm32"))]
141                            AddOpt::File(path) => match tokio::fs::File::open(path.clone())
142                                .and_then(|file| async move {
143                                    let size = file.metadata().await?.len() as usize;
144
145                                    let stream = ReaderStream::new(file);
146
147                                    let name: Option<String> = path.file_name().map(|f| f.to_string_lossy().to_string());
148
149                                    Ok((name, Some(size), stream.boxed()))
150                                }).await {
151                                    Ok(s) => s,
152                                    Err(e) => {
153                                        yield UnixfsStatus::FailedStatus { written, total_size: None, error: e.into() };
154                                        return;
155                                    }
156                                },
157                            AddOpt::Stream { name, total, stream } => (name, total, stream),
158                        };
159
160                        let mut adder = FileAdderBuilder::default()
161                            .with_chunker(chunk)
162                            .build();
163
164                        yield UnixfsStatus::ProgressStatus { written, total_size };
165
166                        while let Some(buffer) = stream.next().await {
167                            let buffer = match buffer {
168                                Ok(buf) => buf,
169                                Err(e) => {
170                                    yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
171                                    return;
172                                }
173                            };
174
175                            let mut total = 0;
176                            while total < buffer.len() {
177                                let (blocks, consumed) = adder.push(&buffer[total..]);
178                                for (cid, block) in blocks {
179                                    let block = match Block::new(cid, block) {
180                                        Ok(block) => block,
181                                        Err(e) => {
182                                            yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
183                                            return;
184                                        }
185                                    };
186                                    let _cid = match repo.put_block(&block).await {
187                                        Ok(cid) => cid,
188                                        Err(e) => {
189                                            yield UnixfsStatus::FailedStatus { written, total_size, error: e };
190                                            return;
191                                        }
192                                    };
193                                }
194                                total += consumed;
195                                written += consumed;
196                            }
197
198                            yield UnixfsStatus::ProgressStatus { written, total_size };
199                        }
200
201                        let blocks = adder.finish();
202                        let mut last_cid = None;
203
204                        for (cid, block) in blocks {
205                            let block = match Block::new(cid, block) {
206                                Ok(block) => block,
207                                Err(e) => {
208                                    yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
209                                    return;
210                                }
211                            };
212                            let _cid = match repo.put_block(&block).await {
213                                Ok(cid) => cid,
214                                Err(e) => {
215                                    yield UnixfsStatus::FailedStatus { written, total_size, error: e };
216                                    return;
217                                }
218                            };
219                            last_cid = Some(cid);
220                        }
221
222                        let cid = match last_cid {
223                            Some(cid) => cid,
224                            None => {
225                                yield UnixfsStatus::FailedStatus { written, total_size, error: TraversalFailed::Io(std::io::ErrorKind::InvalidData.into()).into() };
226                                return;
227                            }
228                        };
229
230                        let mut path = IpfsPath::from(cid);
231
232                        if wrap {
233                            if let Some(name) = name {
234                                let result = {
235                                    let repo = repo.clone();
236                                    async move {
237                                        let mut opts = rust_unixfs::dir::builder::TreeOptions::default();
238                                        opts.wrap_with_directory();
239
240                                        let mut tree = rust_unixfs::dir::builder::BufferingTreeBuilder::new(opts);
241                                        tree.put_link(&name, cid, written as _)?;
242
243                                        let mut iter = tree.build();
244                                        let mut cids = Vec::new();
245
246                                        while let Some(node) = iter.next_borrowed() {
247                                            //TODO: Determine best course to prevent additional allocation
248                                            let node = node?;
249                                            let cid = node.cid.to_owned();
250                                            let block = Block::new(cid, node.block.to_vec())?;
251
252                                            repo.put_block(&block).await?;
253
254                                            cids.push(cid);
255                                        }
256                                        let cid = cids.last().ok_or(anyhow::anyhow!("no cid available"))?;
257                                        let path = IpfsPath::from(*cid).sub_path(&name)?;
258
259                                        Ok::<_, anyhow::Error>(path)
260                                    }
261                                };
262
263                                path = match result.await {
264                                    Ok(path) => path,
265                                    Err(e) => {
266                                        yield UnixfsStatus::FailedStatus { written, total_size, error: e };
267                                        return;
268                                    }
269                                };
270                            }
271                        }
272
273                        let cid = path.root().cid().copied().expect("Cid is apart of the path");
274
275                        if pin && !repo.is_pinned(&cid).await.unwrap_or_default() {
276                            if let Err(e) = repo.pin(cid).recursive().await {
277                                error!("Unable to pin {cid}: {e}");
278                            }
279                        }
280
281                        if provide {
282                            if let Some(ipfs) = ipfs {
283                                if let Err(e) = ipfs.provide(cid).await {
284                                    error!("Unable to provide {cid}: {e}");
285                                }
286                            }
287                        }
288
289
290                        yield UnixfsStatus::CompletedStatus { path, written, total_size }
291                    };
292
293                    self.stream = Some(stream.boxed());
294                }
295                Some(stream) => match futures::ready!(stream.poll_next_unpin(cx)) {
296                    Some(item) => {
297                        if matches!(
298                            item,
299                            UnixfsStatus::FailedStatus { .. }
300                                | UnixfsStatus::CompletedStatus { .. }
301                        ) {
302                            self.stream.take();
303                        }
304                        return Poll::Ready(Some(item));
305                    }
306                    None => {
307                        self.stream.take();
308                        return Poll::Ready(None);
309                    }
310                },
311            }
312        }
313    }
314}
315
316impl std::future::IntoFuture for UnixfsAdd {
317    type Output = Result<IpfsPath, anyhow::Error>;
318
319    type IntoFuture = BoxFuture<'static, Self::Output>;
320
321    fn into_future(mut self) -> Self::IntoFuture {
322        let span = self.span.clone();
323        async move {
324            while let Some(status) = self.next().await {
325                match status {
326                    UnixfsStatus::CompletedStatus { path, .. } => return Ok(path),
327                    UnixfsStatus::FailedStatus { error, .. } => {
328                        return Err(error);
329                    }
330                    _ => {}
331                }
332            }
333            Err::<_, anyhow::Error>(anyhow::anyhow!("Unable to add file"))
334        }
335        .instrument(span)
336        .boxed()
337    }
338}
339
340impl FusedStream for UnixfsAdd {
341    fn is_terminated(&self) -> bool {
342        self.stream.is_none() && self.core.is_none()
343    }
344}