Skip to main content

forest/db/
parity_db.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4mod gc;
5pub use gc::*;
6
7use super::{EthMappingsStore, PersistentStore, SettingsStore};
8use crate::blocks::TipsetKey;
9use crate::db::{DBStatistics, parity_db_config::ParityDbConfig};
10use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite};
11use crate::rpc::eth::types::EthHash;
12use crate::utils::{broadcast::has_subscribers, multihash::prelude::*};
13use anyhow::Context as _;
14use bytes::Bytes;
15use cid::Cid;
16use fvm_ipld_blockstore::Blockstore;
17use fvm_ipld_encoding::DAG_CBOR;
18use parity_db::{CompressionType, Db, Operation, Options};
19use parking_lot::RwLock;
20use std::path::PathBuf;
21use strum::{Display, EnumIter, FromRepr, IntoEnumIterator};
22use tracing::warn;
23
24/// This is specific to Forest's `ParityDb` usage.
25/// It is used to determine which column to use for a given entry type.
26#[derive(Copy, Clone, Debug, Display, PartialEq, FromRepr, EnumIter)]
27#[repr(u8)]
28pub enum DbColumn {
29    /// Column for storing IPLD data with `Blake2b256` hash and `DAG_CBOR` codec.
30    /// Most entries in the `blockstore` will be stored in this column.
31    GraphDagCborBlake2b256,
32    /// Column for storing other IPLD data (different codec or hash function).
33    /// It allows for key retrieval at the cost of degraded performance. Given that
34    /// there will be a small number of entries in this column, the performance
35    /// degradation is negligible.
36    GraphFull,
37    /// Column for storing Forest-specific settings.
38    Settings,
39    /// Column for storing Ethereum mappings.
40    EthMappings,
41    /// Column for storing IPLD data that has to be ignored by the garbage collector.
42    /// Anything stored in this column can be considered permanent, unless manually
43    /// deleted.
44    PersistentGraph,
45}
46
47impl DbColumn {
48    fn create_column_options(compression: CompressionType) -> Vec<parity_db::ColumnOptions> {
49        DbColumn::iter()
50            .map(|col| {
51                match col {
52                    DbColumn::GraphDagCborBlake2b256 | DbColumn::PersistentGraph => {
53                        parity_db::ColumnOptions {
54                            preimage: true,
55                            compression,
56                            ..Default::default()
57                        }
58                    }
59                    DbColumn::GraphFull => parity_db::ColumnOptions {
60                        preimage: true,
61                        // This is needed for key retrieval.
62                        btree_index: true,
63                        compression,
64                        ..Default::default()
65                    },
66                    DbColumn::Settings => parity_db::ColumnOptions {
67                        // explicitly disable preimage for settings column
68                        // othewise we are not able to overwrite entries
69                        preimage: false,
70                        // This is needed for key retrieval.
71                        btree_index: true,
72                        compression,
73                        ..Default::default()
74                    },
75                    DbColumn::EthMappings => parity_db::ColumnOptions {
76                        preimage: false,
77                        btree_index: false,
78                        compression,
79                        ..Default::default()
80                    },
81                }
82            })
83            .collect()
84    }
85}
86
87type WriteOpsBroadcastTxSender = tokio::sync::broadcast::Sender<Vec<(Cid, Bytes)>>;
88
89pub struct ParityDb {
90    pub db: parity_db::Db,
91    statistics_enabled: bool,
92    // This is needed to maintain backwards-compatibility for pre-persistent-column migrations.
93    disable_persistent_fallback: bool,
94    write_ops_broadcast_tx: RwLock<Option<WriteOpsBroadcastTxSender>>,
95}
96
97impl ParityDb {
98    pub fn to_options(path: impl Into<PathBuf>, config: &ParityDbConfig) -> Options {
99        Options {
100            path: path.into(),
101            sync_wal: true,
102            sync_data: true,
103            stats: config.enable_statistics,
104            salt: None,
105            columns: DbColumn::create_column_options(CompressionType::Lz4),
106            compression_threshold: [(0, 128)].into_iter().collect(),
107        }
108    }
109
110    pub fn open(path: impl Into<PathBuf>, config: &ParityDbConfig) -> anyhow::Result<Self> {
111        let opts = Self::to_options(path.into(), config);
112        Self::open_with_options(&opts)
113    }
114
115    pub fn open_with_options(options: &Options) -> anyhow::Result<Self> {
116        Ok(Self {
117            db: Db::open_or_create(options)?,
118            statistics_enabled: options.stats,
119            disable_persistent_fallback: false,
120            write_ops_broadcast_tx: RwLock::new(None),
121        })
122    }
123
124    /// Returns an appropriate column variant based on the information
125    /// in the Cid.
126    fn choose_column(cid: &Cid) -> DbColumn {
127        match cid.codec() {
128            DAG_CBOR if cid.hash().code() == u64::from(MultihashCode::Blake2b256) => {
129                DbColumn::GraphDagCborBlake2b256
130            }
131            _ => DbColumn::GraphFull,
132        }
133    }
134
135    fn read_from_column<K>(&self, key: K, column: DbColumn) -> anyhow::Result<Option<Vec<u8>>>
136    where
137        K: AsRef<[u8]>,
138    {
139        self.db
140            .get(column as u8, key.as_ref())
141            .with_context(|| format!("error from column {column}"))
142    }
143
144    fn write_to_column<K, V>(&self, key: K, value: V, column: DbColumn) -> anyhow::Result<()>
145    where
146        K: AsRef<[u8]>,
147        V: AsRef<[u8]>,
148    {
149        let tx = [(column as u8, key.as_ref(), Some(value.as_ref().to_vec()))];
150        self.db
151            .commit(tx)
152            .with_context(|| format!("error writing to column {column}"))
153    }
154}
155
156impl SettingsStore for ParityDb {
157    fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
158        self.read_from_column(key.as_bytes(), DbColumn::Settings)
159    }
160
161    fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> {
162        self.write_to_column(key.as_bytes(), value, DbColumn::Settings)
163    }
164
165    fn exists(&self, key: &str) -> anyhow::Result<bool> {
166        self.db
167            .get_size(DbColumn::Settings as u8, key.as_bytes())
168            .map(|size| size.is_some())
169            .context("error checking if key exists")
170    }
171
172    fn setting_keys(&self) -> anyhow::Result<Vec<String>> {
173        let mut iter = self.db.iter(DbColumn::Settings as u8)?;
174        let mut keys = vec![];
175        while let Some((key, _)) = iter.next()? {
176            keys.push(String::from_utf8(key)?);
177        }
178        Ok(keys)
179    }
180}
181
182impl super::HeaviestTipsetKeyProvider for ParityDb {
183    fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>> {
184        super::SettingsStoreExt::read_obj::<TipsetKey>(self, super::setting_keys::HEAD_KEY)
185    }
186
187    fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
188        super::SettingsStoreExt::write_obj(self, super::setting_keys::HEAD_KEY, tsk)
189    }
190}
191
192impl EthMappingsStore for ParityDb {
193    fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
194        self.read_from_column(key.0.as_bytes(), DbColumn::EthMappings)
195    }
196
197    fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> {
198        self.write_to_column(key.0.as_bytes(), value, DbColumn::EthMappings)
199    }
200
201    fn exists(&self, key: &EthHash) -> anyhow::Result<bool> {
202        self.db
203            .get_size(DbColumn::EthMappings as u8, key.0.as_bytes())
204            .map(|size| size.is_some())
205            .context("error checking if key exists")
206    }
207
208    fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
209        let mut cids = Vec::new();
210
211        self.db
212            .iter_column_while(DbColumn::EthMappings as u8, |val| {
213                if let Ok(value) = fvm_ipld_encoding::from_slice::<(Cid, u64)>(&val.value) {
214                    cids.push(value);
215                }
216                true
217            })?;
218
219        Ok(cids)
220    }
221
222    fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()> {
223        Ok(self.db.commit_changes(keys.into_iter().map(|key| {
224            let bytes = key.0.as_bytes().to_vec();
225            (DbColumn::EthMappings as u8, Operation::Dereference(bytes))
226        }))?)
227    }
228}
229
230impl Blockstore for ParityDb {
231    fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
232        let column = Self::choose_column(k);
233        let res = self.read_from_column(k.to_bytes(), column)?;
234        if res.is_some() {
235            return Ok(res);
236        }
237        self.get_persistent(k)
238    }
239
240    fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
241        let column = Self::choose_column(k);
242        // We can put the data directly into the database without any encoding.
243        self.write_to_column(k.to_bytes(), block, column)?;
244        match &*self.write_ops_broadcast_tx.read() {
245            Some(tx) if has_subscribers(tx) => {
246                let _ = tx.send(vec![(*k, Bytes::copy_from_slice(block))]);
247            }
248            _ => {}
249        }
250
251        Ok(())
252    }
253
254    fn put_many_keyed<D, I>(&self, blocks: I) -> anyhow::Result<()>
255    where
256        Self: Sized,
257        D: AsRef<[u8]>,
258        I: IntoIterator<Item = (Cid, D)>,
259    {
260        let tx_opt = &*self.write_ops_broadcast_tx.read();
261        let has_subscribers = tx_opt.as_ref().map(has_subscribers).unwrap_or_default();
262        let mut values_for_subscriber = vec![];
263        let values = blocks.into_iter().map(|(k, v)| {
264            let column = Self::choose_column(&k);
265            let v = v.as_ref().to_vec();
266            if has_subscribers {
267                values_for_subscriber.push((k, Bytes::copy_from_slice(&v)));
268            }
269            (column, k.to_bytes(), v)
270        });
271        let tx = values
272            .into_iter()
273            .map(|(col, k, v)| (col as u8, Operation::Set(k, v)));
274        self.db.commit_changes(tx).context("error bulk writing")?;
275        if let Some(tx) = tx_opt {
276            let _ = tx.send(values_for_subscriber);
277        }
278        Ok(())
279    }
280}
281
282impl PersistentStore for ParityDb {
283    fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
284        self.write_to_column(k.to_bytes(), block, DbColumn::PersistentGraph)
285    }
286}
287
288impl BitswapStoreRead for ParityDb {
289    fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
290        // We need to check both columns because we don't know which one
291        // the data is in. The order is important because most data will
292        // be in the [`DbColumn::GraphDagCborBlake2b256`] column and so
293        // it directly affects performance. If this assumption ever changes
294        // then this code should be modified accordingly.
295        for column in [DbColumn::GraphDagCborBlake2b256, DbColumn::GraphFull] {
296            if self
297                .db
298                .get_size(column as u8, &cid.to_bytes())
299                .context("error checking if key exists")?
300                .is_some()
301            {
302                return Ok(true);
303            }
304        }
305        Ok(false)
306    }
307
308    fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
309        Blockstore::get(self, cid)
310    }
311}
312
313impl BitswapStoreReadWrite for ParityDb {
314    type Hashes = MultihashCode;
315
316    fn insert(&self, block: &crate::libp2p_bitswap::Block64<Self::Hashes>) -> anyhow::Result<()> {
317        self.put_keyed(block.cid(), block.data())
318    }
319}
320
321impl DBStatistics for ParityDb {
322    fn get_statistics(&self) -> Option<String> {
323        if !self.statistics_enabled {
324            return None;
325        }
326
327        let mut buf = Vec::new();
328        if let Err(err) = self.db.write_stats_text(&mut buf, None) {
329            warn!("Unable to write database statistics: {err}");
330            return None;
331        }
332
333        match String::from_utf8(buf) {
334            Ok(stats) => Some(stats),
335            Err(e) => {
336                warn!("Malformed statistics: {e}");
337                None
338            }
339        }
340    }
341}
342
343type Op = (u8, Operation<Vec<u8>, Vec<u8>>);
344
345impl ParityDb {
346    /// Removes a record.
347    ///
348    /// # Arguments
349    /// * `key` - record identifier
350    #[allow(dead_code)]
351    pub fn dereference_operation(key: &Cid) -> Op {
352        let column = Self::choose_column(key);
353        (column as u8, Operation::Dereference(key.to_bytes()))
354    }
355
356    /// Updates/inserts a record.
357    ///
358    /// # Arguments
359    /// * `column` - column identifier
360    /// * `key` - record identifier
361    /// * `value` - record contents
362    pub fn set_operation(column: u8, key: Vec<u8>, value: Vec<u8>) -> Op {
363        (column, Operation::Set(key, value))
364    }
365
366    // Get data from persistent graph column.
367    fn get_persistent(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
368        if self.disable_persistent_fallback {
369            return Ok(None);
370        }
371        self.read_from_column(k.to_bytes(), DbColumn::PersistentGraph)
372    }
373}
374
375impl super::BlockstoreWriteOpsSubscribable for ParityDb {
376    fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver<Vec<(Cid, Bytes)>> {
377        let tx_lock = self.write_ops_broadcast_tx.read();
378        if let Some(tx) = &*tx_lock {
379            return tx.subscribe();
380        }
381        drop(tx_lock);
382        let (tx, rx) = tokio::sync::broadcast::channel(65536);
383        *self.write_ops_broadcast_tx.write() = Some(tx);
384        rx
385    }
386
387    fn unsubscribe_write_ops(&self) {
388        self.write_ops_broadcast_tx.write().take();
389    }
390}
391
392#[cfg(test)]
393mod test {
394    use super::*;
395    use crate::db::{BlockstoreWriteOpsSubscribable, tests::db_utils::parity::TempParityDB};
396    use fvm_ipld_encoding::IPLD_RAW;
397    use itertools::Itertools as _;
398    use nom::AsBytes;
399    use std::ops::Deref;
400
401    #[test]
402    fn write_read_different_columns_test() {
403        let db = TempParityDB::new();
404        let data = [
405            b"h'nglui mglw'nafh".to_vec(),
406            b"Cthulhu".to_vec(),
407            b"R'lyeh wgah'nagl fhtagn!!".to_vec(),
408        ];
409        let cids = [
410            Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data[0])),
411            Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data[1])),
412            Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data[1])),
413        ];
414
415        let cases = [
416            (DbColumn::GraphDagCborBlake2b256, cids[0], &data[0]),
417            (DbColumn::GraphFull, cids[1], &data[1]),
418            (DbColumn::GraphFull, cids[2], &data[2]),
419        ];
420
421        for (_, cid, data) in cases {
422            db.put_keyed(&cid, data).unwrap();
423        }
424
425        for (column, cid, data) in cases {
426            let actual = db
427                .read_from_column(cid.to_bytes(), column)
428                .unwrap()
429                .expect("data not found");
430            assert_eq!(data, actual.as_bytes());
431
432            // assert that the data is NOT in the other column
433            let other_column = match column {
434                DbColumn::GraphDagCborBlake2b256 => DbColumn::GraphFull,
435                DbColumn::GraphFull => DbColumn::GraphDagCborBlake2b256,
436                DbColumn::Settings => panic!("invalid column for IPLD data"),
437                DbColumn::EthMappings => panic!("invalid column for IPLD data"),
438                DbColumn::PersistentGraph => panic!("invalid column for GC enabled IPLD data"),
439            };
440            let actual = db.read_from_column(cid.to_bytes(), other_column).unwrap();
441            assert!(actual.is_none());
442
443            // Blockstore API usage should be transparent
444            let actual = fvm_ipld_blockstore::Blockstore::get(db.as_ref(), &cid)
445                .unwrap()
446                .expect("data not found");
447            assert_eq!(data, actual.as_slice());
448        }
449
450        // Check non-IPLD column as well
451        db.write_to_column(b"dagon", b"bloop", DbColumn::Settings)
452            .unwrap();
453        let actual = db
454            .read_from_column(b"dagon", DbColumn::Settings)
455            .unwrap()
456            .expect("data not found");
457        assert_eq!(b"bloop", actual.as_bytes());
458    }
459
460    #[test]
461    fn choose_column_test() {
462        let data = [0u8; 32];
463        let cases = [
464            (
465                Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data)),
466                DbColumn::GraphDagCborBlake2b256,
467            ),
468            (
469                Cid::new_v1(
470                    fvm_ipld_encoding::CBOR,
471                    MultihashCode::Blake2b256.digest(&data),
472                ),
473                DbColumn::GraphFull,
474            ),
475            (
476                Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data)),
477                DbColumn::GraphFull,
478            ),
479        ];
480
481        for (cid, expected) in cases {
482            let actual = ParityDb::choose_column(&cid);
483            assert_eq!(expected, actual);
484        }
485    }
486
487    #[test]
488    fn persistent_tests() {
489        let db = TempParityDB::new();
490        let data = [
491            b"h'nglui mglw'nafh".to_vec(),
492            b"Cthulhu".to_vec(),
493            b"R'lyeh wgah'nagl fhtagn!!".to_vec(),
494        ];
495
496        let persistent_data = data
497            .clone()
498            .into_iter()
499            .map(|mut entry| {
500                entry.push(255);
501                entry
502            })
503            .collect_vec();
504
505        let cids = [
506            Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data[0])),
507            Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data[1])),
508            Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data[1])),
509        ];
510
511        for idx in 0..3 {
512            let cid = &cids[idx];
513            let persistent_entry = &persistent_data[idx];
514            let data_entry = &data[idx];
515            db.put_keyed_persistent(cid, persistent_entry).unwrap();
516            // Check that we get persistent data if the data is otherwise absent from the GC enabled
517            // storage.
518            assert_eq!(
519                Blockstore::get(db.deref(), cid).unwrap(),
520                Some(persistent_entry.clone())
521            );
522            assert!(
523                db.read_from_column(cid.to_bytes(), DbColumn::PersistentGraph)
524                    .unwrap()
525                    .is_some()
526            );
527            db.put_keyed(cid, data_entry).unwrap();
528            assert_eq!(
529                Blockstore::get(db.deref(), cid).unwrap(),
530                Some(data_entry.clone())
531            );
532        }
533    }
534
535    #[test]
536    fn subscription_tests() {
537        let db = TempParityDB::new();
538        assert!(db.write_ops_broadcast_tx.read().is_none());
539        let data = [
540            b"h'nglui mglw'nafh".to_vec(),
541            b"Cthulhu".to_vec(),
542            b"R'lyeh wgah'nagl fhtagn!!".to_vec(),
543        ];
544
545        let cids = [
546            Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data[0])),
547            Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data[1])),
548            Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data[1])),
549        ];
550
551        let mut rx1 = db.subscribe_write_ops();
552        let mut rx2 = db.subscribe_write_ops();
553
554        assert!(has_subscribers(
555            db.write_ops_broadcast_tx.read().as_ref().unwrap()
556        ));
557
558        for (idx, cid) in cids.iter().enumerate() {
559            let data_entry = &data[idx];
560            db.put_keyed(cid, data_entry).unwrap();
561            let expected = vec![(*cid, Bytes::copy_from_slice(data_entry))];
562            assert_eq!(rx1.blocking_recv().unwrap(), expected);
563            assert_eq!(rx2.blocking_recv().unwrap(), expected);
564        }
565
566        drop(rx1);
567        drop(rx2);
568
569        assert!(!has_subscribers(
570            db.write_ops_broadcast_tx.read().as_ref().unwrap()
571        ));
572
573        db.unsubscribe_write_ops();
574
575        assert!(db.write_ops_broadcast_tx.read().is_none());
576    }
577}