forest/db/
memory.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use super::{EthMappingsStore, SettingsStore, SettingsStoreExt};
5use crate::blocks::TipsetKey;
6use crate::db::{IndicesStore, PersistentStore};
7use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite};
8use crate::rpc::eth::types::EthHash;
9use crate::utils::db::car_stream::CarBlock;
10use crate::utils::multihash::prelude::*;
11use ahash::HashMap;
12use anyhow::Context as _;
13use cid::Cid;
14use fvm_ipld_blockstore::Blockstore;
15use itertools::Itertools;
16use parking_lot::RwLock;
17
18#[derive(Debug, Default)]
19pub struct MemoryDB {
20    blockchain_db: RwLock<HashMap<Cid, Vec<u8>>>,
21    blockchain_persistent_db: RwLock<HashMap<Cid, Vec<u8>>>,
22    settings_db: RwLock<HashMap<String, Vec<u8>>>,
23    pub eth_mappings_db: RwLock<HashMap<EthHash, Vec<u8>>>,
24    pub indices_db: RwLock<HashMap<Cid, Vec<u8>>>,
25}
26
27impl MemoryDB {
28    pub async fn export_forest_car<W: tokio::io::AsyncWrite + Unpin>(
29        &self,
30        writer: &mut W,
31    ) -> anyhow::Result<()> {
32        let roots =
33            SettingsStoreExt::read_obj::<TipsetKey>(self, crate::db::setting_keys::HEAD_KEY)?
34                .context("chain head is not tracked and cannot be exported")?
35                .into_cids();
36        let blocks = {
37            let blockchain_db = self.blockchain_db.read();
38            let blockchain_persistent_db = self.blockchain_persistent_db.read();
39            blockchain_db
40                .iter()
41                .chain(blockchain_persistent_db.iter())
42                .map(|(&cid, data)| {
43                    anyhow::Ok(CarBlock {
44                        cid,
45                        data: data.clone(),
46                    })
47                })
48                .collect_vec()
49        };
50        let frames =
51            crate::db::car::forest::Encoder::compress_stream_default(futures::stream::iter(blocks));
52        crate::db::car::forest::Encoder::write(writer, roots, frames).await
53    }
54}
55
56impl SettingsStore for MemoryDB {
57    fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
58        Ok(self.settings_db.read().get(key).cloned())
59    }
60
61    fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> {
62        self.settings_db
63            .write()
64            .insert(key.to_owned(), value.to_vec());
65        Ok(())
66    }
67
68    fn exists(&self, key: &str) -> anyhow::Result<bool> {
69        Ok(self.settings_db.read().contains_key(key))
70    }
71
72    fn setting_keys(&self) -> anyhow::Result<Vec<String>> {
73        Ok(self.settings_db.read().keys().cloned().collect_vec())
74    }
75}
76
77impl EthMappingsStore for MemoryDB {
78    fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
79        Ok(self.eth_mappings_db.read().get(key).cloned())
80    }
81
82    fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> {
83        self.eth_mappings_db
84            .write()
85            .insert(key.to_owned(), value.to_vec());
86        Ok(())
87    }
88
89    fn exists(&self, key: &EthHash) -> anyhow::Result<bool> {
90        Ok(self.eth_mappings_db.read().contains_key(key))
91    }
92
93    fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
94        let cids = self
95            .eth_mappings_db
96            .read()
97            .iter()
98            .filter_map(|(_, value)| fvm_ipld_encoding::from_slice::<(Cid, u64)>(value).ok())
99            .collect();
100
101        Ok(cids)
102    }
103
104    fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()> {
105        let mut lock = self.eth_mappings_db.write();
106        for hash in keys.iter() {
107            lock.remove(hash);
108        }
109        Ok(())
110    }
111}
112
113impl IndicesStore for MemoryDB {
114    fn read_bin(&self, key: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
115        Ok(self.indices_db.read().get(key).cloned())
116    }
117
118    fn write_bin(&self, key: &Cid, value: &[u8]) -> anyhow::Result<()> {
119        self.indices_db
120            .write()
121            .insert(key.to_owned(), value.to_vec());
122        Ok(())
123    }
124
125    fn exists(&self, key: &Cid) -> anyhow::Result<bool> {
126        Ok(self.indices_db.read().contains_key(key))
127    }
128}
129
130impl Blockstore for MemoryDB {
131    fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
132        Ok(self.blockchain_db.read().get(k).cloned().or(self
133            .blockchain_persistent_db
134            .read()
135            .get(k)
136            .cloned()))
137    }
138
139    fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
140        self.blockchain_db.write().insert(*k, block.to_vec());
141        Ok(())
142    }
143}
144
145impl PersistentStore for MemoryDB {
146    fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
147        self.blockchain_persistent_db
148            .write()
149            .insert(*k, block.to_vec());
150        Ok(())
151    }
152}
153
154impl BitswapStoreRead for MemoryDB {
155    fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
156        Ok(self.blockchain_db.read().contains_key(cid))
157    }
158
159    fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
160        Blockstore::get(self, cid)
161    }
162}
163
164impl BitswapStoreReadWrite for MemoryDB {
165    type Hashes = MultihashCode;
166
167    fn insert(&self, block: &crate::libp2p_bitswap::Block64<Self::Hashes>) -> anyhow::Result<()> {
168        self.put_keyed(block.cid(), block.data())
169    }
170}
171
172impl super::HeaviestTipsetKeyProvider for MemoryDB {
173    fn heaviest_tipset_key(&self) -> anyhow::Result<TipsetKey> {
174        SettingsStoreExt::read_obj::<TipsetKey>(self, crate::db::setting_keys::HEAD_KEY)?
175            .context("head key not found")
176    }
177
178    fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
179        SettingsStoreExt::write_obj(self, crate::db::setting_keys::HEAD_KEY, tsk)
180    }
181}
182
183#[cfg(test)]
184mod tests {
185    use super::*;
186    use crate::db::{car::ForestCar, setting_keys::HEAD_KEY};
187    use fvm_ipld_encoding::DAG_CBOR;
188    use multihash_codetable::Code::Blake2b256;
189    use nunny::vec as nonempty;
190
191    #[tokio::test]
192    async fn test_export_forest_car() {
193        let db = MemoryDB::default();
194        let record1 = b"non-persistent";
195        let key1 = Cid::new_v1(DAG_CBOR, Blake2b256.digest(record1.as_slice()));
196        db.put_keyed(&key1, record1.as_slice()).unwrap();
197
198        let record2 = b"persistent";
199        let key2 = Cid::new_v1(DAG_CBOR, Blake2b256.digest(record2.as_slice()));
200        db.put_keyed_persistent(&key2, record2.as_slice()).unwrap();
201
202        let mut car_db_bytes = vec![];
203        assert!(
204            db.export_forest_car(&mut car_db_bytes)
205                .await
206                .unwrap_err()
207                .to_string()
208                .contains("chain head is not tracked and cannot be exported")
209        );
210
211        db.write_obj(HEAD_KEY, &TipsetKey::from(nonempty![key1]))
212            .unwrap();
213
214        car_db_bytes.clear();
215        db.export_forest_car(&mut car_db_bytes).await.unwrap();
216
217        let car = ForestCar::new(car_db_bytes).unwrap();
218        assert_eq!(car.head_tipset_key(), &nonempty![key1]);
219        assert!(car.has(&key1).unwrap());
220        assert!(car.has(&key2).unwrap());
221    }
222}