1use futures::stream::StreamExt;
17use futures_util::io::AsyncReadExt;
18use ipfs_api::request::*;
19use ipfs_api::response::Error as IpfsApiError;
20use ipfs_api::response::FilesStatResponse;
21use ipfs_api::IpfsClient;
22use std::future::Future;
23use std::io::Cursor;
24use std::path::{Path, PathBuf};
25use std::pin::Pin;
26use thiserror::Error as ThisError;
27
28#[derive(ThisError, Debug)]
29pub enum MfsError {
30 #[error("mfs: {msg}")]
31 OpError {
32 msg: String,
33 #[source]
34 source: failure::Compat<IpfsApiError>,
35 },
36 #[error("mfs: {msg}")]
37 LayeredError {
38 msg: String,
39 #[source]
40 source: Box<MfsError>,
41 },
42 #[error("mfs: could not read non-ipfs input")]
43 InputError {
44 #[source]
45 source: std::io::Error,
46 },
47}
48
49type MfsResult<T> = Result<T, MfsError>;
50
51trait OpText<T, E> {
52 fn with_context<C, F>(self, f: F) -> Result<T, MfsError>
53 where
54 C: std::fmt::Display + Send + Sync + 'static,
55 F: FnOnce() -> C;
56}
57impl<T> OpText<T, ipfs_api::response::Error> for std::result::Result<T, ipfs_api::response::Error> {
58 fn with_context<C, F>(self, f: F) -> Result<T, MfsError>
59 where
60 C: std::fmt::Display + Send + Sync + 'static,
61 F: FnOnce() -> C,
62 {
63 match self {
64 Ok(ok) => Ok(ok),
65 Err(e) => Err(MfsError::OpError {
66 msg: f().to_string(),
67 source: failure::Fail::compat(e),
68 }),
69 }
70 }
71}
72impl<T> OpText<T, MfsError> for std::result::Result<T, MfsError> {
73 fn with_context<C, F>(self, f: F) -> Result<T, MfsError>
74 where
75 C: std::fmt::Display + Send + Sync + 'static,
76 F: FnOnce() -> C,
77 {
78 match self {
79 Ok(ok) => Ok(ok),
80 Err(e) => Err(MfsError::LayeredError {
81 msg: f().to_string(),
82 source: Box::new(e),
83 }),
84 }
85 }
86}
87
88trait Unpath {
89 fn unpath(&self) -> &str;
90}
91
92impl<T> Unpath for T
93where
94 T: AsRef<Path>,
95{
96 fn unpath(&self) -> &str {
97 self.as_ref().to_str().unwrap()
98 }
99}
100
101#[derive(Clone)]
105pub struct Mfs {
106 ipfs: IpfsClient,
107 pub hash_default: String,
109 pub cid_default: i32,
111 pub raw_leaves_default: bool,
113}
114impl Mfs {
115 pub fn new(api: &str) -> Result<Mfs, http::uri::InvalidUri> {
116 Ok(Mfs {
117 ipfs: ipfs_api::TryFromUri::from_str(api)?,
118 hash_default: "sha2-256".to_owned(),
119 cid_default: 1,
120 raw_leaves_default: true,
121 })
122 }
123
124 pub async fn rm_r<P: AsRef<Path>>(&self, p: P) -> MfsResult<()> {
126 Ok(self
127 .ipfs
128 .files_rm(p.unpath(), true)
129 .await
130 .with_context(|| format!("rm -r {:?}", p.as_ref()))?)
131 }
132
133 pub async fn rm<P: AsRef<Path>>(&self, p: P) -> MfsResult<()> {
135 Ok(self
136 .ipfs
137 .files_rm(p.unpath(), false)
138 .await
139 .with_context(|| format!("rm {:?}", p.as_ref()))?)
140 }
141
142 pub async fn mkdirs<P: AsRef<Path>>(&self, p: P) -> MfsResult<()> {
144 Ok(self
145 .ipfs
146 .files_mkdir_with_options(
147 FilesMkdir::builder()
148 .path(p.unpath())
149 .parents(true)
150 .cid_version(self.cid_default)
151 .hash(&self.hash_default)
152 .build(),
153 )
154 .await
155 .with_context(|| format!("mkdir -p {:?}", p.as_ref()))?)
156 }
157
158 pub async fn mkdir<P: AsRef<Path>>(&self, p: P) -> MfsResult<()> {
160 Ok(self
161 .ipfs
162 .files_mkdir_with_options(
163 FilesMkdir::builder()
164 .path(p.unpath())
165 .parents(false)
166 .cid_version(self.cid_default)
167 .hash(&self.hash_default)
168 .build(),
169 )
170 .await
171 .with_context(|| format!("mkdir {:?}", p.as_ref()))?)
172 }
173
174 pub async fn mv<PS: AsRef<Path>, PD: AsRef<Path>>(&self, s: PS, d: PD) -> MfsResult<()> {
177 Ok(self
178 .ipfs
179 .files_mv(s.unpath(), d.unpath())
180 .await
181 .with_context(|| format!("mv {:?} {:?}", s.as_ref(), d.as_ref()))?)
182 }
183
184 pub async fn cp<PS: AsRef<Path>, PD: AsRef<Path>>(&self, s: PS, d: PD) -> MfsResult<()> {
186 Ok(self
187 .ipfs
188 .files_cp(s.unpath(), d.unpath())
189 .await
190 .with_context(|| format!("cp {:?} {:?}", s.as_ref(), d.as_ref()))?)
191 }
192
193 pub async fn ls<P: AsRef<Path>>(&self, p: P) -> MfsResult<Vec<PathBuf>> {
195 Ok(self
196 .ipfs
197 .files_ls(Some(p.unpath()))
198 .await
199 .with_context(|| format!("ls {:?}", p.as_ref()))?
200 .entries
201 .into_iter()
202 .map(|e| e.name.into())
203 .collect())
204 }
205
206 pub fn get<'a, P: AsRef<Path>>(
208 &self,
209 s: P,
210 ) -> impl futures_core::stream::Stream<Item = MfsResult<bytes::Bytes>> {
211 self.ipfs
212 .files_read(s.unpath())
213 .map(move |e| Ok(e.with_context(|| format!("reading {:?}", s.as_ref()))?))
214 }
215
216 pub async fn get_fully<P: AsRef<Path>>(&self, s: P) -> MfsResult<Vec<u8>> {
218 use futures_util::stream::TryStreamExt;
219 self.get(s).map_ok(|chunk| chunk.to_vec()).try_concat().await
220 }
222
223 pub async fn flush<P: AsRef<Path>>(&self, p: P) -> MfsResult<()> {
225 Ok(self
226 .ipfs
227 .files_flush(Some(p.unpath()))
228 .await
229 .with_context(|| format!("flush {:?}", p.as_ref()))?)
230 }
231
232 pub async fn put<P: AsRef<Path>, R: 'static + futures::AsyncRead + Send + Sync + Unpin>(
237 &self,
238 d: P,
239 mut data: R,
240 ) -> MfsResult<()> {
241 let d = d.unpath();
242 let mut firstwrite = true;
243 let mut total = 0;
244 let mut finished = false;
245 let mut pending: Option<Pin<Box<dyn Future<Output = Result<(), ipfs_api::response::Error>> + Send>>> =
246 None;
247 while !finished {
248 let mut buf = Vec::new();
249 buf.resize(1 << 23, 0);
250 let mut offset = 0;
251 while offset < buf.len() && !finished {
252 let read = data
253 .read(&mut buf[offset..])
254 .await
255 .map_err(|e| MfsError::InputError { source: e })?;
256 offset += read;
257 finished = read == 0;
258 }
259 buf.truncate(offset);
260 if offset == 0 && !firstwrite {
261 break;
262 }
263 let req = self.ipfs.files_write_with_options(
264 FilesWrite::builder()
265 .path(d)
266 .create(firstwrite)
267 .truncate(firstwrite)
268 .parents(true)
269 .offset(total as i64)
270 .count(offset as i64)
271 .raw_leaves(self.raw_leaves_default)
272 .cid_version(self.cid_default)
273 .hash(&self.hash_default)
274 .flush(true)
275 .build(),
276 Cursor::new(buf),
277 );
278 if let Some(req) = pending {
279 req.await.with_context(|| format!("write {:?}: chunk", d))?;
280 }
281 pending = Some(Box::pin(req));
282 total += offset;
283 firstwrite = false;
284 }
285 if let Some(req) = pending {
286 req.await.with_context(|| format!("write {:?}: final chunk", d))?;
287 }
288 let stat = self.stat(d).await.with_context(|| format!("write {:?}: confirm", d))?;
289 if stat.map(|stat| stat.size != total as u64).unwrap_or(false) {
290 self.rm(d).await.ok();
291 todo!(
292 "write {:?}: read/write sizes do not match - lost bytes :( TODO: don't panic",
293 d
294 );
295 }
296 Ok(())
297 }
298 pub async fn stat<P: AsRef<Path>>(&self, p: P) -> MfsResult<Option<FilesStatResponse>> {
300 match self.ipfs.files_stat(p.unpath()).await {
301 Ok(r) => return Ok(Some(r)),
302 Err(ipfs_api::response::Error::Api(ipfs_api::response::ApiError { code: 0, .. })) => return Ok(None),
303 e @ Err(_) => e.with_context(|| format!("stat {:?}", p.as_ref()))?,
304 };
305 unreachable!("");
306 }
307}
308
309#[cfg(test)]
310mod tests {
311 use super::*;
312
313 #[tokio::test]
314 #[ignore]
315 async fn eierlegendewollmilchsau() {
316 let mfs = Mfs::new("http://127.0.0.1:5001").expect("create client");
317 let basedir = Path::new("/test-rust-ipfs-mfs");
318 if mfs
319 .stat(basedir)
320 .await
321 .expect("Test preparation: check existing files at test path")
322 .is_some()
323 {
324 mfs.rm_r(basedir)
325 .await
326 .expect("Test preparation: remove already existing test files");
327 }
328 mfs.mkdir(basedir)
329 .await
330 .expect("Test preparation: make working directory");
331
332 mfs.mkdir(basedir.join("a/b"))
333 .await
334 .err()
335 .expect("mkdir does not create parents");
336 mfs.mkdirs(basedir.join("a/b")).await.expect("mkdirs creates parents");
337 let stat1 = mfs
338 .stat(basedir)
339 .await
340 .expect("Statting working directory")
341 .expect("Working directory exists");
342 mfs.cp(basedir, basedir.join("a/c")).await.expect("cp succeeds");
343 assert_eq!(
344 mfs.stat(basedir.join("a/c")).await.unwrap().unwrap().hash,
345 stat1.hash,
346 "After cp is before cp (the hash)"
347 );
348 mfs.mv(basedir.join("a/b"), basedir.join("a/d"))
349 .await
350 .expect("mv succeeds");
351 let mut ls1 = mfs.ls(basedir.join("a")).await.expect("Listing a");
352 ls1.sort();
353 assert_eq!(
354 vec![PathBuf::from("c"), PathBuf::from("d")],
355 ls1,
356 "Directory listing matches expected"
357 );
358
359 for size in vec![0 as usize, 10, 1, 8 << 20, 9 << 20] {
360 let f = &basedir.join("f");
361 let data = (0..size).map(|_| rand::random::<u8>()).collect::<Vec<_>>();
362 mfs.put(f, futures::io::Cursor::new(data.clone()))
363 .await
364 .expect(&format!("Write file of size {}", size));
365 let redata = mfs.get_fully(f).await.expect("Read file");
366 assert_eq!(data.len(), redata.len(), "Read size matches written size");
367 assert_eq!(data, redata, "Read matches written");
368 }
369
370 mfs.rm_r(basedir).await.expect("cleanup");
371 }
372
373 #[allow(dead_code)]
375 fn good_little_future() {
376 fn good<T: Send>(_: T) {}
377 let mfs = Mfs::new("http://127.0.0.1:5001").expect("create client");
378 let a = Path::new("");
379 let b = a;
380 let dat = futures::io::Cursor::new("asdf".as_bytes());
381 good(mfs.put(a, dat));
382 good(mfs.cp(a, b));
383 good(mfs.mv(a, b));
384 good(mfs.get(a));
385 good(mfs.ls(a));
386 good(mfs.mkdir(a));
387 good(mfs.mkdirs(a));
388 good(mfs.get_fully(a));
389 }
390}