1use super::{EthMappingsStore, SettingsStore, SettingsStoreExt};
5use crate::blocks::{Tipset, TipsetKey};
6use crate::db::PersistentStore;
7use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite};
8use crate::rpc::eth::types::EthHash;
9use crate::shim::clock::ChainEpoch;
10use crate::utils::db::car_stream::CarBlock;
11use crate::utils::multihash::prelude::*;
12use ahash::HashMap;
13use anyhow::Context as _;
14use cid::Cid;
15use fvm_ipld_blockstore::Blockstore;
16use itertools::Itertools;
17use nunny::Vec as NonEmpty;
18use parking_lot::RwLock;
19
20#[derive(Debug, Default)]
21pub struct MemoryDB {
22 blockchain_db: RwLock<HashMap<Cid, Vec<u8>>>,
23 blockchain_persistent_db: RwLock<HashMap<Cid, Vec<u8>>>,
24 settings_db: RwLock<HashMap<String, Vec<u8>>>,
25 pub eth_mappings_db: RwLock<HashMap<EthHash, Vec<u8>>>,
26 ts_lookup_db: RwLock<HashMap<ChainEpoch, TipsetKey>>,
27}
28
29impl MemoryDB {
30 pub fn blockstore_len(&self) -> usize {
31 self.blockchain_db.read().len() + self.blockchain_persistent_db.read().len()
32 }
33
34 pub fn blockstore_size_bytes(&self) -> usize {
35 self.blockchain_db
36 .read()
37 .iter()
38 .chain(self.blockchain_persistent_db.read().iter())
39 .map(|(k, v)| k.to_bytes().len() + v.len())
40 .sum()
41 }
42
43 pub async fn export_forest_car<W: tokio::io::AsyncWrite + Unpin>(
44 &self,
45 writer: &mut W,
46 ) -> anyhow::Result<()> {
47 let roots =
48 SettingsStoreExt::read_obj::<TipsetKey>(self, crate::db::setting_keys::HEAD_KEY)?
49 .context("chain head is not tracked and cannot be exported")?
50 .into_cids();
51 self.export_forest_car_with_roots(roots, writer).await
52 }
53
54 pub async fn export_forest_car_with_roots<W: tokio::io::AsyncWrite + Unpin>(
55 &self,
56 roots: NonEmpty<Cid>,
57 writer: &mut W,
58 ) -> anyhow::Result<()> {
59 let blocks = {
60 let blockchain_db = self.blockchain_db.read();
61 let blockchain_persistent_db = self.blockchain_persistent_db.read();
62 blockchain_db
63 .iter()
64 .chain(blockchain_persistent_db.iter())
65 .sorted_by_key(|&(&cid, _)| cid)
67 .map(|(&cid, data)| {
68 anyhow::Ok(CarBlock {
69 cid,
70 data: data.clone().into(),
71 })
72 })
73 .collect_vec()
74 };
75 let frames =
76 crate::db::car::forest::Encoder::compress_stream_default(futures::stream::iter(blocks));
77 crate::db::car::forest::Encoder::write(writer, roots, frames).await
78 }
79}
80
81impl SettingsStore for MemoryDB {
82 fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
83 Ok(self.settings_db.read().get(key).cloned())
84 }
85
86 fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> {
87 self.settings_db
88 .write()
89 .insert(key.to_owned(), value.to_vec());
90 Ok(())
91 }
92
93 fn exists(&self, key: &str) -> anyhow::Result<bool> {
94 Ok(self.settings_db.read().contains_key(key))
95 }
96
97 fn setting_keys(&self) -> anyhow::Result<Vec<String>> {
98 Ok(self.settings_db.read().keys().cloned().collect_vec())
99 }
100}
101
102impl EthMappingsStore for MemoryDB {
103 fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
104 Ok(self.eth_mappings_db.read().get(key).cloned())
105 }
106
107 fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> {
108 self.eth_mappings_db
109 .write()
110 .insert(key.to_owned(), value.to_vec());
111 Ok(())
112 }
113
114 fn exists(&self, key: &EthHash) -> anyhow::Result<bool> {
115 Ok(self.eth_mappings_db.read().contains_key(key))
116 }
117
118 fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
119 let cids = self
120 .eth_mappings_db
121 .read()
122 .values()
123 .filter_map(|value| fvm_ipld_encoding::from_slice::<(Cid, u64)>(value).ok())
124 .collect();
125
126 Ok(cids)
127 }
128
129 fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()> {
130 let mut lock = self.eth_mappings_db.write();
131 for hash in keys.iter() {
132 lock.remove(hash);
133 }
134 Ok(())
135 }
136
137 fn tipset_key_by_epoch(&self, epoch: i64) -> anyhow::Result<Option<TipsetKey>> {
138 Ok(self.ts_lookup_db.read().get(&epoch).cloned())
139 }
140
141 fn set_tipset_key_at_epoch(&self, ts: &Tipset) -> anyhow::Result<()> {
142 self.ts_lookup_db
143 .write()
144 .insert(ts.epoch(), ts.key().clone());
145 Ok(())
146 }
147}
148
149impl Blockstore for MemoryDB {
150 fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
151 Ok(self.blockchain_db.read().get(k).cloned().or(self
152 .blockchain_persistent_db
153 .read()
154 .get(k)
155 .cloned()))
156 }
157
158 fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
159 self.blockchain_db.write().insert(*k, block.to_vec());
160 Ok(())
161 }
162}
163
164impl PersistentStore for MemoryDB {
165 fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
166 self.blockchain_persistent_db
167 .write()
168 .insert(*k, block.to_vec());
169 Ok(())
170 }
171}
172
173impl BitswapStoreRead for MemoryDB {
174 fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
175 Ok(self.blockchain_db.read().contains_key(cid))
176 }
177
178 fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
179 Blockstore::get(self, cid)
180 }
181}
182
183impl BitswapStoreReadWrite for MemoryDB {
184 type Hashes = MultihashCode;
185
186 fn insert(&self, block: &crate::libp2p_bitswap::Block64<Self::Hashes>) -> anyhow::Result<()> {
187 self.put_keyed(block.cid(), block.data())
188 }
189}
190
191impl super::HeaviestTipsetKeyProvider for MemoryDB {
192 fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>> {
193 SettingsStoreExt::read_obj::<TipsetKey>(self, crate::db::setting_keys::HEAD_KEY)
194 }
195
196 fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
197 SettingsStoreExt::write_obj(self, crate::db::setting_keys::HEAD_KEY, tsk)
198 }
199}
200
201#[cfg(test)]
202mod tests {
203 use super::*;
204 use crate::db::{car::ForestCar, setting_keys::HEAD_KEY};
205 use fvm_ipld_encoding::DAG_CBOR;
206 use multihash_codetable::Code::Blake2b256;
207 use nunny::vec as nonempty;
208
209 #[tokio::test]
210 async fn test_export_forest_car() {
211 let db = MemoryDB::default();
212 let record1 = b"non-persistent";
213 let key1 = Cid::new_v1(DAG_CBOR, Blake2b256.digest(record1.as_slice()));
214 db.put_keyed(&key1, record1.as_slice()).unwrap();
215
216 let record2 = b"persistent";
217 let key2 = Cid::new_v1(DAG_CBOR, Blake2b256.digest(record2.as_slice()));
218 db.put_keyed_persistent(&key2, record2.as_slice()).unwrap();
219
220 let mut car_db_bytes = vec![];
221 assert!(
222 db.export_forest_car(&mut car_db_bytes)
223 .await
224 .unwrap_err()
225 .to_string()
226 .contains("chain head is not tracked and cannot be exported")
227 );
228
229 db.write_obj(HEAD_KEY, &TipsetKey::from(nonempty![key1]))
230 .unwrap();
231
232 car_db_bytes.clear();
233 db.export_forest_car(&mut car_db_bytes).await.unwrap();
234
235 let car = ForestCar::new(car_db_bytes).unwrap();
236 assert_eq!(car.head_tipset_key(), &nonempty![key1]);
237 assert!(car.has(&key1).unwrap());
238 assert!(car.has(&key2).unwrap());
239 }
240}