1mod gc;
5pub use gc::*;
6
7use super::{EthMappingsStore, PersistentStore, SettingsStore};
8use crate::blocks::TipsetKey;
9use crate::db::{DBStatistics, parity_db_config::ParityDbConfig};
10use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite};
11use crate::rpc::eth::types::EthHash;
12use crate::utils::{broadcast::has_subscribers, multihash::prelude::*};
13use anyhow::Context as _;
14use bytes::Bytes;
15use cid::Cid;
16use fvm_ipld_blockstore::Blockstore;
17use fvm_ipld_encoding::DAG_CBOR;
18use parity_db::{CompressionType, Db, Operation, Options};
19use parking_lot::RwLock;
20use std::path::PathBuf;
21use strum::{Display, EnumIter, FromRepr, IntoEnumIterator};
22use tracing::warn;
23
24#[derive(Copy, Clone, Debug, Display, PartialEq, FromRepr, EnumIter)]
27#[repr(u8)]
28pub enum DbColumn {
29 GraphDagCborBlake2b256,
32 GraphFull,
37 Settings,
39 EthMappings,
41 PersistentGraph,
45}
46
47impl DbColumn {
48 fn create_column_options(compression: CompressionType) -> Vec<parity_db::ColumnOptions> {
49 DbColumn::iter()
50 .map(|col| {
51 match col {
52 DbColumn::GraphDagCborBlake2b256 | DbColumn::PersistentGraph => {
53 parity_db::ColumnOptions {
54 preimage: true,
55 compression,
56 ..Default::default()
57 }
58 }
59 DbColumn::GraphFull => parity_db::ColumnOptions {
60 preimage: true,
61 btree_index: true,
63 compression,
64 ..Default::default()
65 },
66 DbColumn::Settings => parity_db::ColumnOptions {
67 preimage: false,
70 btree_index: true,
72 compression,
73 ..Default::default()
74 },
75 DbColumn::EthMappings => parity_db::ColumnOptions {
76 preimage: false,
77 btree_index: false,
78 compression,
79 ..Default::default()
80 },
81 }
82 })
83 .collect()
84 }
85}
86
87type WriteOpsBroadcastTxSender = tokio::sync::broadcast::Sender<Vec<(Cid, Bytes)>>;
88
89pub struct ParityDb {
90 pub db: parity_db::Db,
91 statistics_enabled: bool,
92 disable_persistent_fallback: bool,
94 write_ops_broadcast_tx: RwLock<Option<WriteOpsBroadcastTxSender>>,
95}
96
97impl ParityDb {
98 pub fn to_options(path: impl Into<PathBuf>, config: &ParityDbConfig) -> Options {
99 Options {
100 path: path.into(),
101 sync_wal: true,
102 sync_data: true,
103 stats: config.enable_statistics,
104 salt: None,
105 columns: DbColumn::create_column_options(CompressionType::Lz4),
106 compression_threshold: [(0, 128)].into_iter().collect(),
107 }
108 }
109
110 pub fn open(path: impl Into<PathBuf>, config: &ParityDbConfig) -> anyhow::Result<Self> {
111 let opts = Self::to_options(path.into(), config);
112 Self::open_with_options(&opts)
113 }
114
115 pub fn open_with_options(options: &Options) -> anyhow::Result<Self> {
116 Ok(Self {
117 db: Db::open_or_create(options)?,
118 statistics_enabled: options.stats,
119 disable_persistent_fallback: false,
120 write_ops_broadcast_tx: RwLock::new(None),
121 })
122 }
123
124 fn choose_column(cid: &Cid) -> DbColumn {
127 match cid.codec() {
128 DAG_CBOR if cid.hash().code() == u64::from(MultihashCode::Blake2b256) => {
129 DbColumn::GraphDagCborBlake2b256
130 }
131 _ => DbColumn::GraphFull,
132 }
133 }
134
135 fn read_from_column<K>(&self, key: K, column: DbColumn) -> anyhow::Result<Option<Vec<u8>>>
136 where
137 K: AsRef<[u8]>,
138 {
139 self.db
140 .get(column as u8, key.as_ref())
141 .with_context(|| format!("error from column {column}"))
142 }
143
144 fn write_to_column<K, V>(&self, key: K, value: V, column: DbColumn) -> anyhow::Result<()>
145 where
146 K: AsRef<[u8]>,
147 V: AsRef<[u8]>,
148 {
149 let tx = [(column as u8, key.as_ref(), Some(value.as_ref().to_vec()))];
150 self.db
151 .commit(tx)
152 .with_context(|| format!("error writing to column {column}"))
153 }
154}
155
156impl SettingsStore for ParityDb {
157 fn read_bin(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
158 self.read_from_column(key.as_bytes(), DbColumn::Settings)
159 }
160
161 fn write_bin(&self, key: &str, value: &[u8]) -> anyhow::Result<()> {
162 self.write_to_column(key.as_bytes(), value, DbColumn::Settings)
163 }
164
165 fn exists(&self, key: &str) -> anyhow::Result<bool> {
166 self.db
167 .get_size(DbColumn::Settings as u8, key.as_bytes())
168 .map(|size| size.is_some())
169 .context("error checking if key exists")
170 }
171
172 fn setting_keys(&self) -> anyhow::Result<Vec<String>> {
173 let mut iter = self.db.iter(DbColumn::Settings as u8)?;
174 let mut keys = vec![];
175 while let Some((key, _)) = iter.next()? {
176 keys.push(String::from_utf8(key)?);
177 }
178 Ok(keys)
179 }
180}
181
182impl super::HeaviestTipsetKeyProvider for ParityDb {
183 fn heaviest_tipset_key(&self) -> anyhow::Result<Option<TipsetKey>> {
184 super::SettingsStoreExt::read_obj::<TipsetKey>(self, super::setting_keys::HEAD_KEY)
185 }
186
187 fn set_heaviest_tipset_key(&self, tsk: &TipsetKey) -> anyhow::Result<()> {
188 super::SettingsStoreExt::write_obj(self, super::setting_keys::HEAD_KEY, tsk)
189 }
190}
191
192impl EthMappingsStore for ParityDb {
193 fn read_bin(&self, key: &EthHash) -> anyhow::Result<Option<Vec<u8>>> {
194 self.read_from_column(key.0.as_bytes(), DbColumn::EthMappings)
195 }
196
197 fn write_bin(&self, key: &EthHash, value: &[u8]) -> anyhow::Result<()> {
198 self.write_to_column(key.0.as_bytes(), value, DbColumn::EthMappings)
199 }
200
201 fn exists(&self, key: &EthHash) -> anyhow::Result<bool> {
202 self.db
203 .get_size(DbColumn::EthMappings as u8, key.0.as_bytes())
204 .map(|size| size.is_some())
205 .context("error checking if key exists")
206 }
207
208 fn get_message_cids(&self) -> anyhow::Result<Vec<(Cid, u64)>> {
209 let mut cids = Vec::new();
210
211 self.db
212 .iter_column_while(DbColumn::EthMappings as u8, |val| {
213 if let Ok(value) = fvm_ipld_encoding::from_slice::<(Cid, u64)>(&val.value) {
214 cids.push(value);
215 }
216 true
217 })?;
218
219 Ok(cids)
220 }
221
222 fn delete(&self, keys: Vec<EthHash>) -> anyhow::Result<()> {
223 Ok(self.db.commit_changes(keys.into_iter().map(|key| {
224 let bytes = key.0.as_bytes().to_vec();
225 (DbColumn::EthMappings as u8, Operation::Dereference(bytes))
226 }))?)
227 }
228}
229
230impl Blockstore for ParityDb {
231 fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
232 let column = Self::choose_column(k);
233 let res = self.read_from_column(k.to_bytes(), column)?;
234 if res.is_some() {
235 return Ok(res);
236 }
237 self.get_persistent(k)
238 }
239
240 fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
241 let column = Self::choose_column(k);
242 self.write_to_column(k.to_bytes(), block, column)?;
244 match &*self.write_ops_broadcast_tx.read() {
245 Some(tx) if has_subscribers(tx) => {
246 let _ = tx.send(vec![(*k, Bytes::copy_from_slice(block))]);
247 }
248 _ => {}
249 }
250
251 Ok(())
252 }
253
254 fn put_many_keyed<D, I>(&self, blocks: I) -> anyhow::Result<()>
255 where
256 Self: Sized,
257 D: AsRef<[u8]>,
258 I: IntoIterator<Item = (Cid, D)>,
259 {
260 let tx_opt = &*self.write_ops_broadcast_tx.read();
261 let has_subscribers = tx_opt.as_ref().map(has_subscribers).unwrap_or_default();
262 let mut values_for_subscriber = vec![];
263 let values = blocks.into_iter().map(|(k, v)| {
264 let column = Self::choose_column(&k);
265 let v = v.as_ref().to_vec();
266 if has_subscribers {
267 values_for_subscriber.push((k, Bytes::copy_from_slice(&v)));
268 }
269 (column, k.to_bytes(), v)
270 });
271 let tx = values
272 .into_iter()
273 .map(|(col, k, v)| (col as u8, Operation::Set(k, v)));
274 self.db.commit_changes(tx).context("error bulk writing")?;
275 if let Some(tx) = tx_opt {
276 let _ = tx.send(values_for_subscriber);
277 }
278 Ok(())
279 }
280}
281
282impl PersistentStore for ParityDb {
283 fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
284 self.write_to_column(k.to_bytes(), block, DbColumn::PersistentGraph)
285 }
286}
287
288impl BitswapStoreRead for ParityDb {
289 fn contains(&self, cid: &Cid) -> anyhow::Result<bool> {
290 for column in [DbColumn::GraphDagCborBlake2b256, DbColumn::GraphFull] {
296 if self
297 .db
298 .get_size(column as u8, &cid.to_bytes())
299 .context("error checking if key exists")?
300 .is_some()
301 {
302 return Ok(true);
303 }
304 }
305 Ok(false)
306 }
307
308 fn get(&self, cid: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
309 Blockstore::get(self, cid)
310 }
311}
312
313impl BitswapStoreReadWrite for ParityDb {
314 type Hashes = MultihashCode;
315
316 fn insert(&self, block: &crate::libp2p_bitswap::Block64<Self::Hashes>) -> anyhow::Result<()> {
317 self.put_keyed(block.cid(), block.data())
318 }
319}
320
321impl DBStatistics for ParityDb {
322 fn get_statistics(&self) -> Option<String> {
323 if !self.statistics_enabled {
324 return None;
325 }
326
327 let mut buf = Vec::new();
328 if let Err(err) = self.db.write_stats_text(&mut buf, None) {
329 warn!("Unable to write database statistics: {err}");
330 return None;
331 }
332
333 match String::from_utf8(buf) {
334 Ok(stats) => Some(stats),
335 Err(e) => {
336 warn!("Malformed statistics: {e}");
337 None
338 }
339 }
340 }
341}
342
343type Op = (u8, Operation<Vec<u8>, Vec<u8>>);
344
345impl ParityDb {
346 #[allow(dead_code)]
351 pub fn dereference_operation(key: &Cid) -> Op {
352 let column = Self::choose_column(key);
353 (column as u8, Operation::Dereference(key.to_bytes()))
354 }
355
356 pub fn set_operation(column: u8, key: Vec<u8>, value: Vec<u8>) -> Op {
363 (column, Operation::Set(key, value))
364 }
365
366 fn get_persistent(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
368 if self.disable_persistent_fallback {
369 return Ok(None);
370 }
371 self.read_from_column(k.to_bytes(), DbColumn::PersistentGraph)
372 }
373}
374
375impl super::BlockstoreWriteOpsSubscribable for ParityDb {
376 fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver<Vec<(Cid, Bytes)>> {
377 let tx_lock = self.write_ops_broadcast_tx.read();
378 if let Some(tx) = &*tx_lock {
379 return tx.subscribe();
380 }
381 drop(tx_lock);
382 let (tx, rx) = tokio::sync::broadcast::channel(65536);
383 *self.write_ops_broadcast_tx.write() = Some(tx);
384 rx
385 }
386
387 fn unsubscribe_write_ops(&self) {
388 self.write_ops_broadcast_tx.write().take();
389 }
390}
391
392#[cfg(test)]
393mod test {
394 use super::*;
395 use crate::db::{BlockstoreWriteOpsSubscribable, tests::db_utils::parity::TempParityDB};
396 use fvm_ipld_encoding::IPLD_RAW;
397 use itertools::Itertools as _;
398 use nom::AsBytes;
399 use std::ops::Deref;
400
401 #[test]
402 fn write_read_different_columns_test() {
403 let db = TempParityDB::new();
404 let data = [
405 b"h'nglui mglw'nafh".to_vec(),
406 b"Cthulhu".to_vec(),
407 b"R'lyeh wgah'nagl fhtagn!!".to_vec(),
408 ];
409 let cids = [
410 Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data[0])),
411 Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data[1])),
412 Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data[1])),
413 ];
414
415 let cases = [
416 (DbColumn::GraphDagCborBlake2b256, cids[0], &data[0]),
417 (DbColumn::GraphFull, cids[1], &data[1]),
418 (DbColumn::GraphFull, cids[2], &data[2]),
419 ];
420
421 for (_, cid, data) in cases {
422 db.put_keyed(&cid, data).unwrap();
423 }
424
425 for (column, cid, data) in cases {
426 let actual = db
427 .read_from_column(cid.to_bytes(), column)
428 .unwrap()
429 .expect("data not found");
430 assert_eq!(data, actual.as_bytes());
431
432 let other_column = match column {
434 DbColumn::GraphDagCborBlake2b256 => DbColumn::GraphFull,
435 DbColumn::GraphFull => DbColumn::GraphDagCborBlake2b256,
436 DbColumn::Settings => panic!("invalid column for IPLD data"),
437 DbColumn::EthMappings => panic!("invalid column for IPLD data"),
438 DbColumn::PersistentGraph => panic!("invalid column for GC enabled IPLD data"),
439 };
440 let actual = db.read_from_column(cid.to_bytes(), other_column).unwrap();
441 assert!(actual.is_none());
442
443 let actual = fvm_ipld_blockstore::Blockstore::get(db.as_ref(), &cid)
445 .unwrap()
446 .expect("data not found");
447 assert_eq!(data, actual.as_slice());
448 }
449
450 db.write_to_column(b"dagon", b"bloop", DbColumn::Settings)
452 .unwrap();
453 let actual = db
454 .read_from_column(b"dagon", DbColumn::Settings)
455 .unwrap()
456 .expect("data not found");
457 assert_eq!(b"bloop", actual.as_bytes());
458 }
459
460 #[test]
461 fn choose_column_test() {
462 let data = [0u8; 32];
463 let cases = [
464 (
465 Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data)),
466 DbColumn::GraphDagCborBlake2b256,
467 ),
468 (
469 Cid::new_v1(
470 fvm_ipld_encoding::CBOR,
471 MultihashCode::Blake2b256.digest(&data),
472 ),
473 DbColumn::GraphFull,
474 ),
475 (
476 Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data)),
477 DbColumn::GraphFull,
478 ),
479 ];
480
481 for (cid, expected) in cases {
482 let actual = ParityDb::choose_column(&cid);
483 assert_eq!(expected, actual);
484 }
485 }
486
487 #[test]
488 fn persistent_tests() {
489 let db = TempParityDB::new();
490 let data = [
491 b"h'nglui mglw'nafh".to_vec(),
492 b"Cthulhu".to_vec(),
493 b"R'lyeh wgah'nagl fhtagn!!".to_vec(),
494 ];
495
496 let persistent_data = data
497 .clone()
498 .into_iter()
499 .map(|mut entry| {
500 entry.push(255);
501 entry
502 })
503 .collect_vec();
504
505 let cids = [
506 Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data[0])),
507 Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data[1])),
508 Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data[1])),
509 ];
510
511 for idx in 0..3 {
512 let cid = &cids[idx];
513 let persistent_entry = &persistent_data[idx];
514 let data_entry = &data[idx];
515 db.put_keyed_persistent(cid, persistent_entry).unwrap();
516 assert_eq!(
519 Blockstore::get(db.deref(), cid).unwrap(),
520 Some(persistent_entry.clone())
521 );
522 assert!(
523 db.read_from_column(cid.to_bytes(), DbColumn::PersistentGraph)
524 .unwrap()
525 .is_some()
526 );
527 db.put_keyed(cid, data_entry).unwrap();
528 assert_eq!(
529 Blockstore::get(db.deref(), cid).unwrap(),
530 Some(data_entry.clone())
531 );
532 }
533 }
534
535 #[test]
536 fn subscription_tests() {
537 let db = TempParityDB::new();
538 assert!(db.write_ops_broadcast_tx.read().is_none());
539 let data = [
540 b"h'nglui mglw'nafh".to_vec(),
541 b"Cthulhu".to_vec(),
542 b"R'lyeh wgah'nagl fhtagn!!".to_vec(),
543 ];
544
545 let cids = [
546 Cid::new_v1(DAG_CBOR, MultihashCode::Blake2b256.digest(&data[0])),
547 Cid::new_v1(DAG_CBOR, MultihashCode::Sha2_256.digest(&data[1])),
548 Cid::new_v1(IPLD_RAW, MultihashCode::Blake2b256.digest(&data[1])),
549 ];
550
551 let mut rx1 = db.subscribe_write_ops();
552 let mut rx2 = db.subscribe_write_ops();
553
554 assert!(has_subscribers(
555 db.write_ops_broadcast_tx.read().as_ref().unwrap()
556 ));
557
558 for (idx, cid) in cids.iter().enumerate() {
559 let data_entry = &data[idx];
560 db.put_keyed(cid, data_entry).unwrap();
561 let expected = vec![(*cid, Bytes::copy_from_slice(data_entry))];
562 assert_eq!(rx1.blocking_recv().unwrap(), expected);
563 assert_eq!(rx2.blocking_recv().unwrap(), expected);
564 }
565
566 drop(rx1);
567 drop(rx2);
568
569 assert!(!has_subscribers(
570 db.write_ops_broadcast_tx.read().as_ref().unwrap()
571 ));
572
573 db.unsubscribe_write_ops();
574
575 assert!(db.write_ops_broadcast_tx.read().is_none());
576 }
577}