Skip to main content

forest/db/
parity_db.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use super::{EthMappingsStore, PersistentStore, SettingsStore};
5use crate::blocks::TipsetKey;
6use crate::db::{DBStatistics, parity_db_config::ParityDbConfig};
7use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite};
8use crate::rpc::eth::types::EthHash;
9use crate::utils::multihash::prelude::*;
10use anyhow::{Context as _, anyhow};
11use cid::Cid;
12use futures::FutureExt;
13use fvm_ipld_blockstore::Blockstore;
14use fvm_ipld_encoding::DAG_CBOR;
15use parity_db::{CompressionType, Db, Operation, Options};
16use parking_lot::RwLock;
17use std::path::PathBuf;
18use strum::{Display, EnumIter, FromRepr, IntoEnumIterator};
19use tracing::warn;
20
21/// This is specific to Forest's `ParityDb` usage.
22/// It is used to determine which column to use for a given entry type.
23#[derive(Copy, Clone, Debug, Display, PartialEq, FromRepr, EnumIter)]
24#[repr(u8)]
25pub enum DbColumn {
26    /// Column for storing IPLD data with `Blake2b256` hash and `DAG_CBOR` codec.
27    /// Most entries in the `blockstore` will be stored in this column.
28    GraphDagCborBlake2b256,
29    /// Column for storing other IPLD data (different codec or hash function).
30    /// It allows for key retrieval at the cost of degraded performance. Given that
31    /// there will be a small number of entries in this column, the performance
32    /// degradation is negligible.
33    GraphFull,
34    /// Column for storing Forest-specific settings.
35    Settings,
36    /// Column for storing Ethereum mappings.
37    EthMappings,
38    /// Column for storing IPLD data that has to be ignored by the garbage collector.
39    /// Anything stored in this column can be considered permanent, unless manually
40    /// deleted.
41    PersistentGraph,
42}
43
44impl DbColumn {
45    fn create_column_options(compression: CompressionType) -> Vec<parity_db::ColumnOptions> {
46        DbColumn::iter()
47            .map(|col| {
48                match col {
49                    DbColumn::GraphDagCborBlake2b256 | DbColumn::PersistentGraph => {
50                        parity_db::ColumnOptions {
51                            preimage: true,
52                            compression,
53                            ..Default::default()
54                        }
55                    }
56                    DbColumn::GraphFull => parity_db::ColumnOptions {
57                        preimage: true,
58                        // This is needed for key retrieval.
59                        btree_index: true,
60                        compression,
61                        ..Default::default()
62                    },
63                    DbColumn::Settings => parity_db::ColumnOptions {
64                        // explicitly disable preimage for settings column
65                        // othewise we are not able to overwrite entries
66                        preimage: false,
67                        // This is needed for key retrieval.
68                        btree_index: true,
69                        compression,
70                        ..Default::default()
71                    },
72                    DbColumn::EthMappings => parity_db::ColumnOptions {
73                        preimage: false,
74                        btree_index: false,
75                        compression,
76                        ..Default::default()
77                    },
78                }
79            })
80            .collect()
81    }
82}
83
84type WriteOpsBroadcastTxSender = tokio::sync::broadcast::Sender<(Cid, Vec<u8>)>;
85
86pub struct ParityDb {
87    pub db: parity_db::Db,
88    statistics_enabled: bool,
89    // This is needed to maintain backwards-compatibility for pre-persistent-column migrations.
90    disable_persistent_fallback: bool,
91    write_ops_broadcast_tx: RwLock<Option<WriteOpsBroadcastTxSender>>,
92}
93
94impl ParityDb {
95    pub fn to_options(path: PathBuf, config: &ParityDbConfig) -> Options {
96        Options {
97            path,
98            sync_wal: true,
99            sync_data: true,
100            stats: config.enable_statistics,
101            salt: None,
102            columns: DbColumn::create_column_options(CompressionType::Lz4),
103            compression_threshold: [(0, 128)].into_iter().collect(),
104        }
105    }
106
107    pub fn open(path: impl Into<PathBuf>, config: &ParityDbConfig) -> anyhow::Result<Self> {
108        let opts = Self::to_options(path.into(), config);
109        Ok(Self {
110            db: Db::open_or_create(&opts)?,
111            statistics_enabled: opts.stats,
112            disable_persistent_fallback: false,
113            write_ops_broadcast_tx: RwLock::new(None),
114        })
115    }
116
117    /// Returns an appropriate column variant based on the information
118    /// in the Cid.
119    fn choose_column(cid: &Cid) -> DbColumn {
120        match cid.codec() {
121            DAG_CBOR if cid.hash().code() == u64::from(MultihashCode::Blake2b256) => {
122                DbColumn::GraphDagCborBlake2b256
123            }
124            _ => DbColumn::GraphFull,
125        }
126    }
127
128    fn read_from_column<K>(&self, key: K, column: DbColumn) -> anyhow::Result<Option<Vec<u8>>>
129    where
130        K: AsRef<[u8]>,
131    {
132        self.db
133            .get(column as u8, key.as_ref())
134            .map_err(|e| anyhow!("error from column {column}: {e}"))
135    }
136
137    fn write_to_column<K, V>(&self, key: K, value: V, column: DbColumn) -> anyhow::Result<()>
138    where
139        K: AsRef<[u8]>,
140        V: AsRef<[u8]>,
141    {
142        let tx = [(column as u8, key.as_ref(), Some(value.as_ref().to_vec()))];
143        self.db
144            .commit(tx)
145            .map_err(|e| anyhow!("error writing to column {column}: {e}"))
146    }
147}
148
149impl SettingsStore for ParityDb {
150    fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
151        self.read_from_column(key.as_bytes(), DbColumn::Settings)
152    }
153
154    fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> {
155        self.write_to_column(key.as_bytes(), value, DbColumn::Settings)
156    }
157
158    fn exists(&self, key: &str) -> anyhow::Result<bool> {
159        self.db
160            .get_size(DbColumn::Settings as u8, key.as_bytes())
161            .map(|size| size.is_some())
162            .context("error checking if key exists")
163    }
164
165    fn setting_keys(&self) -> anyhow::Result<Vec<String>> {
166        let mut iter = self.db.iter(DbColumn::Settings as u8)?;
167        let mut keys = vec![];
168        while let Some((key, _)) = iter.next()? {
169            keys.push(String::from_utf8(key)?);
170        }
171        Ok(keys)
172    }
173}
174
175impl super::HeaviestTipsetKeyProvider for ParityDb {
176    fn heaviest_tipset_key(&self) -> anyhow::Result<TipsetKey> {
177        super::SettingsStoreExt::read_obj::<TipsetKey>(self, super::setting_keys::HEAD_KEY)?
178            .context("head key not found")
179    }
180
181    fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
182        super::SettingsStoreExt::write_obj(self, super::setting_keys::HEAD_KEY, tsk)
183    }
184}
185
186impl EthMappingsStore for ParityDb {
187    fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
188        self.read_from_column(key.0.as_bytes(), DbColumn::EthMappings)
189    }
190
191    fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> {
192        self.write_to_column(key.0.as_bytes(), value, DbColumn::EthMappings)
193    }
194
195    fn exists(&self, key: &EthHash) -> anyhow::Result<bool> {
196        self.db
197            .get_size(DbColumn::EthMappings as u8, key.0.as_bytes())
198            .map(|size| size.is_some())
199            .context("error checking if key exists")
200    }
201
202    fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
203        let mut cids = Vec::new();
204
205        self.db
206            .iter_column_while(DbColumn::EthMappings as u8, |val| {
207                if let Ok(value) = fvm_ipld_encoding::from_slice::<(Cid, u64)>(&val.value) {
208                    cids.push(value);
209                }
210                true
211            })?;
212
213        Ok(cids)
214    }
215
216    fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()> {
217        Ok(self.db.commit_changes(keys.into_iter().map(|key| {
218            let bytes = key.0.as_bytes().to_vec();
219            (DbColumn::EthMappings as u8, Operation::Dereference(bytes))
220        }))?)
221    }
222}
223
224fn has_subscribers<T>(tx: &tokio::sync::broadcast::Sender<T>) -> bool {
225    tx.closed().now_or_never().is_none()
226}
227
228impl Blockstore for ParityDb {
229    fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
230        let column = Self::choose_column(k);
231        let res = self.read_from_column(k.to_bytes(), column)?;
232        if res.is_some() {
233            return Ok(res);
234        }
235        self.get_persistent(k)
236    }
237
238    fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
239        let column = Self::choose_column(k);
240        // We can put the data directly into the database without any encoding.
241        self.write_to_column(k.to_bytes(), block, column)?;
242        match &*self.write_ops_broadcast_tx.read() {
243            Some(tx) if has_subscribers(tx) => {
244                let _ = tx.send((*k, block.to_vec()));
245            }
246            _ => {}
247        }
248
249        Ok(())
250    }
251
252    fn put_many_keyed<D, I>(&self, blocks: I) -> anyhow::Result<()>
253    where
254        Self: Sized,
255        D: AsRef<[u8]>,
256        I: IntoIterator<Item = (Cid, D)>,
257    {
258        let tx_opt: &Option<tokio::sync::broadcast::Sender<(cid::CidGeneric<64>, Vec<u8>)>> =
259            &self.write_ops_broadcast_tx.read();
260        let has_subscribers = tx_opt.as_ref().map(has_subscribers).unwrap_or_default();
261        let mut values_for_subscriber = vec![];
262        let values = blocks.into_iter().map(|(k, v)| {
263            let column = Self::choose_column(&k);
264            let v = v.as_ref().to_vec();
265            if has_subscribers {
266                values_for_subscriber.push((k, v.clone()));
267            }
268            (column, k.to_bytes(), v)
269        });
270        let tx = values
271            .into_iter()
272            .map(|(col, k, v)| (col as u8, Operation::Set(k, v)));
273        self.db
274            .commit_changes(tx)
275            .map_err(|e| anyhow!("error bulk writing: {e}"))?;
276        if let Some(tx) = tx_opt {
277            for i in values_for_subscriber {
278                let _ = tx.send(i);
279            }
280        }
281        Ok(())
282    }
283}
284
285impl PersistentStore for ParityDb {
286    fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
287        self.write_to_column(k.to_bytes(), block, DbColumn::PersistentGraph)
288    }
289}
290
291impl BitswapStoreRead for ParityDb {
292    fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
293        // We need to check both columns because we don't know which one
294        // the data is in. The order is important because most data will
295        // be in the [`DbColumn::GraphDagCborBlake2b256`] column and so
296        // it directly affects performance. If this assumption ever changes
297        // then this code should be modified accordingly.
298        for column in [DbColumn::GraphDagCborBlake2b256, DbColumn::GraphFull] {
299            if self
300                .db
301                .get_size(column as u8, &cid.to_bytes())
302                .context("error checking if key exists")?
303                .is_some()
304            {
305                return Ok(true);
306            }
307        }
308        Ok(false)
309    }
310
311    fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
312        Blockstore::get(self, cid)
313    }
314}
315
316impl BitswapStoreReadWrite for ParityDb {
317    type Hashes = MultihashCode;
318
319    fn insert(&self, block: &crate::libp2p_bitswap::Block64<Self::Hashes>) -> anyhow::Result<()> {
320        self.put_keyed(block.cid(), block.data())
321    }
322}
323
324impl DBStatistics for ParityDb {
325    fn get_statistics(&self) -> Option<String> {
326        if !self.statistics_enabled {
327            return None;
328        }
329
330        let mut buf = Vec::new();
331        if let Err(err) = self.db.write_stats_text(&mut buf, None) {
332            warn!("Unable to write database statistics: {err}");
333            return None;
334        }
335
336        match String::from_utf8(buf) {
337            Ok(stats) => Some(stats),
338            Err(e) => {
339                warn!("Malformed statistics: {e}");
340                None
341            }
342        }
343    }
344}
345
346type Op = (u8, Operation<Vec<u8>, Vec<u8>>);
347
348impl ParityDb {
349    /// Removes a record.
350    ///
351    /// # Arguments
352    /// * `key` - record identifier
353    #[allow(dead_code)]
354    pub fn dereference_operation(key: &Cid) -> Op {
355        let column = Self::choose_column(key);
356        (column as u8, Operation::Dereference(key.to_bytes()))
357    }
358
359    /// Updates/inserts a record.
360    ///
361    /// # Arguments
362    /// * `column` - column identifier
363    /// * `key` - record identifier
364    /// * `value` - record contents
365    pub fn set_operation(column: u8, key: Vec<u8>, value: Vec<u8>) -> Op {
366        (column, Operation::Set(key, value))
367    }
368
369    // Get data from persistent graph column.
370    fn get_persistent(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
371        if self.disable_persistent_fallback {
372            return Ok(None);
373        }
374        self.read_from_column(k.to_bytes(), DbColumn::PersistentGraph)
375    }
376}
377
378impl super::BlockstoreWriteOpsSubscribable for ParityDb {
379    fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver<(Cid, Vec<u8>)> {
380        let tx_lock = self.write_ops_broadcast_tx.read();
381        if let Some(tx) = &*tx_lock {
382            return tx.subscribe();
383        }
384        drop(tx_lock);
385        let (tx, rx) = tokio::sync::broadcast::channel(8192);
386        *self.write_ops_broadcast_tx.write() = Some(tx);
387        rx
388    }
389
390    fn unsubscribe_write_ops(&self) {
391        self.write_ops_broadcast_tx.write().take();
392    }
393}
394
395#[cfg(test)]
396mod test {
397    use super::*;
398    use crate::db::{BlockstoreWriteOpsSubscribable, tests::db_utils::parity::TempParityDB};
399    use fvm_ipld_encoding::IPLD_RAW;
400    use itertools::Itertools as _;
401    use nom::AsBytes;
402    use std::ops::Deref;
403
404    #[test]
405    fn write_read_different_columns_test() {
406        let db = TempParityDB::new();
407        let data = [
408            b"h'nglui mglw'nafh".to_vec(),
409            b"Cthulhu".to_vec(),
410            b"R'lyeh wgah'nagl fhtagn!!".to_vec(),
411        ];
412        let cids = [
413            Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data[0])),
414            Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data[1])),
415            Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data[1])),
416        ];
417
418        let cases = [
419            (DbColumn::GraphDagCborBlake2b256, cids[0], &data[0]),
420            (DbColumn::GraphFull, cids[1], &data[1]),
421            (DbColumn::GraphFull, cids[2], &data[2]),
422        ];
423
424        for (_, cid, data) in cases {
425            db.put_keyed(&cid, data).unwrap();
426        }
427
428        for (column, cid, data) in cases {
429            let actual = db
430                .read_from_column(cid.to_bytes(), column)
431                .unwrap()
432                .expect("data not found");
433            assert_eq!(data, actual.as_bytes());
434
435            // assert that the data is NOT in the other column
436            let other_column = match column {
437                DbColumn::GraphDagCborBlake2b256 => DbColumn::GraphFull,
438                DbColumn::GraphFull => DbColumn::GraphDagCborBlake2b256,
439                DbColumn::Settings => panic!("invalid column for IPLD data"),
440                DbColumn::EthMappings => panic!("invalid column for IPLD data"),
441                DbColumn::PersistentGraph => panic!("invalid column for GC enabled IPLD data"),
442            };
443            let actual = db.read_from_column(cid.to_bytes(), other_column).unwrap();
444            assert!(actual.is_none());
445
446            // Blockstore API usage should be transparent
447            let actual = fvm_ipld_blockstore::Blockstore::get(db.as_ref(), &cid)
448                .unwrap()
449                .expect("data not found");
450            assert_eq!(data, actual.as_slice());
451        }
452
453        // Check non-IPLD column as well
454        db.write_to_column(b"dagon", b"bloop", DbColumn::Settings)
455            .unwrap();
456        let actual = db
457            .read_from_column(b"dagon", DbColumn::Settings)
458            .unwrap()
459            .expect("data not found");
460        assert_eq!(b"bloop", actual.as_bytes());
461    }
462
463    #[test]
464    fn choose_column_test() {
465        let data = [0u8; 32];
466        let cases = [
467            (
468                Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data)),
469                DbColumn::GraphDagCborBlake2b256,
470            ),
471            (
472                Cid::new_v1(
473                    fvm_ipld_encoding::CBOR,
474                    MultihashCode::Blake2b256.digest(&data),
475                ),
476                DbColumn::GraphFull,
477            ),
478            (
479                Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data)),
480                DbColumn::GraphFull,
481            ),
482        ];
483
484        for (cid, expected) in cases {
485            let actual = ParityDb::choose_column(&cid);
486            assert_eq!(expected, actual);
487        }
488    }
489
490    #[test]
491    fn persistent_tests() {
492        let db = TempParityDB::new();
493        let data = [
494            b"h'nglui mglw'nafh".to_vec(),
495            b"Cthulhu".to_vec(),
496            b"R'lyeh wgah'nagl fhtagn!!".to_vec(),
497        ];
498
499        let persistent_data = data
500            .clone()
501            .into_iter()
502            .map(|mut entry| {
503                entry.push(255);
504                entry
505            })
506            .collect_vec();
507
508        let cids = [
509            Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data[0])),
510            Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data[1])),
511            Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data[1])),
512        ];
513
514        for idx in 0..3 {
515            let cid = &cids[idx];
516            let persistent_entry = &persistent_data[idx];
517            let data_entry = &data[idx];
518            db.put_keyed_persistent(cid, persistent_entry).unwrap();
519            // Check that we get persistent data if the data is otherwise absent from the GC enabled
520            // storage.
521            assert_eq!(
522                Blockstore::get(db.deref(), cid).unwrap(),
523                Some(persistent_entry.clone())
524            );
525            assert!(
526                db.read_from_column(cid.to_bytes(), DbColumn::PersistentGraph)
527                    .unwrap()
528                    .is_some()
529            );
530            db.put_keyed(cid, data_entry).unwrap();
531            assert_eq!(
532                Blockstore::get(db.deref(), cid).unwrap(),
533                Some(data_entry.clone())
534            );
535        }
536    }
537
538    #[test]
539    fn subscription_tests() {
540        let db = TempParityDB::new();
541        assert!(db.write_ops_broadcast_tx.read().is_none());
542        let data = [
543            b"h'nglui mglw'nafh".to_vec(),
544            b"Cthulhu".to_vec(),
545            b"R'lyeh wgah'nagl fhtagn!!".to_vec(),
546        ];
547
548        let cids = [
549            Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data[0])),
550            Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data[1])),
551            Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data[1])),
552        ];
553
554        let mut rx1 = db.subscribe_write_ops();
555        let mut rx2 = db.subscribe_write_ops();
556
557        assert!(has_subscribers(
558            db.write_ops_broadcast_tx.read().as_ref().unwrap()
559        ));
560
561        for (idx, cid) in cids.iter().enumerate() {
562            let data_entry = &data[idx];
563            db.put_keyed(cid, data_entry).unwrap();
564            assert_eq!(rx1.blocking_recv().unwrap(), (*cid, data_entry.clone()));
565            assert_eq!(rx2.blocking_recv().unwrap(), (*cid, data_entry.clone()));
566        }
567
568        drop(rx1);
569        drop(rx2);
570
571        assert!(!has_subscribers(
572            db.write_ops_broadcast_tx.read().as_ref().unwrap()
573        ));
574
575        db.unsubscribe_write_ops();
576
577        assert!(db.write_ops_broadcast_tx.read().is_none());
578    }
579}