1mod blockstore_with_read_cache;
5mod blockstore_with_write_buffer;
6pub mod car;
7mod memory;
8pub mod parity_db;
9pub mod parity_db_config;
10
11pub mod gc;
12pub mod ttl;
13pub use blockstore_with_read_cache::*;
14pub use blockstore_with_write_buffer::BlockstoreWithWriteBuffer;
15pub use memory::MemoryDB;
16mod db_mode;
17mod either;
18pub mod migration;
19pub use either::Either;
20
21use crate::blocks::TipsetKey;
22use crate::rpc::eth::types::EthHash;
23use anyhow::{Context as _, bail};
24use cid::Cid;
25pub use fvm_ipld_blockstore::{Blockstore, MemoryBlockstore};
26use serde::Serialize;
27use serde::de::DeserializeOwned;
28use std::sync::Arc;
29
30pub const CAR_DB_DIR_NAME: &str = "car_db";
31
32pub mod setting_keys {
33 pub const HEAD_KEY: &str = "head";
35 pub const MPOOL_CONFIG_KEY: &str = "/mpool/config";
37}
38
39pub trait SettingsStore {
42 fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>>;
45
46 fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()>;
49
50 fn exists(&self, key: &str) -> anyhow::Result<bool>;
52
53 #[allow(dead_code)]
55 fn setting_keys(&self) -> anyhow::Result<Vec<String>>;
56}
57
58impl<T: SettingsStore> SettingsStore for Arc<T> {
59 fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
60 SettingsStore::read_bin(self.as_ref(), key)
61 }
62
63 fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> {
64 SettingsStore::write_bin(self.as_ref(), key, value)
65 }
66
67 fn exists(&self, key: &str) -> anyhow::Result<bool> {
68 SettingsStore::exists(self.as_ref(), key)
69 }
70
71 fn setting_keys(&self) -> anyhow::Result<Vec<String>> {
72 SettingsStore::setting_keys(self.as_ref())
73 }
74}
75
76pub trait SettingsStoreExt {
80 fn read_obj<V: DeserializeOwned>(&self, key: &str) -> anyhow::Result<Option<V>>;
81 fn write_obj<V: Serialize>(&self, key: &str, value: &V) -> anyhow::Result<()>;
82
83 #[allow(dead_code)]
84 fn require_obj<V: DeserializeOwned>(&self, key: &str) -> anyhow::Result<V>;
86}
87
88impl<T: ?Sized + SettingsStore> SettingsStoreExt for T {
89 fn read_obj<V: DeserializeOwned>(&self, key: &str) -> anyhow::Result<Option<V>> {
90 match self.read_bin(key)? {
91 Some(bytes) => Ok(Some(serde_json::from_slice(&bytes)?)),
92 None => Ok(None),
93 }
94 }
95
96 fn write_obj<V: Serialize>(&self, key: &str, value: &V) -> anyhow::Result<()> {
97 self.write_bin(key, &serde_json::to_vec(value)?)
98 }
99
100 fn require_obj<V: DeserializeOwned>(&self, key: &str) -> anyhow::Result<V> {
101 self.read_bin(key)?
102 .with_context(|| format!("Key {key} not found"))
103 .and_then(|bytes| serde_json::from_slice(&bytes).map_err(Into::into))
104 }
105}
106
107pub trait EthMappingsStore {
110 fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>>;
113
114 fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()>;
117
118 #[allow(dead_code)]
120 fn exists(&self, key: &EthHash) -> anyhow::Result<bool>;
121
122 fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>>;
124
125 fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()>;
127}
128
129impl<T: EthMappingsStore> EthMappingsStore for Arc<T> {
130 fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
131 EthMappingsStore::read_bin(self.as_ref(), key)
132 }
133
134 fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> {
135 EthMappingsStore::write_bin(self.as_ref(), key, value)
136 }
137
138 fn exists(&self, key: &EthHash) -> anyhow::Result<bool> {
139 EthMappingsStore::exists(self.as_ref(), key)
140 }
141
142 fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
143 EthMappingsStore::get_message_cids(self.as_ref())
144 }
145
146 fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()> {
147 EthMappingsStore::delete(self.as_ref(), keys)
148 }
149}
150
151pub struct DummyStore {}
152
153const INDEXER_ERROR: &str =
154 "indexer disabled, enable with chain_indexer.enable_indexer / FOREST_CHAIN_INDEXER_ENABLED";
155
156impl EthMappingsStore for DummyStore {
157 fn read_bin(&self, _key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
158 bail!(INDEXER_ERROR)
159 }
160
161 fn write_bin(&self, _key: &EthHash, _value: &[u8]) -> anyhow::Result<()> {
162 bail!(INDEXER_ERROR)
163 }
164
165 fn exists(&self, _key: &EthHash) -> anyhow::Result<bool> {
166 bail!(INDEXER_ERROR)
167 }
168
169 fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
170 bail!(INDEXER_ERROR)
171 }
172
173 fn delete(&self, _keys: Vec<EthHash>) -> anyhow::Result<()> {
174 bail!(INDEXER_ERROR)
175 }
176}
177
178pub trait EthMappingsStoreExt {
179 fn read_obj<V: DeserializeOwned>(&self, key: &EthHash) -> anyhow::Result<Option<V>>;
180 fn write_obj<V: Serialize>(&self, key: &EthHash, value: &V) -> anyhow::Result<()>;
181}
182
183impl<T: ?Sized + EthMappingsStore> EthMappingsStoreExt for T {
184 fn read_obj<V: DeserializeOwned>(&self, key: &EthHash) -> anyhow::Result<Option<V>> {
185 match self.read_bin(key)? {
186 Some(bytes) => Ok(Some(fvm_ipld_encoding::from_slice(&bytes)?)),
187 None => Ok(None),
188 }
189 }
190
191 fn write_obj<V: Serialize>(&self, key: &EthHash, value: &V) -> anyhow::Result<()> {
192 self.write_bin(key, &fvm_ipld_encoding::to_vec(value)?)
193 }
194}
195
196pub trait DBStatistics {
198 fn get_statistics(&self) -> Option<String> {
199 None
200 }
201}
202
203impl<DB: DBStatistics> DBStatistics for std::sync::Arc<DB> {
204 fn get_statistics(&self) -> Option<String> {
205 self.as_ref().get_statistics()
206 }
207}
208
209pub trait PersistentStore: Blockstore {
211 fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()>;
218}
219
220impl PersistentStore for MemoryBlockstore {
221 fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
222 self.put_keyed(k, block)
223 }
224}
225
226impl<T: PersistentStore> PersistentStore for Arc<T> {
227 fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
228 PersistentStore::put_keyed_persistent(self.as_ref(), k, block)
229 }
230}
231
232impl<T: PersistentStore> PersistentStore for &Arc<T> {
233 fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
234 PersistentStore::put_keyed_persistent(self.as_ref(), k, block)
235 }
236}
237
238pub trait HeaviestTipsetKeyProvider {
239 fn heaviest_tipset_key(&self) -> anyhow::Result<TipsetKey>;
241
242 fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()>;
244}
245
246impl<T: HeaviestTipsetKeyProvider> HeaviestTipsetKeyProvider for Arc<T> {
247 fn heaviest_tipset_key(&self) -> anyhow::Result<TipsetKey> {
248 self.as_ref().heaviest_tipset_key()
249 }
250
251 fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
252 self.as_ref().set_heaviest_tipset_key(tsk)
253 }
254}
255
256pub trait BlockstoreWriteOpsSubscribable {
257 fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver<(Cid, Vec<u8>)>;
258
259 fn unsubscribe_write_ops(&self);
260}
261
262impl<T: BlockstoreWriteOpsSubscribable> BlockstoreWriteOpsSubscribable for Arc<T> {
263 fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver<(Cid, Vec<u8>)> {
264 self.as_ref().subscribe_write_ops()
265 }
266
267 fn unsubscribe_write_ops(&self) {
268 self.as_ref().unsubscribe_write_ops()
269 }
270}
271
272pub mod db_engine {
273 use std::path::{Path, PathBuf};
274
275 use super::db_mode::choose_db;
276
277 pub type Db = crate::db::parity_db::ParityDb;
278 pub type DbConfig = crate::db::parity_db_config::ParityDbConfig;
279
280 pub fn db_root(chain_data_root: &Path) -> anyhow::Result<PathBuf> {
282 choose_db(chain_data_root)
283 }
284
285 pub fn open_db(path: PathBuf, config: &DbConfig) -> anyhow::Result<Db> {
286 Db::open(path, config)
287 }
288}
289
290#[cfg(test)]
291mod tests {
292 pub mod db_utils;
293 mod mem_test;
294 mod parity_test;
295 pub mod subtests;
296}