1use super::{EthMappingsStore, IndicesStore, PersistentStore, SettingsStore};
5use crate::blocks::TipsetKey;
6use crate::db::{DBStatistics, parity_db_config::ParityDbConfig};
7use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite};
8use crate::rpc::eth::types::EthHash;
9use crate::utils::multihash::prelude::*;
10use anyhow::{Context as _, anyhow};
11use cid::Cid;
12use futures::FutureExt;
13use fvm_ipld_blockstore::Blockstore;
14use fvm_ipld_encoding::DAG_CBOR;
15use parity_db::{CompressionType, Db, Operation, Options};
16use parking_lot::RwLock;
17use std::path::PathBuf;
18use strum::{Display, EnumIter, FromRepr, IntoEnumIterator};
19use tracing::warn;
20
21#[derive(Copy, Clone, Debug, Display, PartialEq, FromRepr, EnumIter)]
24#[repr(u8)]
25pub enum DbColumn {
26 GraphDagCborBlake2b256,
29 GraphFull,
34 Settings,
36 EthMappings,
38 PersistentGraph,
42 Indices,
44}
45
46impl DbColumn {
47 fn create_column_options(compression: CompressionType) -> Vec<parity_db::ColumnOptions> {
48 DbColumn::iter()
49 .map(|col| {
50 match col {
51 DbColumn::GraphDagCborBlake2b256 | DbColumn::PersistentGraph => {
52 parity_db::ColumnOptions {
53 preimage: true,
54 compression,
55 ..Default::default()
56 }
57 }
58 DbColumn::GraphFull => parity_db::ColumnOptions {
59 preimage: true,
60 btree_index: true,
62 compression,
63 ..Default::default()
64 },
65 DbColumn::Settings => parity_db::ColumnOptions {
66 preimage: false,
69 btree_index: true,
71 compression,
72 ..Default::default()
73 },
74 DbColumn::EthMappings => parity_db::ColumnOptions {
75 preimage: false,
76 btree_index: false,
77 compression,
78 ..Default::default()
79 },
80 DbColumn::Indices => parity_db::ColumnOptions {
81 preimage: false,
82 btree_index: false,
83 compression,
84 ..Default::default()
85 },
86 }
87 })
88 .collect()
89 }
90}
91
92type WriteOpsBroadcastTxSender = tokio::sync::broadcast::Sender<(Cid, Vec<u8>)>;
93
94pub struct ParityDb {
95 pub db: parity_db::Db,
96 statistics_enabled: bool,
97 disable_persistent_fallback: bool,
99 write_ops_broadcast_tx: RwLock<Option<WriteOpsBroadcastTxSender>>,
100}
101
102impl ParityDb {
103 pub fn to_options(path: PathBuf, config: &ParityDbConfig) -> Options {
104 Options {
105 path,
106 sync_wal: true,
107 sync_data: true,
108 stats: config.enable_statistics,
109 salt: None,
110 columns: DbColumn::create_column_options(CompressionType::Lz4),
111 compression_threshold: [(0, 128)].into_iter().collect(),
112 }
113 }
114
115 pub fn open(path: impl Into<PathBuf>, config: &ParityDbConfig) -> anyhow::Result<Self> {
116 let opts = Self::to_options(path.into(), config);
117 Ok(Self {
118 db: Db::open_or_create(&opts)?,
119 statistics_enabled: opts.stats,
120 disable_persistent_fallback: false,
121 write_ops_broadcast_tx: RwLock::new(None),
122 })
123 }
124
125 fn choose_column(cid: &Cid) -> DbColumn {
128 match cid.codec() {
129 DAG_CBOR if cid.hash().code() == u64::from(MultihashCode::Blake2b256) => {
130 DbColumn::GraphDagCborBlake2b256
131 }
132 _ => DbColumn::GraphFull,
133 }
134 }
135
136 fn read_from_column<K>(&self, key: K, column: DbColumn) -> anyhow::Result<Option<Vec<u8>>>
137 where
138 K: AsRef<[u8]>,
139 {
140 self.db
141 .get(column as u8, key.as_ref())
142 .map_err(|e| anyhow!("error from column {column}: {e}"))
143 }
144
145 fn write_to_column<K, V>(&self, key: K, value: V, column: DbColumn) -> anyhow::Result<()>
146 where
147 K: AsRef<[u8]>,
148 V: AsRef<[u8]>,
149 {
150 let tx = [(column as u8, key.as_ref(), Some(value.as_ref().to_vec()))];
151 self.db
152 .commit(tx)
153 .map_err(|e| anyhow!("error writing to column {column}: {e}"))
154 }
155}
156
157impl SettingsStore for ParityDb {
158 fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
159 self.read_from_column(key.as_bytes(), DbColumn::Settings)
160 }
161
162 fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> {
163 self.write_to_column(key.as_bytes(), value, DbColumn::Settings)
164 }
165
166 fn exists(&self, key: &str) -> anyhow::Result<bool> {
167 self.db
168 .get_size(DbColumn::Settings as u8, key.as_bytes())
169 .map(|size| size.is_some())
170 .context("error checking if key exists")
171 }
172
173 fn setting_keys(&self) -> anyhow::Result<Vec<String>> {
174 let mut iter = self.db.iter(DbColumn::Settings as u8)?;
175 let mut keys = vec![];
176 while let Some((key, _)) = iter.next()? {
177 keys.push(String::from_utf8(key)?);
178 }
179 Ok(keys)
180 }
181}
182
183impl super::HeaviestTipsetKeyProvider for ParityDb {
184 fn heaviest_tipset_key(&self) -> anyhow::Result<TipsetKey> {
185 super::SettingsStoreExt::read_obj::<TipsetKey>(self, super::setting_keys::HEAD_KEY)?
186 .context("head key not found")
187 }
188
189 fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
190 super::SettingsStoreExt::write_obj(self, super::setting_keys::HEAD_KEY, tsk)
191 }
192}
193
194impl EthMappingsStore for ParityDb {
195 fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
196 self.read_from_column(key.0.as_bytes(), DbColumn::EthMappings)
197 }
198
199 fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> {
200 self.write_to_column(key.0.as_bytes(), value, DbColumn::EthMappings)
201 }
202
203 fn exists(&self, key: &EthHash) -> anyhow::Result<bool> {
204 self.db
205 .get_size(DbColumn::EthMappings as u8, key.0.as_bytes())
206 .map(|size| size.is_some())
207 .context("error checking if key exists")
208 }
209
210 fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
211 let mut cids = Vec::new();
212
213 self.db
214 .iter_column_while(DbColumn::EthMappings as u8, |val| {
215 if let Ok(value) = fvm_ipld_encoding::from_slice::<(Cid, u64)>(&val.value) {
216 cids.push(value);
217 }
218 true
219 })?;
220
221 Ok(cids)
222 }
223
224 fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()> {
225 Ok(self.db.commit_changes(keys.into_iter().map(|key| {
226 let bytes = key.0.as_bytes().to_vec();
227 (DbColumn::EthMappings as u8, Operation::Dereference(bytes))
228 }))?)
229 }
230}
231
232impl IndicesStore for ParityDb {
233 fn read_bin(&self, key: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
234 self.read_from_column(key.to_bytes(), DbColumn::Indices)
235 }
236
237 fn write_bin(&self, key: &Cid, value: &[u8]) -> anyhow::Result<()> {
238 self.write_to_column(key.to_bytes(), value, DbColumn::Indices)
239 }
240
241 fn exists(&self, key: &Cid) -> anyhow::Result<bool> {
242 self.db
243 .get_size(DbColumn::Indices as u8, &key.to_bytes())
244 .map(|size| size.is_some())
245 .context("error checking if key exists")
246 }
247}
248
249fn has_subscribers<T>(tx: &tokio::sync::broadcast::Sender<T>) -> bool {
250 tx.closed().now_or_never().is_none()
251}
252
253impl Blockstore for ParityDb {
254 fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
255 let column = Self::choose_column(k);
256 let res = self.read_from_column(k.to_bytes(), column)?;
257 if res.is_some() {
258 return Ok(res);
259 }
260 self.get_persistent(k)
261 }
262
263 fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
264 let column = Self::choose_column(k);
265 self.write_to_column(k.to_bytes(), block, column)?;
267 match &*self.write_ops_broadcast_tx.read() {
268 Some(tx) if has_subscribers(tx) => {
269 let _ = tx.send((*k, block.to_vec()));
270 }
271 _ => {}
272 }
273
274 Ok(())
275 }
276
277 fn put_many_keyed<D, I>(&self, blocks: I) -> anyhow::Result<()>
278 where
279 Self: Sized,
280 D: AsRef<[u8]>,
281 I: IntoIterator<Item = (Cid, D)>,
282 {
283 let tx_opt: &Option<tokio::sync::broadcast::Sender<(cid::CidGeneric<64>, Vec<u8>)>> =
284 &self.write_ops_broadcast_tx.read();
285 let has_subscribers = tx_opt.as_ref().map(has_subscribers).unwrap_or_default();
286 let mut values_for_subscriber = vec![];
287 let values = blocks.into_iter().map(|(k, v)| {
288 let column = Self::choose_column(&k);
289 let v = v.as_ref().to_vec();
290 if has_subscribers {
291 values_for_subscriber.push((k, v.clone()));
292 }
293 (column, k.to_bytes(), v)
294 });
295 let tx = values
296 .into_iter()
297 .map(|(col, k, v)| (col as u8, Operation::Set(k, v)));
298 self.db
299 .commit_changes(tx)
300 .map_err(|e| anyhow!("error bulk writing: {e}"))?;
301 if let Some(tx) = tx_opt {
302 for i in values_for_subscriber {
303 let _ = tx.send(i);
304 }
305 }
306 Ok(())
307 }
308}
309
310impl PersistentStore for ParityDb {
311 fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
312 self.write_to_column(k.to_bytes(), block, DbColumn::PersistentGraph)
313 }
314}
315
316impl BitswapStoreRead for ParityDb {
317 fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
318 for column in [DbColumn::GraphDagCborBlake2b256, DbColumn::GraphFull] {
324 if self
325 .db
326 .get_size(column as u8, &cid.to_bytes())
327 .context("error checking if key exists")?
328 .is_some()
329 {
330 return Ok(true);
331 }
332 }
333 Ok(false)
334 }
335
336 fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
337 Blockstore::get(self, cid)
338 }
339}
340
341impl BitswapStoreReadWrite for ParityDb {
342 type Hashes = MultihashCode;
343
344 fn insert(&self, block: &crate::libp2p_bitswap::Block64<Self::Hashes>) -> anyhow::Result<()> {
345 self.put_keyed(block.cid(), block.data())
346 }
347}
348
349impl DBStatistics for ParityDb {
350 fn get_statistics(&self) -> Option<String> {
351 if !self.statistics_enabled {
352 return None;
353 }
354
355 let mut buf = Vec::new();
356 if let Err(err) = self.db.write_stats_text(&mut buf, None) {
357 warn!("Unable to write database statistics: {err}");
358 return None;
359 }
360
361 match String::from_utf8(buf) {
362 Ok(stats) => Some(stats),
363 Err(e) => {
364 warn!("Malformed statistics: {e}");
365 None
366 }
367 }
368 }
369}
370
371type Op = (u8, Operation<Vec<u8>, Vec<u8>>);
372
373impl ParityDb {
374 #[allow(dead_code)]
379 pub fn dereference_operation(key: &Cid) -> Op {
380 let column = Self::choose_column(key);
381 (column as u8, Operation::Dereference(key.to_bytes()))
382 }
383
384 pub fn set_operation(column: u8, key: Vec<u8>, value: Vec<u8>) -> Op {
391 (column, Operation::Set(key, value))
392 }
393
394 fn get_persistent(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
396 if self.disable_persistent_fallback {
397 return Ok(None);
398 }
399 self.read_from_column(k.to_bytes(), DbColumn::PersistentGraph)
400 }
401}
402
403impl super::BlockstoreWriteOpsSubscribable for ParityDb {
404 fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver<(Cid, Vec<u8>)> {
405 let tx_lock = self.write_ops_broadcast_tx.read();
406 if let Some(tx) = &*tx_lock {
407 return tx.subscribe();
408 }
409 drop(tx_lock);
410 let (tx, rx) = tokio::sync::broadcast::channel(8192);
411 *self.write_ops_broadcast_tx.write() = Some(tx);
412 rx
413 }
414
415 fn unsubscribe_write_ops(&self) {
416 self.write_ops_broadcast_tx.write().take();
417 }
418}
419
420#[cfg(test)]
421mod test {
422 use super::*;
423 use crate::db::{BlockstoreWriteOpsSubscribable, tests::db_utils::parity::TempParityDB};
424 use fvm_ipld_encoding::IPLD_RAW;
425 use nom::AsBytes;
426 use std::ops::Deref;
427
428 #[test]
429 fn write_read_different_columns_test() {
430 let db = TempParityDB::new();
431 let data = [
432 b"h'nglui mglw'nafh".to_vec(),
433 b"Cthulhu".to_vec(),
434 b"R'lyeh wgah'nagl fhtagn!!".to_vec(),
435 ];
436 let cids = [
437 Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data[0])),
438 Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data[1])),
439 Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data[1])),
440 ];
441
442 let cases = [
443 (DbColumn::GraphDagCborBlake2b256, cids[0], &data[0]),
444 (DbColumn::GraphFull, cids[1], &data[1]),
445 (DbColumn::GraphFull, cids[2], &data[2]),
446 ];
447
448 for (_, cid, data) in cases {
449 db.put_keyed(&cid, data).unwrap();
450 }
451
452 for (column, cid, data) in cases {
453 let actual = db
454 .read_from_column(cid.to_bytes(), column)
455 .unwrap()
456 .expect("data not found");
457 assert_eq!(data, actual.as_bytes());
458
459 let other_column = match column {
461 DbColumn::GraphDagCborBlake2b256 => DbColumn::GraphFull,
462 DbColumn::GraphFull => DbColumn::GraphDagCborBlake2b256,
463 DbColumn::Settings => panic!("invalid column for IPLD data"),
464 DbColumn::EthMappings => panic!("invalid column for IPLD data"),
465 DbColumn::PersistentGraph => panic!("invalid column for GC enabled IPLD data"),
466 DbColumn::Indices => panic!("invalid indices column for IPLD data"),
467 };
468 let actual = db.read_from_column(cid.to_bytes(), other_column).unwrap();
469 assert!(actual.is_none());
470
471 let actual = fvm_ipld_blockstore::Blockstore::get(db.as_ref(), &cid)
473 .unwrap()
474 .expect("data not found");
475 assert_eq!(data, actual.as_slice());
476 }
477
478 db.write_to_column(b"dagon", b"bloop", DbColumn::Settings)
480 .unwrap();
481 let actual = db
482 .read_from_column(b"dagon", DbColumn::Settings)
483 .unwrap()
484 .expect("data not found");
485 assert_eq!(b"bloop", actual.as_bytes());
486 }
487
488 #[test]
489 fn choose_column_test() {
490 let data = [0u8; 32];
491 let cases = [
492 (
493 Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data)),
494 DbColumn::GraphDagCborBlake2b256,
495 ),
496 (
497 Cid::new_v1(
498 fvm_ipld_encoding::CBOR,
499 MultihashCode::Blake2b256.digest(&data),
500 ),
501 DbColumn::GraphFull,
502 ),
503 (
504 Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data)),
505 DbColumn::GraphFull,
506 ),
507 ];
508
509 for (cid, expected) in cases {
510 let actual = ParityDb::choose_column(&cid);
511 assert_eq!(expected, actual);
512 }
513 }
514
515 #[test]
516 fn persistent_tests() {
517 let db = TempParityDB::new();
518 let data = [
519 b"h'nglui mglw'nafh".to_vec(),
520 b"Cthulhu".to_vec(),
521 b"R'lyeh wgah'nagl fhtagn!!".to_vec(),
522 ];
523
524 let persistent_data = data
525 .clone()
526 .into_iter()
527 .map(|mut entry| {
528 entry.push(255);
529 entry
530 })
531 .collect::<Vec<Vec<u8>>>();
532
533 let cids = [
534 Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data[0])),
535 Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data[1])),
536 Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data[1])),
537 ];
538
539 for idx in 0..3 {
540 let cid = &cids[idx];
541 let persistent_entry = &persistent_data[idx];
542 let data_entry = &data[idx];
543 db.put_keyed_persistent(cid, persistent_entry).unwrap();
544 assert_eq!(
547 Blockstore::get(db.deref(), cid).unwrap(),
548 Some(persistent_entry.clone())
549 );
550 assert!(
551 db.read_from_column(cid.to_bytes(), DbColumn::PersistentGraph)
552 .unwrap()
553 .is_some()
554 );
555 db.put_keyed(cid, data_entry).unwrap();
556 assert_eq!(
557 Blockstore::get(db.deref(), cid).unwrap(),
558 Some(data_entry.clone())
559 );
560 }
561 }
562
563 #[test]
564 fn subscription_tests() {
565 let db = TempParityDB::new();
566 assert!(
567 db.db
568 .as_ref()
569 .unwrap()
570 .write_ops_broadcast_tx
571 .read()
572 .is_none()
573 );
574 let data = [
575 b"h'nglui mglw'nafh".to_vec(),
576 b"Cthulhu".to_vec(),
577 b"R'lyeh wgah'nagl fhtagn!!".to_vec(),
578 ];
579
580 let cids = [
581 Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data[0])),
582 Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data[1])),
583 Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data[1])),
584 ];
585
586 let mut rx1 = db.db.as_ref().unwrap().subscribe_write_ops();
587 let mut rx2 = db.db.as_ref().unwrap().subscribe_write_ops();
588
589 assert!(has_subscribers(
590 db.db
591 .as_ref()
592 .unwrap()
593 .write_ops_broadcast_tx
594 .read()
595 .as_ref()
596 .unwrap()
597 ));
598
599 for (idx, cid) in cids.iter().enumerate() {
600 let data_entry = &data[idx];
601 db.put_keyed(cid, data_entry).unwrap();
602 assert_eq!(rx1.blocking_recv().unwrap(), (*cid, data_entry.clone()));
603 assert_eq!(rx2.blocking_recv().unwrap(), (*cid, data_entry.clone()));
604 }
605
606 drop(rx1);
607 drop(rx2);
608
609 assert!(!has_subscribers(
610 db.db
611 .as_ref()
612 .unwrap()
613 .write_ops_broadcast_tx
614 .read()
615 .as_ref()
616 .unwrap()
617 ));
618
619 db.db.as_ref().unwrap().unsubscribe_write_ops();
620
621 assert!(
622 db.db
623 .as_ref()
624 .unwrap()
625 .write_ops_broadcast_tx
626 .read()
627 .is_none()
628 );
629 }
630}