1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use std::{borrow::Borrow, path::Path};
use futures::{stream::BoxStream, StreamExt};
use ipfs_bitswap::Block;
use rust_unixfs::file::adder::{Chunker, FileAdderBuilder};
use tokio_util::io::ReaderStream;
use crate::{Ipfs, IpfsPath, IpfsTypes};
use super::UnixfsStatus;
pub struct AddOption {
pub chunk: Option<Chunker>,
}
impl Default for AddOption {
fn default() -> Self {
Self {
chunk: Some(Chunker::Size(1024 * 1024)),
}
}
}
pub async fn add_file<'a, Types, MaybeOwned, P: AsRef<Path>>(
ipfs: MaybeOwned,
path: P,
opt: Option<AddOption>,
) -> anyhow::Result<BoxStream<'a, UnixfsStatus>>
where
Types: IpfsTypes,
MaybeOwned: Borrow<Ipfs<Types>> + Send + 'a,
{
let path = path.as_ref();
let file = tokio::fs::File::open(path).await?;
let size = file.metadata().await?.len() as usize;
let stream = ReaderStream::new(file)
.filter_map(|x| async { x.ok() })
.map(|x| x.into());
add(ipfs, Some(size), stream.boxed(), opt).await
}
pub async fn add<'a, Types, MaybeOwned>(
ipfs: MaybeOwned,
total_size: Option<usize>,
mut stream: BoxStream<'a, Vec<u8>>,
opt: Option<AddOption>,
) -> anyhow::Result<BoxStream<'a, UnixfsStatus>>
where
Types: IpfsTypes,
MaybeOwned: Borrow<Ipfs<Types>> + Send + 'a,
{
let stream = async_stream::stream! {
let ipfs = ipfs.borrow();
let mut adder = FileAdderBuilder::default()
.with_chunker(opt.map(|o| o.chunk.unwrap_or_default()).unwrap_or_default())
.build();
let mut written = 0;
yield UnixfsStatus::ProgressStatus { written, total_size };
while let Some(buffer) = stream.next().await {
let mut total = 0;
while total < buffer.len() {
let (blocks, consumed) = adder.push(&buffer[total..]);
for (cid, block) in blocks {
let block = match Block::new(cid, block) {
Ok(block) => block,
Err(e) => {
yield UnixfsStatus::FailedStatus { written, total_size, error: Some(anyhow::anyhow!("{e}")) };
return;
}
};
let _cid = match ipfs.put_block(block).await {
Ok(cid) => cid,
Err(e) => {
yield UnixfsStatus::FailedStatus { written, total_size, error: Some(anyhow::anyhow!("{e}")) };
return;
}
};
}
total += consumed;
written += consumed;
}
yield UnixfsStatus::ProgressStatus { written, total_size };
}
let blocks = adder.finish();
let mut last_cid = None;
for (cid, block) in blocks {
let block = match Block::new(cid, block) {
Ok(block) => block,
Err(e) => {
yield UnixfsStatus::FailedStatus { written, total_size, error: Some(anyhow::anyhow!("{e}")) };
return;
}
};
let _cid = match ipfs.put_block(block).await {
Ok(cid) => cid,
Err(e) => {
yield UnixfsStatus::FailedStatus { written, total_size, error: Some(anyhow::anyhow!("{e}")) };
return;
}
};
last_cid = Some(cid);
}
match last_cid {
Some(cid) => yield UnixfsStatus::CompletedStatus { path: IpfsPath::from(cid), written, total_size },
None => yield UnixfsStatus::FailedStatus { written, total_size, error: None }
};
};
Ok(stream.boxed())
}