rust_ipfs/repo/blockstore/
flatfs.rs

1use crate::error::Error;
2use crate::repo::paths::{block_path, filestem_to_block_cid};
3use crate::repo::{BlockPut, BlockStore};
4use crate::Block;
5use async_trait::async_trait;
6use futures::stream::{self, BoxStream};
7use futures::{StreamExt, TryFutureExt, TryStreamExt};
8use ipld_core::cid::Cid;
9use std::io::{self, ErrorKind, Read};
10use std::path::PathBuf;
11use std::sync::Arc;
12use tokio::fs;
13use tokio::sync::RwLock;
14use tokio_stream::wrappers::ReadDirStream;
15
16/// File system backed block store.
17///
18/// For information on path mangling, please see `block_path` and `filestem_to_block_cid`.
19#[derive(Debug)]
20pub struct FsBlockStore {
21    inner: Arc<RwLock<FsBlockStoreInner>>,
22}
23
24#[derive(Debug)]
25struct FsBlockStoreInner {
26    path: PathBuf,
27}
28
29impl FsBlockStore {
30    pub fn new(path: PathBuf) -> Self {
31        let inner = Arc::new(RwLock::new(FsBlockStoreInner { path }));
32
33        FsBlockStore { inner }
34    }
35}
36
37#[async_trait]
38impl BlockStore for FsBlockStore {
39    async fn init(&self) -> Result<(), Error> {
40        let inner = &*self.inner.read().await;
41        fs::create_dir_all(inner.path.clone()).await?;
42        Ok(())
43    }
44
45    async fn contains(&self, cid: &Cid) -> Result<bool, Error> {
46        let inner = &*self.inner.read().await;
47        inner.contains(cid).await
48    }
49
50    async fn get(&self, cid: &Cid) -> Result<Option<Block>, Error> {
51        let inner = &*self.inner.read().await;
52        inner.get(cid).await
53    }
54
55    async fn size(&self, cid: &[Cid]) -> Result<Option<usize>, Error> {
56        let inner = &*self.inner.read().await;
57        Ok(inner.size(cid).await)
58    }
59
60    async fn total_size(&self) -> Result<usize, Error> {
61        let inner = &*self.inner.read().await;
62        Ok(inner.total_size().await)
63    }
64
65    //TODO: Allow multiple puts without holding a lock. We could probably hold a read lock instead
66    //      and revert back to using a broadcast
67    async fn put(&self, block: &Block) -> Result<(Cid, BlockPut), Error> {
68        let inner = &mut *self.inner.write().await;
69        inner.put(block).await
70    }
71
72    async fn remove(&self, cid: &Cid) -> Result<(), Error> {
73        let inner = &mut *self.inner.write().await;
74        inner.remove(cid).await
75    }
76
77    async fn remove_many(&self, blocks: BoxStream<'static, Cid>) -> BoxStream<'static, Cid> {
78        let inner = self.inner.clone();
79        let stream = async_stream::stream! {
80            let inner = &mut *inner.write().await;
81            let path = inner.path.clone();
82            for await cid in blocks
83                .map(move |cid| (cid, block_path(path.clone(), &cid)))
84                .filter_map(|(cid, path)| async move { fs::remove_file(path).await.ok().map(|_| cid) }) {
85                    yield cid;
86                }
87        };
88
89        stream.boxed()
90    }
91
92    async fn list(&self) -> BoxStream<'static, Cid> {
93        let inner = &*self.inner.read().await;
94        inner.list().await
95    }
96}
97
98impl FsBlockStoreInner {
99    async fn contains(&self, cid: &Cid) -> Result<bool, Error> {
100        let path = block_path(self.path.clone(), cid);
101
102        let metadata = match fs::metadata(path).await {
103            Ok(m) => m,
104            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(false),
105            Err(e) => return Err(e.into()),
106        };
107
108        Ok(metadata.is_file())
109    }
110
111    async fn get(&self, cid: &Cid) -> Result<Option<Block>, Error> {
112        let path = block_path(self.path.clone(), cid);
113
114        let cid = *cid;
115
116        // probably best to do everything in the blocking thread if we are to issue multiple
117        // syscalls
118        tokio::task::spawn_blocking(move || {
119            let mut file = match std::fs::File::open(path) {
120                Ok(file) => file,
121                Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
122                Err(e) => {
123                    return Err(e.into());
124                }
125            };
126
127            let len = file.metadata()?.len();
128
129            let mut data = Vec::with_capacity(len as usize);
130            file.read_to_end(&mut data)?;
131            let block = Block::new(cid, data)?;
132            Ok(Some(block))
133        })
134        .await?
135    }
136
137    async fn put(&mut self, block: &Block) -> Result<(Cid, BlockPut), Error> {
138        let block = block.clone();
139        let target_path = block_path(self.path.clone(), block.cid());
140        let cid = *block.cid();
141
142        let je = tokio::task::spawn_blocking(move || {
143            let sharded = target_path
144                .parent()
145                .expect("we already have at least the shard parent");
146
147            std::fs::create_dir_all(sharded)?;
148
149            let target = std::fs::OpenOptions::new()
150                .write(true)
151                .create_new(true)
152                .open(&target_path)?;
153
154            let temp_path = target_path.with_extension("tmp");
155
156            match write_through_tempfile(target, &target_path, temp_path, block.data()) {
157                Ok(()) => {
158                    trace!("successfully wrote the block");
159                    Ok::<_, std::io::Error>(Ok(block.data().len()))
160                }
161                Err(e) => {
162                    match std::fs::remove_file(&target_path) {
163                        Ok(_) => debug!("removed partially written {:?}", target_path),
164                        Err(removal) => warn!(
165                            "failed to remove partially written {:?}: {}",
166                            target_path, removal
167                        ),
168                    }
169                    Ok(Err(e))
170                }
171            }
172        })
173        .await
174        .map_err(|e| {
175            error!("blocking put task error: {}", e);
176            e
177        })?;
178
179        match je {
180            Ok(Ok(written)) => {
181                trace!(bytes = written, "block writing succeeded");
182                Ok((cid, BlockPut::NewBlock))
183            }
184            Ok(Err(e)) => {
185                trace!("write failed but hopefully the target was removed");
186
187                Err(Error::new(e))
188            }
189            Err(e) if e.kind() == ErrorKind::AlreadyExists => {
190                trace!("block exist: {}", e);
191                Ok((cid, BlockPut::Existed))
192            }
193            Err(e) => Err(Error::new(e)),
194        }
195    }
196
197    async fn size(&self, cids: &[Cid]) -> Option<usize> {
198        let mut block_sizes = 0;
199
200        for cid in cids {
201            let path = block_path(self.path.clone(), cid);
202            if let Ok(size) = fs::metadata(path).await.map(|m| m.len() as usize) {
203                block_sizes += size;
204            }
205        }
206
207        Some(block_sizes)
208    }
209
210    async fn total_size(&self) -> usize {
211        self.list_stream()
212            .and_then(|blocks| async move {
213                let list = blocks
214                    .try_filter_map(|(_, path)| async move {
215                        let meta = fs::metadata(path).await?;
216                        Ok(Some(meta.len()))
217                    })
218                    .try_collect::<Vec<_>>()
219                    .await
220                    .unwrap_or_default();
221                Ok(list.into_iter().sum::<u64>() as usize)
222            })
223            .await
224            .unwrap_or_default()
225    }
226
227    async fn remove(&mut self, cid: &Cid) -> Result<(), Error> {
228        let path = block_path(self.path.clone(), cid);
229        trace!(cid = %cid, "removing block after synchronizing");
230        fs::remove_file(path).await.map_err(anyhow::Error::from)
231    }
232
233    async fn list_stream(
234        &self,
235    ) -> Result<BoxStream<'static, Result<(Cid, PathBuf), io::Error>>, Error> {
236        let stream = ReadDirStream::new(fs::read_dir(&self.path).await?);
237        let path = self.path.clone();
238        Ok(stream
239            .try_filter_map(|d| async move {
240                // map over the shard directories
241                Ok(if d.file_type().await?.is_dir() {
242                    Some(ReadDirStream::new(fs::read_dir(d.path()).await?))
243                } else {
244                    None
245                })
246            })
247            // flatten each; there could be unordered execution pre-flattening
248            .try_flatten()
249            // convert the paths ending in ".data" into cid
250            .try_filter_map(|d| {
251                let name = d.file_name();
252                let path: &std::path::Path = name.as_ref();
253
254                futures::future::ready(if path.extension() != Some("data".as_ref()) {
255                    Ok(None)
256                } else {
257                    let maybe_cid = filestem_to_block_cid(path.file_stem());
258                    Ok(maybe_cid)
259                })
260            })
261            .try_filter_map(move |cid| {
262                let path = path.clone();
263                async move {
264                    let path = block_path(path, &cid);
265                    Ok(Some((cid, path)))
266                }
267            })
268            .boxed())
269    }
270
271    async fn list(&self) -> BoxStream<'static, Cid> {
272        let stream = self.list_stream().await.unwrap_or(stream::empty().boxed());
273        stream
274            .try_filter_map(|(cid, _)| futures::future::ready(Ok(Some(cid))))
275            .filter_map(|cid| futures::future::ready(cid.ok()))
276            .boxed()
277    }
278}
279
280fn write_through_tempfile(
281    target: std::fs::File,
282    target_path: impl AsRef<std::path::Path>,
283    temp_path: impl AsRef<std::path::Path>,
284    data: &[u8],
285) -> Result<(), std::io::Error> {
286    use std::io::Write;
287
288    let mut temp = std::fs::OpenOptions::new()
289        .write(true)
290        .create(true)
291        .truncate(true)
292        .open(&temp_path)?;
293
294    temp.write_all(data)?;
295    temp.flush()?;
296
297    // safe default
298    temp.sync_all()?;
299
300    drop(temp);
301    drop(target);
302
303    std::fs::rename(temp_path, target_path)?;
304
305    // FIXME: there should be a directory fsync here as well
306
307    Ok(())
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313    use crate::block::BlockCodec;
314    use crate::Block;
315    use hex_literal::hex;
316    use ipld_core::cid::Cid;
317    use multihash_codetable::{Code, MultihashDigest};
318    use std::convert::TryFrom;
319    use std::env::temp_dir;
320    use std::sync::Arc;
321
322    #[tokio::test]
323    async fn test_fs_blockstore() {
324        let mut tmp = temp_dir();
325        tmp.push("blockstore1");
326        std::fs::remove_dir_all(tmp.clone()).ok();
327        let store = FsBlockStore::new(tmp.clone());
328
329        let data = b"1".to_vec();
330        let cid = Cid::new_v1(BlockCodec::Raw.into(), Code::Sha2_256.digest(&data));
331        let block = Block::new(cid, data).unwrap();
332
333        store.init().await.unwrap();
334
335        let contains = store.contains(&cid).await.unwrap();
336        assert!(!contains);
337        let get = store.get(&cid).await.unwrap();
338        assert_eq!(get, None);
339        if store.remove(&cid).await.is_ok() {
340            panic!("block should not be found")
341        }
342
343        let put = store.put(&block).await.unwrap();
344        assert_eq!(put.0, cid.to_owned());
345        let contains = store.contains(&cid);
346        assert!(contains.await.unwrap());
347        let get = store.get(&cid);
348        assert_eq!(get.await.unwrap(), Some(block.clone()));
349
350        store.remove(&cid).await.unwrap();
351        let contains = store.contains(&cid);
352        assert!(!contains.await.unwrap());
353        let get = store.get(&cid);
354        assert_eq!(get.await.unwrap(), None);
355
356        std::fs::remove_dir_all(tmp).ok();
357    }
358
359    #[tokio::test]
360    async fn test_fs_blockstore_open() {
361        let mut tmp = temp_dir();
362        tmp.push("blockstore2");
363        std::fs::remove_dir_all(&tmp).ok();
364
365        let data = b"1".to_vec();
366        let cid = Cid::new_v1(BlockCodec::Raw.into(), Code::Sha2_256.digest(&data));
367        let block = Block::new(cid, data).unwrap();
368
369        let block_store = FsBlockStore::new(tmp.clone());
370        block_store.init().await.unwrap();
371
372        assert!(!block_store.contains(block.cid()).await.unwrap());
373        block_store.put(&block).await.unwrap();
374
375        let block_store = FsBlockStore::new(tmp.clone());
376        assert!(block_store.contains(block.cid()).await.unwrap());
377        assert_eq!(block_store.get(block.cid()).await.unwrap().unwrap(), block);
378
379        std::fs::remove_dir_all(&tmp).ok();
380    }
381
382    #[tokio::test]
383    async fn test_fs_blockstore_list() {
384        let mut tmp = temp_dir();
385        tmp.push("blockstore_list");
386        std::fs::remove_dir_all(&tmp).ok();
387
388        let block_store = FsBlockStore::new(tmp.clone());
389        block_store.init().await.unwrap();
390
391        for data in &[b"1", b"2", b"3"] {
392            let data_slice = data.to_vec();
393            let cid = Cid::new_v1(BlockCodec::Raw.into(), Code::Sha2_256.digest(&data_slice));
394            let block = Block::new(cid, data_slice).unwrap();
395            block_store.put(&block).await.unwrap();
396        }
397
398        let cids = block_store.list().await.collect::<Vec<_>>().await;
399        assert_eq!(cids.len(), 3);
400        for cid in cids.iter() {
401            assert!(block_store.contains(cid).await.unwrap());
402        }
403    }
404
405    #[tokio::test]
406    async fn race_to_insert_new() {
407        // FIXME: why not tempdir?
408        let mut tmp = temp_dir();
409        tmp.push("race_to_insert_new");
410        std::fs::remove_dir_all(&tmp).ok();
411
412        let single = FsBlockStore::new(tmp.clone());
413        single.init().await.unwrap();
414
415        let single = Arc::new(single);
416
417        let cid = Cid::try_from("QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL").unwrap();
418        let data = hex!("0a0d08021207666f6f6261720a1807");
419
420        let block = Block::new(cid, data.to_vec()).unwrap();
421
422        let count = 10;
423
424        let (writes, existing) = race_to_insert_scenario(count, block, &single).await;
425
426        assert_eq!(writes, 1);
427        assert_eq!(existing, count - 1);
428    }
429
430    async fn race_to_insert_scenario(
431        count: usize,
432        block: Block,
433        blockstore: &Arc<FsBlockStore>,
434    ) -> (usize, usize) {
435        let barrier = Arc::new(tokio::sync::Barrier::new(count));
436
437        let join_handles = (0..count)
438            .map(|_| {
439                tokio::spawn({
440                    let bs = Arc::clone(blockstore);
441                    let barrier = Arc::clone(&barrier);
442                    let block = block.clone();
443                    async move {
444                        barrier.wait().await;
445                        bs.put(&block).await
446                    }
447                })
448            })
449            .collect::<Vec<_>>();
450
451        let mut writes = 0usize;
452        let mut existing = 0usize;
453
454        for jh in join_handles {
455            let res = jh.await;
456
457            match res {
458                Ok(Ok((_, BlockPut::NewBlock))) => writes += 1,
459                Ok(Ok((_, BlockPut::Existed))) => existing += 1,
460                Ok(Err(e)) => tracing::error!("joinhandle err: {e}"),
461                _ => unreachable!("join error"),
462            }
463        }
464
465        (writes, existing)
466    }
467
468    #[tokio::test]
469    async fn remove() {
470        // FIXME: why not tempdir?
471        let mut tmp = temp_dir();
472        tmp.push("remove");
473        std::fs::remove_dir_all(&tmp).ok();
474
475        let single = FsBlockStore::new(tmp.clone());
476
477        single.init().await.unwrap();
478
479        let cid = Cid::try_from("QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL").unwrap();
480        let data = hex!("0a0d08021207666f6f6261720a1807");
481
482        let block = Block::new(cid, data.to_vec()).unwrap();
483
484        assert_eq!(single.list().await.collect::<Vec<_>>().await.len(), 0);
485
486        single.put(&block).await.unwrap();
487
488        // compare the multihash since we store the block named as cidv1
489        assert_eq!(
490            single.list().await.collect::<Vec<_>>().await[0].hash(),
491            cid.hash()
492        );
493
494        single.remove(&cid).await.unwrap();
495        assert_eq!(single.list().await.collect::<Vec<_>>().await.len(), 0);
496    }
497}