1mod gc;
5pub use gc::*;
6
7use super::{EthMappingsStore, PersistentStore, SettingsStore};
8use crate::blocks::{Tipset, TipsetKey};
9use crate::db::{DBStatistics, parity_db_config::ParityDbConfig};
10use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite};
11use crate::rpc::eth::types::EthHash;
12use crate::utils::{broadcast::has_subscribers, multihash::prelude::*};
13use anyhow::Context as _;
14use bytes::Bytes;
15use cid::Cid;
16use fvm_ipld_blockstore::Blockstore;
17use fvm_ipld_encoding::DAG_CBOR;
18use parity_db::{CompressionType, Db, Operation, Options};
19use parking_lot::RwLock;
20use std::path::PathBuf;
21use strum::{Display, EnumIter, FromRepr, IntoEnumIterator};
22use tracing::warn;
23
24#[derive(Copy, Clone, Debug, Display, PartialEq, FromRepr, EnumIter)]
27#[repr(u8)]
28pub enum DbColumn {
29 GraphDagCborBlake2b256,
32 GraphFull,
37 Settings,
39 EthMappings,
41 PersistentGraph,
45}
46
47impl DbColumn {
48 fn create_column_options(compression: CompressionType) -> Vec<parity_db::ColumnOptions> {
49 DbColumn::iter()
50 .map(|col| {
51 match col {
52 DbColumn::GraphDagCborBlake2b256 | DbColumn::PersistentGraph => {
53 parity_db::ColumnOptions {
54 preimage: true,
55 compression,
56 ..Default::default()
57 }
58 }
59 DbColumn::GraphFull => parity_db::ColumnOptions {
60 preimage: true,
61 btree_index: true,
63 compression,
64 ..Default::default()
65 },
66 DbColumn::Settings => parity_db::ColumnOptions {
67 preimage: false,
70 btree_index: true,
72 compression,
73 ..Default::default()
74 },
75 DbColumn::EthMappings => parity_db::ColumnOptions {
76 preimage: false,
77 btree_index: false,
78 compression,
79 ..Default::default()
80 },
81 }
82 })
83 .collect()
84 }
85}
86
87type WriteOpsBroadcastTxSender = tokio::sync::broadcast::Sender<Vec<(Cid, Bytes)>>;
88
89pub struct ParityDb {
90 pub db: parity_db::Db,
91 statistics_enabled: bool,
92 disable_persistent_fallback: bool,
94 write_ops_broadcast_tx: RwLock<Option<WriteOpsBroadcastTxSender>>,
95}
96
97impl ParityDb {
98 pub fn to_options(path: impl Into<PathBuf>, config: &ParityDbConfig) -> Options {
99 Options {
100 path: path.into(),
101 sync_wal: true,
102 sync_data: true,
103 stats: config.enable_statistics,
104 salt: None,
105 columns: DbColumn::create_column_options(CompressionType::Lz4),
106 compression_threshold: [(0, 128)].into_iter().collect(),
107 }
108 }
109
110 pub fn open(path: impl Into<PathBuf>, config: &ParityDbConfig) -> anyhow::Result<Self> {
111 let opts = Self::to_options(path.into(), config);
112 Self::open_with_options(&opts)
113 }
114
115 pub fn open_with_options(options: &Options) -> anyhow::Result<Self> {
116 Ok(Self {
117 db: Db::open_or_create(options)?,
118 statistics_enabled: options.stats,
119 disable_persistent_fallback: false,
120 write_ops_broadcast_tx: RwLock::new(None),
121 })
122 }
123
124 fn choose_column(cid: &Cid) -> DbColumn {
127 match cid.codec() {
128 DAG_CBOR if cid.hash().code() == u64::from(MultihashCode::Blake2b256) => {
129 DbColumn::GraphDagCborBlake2b256
130 }
131 _ => DbColumn::GraphFull,
132 }
133 }
134
135 fn read_from_column<K>(&self, key: K, column: DbColumn) -> anyhow::Result<Option<Vec<u8>>>
136 where
137 K: AsRef<[u8]>,
138 {
139 self.db
140 .get(column as u8, key.as_ref())
141 .with_context(|| format!("error from column {column}"))
142 }
143
144 fn write_to_column<K, V>(&self, key: K, value: V, column: DbColumn) -> anyhow::Result<()>
145 where
146 K: AsRef<[u8]>,
147 V: AsRef<[u8]>,
148 {
149 let tx = [(column as u8, key.as_ref(), Some(value.as_ref().to_vec()))];
150 self.db
151 .commit(tx)
152 .with_context(|| format!("error writing to column {column}"))
153 }
154}
155
156impl SettingsStore for ParityDb {
157 fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
158 self.read_from_column(key.as_bytes(), DbColumn::Settings)
159 }
160
161 fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> {
162 self.write_to_column(key.as_bytes(), value, DbColumn::Settings)
163 }
164
165 fn exists(&self, key: &str) -> anyhow::Result<bool> {
166 self.db
167 .get_size(DbColumn::Settings as u8, key.as_bytes())
168 .map(|size| size.is_some())
169 .context("error checking if key exists")
170 }
171
172 fn setting_keys(&self) -> anyhow::Result<Vec<String>> {
173 let mut iter = self.db.iter(DbColumn::Settings as u8)?;
174 let mut keys = vec![];
175 while let Some((key, _)) = iter.next()? {
176 keys.push(String::from_utf8(key)?);
177 }
178 Ok(keys)
179 }
180}
181
182impl super::HeaviestTipsetKeyProvider for ParityDb {
183 fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>> {
184 super::SettingsStoreExt::read_obj::<TipsetKey>(self, super::setting_keys::HEAD_KEY)
185 }
186
187 fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
188 super::SettingsStoreExt::write_obj(self, super::setting_keys::HEAD_KEY, tsk)
189 }
190}
191
192impl EthMappingsStore for ParityDb {
193 fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
194 self.read_from_column(key.0.as_bytes(), DbColumn::EthMappings)
195 }
196
197 fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> {
198 self.write_to_column(key.0.as_bytes(), value, DbColumn::EthMappings)
199 }
200
201 fn exists(&self, key: &EthHash) -> anyhow::Result<bool> {
202 self.db
203 .get_size(DbColumn::EthMappings as u8, key.0.as_bytes())
204 .map(|size| size.is_some())
205 .context("error checking if key exists")
206 }
207
208 fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
209 let mut cids = Vec::new();
210
211 self.db
212 .iter_column_while(DbColumn::EthMappings as u8, |val| {
213 if let Ok(value) = fvm_ipld_encoding::from_slice::<(Cid, u64)>(&val.value) {
214 cids.push(value);
215 }
216 true
217 })?;
218
219 Ok(cids)
220 }
221
222 fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()> {
223 Ok(self.db.commit_changes(keys.into_iter().map(|key| {
224 let bytes = key.0.as_bytes().to_vec();
225 (DbColumn::EthMappings as u8, Operation::Dereference(bytes))
226 }))?)
227 }
228
229 fn tipset_key_by_epoch(&self, epoch: i64) -> anyhow::Result<Option<TipsetKey>> {
230 let key = epoch.to_le_bytes();
231 if let Some(bytes) = self.read_from_column(key, DbColumn::EthMappings)? {
232 Ok(Some(fvm_ipld_encoding::from_slice(&bytes)?))
233 } else {
234 Ok(None)
235 }
236 }
237
238 fn set_tipset_key_at_epoch(&self, ts: &Tipset) -> anyhow::Result<()> {
239 let key = ts.epoch().to_le_bytes();
240 let bytes = fvm_ipld_encoding::to_vec(ts.key())?;
241 self.write_to_column(key, bytes, DbColumn::EthMappings)
242 }
243}
244
245impl Blockstore for ParityDb {
246 fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
247 let column = Self::choose_column(k);
248 let res = self.read_from_column(k.to_bytes(), column)?;
249 if res.is_some() {
250 return Ok(res);
251 }
252 self.get_persistent(k)
253 }
254
255 fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
256 let column = Self::choose_column(k);
257 self.write_to_column(k.to_bytes(), block, column)?;
259 match &*self.write_ops_broadcast_tx.read() {
260 Some(tx) if has_subscribers(tx) => {
261 let _ = tx.send(vec![(*k, Bytes::copy_from_slice(block))]);
262 }
263 _ => {}
264 }
265
266 Ok(())
267 }
268
269 fn put_many_keyed<D, I>(&self, blocks: I) -> anyhow::Result<()>
270 where
271 Self: Sized,
272 D: AsRef<[u8]>,
273 I: IntoIterator<Item = (Cid, D)>,
274 {
275 let tx_opt = &*self.write_ops_broadcast_tx.read();
276 let has_subscribers = tx_opt.as_ref().map(has_subscribers).unwrap_or_default();
277 let mut values_for_subscriber = vec![];
278 let values = blocks.into_iter().map(|(k, v)| {
279 let column = Self::choose_column(&k);
280 let v = v.as_ref().to_vec();
281 if has_subscribers {
282 values_for_subscriber.push((k, Bytes::copy_from_slice(&v)));
283 }
284 (column, k.to_bytes(), v)
285 });
286 let tx = values
287 .into_iter()
288 .map(|(col, k, v)| (col as u8, Operation::Set(k, v)));
289 self.db.commit_changes(tx).context("error bulk writing")?;
290 if let Some(tx) = tx_opt {
291 let _ = tx.send(values_for_subscriber);
292 }
293 Ok(())
294 }
295}
296
297impl PersistentStore for ParityDb {
298 fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
299 self.write_to_column(k.to_bytes(), block, DbColumn::PersistentGraph)
300 }
301}
302
303impl BitswapStoreRead for ParityDb {
304 fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
305 for column in [DbColumn::GraphDagCborBlake2b256, DbColumn::GraphFull] {
311 if self
312 .db
313 .get_size(column as u8, &cid.to_bytes())
314 .context("error checking if key exists")?
315 .is_some()
316 {
317 return Ok(true);
318 }
319 }
320 Ok(false)
321 }
322
323 fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
324 Blockstore::get(self, cid)
325 }
326}
327
328impl BitswapStoreReadWrite for ParityDb {
329 type Hashes = MultihashCode;
330
331 fn insert(&self, block: &crate::libp2p_bitswap::Block64<Self::Hashes>) -> anyhow::Result<()> {
332 self.put_keyed(block.cid(), block.data())
333 }
334}
335
336impl DBStatistics for ParityDb {
337 fn get_statistics(&self) -> Option<String> {
338 if !self.statistics_enabled {
339 return None;
340 }
341
342 let mut buf = Vec::new();
343 if let Err(err) = self.db.write_stats_text(&mut buf, None) {
344 warn!("Unable to write database statistics: {err}");
345 return None;
346 }
347
348 match String::from_utf8(buf) {
349 Ok(stats) => Some(stats),
350 Err(e) => {
351 warn!("Malformed statistics: {e}");
352 None
353 }
354 }
355 }
356}
357
358type Op = (u8, Operation<Vec<u8>, Vec<u8>>);
359
360impl ParityDb {
361 #[allow(dead_code)]
366 pub fn dereference_operation(key: &Cid) -> Op {
367 let column = Self::choose_column(key);
368 (column as u8, Operation::Dereference(key.to_bytes()))
369 }
370
371 pub fn set_operation(column: u8, key: Vec<u8>, value: Vec<u8>) -> Op {
378 (column, Operation::Set(key, value))
379 }
380
381 fn get_persistent(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
383 if self.disable_persistent_fallback {
384 return Ok(None);
385 }
386 self.read_from_column(k.to_bytes(), DbColumn::PersistentGraph)
387 }
388}
389
390impl super::BlockstoreWriteOpsSubscribable for ParityDb {
391 fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver<Vec<(Cid, Bytes)>> {
392 let tx_lock = self.write_ops_broadcast_tx.read();
393 if let Some(tx) = &*tx_lock {
394 return tx.subscribe();
395 }
396 drop(tx_lock);
397 let (tx, rx) = tokio::sync::broadcast::channel(65536);
398 *self.write_ops_broadcast_tx.write() = Some(tx);
399 rx
400 }
401
402 fn unsubscribe_write_ops(&self) {
403 self.write_ops_broadcast_tx.write().take();
404 }
405}
406
407#[cfg(test)]
408mod test {
409 use super::*;
410 use crate::db::{BlockstoreWriteOpsSubscribable, tests::db_utils::parity::TempParityDB};
411 use fvm_ipld_encoding::IPLD_RAW;
412 use itertools::Itertools as _;
413 use nom::AsBytes;
414 use std::ops::Deref;
415
416 #[test]
417 fn write_read_different_columns_test() {
418 let db = TempParityDB::new();
419 let data = [
420 b"h'nglui mglw'nafh".to_vec(),
421 b"Cthulhu".to_vec(),
422 b"R'lyeh wgah'nagl fhtagn!!".to_vec(),
423 ];
424 let cids = [
425 Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data[0])),
426 Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data[1])),
427 Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data[1])),
428 ];
429
430 let cases = [
431 (DbColumn::GraphDagCborBlake2b256, cids[0], &data[0]),
432 (DbColumn::GraphFull, cids[1], &data[1]),
433 (DbColumn::GraphFull, cids[2], &data[2]),
434 ];
435
436 for (_, cid, data) in cases {
437 db.put_keyed(&cid, data).unwrap();
438 }
439
440 for (column, cid, data) in cases {
441 let actual = db
442 .read_from_column(cid.to_bytes(), column)
443 .unwrap()
444 .expect("data not found");
445 assert_eq!(data, actual.as_bytes());
446
447 let other_column = match column {
449 DbColumn::GraphDagCborBlake2b256 => DbColumn::GraphFull,
450 DbColumn::GraphFull => DbColumn::GraphDagCborBlake2b256,
451 DbColumn::Settings => panic!("invalid column for IPLD data"),
452 DbColumn::EthMappings => panic!("invalid column for IPLD data"),
453 DbColumn::PersistentGraph => panic!("invalid column for GC enabled IPLD data"),
454 };
455 let actual = db.read_from_column(cid.to_bytes(), other_column).unwrap();
456 assert!(actual.is_none());
457
458 let actual = fvm_ipld_blockstore::Blockstore::get(db.as_ref(), &cid)
460 .unwrap()
461 .expect("data not found");
462 assert_eq!(data, actual.as_slice());
463 }
464
465 db.write_to_column(b"dagon", b"bloop", DbColumn::Settings)
467 .unwrap();
468 let actual = db
469 .read_from_column(b"dagon", DbColumn::Settings)
470 .unwrap()
471 .expect("data not found");
472 assert_eq!(b"bloop", actual.as_bytes());
473 }
474
475 #[test]
476 fn choose_column_test() {
477 let data = [0u8; 32];
478 let cases = [
479 (
480 Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data)),
481 DbColumn::GraphDagCborBlake2b256,
482 ),
483 (
484 Cid::new_v1(
485 fvm_ipld_encoding::CBOR,
486 MultihashCode::Blake2b256.digest(&data),
487 ),
488 DbColumn::GraphFull,
489 ),
490 (
491 Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data)),
492 DbColumn::GraphFull,
493 ),
494 ];
495
496 for (cid, expected) in cases {
497 let actual = ParityDb::choose_column(&cid);
498 assert_eq!(expected, actual);
499 }
500 }
501
502 #[test]
503 fn persistent_tests() {
504 let db = TempParityDB::new();
505 let data = [
506 b"h'nglui mglw'nafh".to_vec(),
507 b"Cthulhu".to_vec(),
508 b"R'lyeh wgah'nagl fhtagn!!".to_vec(),
509 ];
510
511 let persistent_data = data
512 .clone()
513 .into_iter()
514 .map(|mut entry| {
515 entry.push(255);
516 entry
517 })
518 .collect_vec();
519
520 let cids = [
521 Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data[0])),
522 Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data[1])),
523 Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data[1])),
524 ];
525
526 for idx in 0..3 {
527 let cid = &cids[idx];
528 let persistent_entry = &persistent_data[idx];
529 let data_entry = &data[idx];
530 db.put_keyed_persistent(cid, persistent_entry).unwrap();
531 assert_eq!(
534 Blockstore::get(db.deref(), cid).unwrap(),
535 Some(persistent_entry.clone())
536 );
537 assert!(
538 db.read_from_column(cid.to_bytes(), DbColumn::PersistentGraph)
539 .unwrap()
540 .is_some()
541 );
542 db.put_keyed(cid, data_entry).unwrap();
543 assert_eq!(
544 Blockstore::get(db.deref(), cid).unwrap(),
545 Some(data_entry.clone())
546 );
547 }
548 }
549
550 #[test]
551 fn subscription_tests() {
552 let db = TempParityDB::new();
553 assert!(db.write_ops_broadcast_tx.read().is_none());
554 let data = [
555 b"h'nglui mglw'nafh".to_vec(),
556 b"Cthulhu".to_vec(),
557 b"R'lyeh wgah'nagl fhtagn!!".to_vec(),
558 ];
559
560 let cids = [
561 Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data[0])),
562 Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data[1])),
563 Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data[1])),
564 ];
565
566 let mut rx1 = db.subscribe_write_ops();
567 let mut rx2 = db.subscribe_write_ops();
568
569 assert!(has_subscribers(
570 db.write_ops_broadcast_tx.read().as_ref().unwrap()
571 ));
572
573 for (idx, cid) in cids.iter().enumerate() {
574 let data_entry = &data[idx];
575 db.put_keyed(cid, data_entry).unwrap();
576 let expected = vec![(*cid, Bytes::copy_from_slice(data_entry))];
577 assert_eq!(rx1.blocking_recv().unwrap(), expected);
578 assert_eq!(rx2.blocking_recv().unwrap(), expected);
579 }
580
581 drop(rx1);
582 drop(rx2);
583
584 assert!(!has_subscribers(
585 db.write_ops_broadcast_tx.read().as_ref().unwrap()
586 ));
587
588 db.unsubscribe_write_ops();
589
590 assert!(db.write_ops_broadcast_tx.read().is_none());
591 }
592}