mfs/
lib.rs

1//! Small wrapper crate for `ipfs_api`.
2//! Mostly exists because I do not want any part of the `failure` crate to appear in my code.
3//!
4//! Quick example:
5//! ```no_run
6//! # use mfs::Mfs;
7//! # use std::path::Path;
8//! # async {
9//! let mfs = Mfs::new("http://127.0.0.1:5001").unwrap();
10//! mfs.put(Path::new("/demo"), futures::io::Cursor::new("Hello ipfs files")).await.ok();
11//! # };
12//! ```
13//!
14//! The relevant functions are exposted through the main `Mfs` struct.
15
16use futures::stream::StreamExt;
17use futures_util::io::AsyncReadExt;
18use ipfs_api::request::*;
19use ipfs_api::response::Error as IpfsApiError;
20use ipfs_api::response::FilesStatResponse;
21use ipfs_api::IpfsClient;
22use std::future::Future;
23use std::io::Cursor;
24use std::path::{Path, PathBuf};
25use std::pin::Pin;
26use thiserror::Error as ThisError;
27
28#[derive(ThisError, Debug)]
29pub enum MfsError {
30	#[error("mfs: {msg}")]
31	OpError {
32		msg: String,
33		#[source]
34		source: failure::Compat<IpfsApiError>,
35	},
36	#[error("mfs: {msg}")]
37	LayeredError {
38		msg: String,
39		#[source]
40		source: Box<MfsError>,
41	},
42	#[error("mfs: could not read non-ipfs input")]
43	InputError {
44		#[source]
45		source: std::io::Error,
46	},
47}
48
49type MfsResult<T> = Result<T, MfsError>;
50
51trait OpText<T, E> {
52	fn with_context<C, F>(self, f: F) -> Result<T, MfsError>
53	where
54		C: std::fmt::Display + Send + Sync + 'static,
55		F: FnOnce() -> C;
56}
57impl<T> OpText<T, ipfs_api::response::Error> for std::result::Result<T, ipfs_api::response::Error> {
58	fn with_context<C, F>(self, f: F) -> Result<T, MfsError>
59	where
60		C: std::fmt::Display + Send + Sync + 'static,
61		F: FnOnce() -> C,
62	{
63		match self {
64			Ok(ok) => Ok(ok),
65			Err(e) => Err(MfsError::OpError {
66				msg: f().to_string(),
67				source: failure::Fail::compat(e),
68			}),
69		}
70	}
71}
72impl<T> OpText<T, MfsError> for std::result::Result<T, MfsError> {
73	fn with_context<C, F>(self, f: F) -> Result<T, MfsError>
74	where
75		C: std::fmt::Display + Send + Sync + 'static,
76		F: FnOnce() -> C,
77	{
78		match self {
79			Ok(ok) => Ok(ok),
80			Err(e) => Err(MfsError::LayeredError {
81				msg: f().to_string(),
82				source: Box::new(e),
83			}),
84		}
85	}
86}
87
88trait Unpath {
89	fn unpath(&self) -> &str;
90}
91
92impl<T> Unpath for T
93where
94	T: AsRef<Path>,
95{
96	fn unpath(&self) -> &str {
97		self.as_ref().to_str().unwrap()
98	}
99}
100
101/// The main struct
102///
103/// All of its functions will panic if the passed paths are not valid unicode.
104#[derive(Clone)]
105pub struct Mfs {
106	ipfs: IpfsClient,
107	/// Default hash function to use for write / create operations.
108	pub hash_default: String,
109	/// Default cid version to request on write / create operations. Defaults to `1`.
110	pub cid_default: i32,
111	/// Whether to write files with `raw-leaves`
112	pub raw_leaves_default: bool,
113}
114impl Mfs {
115	pub fn new(api: &str) -> Result<Mfs, http::uri::InvalidUri> {
116		Ok(Mfs {
117			ipfs: ipfs_api::TryFromUri::from_str(api)?,
118			hash_default: "sha2-256".to_owned(),
119			cid_default: 1,
120			raw_leaves_default: true,
121		})
122	}
123
124	/// Remove file or folder (possibly non-empty)
125	pub async fn rm_r<P: AsRef<Path>>(&self, p: P) -> MfsResult<()> {
126		Ok(self
127			.ipfs
128			.files_rm(p.unpath(), true)
129			.await
130			.with_context(|| format!("rm -r {:?}", p.as_ref()))?)
131	}
132
133	/// Remove file
134	pub async fn rm<P: AsRef<Path>>(&self, p: P) -> MfsResult<()> {
135		Ok(self
136			.ipfs
137			.files_rm(p.unpath(), false)
138			.await
139			.with_context(|| format!("rm {:?}", p.as_ref()))?)
140	}
141
142	/// Create directory `p` and parents as needed.
143	pub async fn mkdirs<P: AsRef<Path>>(&self, p: P) -> MfsResult<()> {
144		Ok(self
145			.ipfs
146			.files_mkdir_with_options(
147				FilesMkdir::builder()
148					.path(p.unpath())
149					.parents(true)
150					.cid_version(self.cid_default)
151					.hash(&self.hash_default)
152					.build(),
153			)
154			.await
155			.with_context(|| format!("mkdir -p {:?}", p.as_ref()))?)
156	}
157
158	/// Create directory `p`. Requires that its parent already exist.
159	pub async fn mkdir<P: AsRef<Path>>(&self, p: P) -> MfsResult<()> {
160		Ok(self
161			.ipfs
162			.files_mkdir_with_options(
163				FilesMkdir::builder()
164					.path(p.unpath())
165					.parents(false)
166					.cid_version(self.cid_default)
167					.hash(&self.hash_default)
168					.build(),
169			)
170			.await
171			.with_context(|| format!("mkdir {:?}", p.as_ref()))?)
172	}
173
174	/// Rename / move `s` to `d`.
175	///
176	pub async fn mv<PS: AsRef<Path>, PD: AsRef<Path>>(&self, s: PS, d: PD) -> MfsResult<()> {
177		Ok(self
178			.ipfs
179			.files_mv(s.unpath(), d.unpath())
180			.await
181			.with_context(|| format!("mv {:?} {:?}", s.as_ref(), d.as_ref()))?)
182	}
183
184	/// Copy path `s` to path `d`. Beware of `s` starting with `/ipfs` or `/ipns`.
185	pub async fn cp<PS: AsRef<Path>, PD: AsRef<Path>>(&self, s: PS, d: PD) -> MfsResult<()> {
186		Ok(self
187			.ipfs
188			.files_cp(s.unpath(), d.unpath())
189			.await
190			.with_context(|| format!("cp {:?} {:?}", s.as_ref(), d.as_ref()))?)
191	}
192
193	/// List files in folder
194	pub async fn ls<P: AsRef<Path>>(&self, p: P) -> MfsResult<Vec<PathBuf>> {
195		Ok(self
196			.ipfs
197			.files_ls(Some(p.unpath()))
198			.await
199			.with_context(|| format!("ls {:?}", p.as_ref()))?
200			.entries
201			.into_iter()
202			.map(|e| e.name.into())
203			.collect())
204	}
205
206	/// Read file at `s`.
207	pub fn get<'a, P: AsRef<Path>>(
208		&self,
209		s: P,
210	) -> impl futures_core::stream::Stream<Item = MfsResult<bytes::Bytes>> {
211		self.ipfs
212			.files_read(s.unpath())
213			.map(move |e| Ok(e.with_context(|| format!("reading {:?}", s.as_ref()))?))
214	}
215
216	/// Read file at `s` into in-memory buffer.
217	pub async fn get_fully<P: AsRef<Path>>(&self, s: P) -> MfsResult<Vec<u8>> {
218		use futures_util::stream::TryStreamExt;
219		self.get(s).map_ok(|chunk| chunk.to_vec()).try_concat().await
220		// Optimally, I'd have a version of this that returns a Read or similar…
221	}
222
223	/// Flush folder
224	pub async fn flush<P: AsRef<Path>>(&self, p: P) -> MfsResult<()> {
225		Ok(self
226			.ipfs
227			.files_flush(Some(p.unpath()))
228			.await
229			.with_context(|| format!("flush {:?}", p.as_ref()))?)
230	}
231
232	/// Write file to `d`.
233	///
234	/// Internally buffers chunks of 8 MiB and writes them with separate calls to IPFS
235	/// due to limitations of `multipart`.
236	pub async fn put<P: AsRef<Path>, R: 'static + futures::AsyncRead + Send + Sync + Unpin>(
237		&self,
238		d: P,
239		mut data: R,
240	) -> MfsResult<()> {
241		let d = d.unpath();
242		let mut firstwrite = true;
243		let mut total = 0;
244		let mut finished = false;
245		let mut pending: Option<Pin<Box<dyn Future<Output = Result<(), ipfs_api::response::Error>> + Send>>> =
246			None;
247		while !finished {
248			let mut buf = Vec::new();
249			buf.resize(1 << 23, 0);
250			let mut offset = 0;
251			while offset < buf.len() && !finished {
252				let read = data
253					.read(&mut buf[offset..])
254					.await
255					.map_err(|e| MfsError::InputError { source: e })?;
256				offset += read;
257				finished = read == 0;
258			}
259			buf.truncate(offset);
260			if offset == 0 && !firstwrite {
261				break;
262			}
263			let req = self.ipfs.files_write_with_options(
264				FilesWrite::builder()
265					.path(d)
266					.create(firstwrite)
267					.truncate(firstwrite)
268					.parents(true)
269					.offset(total as i64)
270					.count(offset as i64)
271					.raw_leaves(self.raw_leaves_default)
272					.cid_version(self.cid_default)
273					.hash(&self.hash_default)
274					.flush(true)
275					.build(),
276				Cursor::new(buf),
277			);
278			if let Some(req) = pending {
279				req.await.with_context(|| format!("write {:?}: chunk", d))?;
280			}
281			pending = Some(Box::pin(req));
282			total += offset;
283			firstwrite = false;
284		}
285		if let Some(req) = pending {
286			req.await.with_context(|| format!("write {:?}: final chunk", d))?;
287		}
288		let stat = self.stat(d).await.with_context(|| format!("write {:?}: confirm", d))?;
289		if stat.map(|stat| stat.size != total as u64).unwrap_or(false) {
290			self.rm(d).await.ok();
291			todo!(
292				"write {:?}: read/write sizes do not match - lost bytes :( TODO: don't panic",
293				d
294			);
295		}
296		Ok(())
297	}
298	/// Request hash, type, sizes, and block count of `p`. Returns `None` if the file does not exist
299	pub async fn stat<P: AsRef<Path>>(&self, p: P) -> MfsResult<Option<FilesStatResponse>> {
300		match self.ipfs.files_stat(p.unpath()).await {
301			Ok(r) => return Ok(Some(r)),
302			Err(ipfs_api::response::Error::Api(ipfs_api::response::ApiError { code: 0, .. })) => return Ok(None),
303			e @ Err(_) => e.with_context(|| format!("stat {:?}", p.as_ref()))?,
304		};
305		unreachable!("");
306	}
307}
308
309#[cfg(test)]
310mod tests {
311	use super::*;
312
313	#[tokio::test]
314	#[ignore]
315	async fn eierlegendewollmilchsau() {
316		let mfs = Mfs::new("http://127.0.0.1:5001").expect("create client");
317		let basedir = Path::new("/test-rust-ipfs-mfs");
318		if mfs
319			.stat(basedir)
320			.await
321			.expect("Test preparation: check existing files at test path")
322			.is_some()
323		{
324			mfs.rm_r(basedir)
325				.await
326				.expect("Test preparation: remove already existing test files");
327		}
328		mfs.mkdir(basedir)
329			.await
330			.expect("Test preparation: make working directory");
331
332		mfs.mkdir(basedir.join("a/b"))
333			.await
334			.err()
335			.expect("mkdir does not create parents");
336		mfs.mkdirs(basedir.join("a/b")).await.expect("mkdirs creates parents");
337		let stat1 = mfs
338			.stat(basedir)
339			.await
340			.expect("Statting working directory")
341			.expect("Working directory exists");
342		mfs.cp(basedir, basedir.join("a/c")).await.expect("cp succeeds");
343		assert_eq!(
344			mfs.stat(basedir.join("a/c")).await.unwrap().unwrap().hash,
345			stat1.hash,
346			"After cp is before cp (the hash)"
347		);
348		mfs.mv(basedir.join("a/b"), basedir.join("a/d"))
349			.await
350			.expect("mv succeeds");
351		let mut ls1 = mfs.ls(basedir.join("a")).await.expect("Listing a");
352		ls1.sort();
353		assert_eq!(
354			vec![PathBuf::from("c"), PathBuf::from("d")],
355			ls1,
356			"Directory listing matches expected"
357		);
358
359		for size in vec![0 as usize, 10, 1, 8 << 20, 9 << 20] {
360			let f = &basedir.join("f");
361			let data = (0..size).map(|_| rand::random::<u8>()).collect::<Vec<_>>();
362			mfs.put(f, futures::io::Cursor::new(data.clone()))
363				.await
364				.expect(&format!("Write file of size {}", size));
365			let redata = mfs.get_fully(f).await.expect("Read file");
366			assert_eq!(data.len(), redata.len(), "Read size matches written size");
367			assert_eq!(data, redata, "Read matches written");
368		}
369
370		mfs.rm_r(basedir).await.expect("cleanup");
371	}
372
373	// Compile only test
374	#[allow(dead_code)]
375	fn good_little_future() {
376		fn good<T: Send>(_: T) {}
377		let mfs = Mfs::new("http://127.0.0.1:5001").expect("create client");
378		let a = Path::new("");
379		let b = a;
380		let dat = futures::io::Cursor::new("asdf".as_bytes());
381		good(mfs.put(a, dat));
382		good(mfs.cp(a, b));
383		good(mfs.mv(a, b));
384		good(mfs.get(a));
385		good(mfs.ls(a));
386		good(mfs.mkdir(a));
387		good(mfs.mkdirs(a));
388		good(mfs.get_fully(a));
389	}
390}