rust_ipfs/repo/blockstore/
flatfs.rs1use crate::error::Error;
2use crate::repo::paths::{block_path, filestem_to_block_cid};
3use crate::repo::{BlockPut, BlockStore};
4use crate::Block;
5use async_trait::async_trait;
6use futures::stream::{self, BoxStream};
7use futures::{StreamExt, TryFutureExt, TryStreamExt};
8use ipld_core::cid::Cid;
9use std::io::{self, ErrorKind, Read};
10use std::path::PathBuf;
11use std::sync::Arc;
12use tokio::fs;
13use tokio::sync::RwLock;
14use tokio_stream::wrappers::ReadDirStream;
15
16#[derive(Debug)]
20pub struct FsBlockStore {
21 inner: Arc<RwLock<FsBlockStoreInner>>,
22}
23
24#[derive(Debug)]
25struct FsBlockStoreInner {
26 path: PathBuf,
27}
28
29impl FsBlockStore {
30 pub fn new(path: PathBuf) -> Self {
31 let inner = Arc::new(RwLock::new(FsBlockStoreInner { path }));
32
33 FsBlockStore { inner }
34 }
35}
36
37#[async_trait]
38impl BlockStore for FsBlockStore {
39 async fn init(&self) -> Result<(), Error> {
40 let inner = &*self.inner.read().await;
41 fs::create_dir_all(inner.path.clone()).await?;
42 Ok(())
43 }
44
45 async fn contains(&self, cid: &Cid) -> Result<bool, Error> {
46 let inner = &*self.inner.read().await;
47 inner.contains(cid).await
48 }
49
50 async fn get(&self, cid: &Cid) -> Result<Option<Block>, Error> {
51 let inner = &*self.inner.read().await;
52 inner.get(cid).await
53 }
54
55 async fn size(&self, cid: &[Cid]) -> Result<Option<usize>, Error> {
56 let inner = &*self.inner.read().await;
57 Ok(inner.size(cid).await)
58 }
59
60 async fn total_size(&self) -> Result<usize, Error> {
61 let inner = &*self.inner.read().await;
62 Ok(inner.total_size().await)
63 }
64
65 async fn put(&self, block: &Block) -> Result<(Cid, BlockPut), Error> {
68 let inner = &mut *self.inner.write().await;
69 inner.put(block).await
70 }
71
72 async fn remove(&self, cid: &Cid) -> Result<(), Error> {
73 let inner = &mut *self.inner.write().await;
74 inner.remove(cid).await
75 }
76
77 async fn remove_many(&self, blocks: BoxStream<'static, Cid>) -> BoxStream<'static, Cid> {
78 let inner = self.inner.clone();
79 let stream = async_stream::stream! {
80 let inner = &mut *inner.write().await;
81 let path = inner.path.clone();
82 for await cid in blocks
83 .map(move |cid| (cid, block_path(path.clone(), &cid)))
84 .filter_map(|(cid, path)| async move { fs::remove_file(path).await.ok().map(|_| cid) }) {
85 yield cid;
86 }
87 };
88
89 stream.boxed()
90 }
91
92 async fn list(&self) -> BoxStream<'static, Cid> {
93 let inner = &*self.inner.read().await;
94 inner.list().await
95 }
96}
97
98impl FsBlockStoreInner {
99 async fn contains(&self, cid: &Cid) -> Result<bool, Error> {
100 let path = block_path(self.path.clone(), cid);
101
102 let metadata = match fs::metadata(path).await {
103 Ok(m) => m,
104 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(false),
105 Err(e) => return Err(e.into()),
106 };
107
108 Ok(metadata.is_file())
109 }
110
111 async fn get(&self, cid: &Cid) -> Result<Option<Block>, Error> {
112 let path = block_path(self.path.clone(), cid);
113
114 let cid = *cid;
115
116 tokio::task::spawn_blocking(move || {
119 let mut file = match std::fs::File::open(path) {
120 Ok(file) => file,
121 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
122 Err(e) => {
123 return Err(e.into());
124 }
125 };
126
127 let len = file.metadata()?.len();
128
129 let mut data = Vec::with_capacity(len as usize);
130 file.read_to_end(&mut data)?;
131 let block = Block::new(cid, data)?;
132 Ok(Some(block))
133 })
134 .await?
135 }
136
137 async fn put(&mut self, block: &Block) -> Result<(Cid, BlockPut), Error> {
138 let block = block.clone();
139 let target_path = block_path(self.path.clone(), block.cid());
140 let cid = *block.cid();
141
142 let je = tokio::task::spawn_blocking(move || {
143 let sharded = target_path
144 .parent()
145 .expect("we already have at least the shard parent");
146
147 std::fs::create_dir_all(sharded)?;
148
149 let target = std::fs::OpenOptions::new()
150 .write(true)
151 .create_new(true)
152 .open(&target_path)?;
153
154 let temp_path = target_path.with_extension("tmp");
155
156 match write_through_tempfile(target, &target_path, temp_path, block.data()) {
157 Ok(()) => {
158 trace!("successfully wrote the block");
159 Ok::<_, std::io::Error>(Ok(block.data().len()))
160 }
161 Err(e) => {
162 match std::fs::remove_file(&target_path) {
163 Ok(_) => debug!("removed partially written {:?}", target_path),
164 Err(removal) => warn!(
165 "failed to remove partially written {:?}: {}",
166 target_path, removal
167 ),
168 }
169 Ok(Err(e))
170 }
171 }
172 })
173 .await
174 .map_err(|e| {
175 error!("blocking put task error: {}", e);
176 e
177 })?;
178
179 match je {
180 Ok(Ok(written)) => {
181 trace!(bytes = written, "block writing succeeded");
182 Ok((cid, BlockPut::NewBlock))
183 }
184 Ok(Err(e)) => {
185 trace!("write failed but hopefully the target was removed");
186
187 Err(Error::new(e))
188 }
189 Err(e) if e.kind() == ErrorKind::AlreadyExists => {
190 trace!("block exist: {}", e);
191 Ok((cid, BlockPut::Existed))
192 }
193 Err(e) => Err(Error::new(e)),
194 }
195 }
196
197 async fn size(&self, cids: &[Cid]) -> Option<usize> {
198 let mut block_sizes = 0;
199
200 for cid in cids {
201 let path = block_path(self.path.clone(), cid);
202 if let Ok(size) = fs::metadata(path).await.map(|m| m.len() as usize) {
203 block_sizes += size;
204 }
205 }
206
207 Some(block_sizes)
208 }
209
210 async fn total_size(&self) -> usize {
211 self.list_stream()
212 .and_then(|blocks| async move {
213 let list = blocks
214 .try_filter_map(|(_, path)| async move {
215 let meta = fs::metadata(path).await?;
216 Ok(Some(meta.len()))
217 })
218 .try_collect::<Vec<_>>()
219 .await
220 .unwrap_or_default();
221 Ok(list.into_iter().sum::<u64>() as usize)
222 })
223 .await
224 .unwrap_or_default()
225 }
226
227 async fn remove(&mut self, cid: &Cid) -> Result<(), Error> {
228 let path = block_path(self.path.clone(), cid);
229 trace!(cid = %cid, "removing block after synchronizing");
230 fs::remove_file(path).await.map_err(anyhow::Error::from)
231 }
232
233 async fn list_stream(
234 &self,
235 ) -> Result<BoxStream<'static, Result<(Cid, PathBuf), io::Error>>, Error> {
236 let stream = ReadDirStream::new(fs::read_dir(&self.path).await?);
237 let path = self.path.clone();
238 Ok(stream
239 .try_filter_map(|d| async move {
240 Ok(if d.file_type().await?.is_dir() {
242 Some(ReadDirStream::new(fs::read_dir(d.path()).await?))
243 } else {
244 None
245 })
246 })
247 .try_flatten()
249 .try_filter_map(|d| {
251 let name = d.file_name();
252 let path: &std::path::Path = name.as_ref();
253
254 futures::future::ready(if path.extension() != Some("data".as_ref()) {
255 Ok(None)
256 } else {
257 let maybe_cid = filestem_to_block_cid(path.file_stem());
258 Ok(maybe_cid)
259 })
260 })
261 .try_filter_map(move |cid| {
262 let path = path.clone();
263 async move {
264 let path = block_path(path, &cid);
265 Ok(Some((cid, path)))
266 }
267 })
268 .boxed())
269 }
270
271 async fn list(&self) -> BoxStream<'static, Cid> {
272 let stream = self.list_stream().await.unwrap_or(stream::empty().boxed());
273 stream
274 .try_filter_map(|(cid, _)| futures::future::ready(Ok(Some(cid))))
275 .filter_map(|cid| futures::future::ready(cid.ok()))
276 .boxed()
277 }
278}
279
280fn write_through_tempfile(
281 target: std::fs::File,
282 target_path: impl AsRef<std::path::Path>,
283 temp_path: impl AsRef<std::path::Path>,
284 data: &[u8],
285) -> Result<(), std::io::Error> {
286 use std::io::Write;
287
288 let mut temp = std::fs::OpenOptions::new()
289 .write(true)
290 .create(true)
291 .truncate(true)
292 .open(&temp_path)?;
293
294 temp.write_all(data)?;
295 temp.flush()?;
296
297 temp.sync_all()?;
299
300 drop(temp);
301 drop(target);
302
303 std::fs::rename(temp_path, target_path)?;
304
305 Ok(())
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313 use crate::block::BlockCodec;
314 use crate::Block;
315 use hex_literal::hex;
316 use ipld_core::cid::Cid;
317 use multihash_codetable::{Code, MultihashDigest};
318 use std::convert::TryFrom;
319 use std::env::temp_dir;
320 use std::sync::Arc;
321
322 #[tokio::test]
323 async fn test_fs_blockstore() {
324 let mut tmp = temp_dir();
325 tmp.push("blockstore1");
326 std::fs::remove_dir_all(tmp.clone()).ok();
327 let store = FsBlockStore::new(tmp.clone());
328
329 let data = b"1".to_vec();
330 let cid = Cid::new_v1(BlockCodec::Raw.into(), Code::Sha2_256.digest(&data));
331 let block = Block::new(cid, data).unwrap();
332
333 store.init().await.unwrap();
334
335 let contains = store.contains(&cid).await.unwrap();
336 assert!(!contains);
337 let get = store.get(&cid).await.unwrap();
338 assert_eq!(get, None);
339 if store.remove(&cid).await.is_ok() {
340 panic!("block should not be found")
341 }
342
343 let put = store.put(&block).await.unwrap();
344 assert_eq!(put.0, cid.to_owned());
345 let contains = store.contains(&cid);
346 assert!(contains.await.unwrap());
347 let get = store.get(&cid);
348 assert_eq!(get.await.unwrap(), Some(block.clone()));
349
350 store.remove(&cid).await.unwrap();
351 let contains = store.contains(&cid);
352 assert!(!contains.await.unwrap());
353 let get = store.get(&cid);
354 assert_eq!(get.await.unwrap(), None);
355
356 std::fs::remove_dir_all(tmp).ok();
357 }
358
359 #[tokio::test]
360 async fn test_fs_blockstore_open() {
361 let mut tmp = temp_dir();
362 tmp.push("blockstore2");
363 std::fs::remove_dir_all(&tmp).ok();
364
365 let data = b"1".to_vec();
366 let cid = Cid::new_v1(BlockCodec::Raw.into(), Code::Sha2_256.digest(&data));
367 let block = Block::new(cid, data).unwrap();
368
369 let block_store = FsBlockStore::new(tmp.clone());
370 block_store.init().await.unwrap();
371
372 assert!(!block_store.contains(block.cid()).await.unwrap());
373 block_store.put(&block).await.unwrap();
374
375 let block_store = FsBlockStore::new(tmp.clone());
376 assert!(block_store.contains(block.cid()).await.unwrap());
377 assert_eq!(block_store.get(block.cid()).await.unwrap().unwrap(), block);
378
379 std::fs::remove_dir_all(&tmp).ok();
380 }
381
382 #[tokio::test]
383 async fn test_fs_blockstore_list() {
384 let mut tmp = temp_dir();
385 tmp.push("blockstore_list");
386 std::fs::remove_dir_all(&tmp).ok();
387
388 let block_store = FsBlockStore::new(tmp.clone());
389 block_store.init().await.unwrap();
390
391 for data in &[b"1", b"2", b"3"] {
392 let data_slice = data.to_vec();
393 let cid = Cid::new_v1(BlockCodec::Raw.into(), Code::Sha2_256.digest(&data_slice));
394 let block = Block::new(cid, data_slice).unwrap();
395 block_store.put(&block).await.unwrap();
396 }
397
398 let cids = block_store.list().await.collect::<Vec<_>>().await;
399 assert_eq!(cids.len(), 3);
400 for cid in cids.iter() {
401 assert!(block_store.contains(cid).await.unwrap());
402 }
403 }
404
405 #[tokio::test]
406 async fn race_to_insert_new() {
407 let mut tmp = temp_dir();
409 tmp.push("race_to_insert_new");
410 std::fs::remove_dir_all(&tmp).ok();
411
412 let single = FsBlockStore::new(tmp.clone());
413 single.init().await.unwrap();
414
415 let single = Arc::new(single);
416
417 let cid = Cid::try_from("QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL").unwrap();
418 let data = hex!("0a0d08021207666f6f6261720a1807");
419
420 let block = Block::new(cid, data.to_vec()).unwrap();
421
422 let count = 10;
423
424 let (writes, existing) = race_to_insert_scenario(count, block, &single).await;
425
426 assert_eq!(writes, 1);
427 assert_eq!(existing, count - 1);
428 }
429
430 async fn race_to_insert_scenario(
431 count: usize,
432 block: Block,
433 blockstore: &Arc<FsBlockStore>,
434 ) -> (usize, usize) {
435 let barrier = Arc::new(tokio::sync::Barrier::new(count));
436
437 let join_handles = (0..count)
438 .map(|_| {
439 tokio::spawn({
440 let bs = Arc::clone(blockstore);
441 let barrier = Arc::clone(&barrier);
442 let block = block.clone();
443 async move {
444 barrier.wait().await;
445 bs.put(&block).await
446 }
447 })
448 })
449 .collect::<Vec<_>>();
450
451 let mut writes = 0usize;
452 let mut existing = 0usize;
453
454 for jh in join_handles {
455 let res = jh.await;
456
457 match res {
458 Ok(Ok((_, BlockPut::NewBlock))) => writes += 1,
459 Ok(Ok((_, BlockPut::Existed))) => existing += 1,
460 Ok(Err(e)) => tracing::error!("joinhandle err: {e}"),
461 _ => unreachable!("join error"),
462 }
463 }
464
465 (writes, existing)
466 }
467
468 #[tokio::test]
469 async fn remove() {
470 let mut tmp = temp_dir();
472 tmp.push("remove");
473 std::fs::remove_dir_all(&tmp).ok();
474
475 let single = FsBlockStore::new(tmp.clone());
476
477 single.init().await.unwrap();
478
479 let cid = Cid::try_from("QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL").unwrap();
480 let data = hex!("0a0d08021207666f6f6261720a1807");
481
482 let block = Block::new(cid, data.to_vec()).unwrap();
483
484 assert_eq!(single.list().await.collect::<Vec<_>>().await.len(), 0);
485
486 single.put(&block).await.unwrap();
487
488 assert_eq!(
490 single.list().await.collect::<Vec<_>>().await[0].hash(),
491 cid.hash()
492 );
493
494 single.remove(&cid).await.unwrap();
495 assert_eq!(single.list().await.collect::<Vec<_>>().await.len(), 0);
496 }
497}