forest/db/
blockstore_with_write_buffer.rs1use ahash::{HashMap, HashMapExt};
5use cid::Cid;
6use fvm_ipld_blockstore::Blockstore;
7use itertools::Itertools;
8use parking_lot::RwLock;
9
10pub struct BlockstoreWithWriteBuffer<DB: Blockstore> {
11 inner: DB,
12 buffer: RwLock<HashMap<Cid, Vec<u8>>>,
13 buffer_capacity: usize,
14}
15
16impl<DB: Blockstore> Blockstore for BlockstoreWithWriteBuffer<DB> {
17 fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
18 if let Some(v) = self.buffer.read().get(k) {
19 return Ok(Some(v.clone()));
20 }
21 self.inner.get(k)
22 }
23
24 fn has(&self, k: &Cid) -> anyhow::Result<bool> {
25 Ok(self.buffer.read().contains_key(k) || self.inner.has(k)?)
26 }
27
28 fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
29 {
30 let mut buffer = self.buffer.write();
31 buffer.insert(*k, block.to_vec());
32 }
33 self.flush_buffer_if_needed()
34 }
35}
36
37impl<DB: Blockstore> BlockstoreWithWriteBuffer<DB> {
38 pub fn new_with_capacity(inner: DB, buffer_capacity: usize) -> Self {
39 Self {
40 inner,
41 buffer_capacity,
42 buffer: RwLock::new(HashMap::with_capacity(buffer_capacity)),
43 }
44 }
45
46 fn flush_buffer(&self) -> anyhow::Result<()> {
47 let records = {
48 let mut buffer = self.buffer.write();
49 buffer.drain().collect_vec()
50 };
51 self.inner.put_many_keyed(records)
52 }
53
54 fn flush_buffer_if_needed(&self) -> anyhow::Result<()> {
55 if self.buffer.read().len() >= self.buffer_capacity {
56 self.flush_buffer()
57 } else {
58 Ok(())
59 }
60 }
61}
62
63impl<DB: Blockstore> Drop for BlockstoreWithWriteBuffer<DB> {
64 fn drop(&mut self) {
65 if let Err(e) = self.flush_buffer() {
66 tracing::warn!("{e}");
67 }
68 }
69}
70
71#[cfg(test)]
72mod tests {
73 use super::*;
74 use crate::{db::MemoryDB, utils::rand::forest_rng};
75 use fvm_ipld_encoding::DAG_CBOR;
76 use multihash_codetable::Code::Blake2b256;
77 use multihash_codetable::MultihashDigest as _;
78 use rand::Rng as _;
79 use std::sync::Arc;
80
81 #[test]
82 fn test_buffer_flush() {
83 const BUFFER_SIZE: usize = 10;
84 const N_RECORDS: usize = 15;
85 let mem_db = Arc::new(MemoryDB::default());
86 let buf_db = BlockstoreWithWriteBuffer::new_with_capacity(mem_db.clone(), BUFFER_SIZE);
87 let mut records = Vec::with_capacity(N_RECORDS);
88 for _ in 0..N_RECORDS {
89 let mut record = [0; 1024];
90 forest_rng().fill(&mut record);
91 let key = Cid::new_v1(DAG_CBOR, Blake2b256.digest(record.as_slice()));
92 records.push((key, record));
93 }
94
95 buf_db.put_many_keyed(records.clone()).unwrap();
96
97 for (i, (k, v)) in records.iter().enumerate() {
98 assert!(buf_db.has(k).unwrap());
99 assert_eq!(buf_db.get(k).unwrap().unwrap().as_slice(), v);
100 if i < BUFFER_SIZE {
101 assert!(mem_db.has(k).unwrap());
102 assert_eq!(mem_db.get(k).unwrap().unwrap().as_slice(), v);
103 } else {
104 assert!(!mem_db.has(k).unwrap());
105 }
106 }
107
108 drop(buf_db);
109
110 for (k, v) in records.iter() {
111 assert!(mem_db.has(k).unwrap());
112 assert_eq!(mem_db.get(k).unwrap().unwrap().as_slice(), v);
113 }
114 }
115}