Skip to main content

forest/db/
blockstore_with_write_buffer.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use ahash::{HashMap, HashMapExt};
5use cid::Cid;
6use fvm_ipld_blockstore::Blockstore;
7use itertools::Itertools;
8use parking_lot::RwLock;
9
10pub struct BlockstoreWithWriteBuffer<DB: Blockstore> {
11    inner: DB,
12    buffer: RwLock<HashMap<Cid, Vec<u8>>>,
13    buffer_capacity: usize,
14}
15
16impl<DB: Blockstore> Blockstore for BlockstoreWithWriteBuffer<DB> {
17    fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
18        if let Some(v) = self.buffer.read().get(k) {
19            return Ok(Some(v.clone()));
20        }
21        self.inner.get(k)
22    }
23
24    fn has(&self, k: &Cid) -> anyhow::Result<bool> {
25        Ok(self.buffer.read().contains_key(k) || self.inner.has(k)?)
26    }
27
28    fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
29        {
30            let mut buffer = self.buffer.write();
31            buffer.insert(*k, block.to_vec());
32        }
33        self.flush_buffer_if_needed()
34    }
35}
36
37impl<DB: Blockstore> BlockstoreWithWriteBuffer<DB> {
38    pub fn new_with_capacity(inner: DB, buffer_capacity: usize) -> Self {
39        Self {
40            inner,
41            buffer_capacity,
42            buffer: RwLock::new(HashMap::with_capacity(buffer_capacity)),
43        }
44    }
45
46    fn flush_buffer(&self) -> anyhow::Result<()> {
47        let records = {
48            let mut buffer = self.buffer.write();
49            buffer.drain().collect_vec()
50        };
51        self.inner.put_many_keyed(records)
52    }
53
54    fn flush_buffer_if_needed(&self) -> anyhow::Result<()> {
55        if self.buffer.read().len() >= self.buffer_capacity {
56            self.flush_buffer()
57        } else {
58            Ok(())
59        }
60    }
61}
62
63impl<DB: Blockstore> Drop for BlockstoreWithWriteBuffer<DB> {
64    fn drop(&mut self) {
65        if let Err(e) = self.flush_buffer() {
66            tracing::warn!("{e}");
67        }
68    }
69}
70
71#[cfg(test)]
72mod tests {
73    use super::*;
74    use crate::{db::MemoryDB, utils::rand::forest_rng};
75    use fvm_ipld_encoding::DAG_CBOR;
76    use multihash_codetable::Code::Blake2b256;
77    use multihash_codetable::MultihashDigest as _;
78    use rand::Rng as _;
79    use std::sync::Arc;
80
81    #[test]
82    fn test_buffer_flush() {
83        const BUFFER_SIZE: usize = 10;
84        const N_RECORDS: usize = 15;
85        let mem_db = Arc::new(MemoryDB::default());
86        let buf_db = BlockstoreWithWriteBuffer::new_with_capacity(mem_db.clone(), BUFFER_SIZE);
87        let mut records = Vec::with_capacity(N_RECORDS);
88        for _ in 0..N_RECORDS {
89            let mut record = [0; 1024];
90            forest_rng().fill(&mut record);
91            let key = Cid::new_v1(DAG_CBOR, Blake2b256.digest(record.as_slice()));
92            records.push((key, record));
93        }
94
95        buf_db.put_many_keyed(records.clone()).unwrap();
96
97        for (i, (k, v)) in records.iter().enumerate() {
98            assert!(buf_db.has(k).unwrap());
99            assert_eq!(buf_db.get(k).unwrap().unwrap().as_slice(), v);
100            if i < BUFFER_SIZE {
101                assert!(mem_db.has(k).unwrap());
102                assert_eq!(mem_db.get(k).unwrap().unwrap().as_slice(), v);
103            } else {
104                assert!(!mem_db.has(k).unwrap());
105            }
106        }
107
108        drop(buf_db);
109
110        for (k, v) in records.iter() {
111            assert!(mem_db.has(k).unwrap());
112            assert_eq!(mem_db.get(k).unwrap().unwrap().as_slice(), v);
113        }
114    }
115}