Skip to main content

forest/db/
parity_db.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4mod gc;
5pub use gc::*;
6
7use super::{EthMappingsStore, PersistentStore, SettingsStore};
8use crate::blocks::{Tipset, TipsetKey};
9use crate::db::{DBStatistics, parity_db_config::ParityDbConfig};
10use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite};
11use crate::rpc::eth::types::EthHash;
12use crate::utils::{broadcast::has_subscribers, multihash::prelude::*};
13use anyhow::Context as _;
14use bytes::Bytes;
15use cid::Cid;
16use fvm_ipld_blockstore::Blockstore;
17use fvm_ipld_encoding::DAG_CBOR;
18use parity_db::{CompressionType, Db, Operation, Options};
19use parking_lot::RwLock;
20use std::path::PathBuf;
21use strum::{Display, EnumIter, FromRepr, IntoEnumIterator};
22use tracing::warn;
23
24/// This is specific to Forest's `ParityDb` usage.
25/// It is used to determine which column to use for a given entry type.
26#[derive(Copy, Clone, Debug, Display, PartialEq, FromRepr, EnumIter)]
27#[repr(u8)]
28pub enum DbColumn {
29    /// Column for storing IPLD data with `Blake2b256` hash and `DAG_CBOR` codec.
30    /// Most entries in the `blockstore` will be stored in this column.
31    GraphDagCborBlake2b256,
32    /// Column for storing other IPLD data (different codec or hash function).
33    /// It allows for key retrieval at the cost of degraded performance. Given that
34    /// there will be a small number of entries in this column, the performance
35    /// degradation is negligible.
36    GraphFull,
37    /// Column for storing Forest-specific settings.
38    Settings,
39    /// Column for storing Ethereum mappings.
40    EthMappings,
41    /// Column for storing IPLD data that has to be ignored by the garbage collector.
42    /// Anything stored in this column can be considered permanent, unless manually
43    /// deleted.
44    PersistentGraph,
45}
46
47impl DbColumn {
48    fn create_column_options(compression: CompressionType) -> Vec<parity_db::ColumnOptions> {
49        DbColumn::iter()
50            .map(|col| {
51                match col {
52                    DbColumn::GraphDagCborBlake2b256 | DbColumn::PersistentGraph => {
53                        parity_db::ColumnOptions {
54                            preimage: true,
55                            compression,
56                            ..Default::default()
57                        }
58                    }
59                    DbColumn::GraphFull => parity_db::ColumnOptions {
60                        preimage: true,
61                        // This is needed for key retrieval.
62                        btree_index: true,
63                        compression,
64                        ..Default::default()
65                    },
66                    DbColumn::Settings => parity_db::ColumnOptions {
67                        // explicitly disable preimage for settings column
68                        // othewise we are not able to overwrite entries
69                        preimage: false,
70                        // This is needed for key retrieval.
71                        btree_index: true,
72                        compression,
73                        ..Default::default()
74                    },
75                    DbColumn::EthMappings => parity_db::ColumnOptions {
76                        preimage: false,
77                        btree_index: false,
78                        compression,
79                        ..Default::default()
80                    },
81                }
82            })
83            .collect()
84    }
85}
86
87type WriteOpsBroadcastTxSender = tokio::sync::broadcast::Sender<Vec<(Cid, Bytes)>>;
88
89pub struct ParityDb {
90    pub db: parity_db::Db,
91    statistics_enabled: bool,
92    // This is needed to maintain backwards-compatibility for pre-persistent-column migrations.
93    disable_persistent_fallback: bool,
94    write_ops_broadcast_tx: RwLock<Option<WriteOpsBroadcastTxSender>>,
95}
96
97impl ParityDb {
98    pub fn to_options(path: impl Into<PathBuf>, config: &ParityDbConfig) -> Options {
99        Options {
100            path: path.into(),
101            sync_wal: true,
102            sync_data: true,
103            stats: config.enable_statistics,
104            salt: None,
105            columns: DbColumn::create_column_options(CompressionType::Lz4),
106            compression_threshold: [(0, 128)].into_iter().collect(),
107        }
108    }
109
110    pub fn open(path: impl Into<PathBuf>, config: &ParityDbConfig) -> anyhow::Result<Self> {
111        let opts = Self::to_options(path.into(), config);
112        Self::open_with_options(&opts)
113    }
114
115    pub fn open_with_options(options: &Options) -> anyhow::Result<Self> {
116        Ok(Self {
117            db: Db::open_or_create(options)?,
118            statistics_enabled: options.stats,
119            disable_persistent_fallback: false,
120            write_ops_broadcast_tx: RwLock::new(None),
121        })
122    }
123
124    /// Returns an appropriate column variant based on the information
125    /// in the Cid.
126    fn choose_column(cid: &Cid) -> DbColumn {
127        match cid.codec() {
128            DAG_CBOR if cid.hash().code() == u64::from(MultihashCode::Blake2b256) => {
129                DbColumn::GraphDagCborBlake2b256
130            }
131            _ => DbColumn::GraphFull,
132        }
133    }
134
135    fn read_from_column<K>(&self, key: K, column: DbColumn) -> anyhow::Result<Option<Vec<u8>>>
136    where
137        K: AsRef<[u8]>,
138    {
139        self.db
140            .get(column as u8, key.as_ref())
141            .with_context(|| format!("error from column {column}"))
142    }
143
144    fn write_to_column<K, V>(&self, key: K, value: V, column: DbColumn) -> anyhow::Result<()>
145    where
146        K: AsRef<[u8]>,
147        V: AsRef<[u8]>,
148    {
149        let tx = [(column as u8, key.as_ref(), Some(value.as_ref().to_vec()))];
150        self.db
151            .commit(tx)
152            .with_context(|| format!("error writing to column {column}"))
153    }
154}
155
156impl SettingsStore for ParityDb {
157    fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
158        self.read_from_column(key.as_bytes(), DbColumn::Settings)
159    }
160
161    fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> {
162        self.write_to_column(key.as_bytes(), value, DbColumn::Settings)
163    }
164
165    fn exists(&self, key: &str) -> anyhow::Result<bool> {
166        self.db
167            .get_size(DbColumn::Settings as u8, key.as_bytes())
168            .map(|size| size.is_some())
169            .context("error checking if key exists")
170    }
171
172    fn setting_keys(&self) -> anyhow::Result<Vec<String>> {
173        let mut iter = self.db.iter(DbColumn::Settings as u8)?;
174        let mut keys = vec![];
175        while let Some((key, _)) = iter.next()? {
176            keys.push(String::from_utf8(key)?);
177        }
178        Ok(keys)
179    }
180}
181
182impl super::HeaviestTipsetKeyProvider for ParityDb {
183    fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>> {
184        super::SettingsStoreExt::read_obj::<TipsetKey>(self, super::setting_keys::HEAD_KEY)
185    }
186
187    fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
188        super::SettingsStoreExt::write_obj(self, super::setting_keys::HEAD_KEY, tsk)
189    }
190}
191
192impl EthMappingsStore for ParityDb {
193    fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
194        self.read_from_column(key.0.as_bytes(), DbColumn::EthMappings)
195    }
196
197    fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> {
198        self.write_to_column(key.0.as_bytes(), value, DbColumn::EthMappings)
199    }
200
201    fn exists(&self, key: &EthHash) -> anyhow::Result<bool> {
202        self.db
203            .get_size(DbColumn::EthMappings as u8, key.0.as_bytes())
204            .map(|size| size.is_some())
205            .context("error checking if key exists")
206    }
207
208    fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
209        let mut cids = Vec::new();
210
211        self.db
212            .iter_column_while(DbColumn::EthMappings as u8, |val| {
213                if let Ok(value) = fvm_ipld_encoding::from_slice::<(Cid, u64)>(&val.value) {
214                    cids.push(value);
215                }
216                true
217            })?;
218
219        Ok(cids)
220    }
221
222    fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()> {
223        Ok(self.db.commit_changes(keys.into_iter().map(|key| {
224            let bytes = key.0.as_bytes().to_vec();
225            (DbColumn::EthMappings as u8, Operation::Dereference(bytes))
226        }))?)
227    }
228
229    fn tipset_key_by_epoch(&self, epoch: i64) -> anyhow::Result<Option<TipsetKey>> {
230        let key = epoch.to_le_bytes();
231        if let Some(bytes) = self.read_from_column(key, DbColumn::EthMappings)? {
232            Ok(Some(fvm_ipld_encoding::from_slice(&bytes)?))
233        } else {
234            Ok(None)
235        }
236    }
237
238    fn set_tipset_key_at_epoch(&self, ts: &Tipset) -> anyhow::Result<()> {
239        let key = ts.epoch().to_le_bytes();
240        let bytes = fvm_ipld_encoding::to_vec(ts.key())?;
241        self.write_to_column(key, bytes, DbColumn::EthMappings)
242    }
243}
244
245impl Blockstore for ParityDb {
246    fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
247        let column = Self::choose_column(k);
248        let res = self.read_from_column(k.to_bytes(), column)?;
249        if res.is_some() {
250            return Ok(res);
251        }
252        self.get_persistent(k)
253    }
254
255    fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
256        let column = Self::choose_column(k);
257        // We can put the data directly into the database without any encoding.
258        self.write_to_column(k.to_bytes(), block, column)?;
259        match &*self.write_ops_broadcast_tx.read() {
260            Some(tx) if has_subscribers(tx) => {
261                let _ = tx.send(vec![(*k, Bytes::copy_from_slice(block))]);
262            }
263            _ => {}
264        }
265
266        Ok(())
267    }
268
269    fn put_many_keyed<D, I>(&self, blocks: I) -> anyhow::Result<()>
270    where
271        Self: Sized,
272        D: AsRef<[u8]>,
273        I: IntoIterator<Item = (Cid, D)>,
274    {
275        let tx_opt = &*self.write_ops_broadcast_tx.read();
276        let has_subscribers = tx_opt.as_ref().map(has_subscribers).unwrap_or_default();
277        let mut values_for_subscriber = vec![];
278        let values = blocks.into_iter().map(|(k, v)| {
279            let column = Self::choose_column(&k);
280            let v = v.as_ref().to_vec();
281            if has_subscribers {
282                values_for_subscriber.push((k, Bytes::copy_from_slice(&v)));
283            }
284            (column, k.to_bytes(), v)
285        });
286        let tx = values
287            .into_iter()
288            .map(|(col, k, v)| (col as u8, Operation::Set(k, v)));
289        self.db.commit_changes(tx).context("error bulk writing")?;
290        if let Some(tx) = tx_opt {
291            let _ = tx.send(values_for_subscriber);
292        }
293        Ok(())
294    }
295}
296
297impl PersistentStore for ParityDb {
298    fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
299        self.write_to_column(k.to_bytes(), block, DbColumn::PersistentGraph)
300    }
301}
302
303impl BitswapStoreRead for ParityDb {
304    fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
305        // We need to check both columns because we don't know which one
306        // the data is in. The order is important because most data will
307        // be in the [`DbColumn::GraphDagCborBlake2b256`] column and so
308        // it directly affects performance. If this assumption ever changes
309        // then this code should be modified accordingly.
310        for column in [DbColumn::GraphDagCborBlake2b256, DbColumn::GraphFull] {
311            if self
312                .db
313                .get_size(column as u8, &cid.to_bytes())
314                .context("error checking if key exists")?
315                .is_some()
316            {
317                return Ok(true);
318            }
319        }
320        Ok(false)
321    }
322
323    fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
324        Blockstore::get(self, cid)
325    }
326}
327
328impl BitswapStoreReadWrite for ParityDb {
329    type Hashes = MultihashCode;
330
331    fn insert(&self, block: &crate::libp2p_bitswap::Block64<Self::Hashes>) -> anyhow::Result<()> {
332        self.put_keyed(block.cid(), block.data())
333    }
334}
335
336impl DBStatistics for ParityDb {
337    fn get_statistics(&self) -> Option<String> {
338        if !self.statistics_enabled {
339            return None;
340        }
341
342        let mut buf = Vec::new();
343        if let Err(err) = self.db.write_stats_text(&mut buf, None) {
344            warn!("Unable to write database statistics: {err}");
345            return None;
346        }
347
348        match String::from_utf8(buf) {
349            Ok(stats) => Some(stats),
350            Err(e) => {
351                warn!("Malformed statistics: {e}");
352                None
353            }
354        }
355    }
356}
357
358type Op = (u8, Operation<Vec<u8>, Vec<u8>>);
359
360impl ParityDb {
361    /// Removes a record.
362    ///
363    /// # Arguments
364    /// * `key` - record identifier
365    #[allow(dead_code)]
366    pub fn dereference_operation(key: &Cid) -> Op {
367        let column = Self::choose_column(key);
368        (column as u8, Operation::Dereference(key.to_bytes()))
369    }
370
371    /// Updates/inserts a record.
372    ///
373    /// # Arguments
374    /// * `column` - column identifier
375    /// * `key` - record identifier
376    /// * `value` - record contents
377    pub fn set_operation(column: u8, key: Vec<u8>, value: Vec<u8>) -> Op {
378        (column, Operation::Set(key, value))
379    }
380
381    // Get data from persistent graph column.
382    fn get_persistent(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
383        if self.disable_persistent_fallback {
384            return Ok(None);
385        }
386        self.read_from_column(k.to_bytes(), DbColumn::PersistentGraph)
387    }
388}
389
390impl super::BlockstoreWriteOpsSubscribable for ParityDb {
391    fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver<Vec<(Cid, Bytes)>> {
392        let tx_lock = self.write_ops_broadcast_tx.read();
393        if let Some(tx) = &*tx_lock {
394            return tx.subscribe();
395        }
396        drop(tx_lock);
397        let (tx, rx) = tokio::sync::broadcast::channel(65536);
398        *self.write_ops_broadcast_tx.write() = Some(tx);
399        rx
400    }
401
402    fn unsubscribe_write_ops(&self) {
403        self.write_ops_broadcast_tx.write().take();
404    }
405}
406
407#[cfg(test)]
408mod test {
409    use super::*;
410    use crate::db::{BlockstoreWriteOpsSubscribable, tests::db_utils::parity::TempParityDB};
411    use fvm_ipld_encoding::IPLD_RAW;
412    use itertools::Itertools as _;
413    use nom::AsBytes;
414    use std::ops::Deref;
415
416    #[test]
417    fn write_read_different_columns_test() {
418        let db = TempParityDB::new();
419        let data = [
420            b"h'nglui mglw'nafh".to_vec(),
421            b"Cthulhu".to_vec(),
422            b"R'lyeh wgah'nagl fhtagn!!".to_vec(),
423        ];
424        let cids = [
425            Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data[0])),
426            Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data[1])),
427            Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data[1])),
428        ];
429
430        let cases = [
431            (DbColumn::GraphDagCborBlake2b256, cids[0], &data[0]),
432            (DbColumn::GraphFull, cids[1], &data[1]),
433            (DbColumn::GraphFull, cids[2], &data[2]),
434        ];
435
436        for (_, cid, data) in cases {
437            db.put_keyed(&cid, data).unwrap();
438        }
439
440        for (column, cid, data) in cases {
441            let actual = db
442                .read_from_column(cid.to_bytes(), column)
443                .unwrap()
444                .expect("data not found");
445            assert_eq!(data, actual.as_bytes());
446
447            // assert that the data is NOT in the other column
448            let other_column = match column {
449                DbColumn::GraphDagCborBlake2b256 => DbColumn::GraphFull,
450                DbColumn::GraphFull => DbColumn::GraphDagCborBlake2b256,
451                DbColumn::Settings => panic!("invalid column for IPLD data"),
452                DbColumn::EthMappings => panic!("invalid column for IPLD data"),
453                DbColumn::PersistentGraph => panic!("invalid column for GC enabled IPLD data"),
454            };
455            let actual = db.read_from_column(cid.to_bytes(), other_column).unwrap();
456            assert!(actual.is_none());
457
458            // Blockstore API usage should be transparent
459            let actual = fvm_ipld_blockstore::Blockstore::get(db.as_ref(), &cid)
460                .unwrap()
461                .expect("data not found");
462            assert_eq!(data, actual.as_slice());
463        }
464
465        // Check non-IPLD column as well
466        db.write_to_column(b"dagon", b"bloop", DbColumn::Settings)
467            .unwrap();
468        let actual = db
469            .read_from_column(b"dagon", DbColumn::Settings)
470            .unwrap()
471            .expect("data not found");
472        assert_eq!(b"bloop", actual.as_bytes());
473    }
474
475    #[test]
476    fn choose_column_test() {
477        let data = [0u8; 32];
478        let cases = [
479            (
480                Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data)),
481                DbColumn::GraphDagCborBlake2b256,
482            ),
483            (
484                Cid::new_v1(
485                    fvm_ipld_encoding::CBOR,
486                    MultihashCode::Blake2b256.digest(&data),
487                ),
488                DbColumn::GraphFull,
489            ),
490            (
491                Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data)),
492                DbColumn::GraphFull,
493            ),
494        ];
495
496        for (cid, expected) in cases {
497            let actual = ParityDb::choose_column(&cid);
498            assert_eq!(expected, actual);
499        }
500    }
501
502    #[test]
503    fn persistent_tests() {
504        let db = TempParityDB::new();
505        let data = [
506            b"h'nglui mglw'nafh".to_vec(),
507            b"Cthulhu".to_vec(),
508            b"R'lyeh wgah'nagl fhtagn!!".to_vec(),
509        ];
510
511        let persistent_data = data
512            .clone()
513            .into_iter()
514            .map(|mut entry| {
515                entry.push(255);
516                entry
517            })
518            .collect_vec();
519
520        let cids = [
521            Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data[0])),
522            Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data[1])),
523            Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data[1])),
524        ];
525
526        for idx in 0..3 {
527            let cid = &cids[idx];
528            let persistent_entry = &persistent_data[idx];
529            let data_entry = &data[idx];
530            db.put_keyed_persistent(cid, persistent_entry).unwrap();
531            // Check that we get persistent data if the data is otherwise absent from the GC enabled
532            // storage.
533            assert_eq!(
534                Blockstore::get(db.deref(), cid).unwrap(),
535                Some(persistent_entry.clone())
536            );
537            assert!(
538                db.read_from_column(cid.to_bytes(), DbColumn::PersistentGraph)
539                    .unwrap()
540                    .is_some()
541            );
542            db.put_keyed(cid, data_entry).unwrap();
543            assert_eq!(
544                Blockstore::get(db.deref(), cid).unwrap(),
545                Some(data_entry.clone())
546            );
547        }
548    }
549
550    #[test]
551    fn subscription_tests() {
552        let db = TempParityDB::new();
553        assert!(db.write_ops_broadcast_tx.read().is_none());
554        let data = [
555            b"h'nglui mglw'nafh".to_vec(),
556            b"Cthulhu".to_vec(),
557            b"R'lyeh wgah'nagl fhtagn!!".to_vec(),
558        ];
559
560        let cids = [
561            Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data[0])),
562            Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data[1])),
563            Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data[1])),
564        ];
565
566        let mut rx1 = db.subscribe_write_ops();
567        let mut rx2 = db.subscribe_write_ops();
568
569        assert!(has_subscribers(
570            db.write_ops_broadcast_tx.read().as_ref().unwrap()
571        ));
572
573        for (idx, cid) in cids.iter().enumerate() {
574            let data_entry = &data[idx];
575            db.put_keyed(cid, data_entry).unwrap();
576            let expected = vec![(*cid, Bytes::copy_from_slice(data_entry))];
577            assert_eq!(rx1.blocking_recv().unwrap(), expected);
578            assert_eq!(rx2.blocking_recv().unwrap(), expected);
579        }
580
581        drop(rx1);
582        drop(rx2);
583
584        assert!(!has_subscribers(
585            db.write_ops_broadcast_tx.read().as_ref().unwrap()
586        ));
587
588        db.unsubscribe_write_ops();
589
590        assert!(db.write_ops_broadcast_tx.read().is_none());
591    }
592}