1use std::task::Poll;
2
3#[cfg(not(target_arch = "wasm32"))]
4use std::path::{Path, PathBuf};
5
6use crate::{repo::Repo, Block};
7use bytes::Bytes;
8use either::Either;
9#[allow(unused_imports)]
10use futures::{
11 future::BoxFuture,
12 stream::{BoxStream, FusedStream},
13 FutureExt, Stream, StreamExt, TryFutureExt,
14};
15use rust_unixfs::file::adder::{Chunker, FileAdderBuilder};
16#[cfg(not(target_arch = "wasm32"))]
17use tokio_util::io::ReaderStream;
18use tracing::{Instrument, Span};
19
20use crate::{Ipfs, IpfsPath};
21
22use super::{TraversalFailed, UnixfsStatus};
23
24pub enum AddOpt {
25 #[cfg(not(target_arch = "wasm32"))]
26 File(PathBuf),
27 Stream {
28 name: Option<String>,
29 total: Option<usize>,
30 stream: BoxStream<'static, Result<Bytes, std::io::Error>>,
31 },
32}
33
34#[cfg(not(target_arch = "wasm32"))]
35impl From<PathBuf> for AddOpt {
36 fn from(path: PathBuf) -> Self {
37 AddOpt::File(path)
38 }
39}
40
41#[cfg(not(target_arch = "wasm32"))]
42impl From<&Path> for AddOpt {
43 fn from(path: &Path) -> Self {
44 AddOpt::File(path.to_path_buf())
45 }
46}
47
48#[must_use = "does nothing unless you `.await` or poll the stream"]
49pub struct UnixfsAdd {
50 core: Option<Either<Ipfs, Repo>>,
51 opt: Option<AddOpt>,
52 span: Span,
53 chunk: Chunker,
54 pin: bool,
55 provide: bool,
56 wrap: bool,
57 stream: Option<BoxStream<'static, UnixfsStatus>>,
58}
59
60impl UnixfsAdd {
61 pub fn with_ipfs(ipfs: &Ipfs, opt: impl Into<AddOpt>) -> Self {
62 Self::with_either(Either::Left(ipfs.clone()), opt)
63 }
64
65 pub fn with_repo(repo: &Repo, opt: impl Into<AddOpt>) -> Self {
66 Self::with_either(Either::Right(repo.clone()), opt)
67 }
68
69 fn with_either(core: Either<Ipfs, Repo>, opt: impl Into<AddOpt>) -> Self {
70 let opt = opt.into();
71 Self {
72 core: Some(core),
73 opt: Some(opt),
74 span: Span::current(),
75 chunk: Chunker::Size(256 * 1024),
76 pin: true,
77 provide: false,
78 wrap: false,
79 stream: None,
80 }
81 }
82
83 pub fn span(mut self, span: Span) -> Self {
84 self.span = span;
85 self
86 }
87
88 pub fn chunk(mut self, chunk: Chunker) -> Self {
89 self.chunk = chunk;
90 self
91 }
92
93 pub fn pin(mut self, pin: bool) -> Self {
94 self.pin = pin;
95 self
96 }
97
98 pub fn provide(mut self) -> Self {
99 self.provide = true;
100 self
101 }
102
103 pub fn wrap(mut self) -> Self {
104 self.wrap = true;
105 self
106 }
107}
108
109impl Stream for UnixfsAdd {
110 type Item = UnixfsStatus;
111 fn poll_next(
112 mut self: std::pin::Pin<&mut Self>,
113 cx: &mut std::task::Context<'_>,
114 ) -> Poll<Option<Self::Item>> {
115 if self.core.is_none() && self.stream.is_none() {
116 return Poll::Ready(None);
117 }
118 loop {
119 match &mut self.stream {
120 None => {
121 let (ipfs, repo) = match self.core.take().expect("ipfs or repo is used") {
122 Either::Left(ipfs) => {
123 let repo = ipfs.repo().clone();
124 (Some(ipfs), repo)
125 }
126 Either::Right(repo) => (None, repo),
127 };
128 let option = self.opt.take().expect("option already constructed");
129 let chunk = self.chunk;
130 let pin = self.pin;
131 let provide = self.provide;
132 let wrap = self.wrap;
133
134 let stream = async_stream::stream! {
135 let _g = repo.gc_guard().await;
136
137 let mut written = 0;
138
139 let (name, total_size, mut stream) = match option {
140 #[cfg(not(target_arch = "wasm32"))]
141 AddOpt::File(path) => match tokio::fs::File::open(path.clone())
142 .and_then(|file| async move {
143 let size = file.metadata().await?.len() as usize;
144
145 let stream = ReaderStream::new(file);
146
147 let name: Option<String> = path.file_name().map(|f| f.to_string_lossy().to_string());
148
149 Ok((name, Some(size), stream.boxed()))
150 }).await {
151 Ok(s) => s,
152 Err(e) => {
153 yield UnixfsStatus::FailedStatus { written, total_size: None, error: e.into() };
154 return;
155 }
156 },
157 AddOpt::Stream { name, total, stream } => (name, total, stream),
158 };
159
160 let mut adder = FileAdderBuilder::default()
161 .with_chunker(chunk)
162 .build();
163
164 yield UnixfsStatus::ProgressStatus { written, total_size };
165
166 while let Some(buffer) = stream.next().await {
167 let buffer = match buffer {
168 Ok(buf) => buf,
169 Err(e) => {
170 yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
171 return;
172 }
173 };
174
175 let mut total = 0;
176 while total < buffer.len() {
177 let (blocks, consumed) = adder.push(&buffer[total..]);
178 for (cid, block) in blocks {
179 let block = match Block::new(cid, block) {
180 Ok(block) => block,
181 Err(e) => {
182 yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
183 return;
184 }
185 };
186 let _cid = match repo.put_block(&block).await {
187 Ok(cid) => cid,
188 Err(e) => {
189 yield UnixfsStatus::FailedStatus { written, total_size, error: e };
190 return;
191 }
192 };
193 }
194 total += consumed;
195 written += consumed;
196 }
197
198 yield UnixfsStatus::ProgressStatus { written, total_size };
199 }
200
201 let blocks = adder.finish();
202 let mut last_cid = None;
203
204 for (cid, block) in blocks {
205 let block = match Block::new(cid, block) {
206 Ok(block) => block,
207 Err(e) => {
208 yield UnixfsStatus::FailedStatus { written, total_size, error: e.into() };
209 return;
210 }
211 };
212 let _cid = match repo.put_block(&block).await {
213 Ok(cid) => cid,
214 Err(e) => {
215 yield UnixfsStatus::FailedStatus { written, total_size, error: e };
216 return;
217 }
218 };
219 last_cid = Some(cid);
220 }
221
222 let cid = match last_cid {
223 Some(cid) => cid,
224 None => {
225 yield UnixfsStatus::FailedStatus { written, total_size, error: TraversalFailed::Io(std::io::ErrorKind::InvalidData.into()).into() };
226 return;
227 }
228 };
229
230 let mut path = IpfsPath::from(cid);
231
232 if wrap {
233 if let Some(name) = name {
234 let result = {
235 let repo = repo.clone();
236 async move {
237 let mut opts = rust_unixfs::dir::builder::TreeOptions::default();
238 opts.wrap_with_directory();
239
240 let mut tree = rust_unixfs::dir::builder::BufferingTreeBuilder::new(opts);
241 tree.put_link(&name, cid, written as _)?;
242
243 let mut iter = tree.build();
244 let mut cids = Vec::new();
245
246 while let Some(node) = iter.next_borrowed() {
247 let node = node?;
249 let cid = node.cid.to_owned();
250 let block = Block::new(cid, node.block.to_vec())?;
251
252 repo.put_block(&block).await?;
253
254 cids.push(cid);
255 }
256 let cid = cids.last().ok_or(anyhow::anyhow!("no cid available"))?;
257 let path = IpfsPath::from(*cid).sub_path(&name)?;
258
259 Ok::<_, anyhow::Error>(path)
260 }
261 };
262
263 path = match result.await {
264 Ok(path) => path,
265 Err(e) => {
266 yield UnixfsStatus::FailedStatus { written, total_size, error: e };
267 return;
268 }
269 };
270 }
271 }
272
273 let cid = path.root().cid().copied().expect("Cid is apart of the path");
274
275 if pin && !repo.is_pinned(&cid).await.unwrap_or_default() {
276 if let Err(e) = repo.pin(cid).recursive().await {
277 error!("Unable to pin {cid}: {e}");
278 }
279 }
280
281 if provide {
282 if let Some(ipfs) = ipfs {
283 if let Err(e) = ipfs.provide(cid).await {
284 error!("Unable to provide {cid}: {e}");
285 }
286 }
287 }
288
289
290 yield UnixfsStatus::CompletedStatus { path, written, total_size }
291 };
292
293 self.stream = Some(stream.boxed());
294 }
295 Some(stream) => match futures::ready!(stream.poll_next_unpin(cx)) {
296 Some(item) => {
297 if matches!(
298 item,
299 UnixfsStatus::FailedStatus { .. }
300 | UnixfsStatus::CompletedStatus { .. }
301 ) {
302 self.stream.take();
303 }
304 return Poll::Ready(Some(item));
305 }
306 None => {
307 self.stream.take();
308 return Poll::Ready(None);
309 }
310 },
311 }
312 }
313 }
314}
315
316impl std::future::IntoFuture for UnixfsAdd {
317 type Output = Result<IpfsPath, anyhow::Error>;
318
319 type IntoFuture = BoxFuture<'static, Self::Output>;
320
321 fn into_future(mut self) -> Self::IntoFuture {
322 let span = self.span.clone();
323 async move {
324 while let Some(status) = self.next().await {
325 match status {
326 UnixfsStatus::CompletedStatus { path, .. } => return Ok(path),
327 UnixfsStatus::FailedStatus { error, .. } => {
328 return Err(error);
329 }
330 _ => {}
331 }
332 }
333 Err::<_, anyhow::Error>(anyhow::anyhow!("Unable to add file"))
334 }
335 .instrument(span)
336 .boxed()
337 }
338}
339
340impl FusedStream for UnixfsAdd {
341 fn is_terminated(&self) -> bool {
342 self.stream.is_none() && self.core.is_none()
343 }
344}