forest/db/
parity_db.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use super::{EthMappingsStore, IndicesStore, PersistentStore, SettingsStore};
5use crate::blocks::TipsetKey;
6use crate::db::{DBStatistics, parity_db_config::ParityDbConfig};
7use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite};
8use crate::rpc::eth::types::EthHash;
9use crate::utils::multihash::prelude::*;
10use anyhow::{Context as _, anyhow};
11use cid::Cid;
12use futures::FutureExt;
13use fvm_ipld_blockstore::Blockstore;
14use fvm_ipld_encoding::DAG_CBOR;
15use parity_db::{CompressionType, Db, Operation, Options};
16use parking_lot::RwLock;
17use std::path::PathBuf;
18use strum::{Display, EnumIter, FromRepr, IntoEnumIterator};
19use tracing::warn;
20
21/// This is specific to Forest's `ParityDb` usage.
22/// It is used to determine which column to use for a given entry type.
23#[derive(Copy, Clone, Debug, Display, PartialEq, FromRepr, EnumIter)]
24#[repr(u8)]
25pub enum DbColumn {
26    /// Column for storing IPLD data with `Blake2b256` hash and `DAG_CBOR` codec.
27    /// Most entries in the `blockstore` will be stored in this column.
28    GraphDagCborBlake2b256,
29    /// Column for storing other IPLD data (different codec or hash function).
30    /// It allows for key retrieval at the cost of degraded performance. Given that
31    /// there will be a small number of entries in this column, the performance
32    /// degradation is negligible.
33    GraphFull,
34    /// Column for storing Forest-specific settings.
35    Settings,
36    /// Column for storing Ethereum mappings.
37    EthMappings,
38    /// Column for storing IPLD data that has to be ignored by the garbage collector.
39    /// Anything stored in this column can be considered permanent, unless manually
40    /// deleted.
41    PersistentGraph,
42    /// Column for storing indexed values.
43    Indices,
44}
45
46impl DbColumn {
47    fn create_column_options(compression: CompressionType) -> Vec<parity_db::ColumnOptions> {
48        DbColumn::iter()
49            .map(|col| {
50                match col {
51                    DbColumn::GraphDagCborBlake2b256 | DbColumn::PersistentGraph => {
52                        parity_db::ColumnOptions {
53                            preimage: true,
54                            compression,
55                            ..Default::default()
56                        }
57                    }
58                    DbColumn::GraphFull => parity_db::ColumnOptions {
59                        preimage: true,
60                        // This is needed for key retrieval.
61                        btree_index: true,
62                        compression,
63                        ..Default::default()
64                    },
65                    DbColumn::Settings => parity_db::ColumnOptions {
66                        // explicitly disable preimage for settings column
67                        // othewise we are not able to overwrite entries
68                        preimage: false,
69                        // This is needed for key retrieval.
70                        btree_index: true,
71                        compression,
72                        ..Default::default()
73                    },
74                    DbColumn::EthMappings => parity_db::ColumnOptions {
75                        preimage: false,
76                        btree_index: false,
77                        compression,
78                        ..Default::default()
79                    },
80                    DbColumn::Indices => parity_db::ColumnOptions {
81                        preimage: false,
82                        btree_index: false,
83                        compression,
84                        ..Default::default()
85                    },
86                }
87            })
88            .collect()
89    }
90}
91
92type WriteOpsBroadcastTxSender = tokio::sync::broadcast::Sender<(Cid, Vec<u8>)>;
93
94pub struct ParityDb {
95    pub db: parity_db::Db,
96    statistics_enabled: bool,
97    // This is needed to maintain backwards-compatibility for pre-persistent-column migrations.
98    disable_persistent_fallback: bool,
99    write_ops_broadcast_tx: RwLock<Option<WriteOpsBroadcastTxSender>>,
100}
101
102impl ParityDb {
103    pub fn to_options(path: PathBuf, config: &ParityDbConfig) -> Options {
104        Options {
105            path,
106            sync_wal: true,
107            sync_data: true,
108            stats: config.enable_statistics,
109            salt: None,
110            columns: DbColumn::create_column_options(CompressionType::Lz4),
111            compression_threshold: [(0, 128)].into_iter().collect(),
112        }
113    }
114
115    pub fn open(path: impl Into<PathBuf>, config: &ParityDbConfig) -> anyhow::Result<Self> {
116        let opts = Self::to_options(path.into(), config);
117        Ok(Self {
118            db: Db::open_or_create(&opts)?,
119            statistics_enabled: opts.stats,
120            disable_persistent_fallback: false,
121            write_ops_broadcast_tx: RwLock::new(None),
122        })
123    }
124
125    /// Returns an appropriate column variant based on the information
126    /// in the Cid.
127    fn choose_column(cid: &Cid) -> DbColumn {
128        match cid.codec() {
129            DAG_CBOR if cid.hash().code() == u64::from(MultihashCode::Blake2b256) => {
130                DbColumn::GraphDagCborBlake2b256
131            }
132            _ => DbColumn::GraphFull,
133        }
134    }
135
136    fn read_from_column<K>(&self, key: K, column: DbColumn) -> anyhow::Result<Option<Vec<u8>>>
137    where
138        K: AsRef<[u8]>,
139    {
140        self.db
141            .get(column as u8, key.as_ref())
142            .map_err(|e| anyhow!("error from column {column}: {e}"))
143    }
144
145    fn write_to_column<K, V>(&self, key: K, value: V, column: DbColumn) -> anyhow::Result<()>
146    where
147        K: AsRef<[u8]>,
148        V: AsRef<[u8]>,
149    {
150        let tx = [(column as u8, key.as_ref(), Some(value.as_ref().to_vec()))];
151        self.db
152            .commit(tx)
153            .map_err(|e| anyhow!("error writing to column {column}: {e}"))
154    }
155}
156
157impl SettingsStore for ParityDb {
158    fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
159        self.read_from_column(key.as_bytes(), DbColumn::Settings)
160    }
161
162    fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> {
163        self.write_to_column(key.as_bytes(), value, DbColumn::Settings)
164    }
165
166    fn exists(&self, key: &str) -> anyhow::Result<bool> {
167        self.db
168            .get_size(DbColumn::Settings as u8, key.as_bytes())
169            .map(|size| size.is_some())
170            .context("error checking if key exists")
171    }
172
173    fn setting_keys(&self) -> anyhow::Result<Vec<String>> {
174        let mut iter = self.db.iter(DbColumn::Settings as u8)?;
175        let mut keys = vec![];
176        while let Some((key, _)) = iter.next()? {
177            keys.push(String::from_utf8(key)?);
178        }
179        Ok(keys)
180    }
181}
182
183impl super::HeaviestTipsetKeyProvider for ParityDb {
184    fn heaviest_tipset_key(&self) -> anyhow::Result<TipsetKey> {
185        super::SettingsStoreExt::read_obj::<TipsetKey>(self, super::setting_keys::HEAD_KEY)?
186            .context("head key not found")
187    }
188
189    fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
190        super::SettingsStoreExt::write_obj(self, super::setting_keys::HEAD_KEY, tsk)
191    }
192}
193
194impl EthMappingsStore for ParityDb {
195    fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
196        self.read_from_column(key.0.as_bytes(), DbColumn::EthMappings)
197    }
198
199    fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> {
200        self.write_to_column(key.0.as_bytes(), value, DbColumn::EthMappings)
201    }
202
203    fn exists(&self, key: &EthHash) -> anyhow::Result<bool> {
204        self.db
205            .get_size(DbColumn::EthMappings as u8, key.0.as_bytes())
206            .map(|size| size.is_some())
207            .context("error checking if key exists")
208    }
209
210    fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
211        let mut cids = Vec::new();
212
213        self.db
214            .iter_column_while(DbColumn::EthMappings as u8, |val| {
215                if let Ok(value) = fvm_ipld_encoding::from_slice::<(Cid, u64)>(&val.value) {
216                    cids.push(value);
217                }
218                true
219            })?;
220
221        Ok(cids)
222    }
223
224    fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()> {
225        Ok(self.db.commit_changes(keys.into_iter().map(|key| {
226            let bytes = key.0.as_bytes().to_vec();
227            (DbColumn::EthMappings as u8, Operation::Dereference(bytes))
228        }))?)
229    }
230}
231
232impl IndicesStore for ParityDb {
233    fn read_bin(&self, key: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
234        self.read_from_column(key.to_bytes(), DbColumn::Indices)
235    }
236
237    fn write_bin(&self, key: &Cid, value: &[u8]) -> anyhow::Result<()> {
238        self.write_to_column(key.to_bytes(), value, DbColumn::Indices)
239    }
240
241    fn exists(&self, key: &Cid) -> anyhow::Result<bool> {
242        self.db
243            .get_size(DbColumn::Indices as u8, &key.to_bytes())
244            .map(|size| size.is_some())
245            .context("error checking if key exists")
246    }
247}
248
249fn has_subscribers<T>(tx: &tokio::sync::broadcast::Sender<T>) -> bool {
250    tx.closed().now_or_never().is_none()
251}
252
253impl Blockstore for ParityDb {
254    fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
255        let column = Self::choose_column(k);
256        let res = self.read_from_column(k.to_bytes(), column)?;
257        if res.is_some() {
258            return Ok(res);
259        }
260        self.get_persistent(k)
261    }
262
263    fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
264        let column = Self::choose_column(k);
265        // We can put the data directly into the database without any encoding.
266        self.write_to_column(k.to_bytes(), block, column)?;
267        match &*self.write_ops_broadcast_tx.read() {
268            Some(tx) if has_subscribers(tx) => {
269                let _ = tx.send((*k, block.to_vec()));
270            }
271            _ => {}
272        }
273
274        Ok(())
275    }
276
277    fn put_many_keyed<D, I>(&self, blocks: I) -> anyhow::Result<()>
278    where
279        Self: Sized,
280        D: AsRef<[u8]>,
281        I: IntoIterator<Item = (Cid, D)>,
282    {
283        let tx_opt: &Option<tokio::sync::broadcast::Sender<(cid::CidGeneric<64>, Vec<u8>)>> =
284            &self.write_ops_broadcast_tx.read();
285        let has_subscribers = tx_opt.as_ref().map(has_subscribers).unwrap_or_default();
286        let mut values_for_subscriber = vec![];
287        let values = blocks.into_iter().map(|(k, v)| {
288            let column = Self::choose_column(&k);
289            let v = v.as_ref().to_vec();
290            if has_subscribers {
291                values_for_subscriber.push((k, v.clone()));
292            }
293            (column, k.to_bytes(), v)
294        });
295        let tx = values
296            .into_iter()
297            .map(|(col, k, v)| (col as u8, Operation::Set(k, v)));
298        self.db
299            .commit_changes(tx)
300            .map_err(|e| anyhow!("error bulk writing: {e}"))?;
301        if let Some(tx) = tx_opt {
302            for i in values_for_subscriber {
303                let _ = tx.send(i);
304            }
305        }
306        Ok(())
307    }
308}
309
310impl PersistentStore for ParityDb {
311    fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
312        self.write_to_column(k.to_bytes(), block, DbColumn::PersistentGraph)
313    }
314}
315
316impl BitswapStoreRead for ParityDb {
317    fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
318        // We need to check both columns because we don't know which one
319        // the data is in. The order is important because most data will
320        // be in the [`DbColumn::GraphDagCborBlake2b256`] column and so
321        // it directly affects performance. If this assumption ever changes
322        // then this code should be modified accordingly.
323        for column in [DbColumn::GraphDagCborBlake2b256, DbColumn::GraphFull] {
324            if self
325                .db
326                .get_size(column as u8, &cid.to_bytes())
327                .context("error checking if key exists")?
328                .is_some()
329            {
330                return Ok(true);
331            }
332        }
333        Ok(false)
334    }
335
336    fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
337        Blockstore::get(self, cid)
338    }
339}
340
341impl BitswapStoreReadWrite for ParityDb {
342    type Hashes = MultihashCode;
343
344    fn insert(&self, block: &crate::libp2p_bitswap::Block64<Self::Hashes>) -> anyhow::Result<()> {
345        self.put_keyed(block.cid(), block.data())
346    }
347}
348
349impl DBStatistics for ParityDb {
350    fn get_statistics(&self) -> Option<String> {
351        if !self.statistics_enabled {
352            return None;
353        }
354
355        let mut buf = Vec::new();
356        if let Err(err) = self.db.write_stats_text(&mut buf, None) {
357            warn!("Unable to write database statistics: {err}");
358            return None;
359        }
360
361        match String::from_utf8(buf) {
362            Ok(stats) => Some(stats),
363            Err(e) => {
364                warn!("Malformed statistics: {e}");
365                None
366            }
367        }
368    }
369}
370
371type Op = (u8, Operation<Vec<u8>, Vec<u8>>);
372
373impl ParityDb {
374    /// Removes a record.
375    ///
376    /// # Arguments
377    /// * `key` - record identifier
378    #[allow(dead_code)]
379    pub fn dereference_operation(key: &Cid) -> Op {
380        let column = Self::choose_column(key);
381        (column as u8, Operation::Dereference(key.to_bytes()))
382    }
383
384    /// Updates/inserts a record.
385    ///
386    /// # Arguments
387    /// * `column` - column identifier
388    /// * `key` - record identifier
389    /// * `value` - record contents
390    pub fn set_operation(column: u8, key: Vec<u8>, value: Vec<u8>) -> Op {
391        (column, Operation::Set(key, value))
392    }
393
394    // Get data from persistent graph column.
395    fn get_persistent(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
396        if self.disable_persistent_fallback {
397            return Ok(None);
398        }
399        self.read_from_column(k.to_bytes(), DbColumn::PersistentGraph)
400    }
401}
402
403impl super::BlockstoreWriteOpsSubscribable for ParityDb {
404    fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver<(Cid, Vec<u8>)> {
405        let tx_lock = self.write_ops_broadcast_tx.read();
406        if let Some(tx) = &*tx_lock {
407            return tx.subscribe();
408        }
409        drop(tx_lock);
410        let (tx, rx) = tokio::sync::broadcast::channel(8192);
411        *self.write_ops_broadcast_tx.write() = Some(tx);
412        rx
413    }
414
415    fn unsubscribe_write_ops(&self) {
416        self.write_ops_broadcast_tx.write().take();
417    }
418}
419
420#[cfg(test)]
421mod test {
422    use super::*;
423    use crate::db::{BlockstoreWriteOpsSubscribable, tests::db_utils::parity::TempParityDB};
424    use fvm_ipld_encoding::IPLD_RAW;
425    use nom::AsBytes;
426    use std::ops::Deref;
427
428    #[test]
429    fn write_read_different_columns_test() {
430        let db = TempParityDB::new();
431        let data = [
432            b"h'nglui mglw'nafh".to_vec(),
433            b"Cthulhu".to_vec(),
434            b"R'lyeh wgah'nagl fhtagn!!".to_vec(),
435        ];
436        let cids = [
437            Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data[0])),
438            Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data[1])),
439            Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data[1])),
440        ];
441
442        let cases = [
443            (DbColumn::GraphDagCborBlake2b256, cids[0], &data[0]),
444            (DbColumn::GraphFull, cids[1], &data[1]),
445            (DbColumn::GraphFull, cids[2], &data[2]),
446        ];
447
448        for (_, cid, data) in cases {
449            db.put_keyed(&cid, data).unwrap();
450        }
451
452        for (column, cid, data) in cases {
453            let actual = db
454                .read_from_column(cid.to_bytes(), column)
455                .unwrap()
456                .expect("data not found");
457            assert_eq!(data, actual.as_bytes());
458
459            // assert that the data is NOT in the other column
460            let other_column = match column {
461                DbColumn::GraphDagCborBlake2b256 => DbColumn::GraphFull,
462                DbColumn::GraphFull => DbColumn::GraphDagCborBlake2b256,
463                DbColumn::Settings => panic!("invalid column for IPLD data"),
464                DbColumn::EthMappings => panic!("invalid column for IPLD data"),
465                DbColumn::PersistentGraph => panic!("invalid column for GC enabled IPLD data"),
466                DbColumn::Indices => panic!("invalid indices column for IPLD data"),
467            };
468            let actual = db.read_from_column(cid.to_bytes(), other_column).unwrap();
469            assert!(actual.is_none());
470
471            // Blockstore API usage should be transparent
472            let actual = fvm_ipld_blockstore::Blockstore::get(db.as_ref(), &cid)
473                .unwrap()
474                .expect("data not found");
475            assert_eq!(data, actual.as_slice());
476        }
477
478        // Check non-IPLD column as well
479        db.write_to_column(b"dagon", b"bloop", DbColumn::Settings)
480            .unwrap();
481        let actual = db
482            .read_from_column(b"dagon", DbColumn::Settings)
483            .unwrap()
484            .expect("data not found");
485        assert_eq!(b"bloop", actual.as_bytes());
486    }
487
488    #[test]
489    fn choose_column_test() {
490        let data = [0u8; 32];
491        let cases = [
492            (
493                Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data)),
494                DbColumn::GraphDagCborBlake2b256,
495            ),
496            (
497                Cid::new_v1(
498                    fvm_ipld_encoding::CBOR,
499                    MultihashCode::Blake2b256.digest(&data),
500                ),
501                DbColumn::GraphFull,
502            ),
503            (
504                Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data)),
505                DbColumn::GraphFull,
506            ),
507        ];
508
509        for (cid, expected) in cases {
510            let actual = ParityDb::choose_column(&cid);
511            assert_eq!(expected, actual);
512        }
513    }
514
515    #[test]
516    fn persistent_tests() {
517        let db = TempParityDB::new();
518        let data = [
519            b"h'nglui mglw'nafh".to_vec(),
520            b"Cthulhu".to_vec(),
521            b"R'lyeh wgah'nagl fhtagn!!".to_vec(),
522        ];
523
524        let persistent_data = data
525            .clone()
526            .into_iter()
527            .map(|mut entry| {
528                entry.push(255);
529                entry
530            })
531            .collect::<Vec<Vec<u8>>>();
532
533        let cids = [
534            Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data[0])),
535            Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data[1])),
536            Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data[1])),
537        ];
538
539        for idx in 0..3 {
540            let cid = &cids[idx];
541            let persistent_entry = &persistent_data[idx];
542            let data_entry = &data[idx];
543            db.put_keyed_persistent(cid, persistent_entry).unwrap();
544            // Check that we get persistent data if the data is otherwise absent from the GC enabled
545            // storage.
546            assert_eq!(
547                Blockstore::get(db.deref(), cid).unwrap(),
548                Some(persistent_entry.clone())
549            );
550            assert!(
551                db.read_from_column(cid.to_bytes(), DbColumn::PersistentGraph)
552                    .unwrap()
553                    .is_some()
554            );
555            db.put_keyed(cid, data_entry).unwrap();
556            assert_eq!(
557                Blockstore::get(db.deref(), cid).unwrap(),
558                Some(data_entry.clone())
559            );
560        }
561    }
562
563    #[test]
564    fn subscription_tests() {
565        let db = TempParityDB::new();
566        assert!(
567            db.db
568                .as_ref()
569                .unwrap()
570                .write_ops_broadcast_tx
571                .read()
572                .is_none()
573        );
574        let data = [
575            b"h'nglui mglw'nafh".to_vec(),
576            b"Cthulhu".to_vec(),
577            b"R'lyeh wgah'nagl fhtagn!!".to_vec(),
578        ];
579
580        let cids = [
581            Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data[0])),
582            Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data[1])),
583            Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data[1])),
584        ];
585
586        let mut rx1 = db.db.as_ref().unwrap().subscribe_write_ops();
587        let mut rx2 = db.db.as_ref().unwrap().subscribe_write_ops();
588
589        assert!(has_subscribers(
590            db.db
591                .as_ref()
592                .unwrap()
593                .write_ops_broadcast_tx
594                .read()
595                .as_ref()
596                .unwrap()
597        ));
598
599        for (idx, cid) in cids.iter().enumerate() {
600            let data_entry = &data[idx];
601            db.put_keyed(cid, data_entry).unwrap();
602            assert_eq!(rx1.blocking_recv().unwrap(), (*cid, data_entry.clone()));
603            assert_eq!(rx2.blocking_recv().unwrap(), (*cid, data_entry.clone()));
604        }
605
606        drop(rx1);
607        drop(rx2);
608
609        assert!(!has_subscribers(
610            db.db
611                .as_ref()
612                .unwrap()
613                .write_ops_broadcast_tx
614                .read()
615                .as_ref()
616                .unwrap()
617        ));
618
619        db.db.as_ref().unwrap().unsubscribe_write_ops();
620
621        assert!(
622            db.db
623                .as_ref()
624                .unwrap()
625                .write_ops_broadcast_tx
626                .read()
627                .is_none()
628        );
629    }
630}