1use super::{EthMappingsStore, PersistentStore, SettingsStore};
5use crate::blocks::TipsetKey;
6use crate::db::{DBStatistics, parity_db_config::ParityDbConfig};
7use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite};
8use crate::rpc::eth::types::EthHash;
9use crate::utils::multihash::prelude::*;
10use anyhow::{Context as _, anyhow};
11use cid::Cid;
12use futures::FutureExt;
13use fvm_ipld_blockstore::Blockstore;
14use fvm_ipld_encoding::DAG_CBOR;
15use parity_db::{CompressionType, Db, Operation, Options};
16use parking_lot::RwLock;
17use std::path::PathBuf;
18use strum::{Display, EnumIter, FromRepr, IntoEnumIterator};
19use tracing::warn;
20
21#[derive(Copy, Clone, Debug, Display, PartialEq, FromRepr, EnumIter)]
24#[repr(u8)]
25pub enum DbColumn {
26 GraphDagCborBlake2b256,
29 GraphFull,
34 Settings,
36 EthMappings,
38 PersistentGraph,
42}
43
44impl DbColumn {
45 fn create_column_options(compression: CompressionType) -> Vec<parity_db::ColumnOptions> {
46 DbColumn::iter()
47 .map(|col| {
48 match col {
49 DbColumn::GraphDagCborBlake2b256 | DbColumn::PersistentGraph => {
50 parity_db::ColumnOptions {
51 preimage: true,
52 compression,
53 ..Default::default()
54 }
55 }
56 DbColumn::GraphFull => parity_db::ColumnOptions {
57 preimage: true,
58 btree_index: true,
60 compression,
61 ..Default::default()
62 },
63 DbColumn::Settings => parity_db::ColumnOptions {
64 preimage: false,
67 btree_index: true,
69 compression,
70 ..Default::default()
71 },
72 DbColumn::EthMappings => parity_db::ColumnOptions {
73 preimage: false,
74 btree_index: false,
75 compression,
76 ..Default::default()
77 },
78 }
79 })
80 .collect()
81 }
82}
83
84type WriteOpsBroadcastTxSender = tokio::sync::broadcast::Sender<(Cid, Vec<u8>)>;
85
86pub struct ParityDb {
87 pub db: parity_db::Db,
88 statistics_enabled: bool,
89 disable_persistent_fallback: bool,
91 write_ops_broadcast_tx: RwLock<Option<WriteOpsBroadcastTxSender>>,
92}
93
94impl ParityDb {
95 pub fn to_options(path: PathBuf, config: &ParityDbConfig) -> Options {
96 Options {
97 path,
98 sync_wal: true,
99 sync_data: true,
100 stats: config.enable_statistics,
101 salt: None,
102 columns: DbColumn::create_column_options(CompressionType::Lz4),
103 compression_threshold: [(0, 128)].into_iter().collect(),
104 }
105 }
106
107 pub fn open(path: impl Into<PathBuf>, config: &ParityDbConfig) -> anyhow::Result<Self> {
108 let opts = Self::to_options(path.into(), config);
109 Ok(Self {
110 db: Db::open_or_create(&opts)?,
111 statistics_enabled: opts.stats,
112 disable_persistent_fallback: false,
113 write_ops_broadcast_tx: RwLock::new(None),
114 })
115 }
116
117 fn choose_column(cid: &Cid) -> DbColumn {
120 match cid.codec() {
121 DAG_CBOR if cid.hash().code() == u64::from(MultihashCode::Blake2b256) => {
122 DbColumn::GraphDagCborBlake2b256
123 }
124 _ => DbColumn::GraphFull,
125 }
126 }
127
128 fn read_from_column<K>(&self, key: K, column: DbColumn) -> anyhow::Result<Option<Vec<u8>>>
129 where
130 K: AsRef<[u8]>,
131 {
132 self.db
133 .get(column as u8, key.as_ref())
134 .map_err(|e| anyhow!("error from column {column}: {e}"))
135 }
136
137 fn write_to_column<K, V>(&self, key: K, value: V, column: DbColumn) -> anyhow::Result<()>
138 where
139 K: AsRef<[u8]>,
140 V: AsRef<[u8]>,
141 {
142 let tx = [(column as u8, key.as_ref(), Some(value.as_ref().to_vec()))];
143 self.db
144 .commit(tx)
145 .map_err(|e| anyhow!("error writing to column {column}: {e}"))
146 }
147}
148
149impl SettingsStore for ParityDb {
150 fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
151 self.read_from_column(key.as_bytes(), DbColumn::Settings)
152 }
153
154 fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> {
155 self.write_to_column(key.as_bytes(), value, DbColumn::Settings)
156 }
157
158 fn exists(&self, key: &str) -> anyhow::Result<bool> {
159 self.db
160 .get_size(DbColumn::Settings as u8, key.as_bytes())
161 .map(|size| size.is_some())
162 .context("error checking if key exists")
163 }
164
165 fn setting_keys(&self) -> anyhow::Result<Vec<String>> {
166 let mut iter = self.db.iter(DbColumn::Settings as u8)?;
167 let mut keys = vec![];
168 while let Some((key, _)) = iter.next()? {
169 keys.push(String::from_utf8(key)?);
170 }
171 Ok(keys)
172 }
173}
174
175impl super::HeaviestTipsetKeyProvider for ParityDb {
176 fn heaviest_tipset_key(&self) -> anyhow::Result<TipsetKey> {
177 super::SettingsStoreExt::read_obj::<TipsetKey>(self, super::setting_keys::HEAD_KEY)?
178 .context("head key not found")
179 }
180
181 fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
182 super::SettingsStoreExt::write_obj(self, super::setting_keys::HEAD_KEY, tsk)
183 }
184}
185
186impl EthMappingsStore for ParityDb {
187 fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
188 self.read_from_column(key.0.as_bytes(), DbColumn::EthMappings)
189 }
190
191 fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> {
192 self.write_to_column(key.0.as_bytes(), value, DbColumn::EthMappings)
193 }
194
195 fn exists(&self, key: &EthHash) -> anyhow::Result<bool> {
196 self.db
197 .get_size(DbColumn::EthMappings as u8, key.0.as_bytes())
198 .map(|size| size.is_some())
199 .context("error checking if key exists")
200 }
201
202 fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
203 let mut cids = Vec::new();
204
205 self.db
206 .iter_column_while(DbColumn::EthMappings as u8, |val| {
207 if let Ok(value) = fvm_ipld_encoding::from_slice::<(Cid, u64)>(&val.value) {
208 cids.push(value);
209 }
210 true
211 })?;
212
213 Ok(cids)
214 }
215
216 fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()> {
217 Ok(self.db.commit_changes(keys.into_iter().map(|key| {
218 let bytes = key.0.as_bytes().to_vec();
219 (DbColumn::EthMappings as u8, Operation::Dereference(bytes))
220 }))?)
221 }
222}
223
224fn has_subscribers<T>(tx: &tokio::sync::broadcast::Sender<T>) -> bool {
225 tx.closed().now_or_never().is_none()
226}
227
228impl Blockstore for ParityDb {
229 fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
230 let column = Self::choose_column(k);
231 let res = self.read_from_column(k.to_bytes(), column)?;
232 if res.is_some() {
233 return Ok(res);
234 }
235 self.get_persistent(k)
236 }
237
238 fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
239 let column = Self::choose_column(k);
240 self.write_to_column(k.to_bytes(), block, column)?;
242 match &*self.write_ops_broadcast_tx.read() {
243 Some(tx) if has_subscribers(tx) => {
244 let _ = tx.send((*k, block.to_vec()));
245 }
246 _ => {}
247 }
248
249 Ok(())
250 }
251
252 fn put_many_keyed<D, I>(&self, blocks: I) -> anyhow::Result<()>
253 where
254 Self: Sized,
255 D: AsRef<[u8]>,
256 I: IntoIterator<Item = (Cid, D)>,
257 {
258 let tx_opt: &Option<tokio::sync::broadcast::Sender<(cid::CidGeneric<64>, Vec<u8>)>> =
259 &self.write_ops_broadcast_tx.read();
260 let has_subscribers = tx_opt.as_ref().map(has_subscribers).unwrap_or_default();
261 let mut values_for_subscriber = vec![];
262 let values = blocks.into_iter().map(|(k, v)| {
263 let column = Self::choose_column(&k);
264 let v = v.as_ref().to_vec();
265 if has_subscribers {
266 values_for_subscriber.push((k, v.clone()));
267 }
268 (column, k.to_bytes(), v)
269 });
270 let tx = values
271 .into_iter()
272 .map(|(col, k, v)| (col as u8, Operation::Set(k, v)));
273 self.db
274 .commit_changes(tx)
275 .map_err(|e| anyhow!("error bulk writing: {e}"))?;
276 if let Some(tx) = tx_opt {
277 for i in values_for_subscriber {
278 let _ = tx.send(i);
279 }
280 }
281 Ok(())
282 }
283}
284
285impl PersistentStore for ParityDb {
286 fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
287 self.write_to_column(k.to_bytes(), block, DbColumn::PersistentGraph)
288 }
289}
290
291impl BitswapStoreRead for ParityDb {
292 fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
293 for column in [DbColumn::GraphDagCborBlake2b256, DbColumn::GraphFull] {
299 if self
300 .db
301 .get_size(column as u8, &cid.to_bytes())
302 .context("error checking if key exists")?
303 .is_some()
304 {
305 return Ok(true);
306 }
307 }
308 Ok(false)
309 }
310
311 fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
312 Blockstore::get(self, cid)
313 }
314}
315
316impl BitswapStoreReadWrite for ParityDb {
317 type Hashes = MultihashCode;
318
319 fn insert(&self, block: &crate::libp2p_bitswap::Block64<Self::Hashes>) -> anyhow::Result<()> {
320 self.put_keyed(block.cid(), block.data())
321 }
322}
323
324impl DBStatistics for ParityDb {
325 fn get_statistics(&self) -> Option<String> {
326 if !self.statistics_enabled {
327 return None;
328 }
329
330 let mut buf = Vec::new();
331 if let Err(err) = self.db.write_stats_text(&mut buf, None) {
332 warn!("Unable to write database statistics: {err}");
333 return None;
334 }
335
336 match String::from_utf8(buf) {
337 Ok(stats) => Some(stats),
338 Err(e) => {
339 warn!("Malformed statistics: {e}");
340 None
341 }
342 }
343 }
344}
345
346type Op = (u8, Operation<Vec<u8>, Vec<u8>>);
347
348impl ParityDb {
349 #[allow(dead_code)]
354 pub fn dereference_operation(key: &Cid) -> Op {
355 let column = Self::choose_column(key);
356 (column as u8, Operation::Dereference(key.to_bytes()))
357 }
358
359 pub fn set_operation(column: u8, key: Vec<u8>, value: Vec<u8>) -> Op {
366 (column, Operation::Set(key, value))
367 }
368
369 fn get_persistent(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
371 if self.disable_persistent_fallback {
372 return Ok(None);
373 }
374 self.read_from_column(k.to_bytes(), DbColumn::PersistentGraph)
375 }
376}
377
378impl super::BlockstoreWriteOpsSubscribable for ParityDb {
379 fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver<(Cid, Vec<u8>)> {
380 let tx_lock = self.write_ops_broadcast_tx.read();
381 if let Some(tx) = &*tx_lock {
382 return tx.subscribe();
383 }
384 drop(tx_lock);
385 let (tx, rx) = tokio::sync::broadcast::channel(8192);
386 *self.write_ops_broadcast_tx.write() = Some(tx);
387 rx
388 }
389
390 fn unsubscribe_write_ops(&self) {
391 self.write_ops_broadcast_tx.write().take();
392 }
393}
394
395#[cfg(test)]
396mod test {
397 use super::*;
398 use crate::db::{BlockstoreWriteOpsSubscribable, tests::db_utils::parity::TempParityDB};
399 use fvm_ipld_encoding::IPLD_RAW;
400 use itertools::Itertools as _;
401 use nom::AsBytes;
402 use std::ops::Deref;
403
404 #[test]
405 fn write_read_different_columns_test() {
406 let db = TempParityDB::new();
407 let data = [
408 b"h'nglui mglw'nafh".to_vec(),
409 b"Cthulhu".to_vec(),
410 b"R'lyeh wgah'nagl fhtagn!!".to_vec(),
411 ];
412 let cids = [
413 Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data[0])),
414 Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data[1])),
415 Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data[1])),
416 ];
417
418 let cases = [
419 (DbColumn::GraphDagCborBlake2b256, cids[0], &data[0]),
420 (DbColumn::GraphFull, cids[1], &data[1]),
421 (DbColumn::GraphFull, cids[2], &data[2]),
422 ];
423
424 for (_, cid, data) in cases {
425 db.put_keyed(&cid, data).unwrap();
426 }
427
428 for (column, cid, data) in cases {
429 let actual = db
430 .read_from_column(cid.to_bytes(), column)
431 .unwrap()
432 .expect("data not found");
433 assert_eq!(data, actual.as_bytes());
434
435 let other_column = match column {
437 DbColumn::GraphDagCborBlake2b256 => DbColumn::GraphFull,
438 DbColumn::GraphFull => DbColumn::GraphDagCborBlake2b256,
439 DbColumn::Settings => panic!("invalid column for IPLD data"),
440 DbColumn::EthMappings => panic!("invalid column for IPLD data"),
441 DbColumn::PersistentGraph => panic!("invalid column for GC enabled IPLD data"),
442 };
443 let actual = db.read_from_column(cid.to_bytes(), other_column).unwrap();
444 assert!(actual.is_none());
445
446 let actual = fvm_ipld_blockstore::Blockstore::get(db.as_ref(), &cid)
448 .unwrap()
449 .expect("data not found");
450 assert_eq!(data, actual.as_slice());
451 }
452
453 db.write_to_column(b"dagon", b"bloop", DbColumn::Settings)
455 .unwrap();
456 let actual = db
457 .read_from_column(b"dagon", DbColumn::Settings)
458 .unwrap()
459 .expect("data not found");
460 assert_eq!(b"bloop", actual.as_bytes());
461 }
462
463 #[test]
464 fn choose_column_test() {
465 let data = [0u8; 32];
466 let cases = [
467 (
468 Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data)),
469 DbColumn::GraphDagCborBlake2b256,
470 ),
471 (
472 Cid::new_v1(
473 fvm_ipld_encoding::CBOR,
474 MultihashCode::Blake2b256.digest(&data),
475 ),
476 DbColumn::GraphFull,
477 ),
478 (
479 Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data)),
480 DbColumn::GraphFull,
481 ),
482 ];
483
484 for (cid, expected) in cases {
485 let actual = ParityDb::choose_column(&cid);
486 assert_eq!(expected, actual);
487 }
488 }
489
490 #[test]
491 fn persistent_tests() {
492 let db = TempParityDB::new();
493 let data = [
494 b"h'nglui mglw'nafh".to_vec(),
495 b"Cthulhu".to_vec(),
496 b"R'lyeh wgah'nagl fhtagn!!".to_vec(),
497 ];
498
499 let persistent_data = data
500 .clone()
501 .into_iter()
502 .map(|mut entry| {
503 entry.push(255);
504 entry
505 })
506 .collect_vec();
507
508 let cids = [
509 Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data[0])),
510 Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data[1])),
511 Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data[1])),
512 ];
513
514 for idx in 0..3 {
515 let cid = &cids[idx];
516 let persistent_entry = &persistent_data[idx];
517 let data_entry = &data[idx];
518 db.put_keyed_persistent(cid, persistent_entry).unwrap();
519 assert_eq!(
522 Blockstore::get(db.deref(), cid).unwrap(),
523 Some(persistent_entry.clone())
524 );
525 assert!(
526 db.read_from_column(cid.to_bytes(), DbColumn::PersistentGraph)
527 .unwrap()
528 .is_some()
529 );
530 db.put_keyed(cid, data_entry).unwrap();
531 assert_eq!(
532 Blockstore::get(db.deref(), cid).unwrap(),
533 Some(data_entry.clone())
534 );
535 }
536 }
537
538 #[test]
539 fn subscription_tests() {
540 let db = TempParityDB::new();
541 assert!(db.write_ops_broadcast_tx.read().is_none());
542 let data = [
543 b"h'nglui mglw'nafh".to_vec(),
544 b"Cthulhu".to_vec(),
545 b"R'lyeh wgah'nagl fhtagn!!".to_vec(),
546 ];
547
548 let cids = [
549 Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data[0])),
550 Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data[1])),
551 Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data[1])),
552 ];
553
554 let mut rx1 = db.subscribe_write_ops();
555 let mut rx2 = db.subscribe_write_ops();
556
557 assert!(has_subscribers(
558 db.write_ops_broadcast_tx.read().as_ref().unwrap()
559 ));
560
561 for (idx, cid) in cids.iter().enumerate() {
562 let data_entry = &data[idx];
563 db.put_keyed(cid, data_entry).unwrap();
564 assert_eq!(rx1.blocking_recv().unwrap(), (*cid, data_entry.clone()));
565 assert_eq!(rx2.blocking_recv().unwrap(), (*cid, data_entry.clone()));
566 }
567
568 drop(rx1);
569 drop(rx2);
570
571 assert!(!has_subscribers(
572 db.write_ops_broadcast_tx.read().as_ref().unwrap()
573 ));
574
575 db.unsubscribe_write_ops();
576
577 assert!(db.write_ops_broadcast_tx.read().is_none());
578 }
579}