1use std::cell::RefCell;
8use std::collections::HashSet;
9use std::io::{Read, Write};
10use std::path::Path;
11use std::sync::Arc;
12use std::{io, vec};
13
14use anyhow::Result;
15use dusk_core::transfer::data::BlobSidecar;
16use node_data::ledger::{
17 Block, Fault, Header, Label, SpendingId, SpentTransaction, Transaction,
18};
19use node_data::message::{payload, ConsensusHeader};
20use node_data::Serializable;
21use rocksdb::{
22 AsColumnFamilyRef, BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor,
23 DBAccess, DBRawIteratorWithThreadMode, IteratorMode, LogLevel,
24 OptimisticTransactionDB, OptimisticTransactionOptions, Options,
25 WriteOptions,
26};
27use tracing::info;
28
29use super::{
30 ConsensusStorage, DatabaseOptions, Ledger, LightBlock, Metadata, Persist,
31 DB,
32};
33use crate::database::Mempool;
34
35const CF_LEDGER_HEADER: &str = "cf_ledger_header";
36const CF_LEDGER_TXS: &str = "cf_ledger_txs";
37const CF_LEDGER_BLOBS: &str = "cf_ledger_blobs";
38const CF_LEDGER_BLOBS_HEIGHT: &str = "cf_ledger_blobs_height";
39const CF_LEDGER_FAULTS: &str = "cf_ledger_faults";
40const CF_LEDGER_HEIGHT: &str = "cf_ledger_height";
41const CF_CANDIDATES: &str = "cf_candidates";
42const CF_CANDIDATES_HEIGHT: &str = "cf_candidates_height";
43const CF_VALIDATION_RESULTS: &str = "cf_validation_results";
44const CF_MEMPOOL: &str = "cf_mempool";
45const CF_MEMPOOL_SPENDING_ID: &str = "cf_mempool_spending_id";
46const CF_MEMPOOL_FEES: &str = "cf_mempool_fees";
47const CF_METADATA: &str = "cf_metadata";
48
49const DB_FOLDER_NAME: &str = "chain.db";
50
51pub const MD_HASH_KEY: &[u8] = b"hash_key";
53pub const MD_STATE_ROOT_KEY: &[u8] = b"state_hash_key";
54pub const MD_AVG_VALIDATION: &[u8] = b"avg_validation_time";
55pub const MD_AVG_RATIFICATION: &[u8] = b"avg_ratification_time";
56pub const MD_AVG_PROPOSAL: &[u8] = b"avg_proposal_time";
57pub const MD_LAST_ITER: &[u8] = b"consensus_last_iter";
58
59#[derive(Clone)]
60pub struct Backend {
61 rocksdb: Arc<OptimisticTransactionDB>,
62}
63
64impl Backend {
65 fn begin_tx(&self) -> DBTransaction<'_, OptimisticTransactionDB> {
66 let write_options = WriteOptions::default();
68 let tx_options = OptimisticTransactionOptions::default();
69
70 let inner = self.rocksdb.transaction_opt(&write_options, &tx_options);
71
72 let ledger_cf = self
74 .rocksdb
75 .cf_handle(CF_LEDGER_HEADER)
76 .expect("ledger_header column family must exist");
77
78 let ledger_txs_cf = self
79 .rocksdb
80 .cf_handle(CF_LEDGER_TXS)
81 .expect("CF_LEDGER_TXS column family must exist");
82
83 let ledger_faults_cf = self
84 .rocksdb
85 .cf_handle(CF_LEDGER_FAULTS)
86 .expect("CF_LEDGER_FAULTS column family must exist");
87
88 let candidates_cf = self
89 .rocksdb
90 .cf_handle(CF_CANDIDATES)
91 .expect("candidates column family must exist");
92
93 let candidates_height_cf = self
94 .rocksdb
95 .cf_handle(CF_CANDIDATES_HEIGHT)
96 .expect("candidates_height column family must exist");
97
98 let validation_results_cf = self
99 .rocksdb
100 .cf_handle(CF_VALIDATION_RESULTS)
101 .expect("validation result column family must exist");
102
103 let mempool_cf = self
104 .rocksdb
105 .cf_handle(CF_MEMPOOL)
106 .expect("mempool column family must exist");
107
108 let spending_id_cf = self
109 .rocksdb
110 .cf_handle(CF_MEMPOOL_SPENDING_ID)
111 .expect("CF_MEMPOOL_SPENDING_ID column family must exist");
112
113 let fees_cf = self
114 .rocksdb
115 .cf_handle(CF_MEMPOOL_FEES)
116 .expect("CF_MEMPOOL_FEES column family must exist");
117
118 let ledger_height_cf = self
119 .rocksdb
120 .cf_handle(CF_LEDGER_HEIGHT)
121 .expect("CF_LEDGER_HEIGHT column family must exist");
122
123 let metadata_cf = self
124 .rocksdb
125 .cf_handle(CF_METADATA)
126 .expect("CF_METADATA column family must exist");
127
128 let ledger_blobs_cf = self
129 .rocksdb
130 .cf_handle(CF_LEDGER_BLOBS)
131 .expect("CF_LEDGER_BLOBS column family must exist");
132
133 let ledger_blobs_height_cf = self
134 .rocksdb
135 .cf_handle(CF_LEDGER_BLOBS_HEIGHT)
136 .expect("CF_LEDGER_BLOBS_HEIGHT column family must exist");
137
138 DBTransaction::<'_, OptimisticTransactionDB> {
139 inner,
140 candidates_cf,
141 candidates_height_cf,
142 validation_results_cf,
143 ledger_cf,
144 ledger_txs_cf,
145 ledger_faults_cf,
146 mempool_cf,
147 spending_id_cf,
148 fees_cf,
149 ledger_height_cf,
150 ledger_blobs_cf,
151 ledger_blobs_height_cf,
152 metadata_cf,
153 cumulative_inner_size: RefCell::new(0),
154 }
155 }
156}
157
158impl DB for Backend {
159 type P<'a> = DBTransaction<'a, OptimisticTransactionDB>;
160
161 fn create_or_open<T>(path: T, db_opts: DatabaseOptions) -> Self
162 where
163 T: AsRef<Path>,
164 {
165 let path = path.as_ref().join(DB_FOLDER_NAME);
166 info!("Opening database in {path:?}, {:?} ", db_opts);
167
168 let mut blocks_cf_opts = Options::default();
171 blocks_cf_opts.create_if_missing(db_opts.create_if_missing);
172 blocks_cf_opts.create_missing_column_families(true);
173 blocks_cf_opts.set_level_compaction_dynamic_level_bytes(true);
174 blocks_cf_opts
175 .set_write_buffer_size(db_opts.blocks_cf_max_write_buffer_size);
176
177 if db_opts.enable_debug {
178 blocks_cf_opts.set_log_level(LogLevel::Info);
179 blocks_cf_opts.set_dump_malloc_stats(true);
180 blocks_cf_opts.enable_statistics();
181 }
182
183 if db_opts.blocks_cf_disable_block_cache {
184 let mut block_opts = BlockBasedOptions::default();
185 block_opts.disable_cache();
186 blocks_cf_opts.set_block_based_table_factory(&block_opts);
187 }
188
189 let mut mp_opts = blocks_cf_opts.clone();
192 mp_opts.set_manual_wal_flush(true);
194 mp_opts.create_if_missing(true);
195 mp_opts.create_missing_column_families(true);
196 mp_opts.set_write_buffer_size(db_opts.mempool_cf_max_write_buffer_size);
197
198 if db_opts.enable_debug {
199 mp_opts.set_log_level(LogLevel::Info);
200 mp_opts.set_dump_malloc_stats(true);
201 mp_opts.enable_statistics();
202 }
203
204 let cfs = vec![
205 ColumnFamilyDescriptor::new(
206 CF_LEDGER_HEADER,
207 blocks_cf_opts.clone(),
208 ),
209 ColumnFamilyDescriptor::new(CF_LEDGER_TXS, blocks_cf_opts.clone()),
210 ColumnFamilyDescriptor::new(
211 CF_LEDGER_FAULTS,
212 blocks_cf_opts.clone(),
213 ),
214 ColumnFamilyDescriptor::new(
215 CF_LEDGER_HEIGHT,
216 blocks_cf_opts.clone(),
217 ),
218 ColumnFamilyDescriptor::new(
219 CF_LEDGER_BLOBS,
220 blocks_cf_opts.clone(),
221 ),
222 ColumnFamilyDescriptor::new(
223 CF_LEDGER_BLOBS_HEIGHT,
224 blocks_cf_opts.clone(),
225 ),
226 ColumnFamilyDescriptor::new(CF_CANDIDATES, blocks_cf_opts.clone()),
227 ColumnFamilyDescriptor::new(
228 CF_CANDIDATES_HEIGHT,
229 blocks_cf_opts.clone(),
230 ),
231 ColumnFamilyDescriptor::new(
232 CF_VALIDATION_RESULTS,
233 blocks_cf_opts.clone(),
234 ),
235 ColumnFamilyDescriptor::new(CF_METADATA, blocks_cf_opts.clone()),
236 ColumnFamilyDescriptor::new(CF_MEMPOOL, mp_opts.clone()),
237 ColumnFamilyDescriptor::new(
238 CF_MEMPOOL_SPENDING_ID,
239 mp_opts.clone(),
240 ),
241 ColumnFamilyDescriptor::new(CF_MEMPOOL_FEES, mp_opts.clone()),
242 ];
243
244 Self {
245 rocksdb: Arc::new(
246 OptimisticTransactionDB::open_cf_descriptors(
247 &blocks_cf_opts,
248 &path,
249 cfs,
250 )
251 .unwrap_or_else(|_| {
252 panic!("should be a valid database in {path:?}")
253 }),
254 ),
255 }
256 }
257
258 fn view<F, T>(&self, f: F) -> T
259 where
260 F: for<'a> FnOnce(&Self::P<'a>) -> T,
261 {
262 let tx = self.begin_tx();
264
265 let ret = f(&tx);
267 tx.rollback().expect("rollback to succeed for readonly");
268 ret
269 }
270
271 fn update<F, T>(&self, execute: F) -> Result<T>
272 where
273 F: for<'a> FnOnce(&mut Self::P<'a>) -> Result<T>,
274 {
275 self.update_dry_run(false, execute)
276 }
277
278 fn update_dry_run<F, T>(&self, dry_run: bool, execute: F) -> Result<T>
279 where
280 F: for<'a> FnOnce(&mut Self::P<'a>) -> Result<T>,
281 {
282 let mut tx = self.begin_tx();
284
285 let ret = execute(&mut tx)?;
288
289 if dry_run {
290 tx.rollback()?;
291 } else {
292 tx.commit()?;
294 }
295
296 Ok(ret)
297 }
298
299 fn close(&mut self) {}
300}
301
302pub struct DBTransaction<'db, DB: DBAccess> {
303 inner: rocksdb::Transaction<'db, DB>,
304 cumulative_inner_size: RefCell<usize>,
306
307 candidates_cf: &'db ColumnFamily,
310 candidates_height_cf: &'db ColumnFamily,
311 validation_results_cf: &'db ColumnFamily,
313
314 ledger_cf: &'db ColumnFamily,
316 ledger_faults_cf: &'db ColumnFamily,
317 ledger_txs_cf: &'db ColumnFamily,
318 ledger_height_cf: &'db ColumnFamily,
319 ledger_blobs_cf: &'db ColumnFamily,
320 ledger_blobs_height_cf: &'db ColumnFamily,
321
322 mempool_cf: &'db ColumnFamily,
324 spending_id_cf: &'db ColumnFamily,
325 fees_cf: &'db ColumnFamily,
326
327 metadata_cf: &'db ColumnFamily,
328}
329
330impl<DB: DBAccess> Ledger for DBTransaction<'_, DB> {
331 fn store_block(
332 &mut self,
333 header: &Header,
334 txs: &[SpentTransaction],
335 faults: &[Fault],
336 label: Label,
337 ) -> Result<usize> {
338 {
342 let cf = self.ledger_cf;
343
344 let mut buf = vec![];
345 LightBlock {
346 header: header.clone(),
347 transactions_ids: txs.iter().map(|t| t.inner.id()).collect(),
348 faults_ids: faults.iter().map(|f| f.id()).collect(),
349 }
350 .write(&mut buf)?;
351
352 self.put_cf(cf, header.hash, buf)?;
353 }
354
355 self.op_write(MD_HASH_KEY, header.hash)?;
357 self.op_write(MD_STATE_ROOT_KEY, header.state_hash)?;
358
359 {
361 let cf = self.ledger_txs_cf;
362
363 let mut stored_blobs = Vec::with_capacity(6);
364
365 for tx in txs {
367 let mut d = vec![];
368
369 if tx.inner.inner.blob().is_some() {
370 let mut strip_tx = tx.clone();
371 if let Some(blobs) = strip_tx.inner.inner.strip_blobs() {
372 for (hash, sidecar) in blobs.into_iter() {
373 let sidecar_bytes = sidecar.to_var_bytes();
374 self.store_blob_data(&hash, sidecar_bytes)?;
375 stored_blobs.push(hash);
376 }
377 }
378 strip_tx.write(&mut d)?;
379 } else {
380 tx.write(&mut d)?;
381 }
382 self.put_cf(cf, tx.inner.id(), d)?;
383 }
384
385 if !stored_blobs.is_empty() {
386 self.store_blobs_height(header.height, &stored_blobs)?;
388 }
389 }
390
391 {
393 let cf = self.ledger_faults_cf;
394
395 for f in faults {
397 let mut d = vec![];
398 f.write(&mut d)?;
399 self.put_cf(cf, f.id(), d)?;
400 }
401 }
402 self.store_block_label(header.height, &header.hash, label)?;
403
404 Ok(self.get_size())
405 }
406
407 fn faults_by_block(&self, start_height: u64) -> Result<Vec<Fault>> {
408 let mut faults = vec![];
409 let mut hash = self
410 .op_read(MD_HASH_KEY)?
411 .ok_or(anyhow::anyhow!("Cannot read tip"))?;
412
413 loop {
414 let block = self.light_block(&hash)?.ok_or(anyhow::anyhow!(
415 "Cannot read block {}",
416 hex::encode(&hash)
417 ))?;
418
419 let block_height = block.header.height;
420
421 if block_height >= start_height {
422 hash = block.header.prev_block_hash.to_vec();
423 faults.extend(self.faults(&block.faults_ids)?);
424 } else {
425 break;
426 }
427
428 if block_height == 0 {
429 break;
430 }
431 }
432 Ok(faults)
433 }
434
435 fn store_block_label(
436 &mut self,
437 height: u64,
438 hash: &[u8; 32],
439 label: Label,
440 ) -> Result<()> {
441 let mut buf = vec![];
443 buf.write_all(hash)?;
444 label.write(&mut buf)?;
445
446 self.put_cf(self.ledger_height_cf, height.to_le_bytes(), buf)?;
447 Ok(())
448 }
449
450 fn delete_block(&mut self, b: &Block) -> Result<()> {
451 self.inner.delete_cf(
452 self.ledger_height_cf,
453 b.header().height.to_le_bytes(),
454 )?;
455
456 for tx in b.txs() {
457 self.inner.delete_cf(self.ledger_txs_cf, tx.id())?;
458 }
459 for f in b.faults() {
460 self.inner.delete_cf(self.ledger_faults_cf, f.id())?;
461 }
462
463 self.delete_blobs_by_height(b.header().height)?;
464 self.inner.delete_cf(self.ledger_cf, b.header().hash)?;
465
466 Ok(())
467 }
468
469 fn block_exists(&self, hash: &[u8]) -> Result<bool> {
470 Ok(self.inner.get_cf(self.ledger_cf, hash)?.is_some())
471 }
472
473 fn faults(&self, faults_ids: &[[u8; 32]]) -> Result<Vec<Fault>> {
474 if faults_ids.is_empty() {
475 return Ok(vec![]);
476 }
477 let ids = faults_ids
478 .iter()
479 .map(|id| (self.ledger_faults_cf, id))
480 .collect::<Vec<_>>();
481
482 let faults_buffer = self.inner.multi_get_cf(ids);
484
485 let mut faults = vec![];
486 for buf in faults_buffer {
487 let buf = buf?.unwrap();
488 let fault = Fault::read(&mut &buf[..])?;
489 faults.push(fault);
490 }
491
492 Ok(faults)
493 }
494
495 fn latest_block(&self) -> Result<LightBlock> {
496 let tip_hash = self
497 .op_read(MD_HASH_KEY)?
498 .ok_or(anyhow::anyhow!("Cannot find tip stored in metadata"))?;
499 self.light_block(&tip_hash)?
500 .ok_or(anyhow::anyhow!("Cannot find tip block"))
501 }
502
503 fn blob_data_by_hash(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
504 Ok(self.inner.get_cf(self.ledger_blobs_cf, hash)?)
505 }
506
507 fn store_blob_data(&self, hash: &[u8; 32], data: Vec<u8>) -> Result<()> {
508 self.inner.put_cf(self.ledger_blobs_cf, hash, data)?;
509 Ok(())
510 }
511 fn store_blobs_height(
512 &self,
513 block_height: u64,
514 blob_hashes: &[[u8; 32]],
515 ) -> Result<()> {
516 if blob_hashes.is_empty() {
517 return Ok(());
518 }
519 let blob_hashes_bytes: Vec<_> =
520 blob_hashes.iter().flat_map(|hash| hash.to_vec()).collect();
521 self.inner.put_cf(
522 self.ledger_blobs_height_cf,
523 block_height.to_be_bytes(),
524 blob_hashes_bytes,
525 )?;
526 Ok(())
527 }
528
529 fn delete_blobs_by_height(&self, block_height: u64) -> Result<()> {
530 let blobs_to_delete = self.blobs_by_height(block_height)?;
531 if let Some(blob_hashes) = blobs_to_delete {
532 for hash in blob_hashes {
533 self.inner.delete_cf(self.ledger_blobs_cf, hash)?;
536 }
537 self.inner.delete_cf(
538 self.ledger_blobs_height_cf,
539 block_height.to_be_bytes(),
540 )?;
541 }
542
543 Ok(())
544 }
545
546 fn blobs_by_height(
547 &self,
548 block_height: u64,
549 ) -> Result<Option<Vec<[u8; 32]>>> {
550 let blob_hashes_bytes = self
551 .inner
552 .get_cf(self.ledger_blobs_height_cf, block_height.to_be_bytes())?;
553
554 if let Some(blob_hashes_bytes) = blob_hashes_bytes {
555 let mut blob_hashes = vec![];
556 for chunk in blob_hashes_bytes.chunks(32) {
557 let mut hash = [0u8; 32];
558 hash.copy_from_slice(chunk);
559 blob_hashes.push(hash);
560 }
561 Ok(Some(blob_hashes))
562 } else {
563 Ok(None)
564 }
565 }
566
567 fn block(&self, hash: &[u8]) -> Result<Option<Block>> {
568 match self.inner.get_cf(self.ledger_cf, hash)? {
569 Some(blob) => {
570 let record = LightBlock::read(&mut &blob[..])?;
571
572 let txs_buffers = self.inner.multi_get_cf(
574 record
575 .transactions_ids
576 .iter()
577 .map(|id| (self.ledger_txs_cf, id))
578 .collect::<Vec<(&ColumnFamily, &[u8; 32])>>(),
579 );
580
581 let mut txs = vec![];
582 for buf in txs_buffers {
583 let buf = buf?.unwrap();
584 let mut tx = SpentTransaction::read(&mut &buf[..])?;
585 if let Some(blobs) = tx.inner.inner.blob_mut() {
586 for blob in blobs {
587 let sidecar = self
589 .blob_data_by_hash(&blob.hash)?
590 .map(|bytes| {
591 BlobSidecar::from_buf(&mut &bytes[..])
592 })
593 .transpose()
594 .map_err(|e| {
595 anyhow::anyhow!(
596 "Failed to parse blob sidecar: {e:?}"
597 )
598 })?;
599 blob.data = sidecar;
600 }
601 }
602 txs.push(tx.inner);
603 }
604
605 let faults_buffer = self.inner.multi_get_cf(
607 record
608 .faults_ids
609 .iter()
610 .map(|id| (self.ledger_faults_cf, id))
611 .collect::<Vec<(&ColumnFamily, &[u8; 32])>>(),
612 );
613 let mut faults = vec![];
614 for buf in faults_buffer {
615 let buf = buf?.unwrap();
616 let fault = Fault::read(&mut &buf[..])?;
617 faults.push(fault);
618 }
619
620 Ok(Some(
621 Block::new(record.header, txs, faults)
622 .expect("block should be valid"),
623 ))
624 }
625 None => Ok(None),
626 }
627 }
628
629 fn light_block(&self, hash: &[u8]) -> Result<Option<LightBlock>> {
630 match self.inner.get_cf(self.ledger_cf, hash)? {
631 Some(blob) => {
632 let record = LightBlock::read(&mut &blob[..])?;
633 Ok(Some(record))
634 }
635 None => Ok(None),
636 }
637 }
638
639 fn block_header(&self, hash: &[u8]) -> Result<Option<Header>> {
640 match self.inner.get_cf(self.ledger_cf, hash)? {
641 Some(blob) => {
642 let record = Header::read(&mut &blob[..])?;
643 Ok(Some(record))
644 }
645 None => Ok(None),
646 }
647 }
648
649 fn block_hash_by_height(&self, height: u64) -> Result<Option<[u8; 32]>> {
650 Ok(self
651 .inner
652 .get_cf(self.ledger_height_cf, height.to_le_bytes())?
653 .map(|h| {
654 const LEN: usize = 32;
655 let mut hash = [0u8; LEN];
656 hash.copy_from_slice(&h.as_slice()[0..LEN]);
657 hash
658 }))
659 }
660
661 fn ledger_tx(&self, tx_id: &[u8]) -> Result<Option<SpentTransaction>> {
662 let tx = self
663 .inner
664 .get_cf(self.ledger_txs_cf, tx_id)?
665 .map(|blob| SpentTransaction::read(&mut &blob[..]))
666 .transpose()?;
667
668 Ok(tx)
669 }
670
671 fn ledger_txs(
678 &self,
679 tx_ids: Vec<&[u8; 32]>,
680 ) -> Result<Vec<SpentTransaction>> {
681 let cf = self.ledger_txs_cf;
682
683 let ids = tx_ids.into_iter().map(|id| (cf, id)).collect::<Vec<_>>();
684
685 let multi_get_results = self.inner.multi_get_cf(ids);
686
687 let mut spent_transactions =
688 Vec::with_capacity(multi_get_results.len());
689 for result in multi_get_results.into_iter() {
690 let opt_blob = result.map_err(|e| {
691 std::io::Error::new(std::io::ErrorKind::Other, e)
692 })?;
693
694 let Some(blob) = opt_blob else {
695 return Err(anyhow::anyhow!(
696 "At least one Transaction ID was not found"
697 ));
698 };
699
700 let stx = SpentTransaction::read(&mut &blob[..])?;
701
702 spent_transactions.push(stx);
703 }
704
705 Ok(spent_transactions)
706 }
707
708 fn ledger_tx_exists(&self, tx_id: &[u8]) -> Result<bool> {
714 Ok(self.inner.get_cf(self.ledger_txs_cf, tx_id)?.is_some())
715 }
716
717 fn block_by_height(&self, height: u64) -> Result<Option<Block>> {
718 let hash = self.block_hash_by_height(height)?;
719 let block = match hash {
720 Some(hash) => self.block(&hash)?,
721 None => None,
722 };
723 Ok(block)
724 }
725
726 fn block_label_by_height(
727 &self,
728 height: u64,
729 ) -> Result<Option<([u8; 32], Label)>> {
730 const HASH_LEN: usize = 32;
731 Ok(self
732 .inner
733 .get_cf(self.ledger_height_cf, height.to_le_bytes())?
734 .map(|h| {
735 let mut hash = [0u8; HASH_LEN];
736 hash.copy_from_slice(&h.as_slice()[0..HASH_LEN]);
737
738 let label_buff = h[HASH_LEN..].to_vec();
739 Label::read(&mut &label_buff[..]).map(|label| (hash, label))
740 })
741 .transpose()?)
742 }
743}
744
745impl<DB: DBAccess> ConsensusStorage for DBTransaction<'_, DB> {
747 fn store_candidate(&mut self, b: Block) -> Result<()> {
758 let mut serialized = vec![];
759 b.write(&mut serialized)?;
760
761 self.inner
762 .put_cf(self.candidates_cf, b.header().hash, serialized)?;
763
764 let key = serialize_key(b.header().height, b.header().hash)?;
765 self.inner
766 .put_cf(self.candidates_height_cf, key, b.header().hash)?;
767
768 Ok(())
769 }
770
771 fn candidate(&self, hash: &[u8]) -> Result<Option<Block>> {
782 if let Some(blob) = self.inner.get_cf(self.candidates_cf, hash)? {
783 let b = Block::read(&mut &blob[..])?;
784 return Ok(Some(b));
785 }
786
787 Ok(None)
789 }
790
791 fn candidate_by_iteration(
792 &self,
793 consensus_header: &ConsensusHeader,
794 ) -> Result<Option<Block>> {
795 let iter = self
796 .inner
797 .iterator_cf(self.candidates_cf, IteratorMode::Start);
798
799 for (_, blob) in iter.map(Result::unwrap) {
800 let b = Block::read(&mut &blob[..])?;
801
802 let header = b.header();
803 if header.prev_block_hash == consensus_header.prev_block_hash
804 && header.iteration == consensus_header.iteration
805 {
806 return Ok(Some(b));
807 }
808 }
809
810 Ok(None)
811 }
812
813 fn delete_candidate<F>(&mut self, closure: F) -> Result<()>
824 where
825 F: FnOnce(u64) -> bool + std::marker::Copy,
826 {
827 let iter = self
828 .inner
829 .iterator_cf(self.candidates_height_cf, IteratorMode::Start);
830
831 for (key, hash) in iter.map(Result::unwrap) {
832 let (height, _) = deserialize_key(&mut &key.to_vec()[..])?;
833 if closure(height) {
834 self.inner.delete_cf(self.candidates_cf, hash)?;
835 self.inner.delete_cf(self.candidates_height_cf, key)?;
836 }
837 }
838
839 Ok(())
840 }
841
842 fn count_candidates(&self) -> usize {
843 let iter = self
844 .inner
845 .iterator_cf(self.candidates_height_cf, IteratorMode::Start);
846
847 iter.count()
848 }
849
850 fn clear_candidates(&mut self) -> Result<()> {
857 self.delete_candidate(|_| true)
858 }
859
860 fn store_validation_result(
871 &mut self,
872 consensus_header: &ConsensusHeader,
873 validation_result: &payload::ValidationResult,
874 ) -> Result<()> {
875 let mut serialized = vec![];
876 validation_result.write(&mut serialized)?;
877
878 let key = serialize_iter_key(consensus_header)?;
879 self.inner
880 .put_cf(self.validation_results_cf, key, serialized)?;
881
882 Ok(())
883 }
884
885 fn validation_result(
897 &self,
898 consensus_header: &ConsensusHeader,
899 ) -> Result<Option<payload::ValidationResult>> {
900 let key = serialize_iter_key(consensus_header)?;
901 if let Some(blob) =
902 self.inner.get_cf(self.validation_results_cf, key)?
903 {
904 let validation_result =
905 payload::ValidationResult::read(&mut &blob[..])?;
906 return Ok(Some(validation_result));
907 }
908
909 Ok(None)
911 }
912
913 fn delete_validation_results<F>(&mut self, closure: F) -> Result<()>
925 where
926 F: FnOnce([u8; 32]) -> bool + std::marker::Copy,
927 {
928 let iter = self
929 .inner
930 .iterator_cf(self.validation_results_cf, IteratorMode::Start);
931
932 for (key, _) in iter.map(Result::unwrap) {
933 let (prev_block_hash, _) =
934 deserialize_iter_key(&mut &key.to_vec()[..])?;
935 if closure(prev_block_hash) {
936 self.inner.delete_cf(self.validation_results_cf, key)?;
937 }
938 }
939
940 Ok(())
941 }
942
943 fn count_validation_results(&self) -> usize {
944 let iter = self
945 .inner
946 .iterator_cf(self.validation_results_cf, IteratorMode::Start);
947
948 iter.count()
949 }
950
951 fn clear_validation_results(&mut self) -> Result<()> {
958 self.delete_validation_results(|_| true)
959 }
960}
961
962impl<DB: DBAccess> Persist for DBTransaction<'_, DB> {
963 fn clear_database(&mut self) -> Result<()> {
965 let iter = self.inner.iterator_cf(self.ledger_cf, IteratorMode::Start);
967
968 for (key, _) in iter.map(Result::unwrap) {
970 self.inner.delete_cf(self.ledger_cf, key)?;
971 }
972
973 self.clear_candidates()?;
974 self.clear_validation_results()?;
975 Ok(())
976 }
977
978 fn commit(self) -> Result<()> {
979 if let Err(e) = self.inner.commit() {
980 return Err(anyhow::Error::new(e).context("failed to commit"));
981 }
982
983 Ok(())
984 }
985
986 fn rollback(self) -> Result<()> {
987 if let Err(e) = self.inner.rollback() {
988 return Err(anyhow::Error::new(e).context("failed to rollback"));
989 }
990
991 Ok(())
992 }
993}
994
995impl<DB: DBAccess> Mempool for DBTransaction<'_, DB> {
996 fn store_mempool_tx(
997 &mut self,
998 tx: &Transaction,
999 timestamp: u64,
1000 ) -> Result<()> {
1001 let mut tx_data = vec![];
1003 tx.write(&mut tx_data)?;
1004
1005 let hash = tx.id();
1006 self.put_cf(self.mempool_cf, hash, tx_data)?;
1007
1008 for n in tx.to_spend_ids() {
1011 let key = n.to_bytes();
1012 self.put_cf(self.spending_id_cf, key, hash)?;
1013 }
1014
1015 let timestamp = timestamp.to_be_bytes();
1016
1017 self.put_cf(
1021 self.fees_cf,
1022 serialize_key(tx.gas_price(), hash)?,
1023 timestamp,
1024 )?;
1025
1026 Ok(())
1027 }
1028
1029 fn mempool_tx(&self, hash: [u8; 32]) -> Result<Option<Transaction>> {
1030 let data = self.inner.get_cf(self.mempool_cf, hash)?;
1031
1032 match data {
1033 None => Ok(None),
1035 Some(blob) => Ok(Some(Transaction::read(&mut &blob.to_vec()[..])?)),
1036 }
1037 }
1038
1039 fn mempool_tx_exists(&self, h: [u8; 32]) -> Result<bool> {
1040 Ok(self.inner.get_cf(self.mempool_cf, h)?.is_some())
1041 }
1042
1043 fn delete_mempool_tx(
1044 &mut self,
1045 h: [u8; 32],
1046 cascade: bool,
1047 ) -> Result<Vec<[u8; 32]>> {
1048 let mut deleted = vec![];
1049 let tx = self.mempool_tx(h)?;
1050 if let Some(tx) = tx {
1051 let hash = tx.id();
1052
1053 self.inner.delete_cf(self.mempool_cf, hash)?;
1054
1055 for n in tx.to_spend_ids() {
1058 let key = n.to_bytes();
1059 self.inner.delete_cf(self.spending_id_cf, key)?;
1060 }
1061
1062 self.inner.delete_cf(
1064 self.fees_cf,
1065 serialize_key(tx.gas_price(), hash)?,
1066 )?;
1067
1068 deleted.push(h);
1069
1070 if cascade {
1071 let mut dependants = vec![];
1072 let mut next_spending_id = tx.next_spending_id();
1075 while let Some(spending_id) = next_spending_id {
1076 next_spending_id = spending_id.next();
1077 let next_txs =
1078 self.mempool_txs_by_spendable_ids(&[spending_id]);
1079 if next_txs.is_empty() {
1080 break;
1081 }
1082 dependants.extend(next_txs);
1083 }
1084
1085 for tx_id in dependants {
1087 let cascade_deleted =
1088 self.delete_mempool_tx(tx_id, false)?;
1089 deleted.extend(cascade_deleted);
1090 }
1091 }
1092 }
1093
1094 Ok(deleted)
1095 }
1096
1097 fn mempool_txs_by_spendable_ids(
1098 &self,
1099 n: &[SpendingId],
1100 ) -> HashSet<[u8; 32]> {
1101 n.iter()
1102 .filter_map(|n| {
1103 match self.inner.get_cf(self.spending_id_cf, n.to_bytes()) {
1104 Ok(Some(tx_id)) => tx_id.try_into().ok(),
1105 _ => None,
1106 }
1107 })
1108 .collect()
1109 }
1110
1111 fn mempool_txs_sorted_by_fee(
1112 &self,
1113 ) -> Box<dyn Iterator<Item = Transaction> + '_> {
1114 let iter = MemPoolIterator::new(&self.inner, self.fees_cf, self);
1115
1116 Box::new(iter)
1117 }
1118
1119 fn mempool_txs_ids_sorted_by_fee(
1120 &self,
1121 ) -> Box<dyn Iterator<Item = (u64, [u8; 32])> + '_> {
1122 let iter = MemPoolFeeIterator::new(&self.inner, self.fees_cf, true);
1123
1124 Box::new(iter)
1125 }
1126
1127 fn mempool_txs_ids_sorted_by_low_fee(
1128 &self,
1129 ) -> Box<dyn Iterator<Item = (u64, [u8; 32])> + '_> {
1130 let iter = MemPoolFeeIterator::new(&self.inner, self.fees_cf, false);
1131
1132 Box::new(iter)
1133 }
1134
1135 fn mempool_expired_txs(&self, timestamp: u64) -> Result<Vec<[u8; 32]>> {
1137 let mut iter = self.inner.raw_iterator_cf(self.fees_cf);
1138 iter.seek_to_first();
1139 let mut txs_list = vec![];
1140
1141 while iter.valid() {
1142 if let Some(key) = iter.key() {
1143 let (_, tx_id) = deserialize_key(&mut &key.to_vec()[..])?;
1144
1145 let tx_timestamp = u64::from_be_bytes(
1146 iter.value()
1147 .ok_or_else(|| {
1148 io::Error::new(
1149 io::ErrorKind::InvalidData,
1150 "no value",
1151 )
1152 })?
1153 .try_into()
1154 .map_err(|_| {
1155 io::Error::new(
1156 io::ErrorKind::InvalidData,
1157 "invalid data",
1158 )
1159 })?,
1160 );
1161
1162 if tx_timestamp <= timestamp {
1163 txs_list.push(tx_id);
1164 }
1165 }
1166
1167 iter.next();
1168 }
1169
1170 Ok(txs_list)
1171 }
1172
1173 fn mempool_txs_ids(&self) -> Result<Vec<[u8; 32]>> {
1174 let mut iter = self.inner.raw_iterator_cf(self.fees_cf);
1175 iter.seek_to_last();
1176
1177 let mut txs_list = vec![];
1178
1179 while iter.valid() {
1181 if let Some(key) = iter.key() {
1182 let (_, tx_id) = deserialize_key(&mut &key.to_vec()[..])?;
1183
1184 txs_list.push(tx_id);
1185 }
1186
1187 iter.prev();
1188 }
1189
1190 Ok(txs_list)
1191 }
1192
1193 fn mempool_txs_count(&self) -> usize {
1194 self.inner
1195 .iterator_cf(self.mempool_cf, IteratorMode::Start)
1196 .count()
1197 }
1198}
1199
1200pub struct MemPoolIterator<'db, DB: DBAccess, M: Mempool> {
1201 iter: MemPoolFeeIterator<'db, DB>,
1202 mempool: &'db M,
1203}
1204
1205impl<'db, DB: DBAccess, M: Mempool> MemPoolIterator<'db, DB, M> {
1206 fn new(
1207 db: &'db rocksdb::Transaction<DB>,
1208 fees_cf: &ColumnFamily,
1209 mempool: &'db M,
1210 ) -> Self {
1211 let iter = MemPoolFeeIterator::new(db, fees_cf, true);
1212 MemPoolIterator { iter, mempool }
1213 }
1214}
1215
1216impl<DB: DBAccess, M: Mempool> Iterator for MemPoolIterator<'_, DB, M> {
1217 type Item = Transaction;
1218 fn next(&mut self) -> Option<Self::Item> {
1219 self.iter.next().and_then(|(_, tx_id)| {
1220 self.mempool.mempool_tx(tx_id).ok().flatten()
1221 })
1222 }
1223}
1224
1225pub struct MemPoolFeeIterator<'db, DB: DBAccess> {
1226 iter: DBRawIteratorWithThreadMode<'db, rocksdb::Transaction<'db, DB>>,
1227 fee_desc: bool,
1228}
1229
1230impl<'db, DB: DBAccess> MemPoolFeeIterator<'db, DB> {
1231 fn new(
1232 db: &'db rocksdb::Transaction<DB>,
1233 fees_cf: &ColumnFamily,
1234 fee_desc: bool,
1235 ) -> Self {
1236 let mut iter = db.raw_iterator_cf(fees_cf);
1237 if fee_desc {
1238 iter.seek_to_last();
1239 };
1240 MemPoolFeeIterator { iter, fee_desc }
1241 }
1242}
1243
1244impl<DB: DBAccess> Iterator for MemPoolFeeIterator<'_, DB> {
1245 type Item = (u64, [u8; 32]);
1246 fn next(&mut self) -> Option<Self::Item> {
1247 match self.iter.valid() {
1248 true => {
1249 if let Some(key) = self.iter.key() {
1250 let (gas_price, hash) =
1251 deserialize_key(&mut &key.to_vec()[..]).ok()?;
1252 if self.fee_desc {
1253 self.iter.prev();
1254 } else {
1255 self.iter.next();
1256 }
1257 Some((gas_price, hash))
1258 } else {
1259 None
1260 }
1261 }
1262 false => None,
1263 }
1264 }
1265}
1266
1267impl<DB: DBAccess> std::fmt::Debug for DBTransaction<'_, DB> {
1268 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1269 let iter = self.inner.iterator_cf(self.ledger_cf, IteratorMode::Start);
1271
1272 iter.map(Result::unwrap).try_for_each(|(hash, _)| {
1273 if let Ok(Some(blob)) = self.inner.get_cf(self.ledger_cf, &hash[..])
1274 {
1275 let b = Block::read(&mut &blob[..]).unwrap_or_default();
1276 writeln!(f, "ledger_block [{}]: {:#?}", b.header().height, b)
1277 } else {
1278 Ok(())
1279 }
1280 })?;
1281
1282 let iter = self
1284 .inner
1285 .iterator_cf(self.candidates_cf, IteratorMode::Start);
1286
1287 let results: std::fmt::Result =
1288 iter.map(Result::unwrap).try_for_each(|(hash, _)| {
1289 if let Ok(Some(blob)) =
1290 self.inner.get_cf(self.candidates_cf, &hash[..])
1291 {
1292 let b = Block::read(&mut &blob[..]).unwrap_or_default();
1293 writeln!(
1294 f,
1295 "candidate_block [{}]: {:#?}",
1296 b.header().height,
1297 b
1298 )
1299 } else {
1300 Ok(())
1301 }
1302 });
1303
1304 results
1305 }
1306}
1307
1308impl<DB: DBAccess> Metadata for DBTransaction<'_, DB> {
1309 fn op_write<T: AsRef<[u8]>>(&mut self, key: &[u8], value: T) -> Result<()> {
1310 self.put_cf(self.metadata_cf, key, value)?;
1311 Ok(())
1312 }
1313
1314 fn op_read(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1315 self.inner.get_cf(self.metadata_cf, key).map_err(Into::into)
1316 }
1317}
1318
1319impl<DB: DBAccess> DBTransaction<'_, DB> {
1320 fn put_cf<K: AsRef<[u8]>, V: AsRef<[u8]>>(
1323 &self,
1324 cf: &impl AsColumnFamilyRef,
1325 key: K,
1326 value: V,
1327 ) -> Result<()> {
1328 let kv_size = key.as_ref().len() + value.as_ref().len();
1329 self.inner.put_cf(cf, key, value)?;
1330 *self.cumulative_inner_size.borrow_mut() += kv_size;
1331 Ok(())
1332 }
1333
1334 pub fn get_size(&self) -> usize {
1335 *self.cumulative_inner_size.borrow()
1336 }
1337}
1338
1339fn serialize_key(value: u64, hash: [u8; 32]) -> std::io::Result<Vec<u8>> {
1340 let mut w = vec![];
1341 std::io::Write::write_all(&mut w, &value.to_be_bytes())?;
1342 std::io::Write::write_all(&mut w, &hash)?;
1343 Ok(w)
1344}
1345
1346fn deserialize_key<R: Read>(r: &mut R) -> Result<(u64, [u8; 32])> {
1347 let mut buf = [0u8; 8];
1348 r.read_exact(&mut buf)?;
1349 let value = u64::from_be_bytes(buf);
1350 let mut hash = [0u8; 32];
1351 r.read_exact(&mut hash[..])?;
1352
1353 Ok((value, hash))
1354}
1355
1356fn serialize_iter_key(ch: &ConsensusHeader) -> std::io::Result<Vec<u8>> {
1357 let mut w = vec![];
1358 std::io::Write::write_all(&mut w, &ch.prev_block_hash)?;
1359 std::io::Write::write_all(&mut w, &[ch.iteration])?;
1360 Ok(w)
1361}
1362
1363fn deserialize_iter_key<R: Read>(r: &mut R) -> Result<([u8; 32], u8)> {
1364 let mut prev_block_hash = [0u8; 32];
1365 r.read_exact(&mut prev_block_hash)?;
1366
1367 let mut iter_byte = [0u8; 1];
1368 r.read_exact(&mut iter_byte)?;
1369 let iteration = u8::from_be_bytes(iter_byte);
1370
1371 Ok((prev_block_hash, iteration))
1372}
1373
1374impl node_data::Serializable for LightBlock {
1375 fn write<W: Write>(&self, w: &mut W) -> io::Result<()> {
1376 self.header.write(w)?;
1378
1379 let len = self.transactions_ids.len() as u32;
1381 w.write_all(&len.to_le_bytes())?;
1382
1383 for tx_id in &self.transactions_ids {
1385 w.write_all(tx_id)?;
1386 }
1387
1388 let len = self.faults_ids.len() as u32;
1390 w.write_all(&len.to_le_bytes())?;
1391
1392 for f_id in &self.faults_ids {
1394 w.write_all(f_id)?;
1395 }
1396
1397 Ok(())
1398 }
1399
1400 fn read<R: Read>(r: &mut R) -> io::Result<Self>
1401 where
1402 Self: Sized,
1403 {
1404 let header = Header::read(r)?;
1406
1407 let len = Self::read_u32_le(r)?;
1409
1410 let mut transactions_ids = vec![];
1412 for _ in 0..len {
1413 let mut tx_id = [0u8; 32];
1414 r.read_exact(&mut tx_id[..])?;
1415
1416 transactions_ids.push(tx_id);
1417 }
1418
1419 let len = Self::read_u32_le(r)?;
1421
1422 let mut faults_ids = vec![];
1424 for _ in 0..len {
1425 let mut f_id = [0u8; 32];
1426 r.read_exact(&mut f_id[..])?;
1427
1428 faults_ids.push(f_id);
1429 }
1430
1431 Ok(Self {
1432 header,
1433 transactions_ids,
1434 faults_ids,
1435 })
1436 }
1437}
1438
1439#[cfg(test)]
1440mod tests {
1441 use fake::{Fake, Faker};
1442 use node_data::ledger;
1443
1444 use super::*;
1445
1446 #[test]
1447 fn test_store_block() {
1448 TestWrapper::new("test_store_block").run(|path| {
1449 let db = Backend::create_or_open(path, DatabaseOptions::default());
1450
1451 let b: Block = Faker.fake();
1452 assert!(!b.txs().is_empty());
1453
1454 let hash = b.header().hash;
1455
1456 assert!(db
1457 .update(|txn| {
1458 txn.store_block(
1459 b.header(),
1460 &to_spent_txs(b.txs()),
1461 b.faults(),
1462 Label::Final(3),
1463 )?;
1464 Ok(())
1465 })
1466 .is_ok());
1467
1468 db.view(|txn| {
1469 let db_blk = txn
1471 .block(&hash)
1472 .expect("Block to be fetched")
1473 .expect("Block to exist");
1474 assert_eq!(db_blk.header().hash, b.header().hash);
1475
1476 for pos in 0..b.txs().len() {
1479 assert_eq!(db_blk.txs()[pos].id(), b.txs()[pos].id());
1480 }
1481
1482 for pos in 0..b.faults().len() {
1485 assert_eq!(db_blk.faults()[pos].id(), b.faults()[pos].id());
1486 }
1487 });
1488
1489 assert!(db
1490 .update(|txn| {
1491 txn.clear_database()?;
1492 Ok(())
1493 })
1494 .is_ok());
1495
1496 db.view(|txn| {
1497 assert!(txn
1498 .block(&hash)
1499 .expect("block to be fetched")
1500 .is_none());
1501 });
1502 });
1503 }
1504
1505 #[test]
1506 fn test_read_only() {
1507 TestWrapper::new("test_read_only").run(|path| {
1508 let db = Backend::create_or_open(path, DatabaseOptions::default());
1509 let b: Block = Faker.fake();
1510 db.update_dry_run(true, |txn| {
1511 txn.store_block(
1512 b.header(),
1513 &to_spent_txs(b.txs()),
1514 b.faults(),
1515 Label::Final(3),
1516 )
1517 })
1518 .expect("block to be stored");
1519 db.view(|txn| {
1520 assert!(txn
1521 .block(&b.header().hash)
1522 .expect("block to be fetched")
1523 .is_none());
1524 });
1525 });
1526 }
1527
1528 #[test]
1529 fn test_transaction_isolation() {
1530 TestWrapper::new("test_transaction_isolation").run(|path| {
1531 let db = Backend::create_or_open(path, DatabaseOptions::default());
1532 let mut b: Block = Faker.fake();
1533 let hash = b.header().hash;
1534
1535 db.view(|txn| {
1536 assert!(db
1539 .update(|inner| {
1540 inner
1541 .store_block(
1542 b.header(),
1543 &to_spent_txs(b.txs()),
1544 b.faults(),
1545 Label::Final(3),
1546 )
1547 .unwrap();
1548
1549 assert!(inner.block(&hash)?.is_some());
1551 assert!(txn.block(&hash)?.is_none());
1554 Ok(())
1555 })
1556 .is_ok());
1557
1558 assert!(txn
1561 .block(&hash)
1562 .expect("block to be fetched")
1563 .is_some());
1564 });
1565
1566 db.view(|txn| {
1568 assert_blocks_eq(
1569 &mut txn
1570 .block(&hash)
1571 .expect("block to be fetched")
1572 .unwrap(),
1573 &mut b,
1574 );
1575 });
1576 });
1577 }
1578
1579 fn assert_blocks_eq(a: &Block, b: &Block) {
1580 assert!(a.header().hash != [0u8; 32]);
1581 assert!(a.header().hash.eq(&b.header().hash));
1582 }
1583
1584 #[test]
1585 fn test_add_mempool_tx() {
1586 TestWrapper::new("test_add_tx").run(|path| {
1587 let db = Backend::create_or_open(path, DatabaseOptions::default());
1588 let t: Transaction = Faker.fake();
1589
1590 assert!(db.update(|txn| { txn.store_mempool_tx(&t, 0) }).is_ok());
1591
1592 db.view(|vq| {
1593 assert!(vq.mempool_tx_exists(t.id()).unwrap());
1594
1595 let fetched_tx = vq
1596 .mempool_tx(t.id())
1597 .expect("valid contract call")
1598 .unwrap();
1599
1600 assert_eq!(
1601 fetched_tx.id(),
1602 t.id(),
1603 "fetched transaction should be the same"
1604 );
1605 });
1606
1607 db.update(|txn| {
1609 let deleted =
1610 txn.delete_mempool_tx(t.id(), false).expect("valid tx");
1611 assert!(deleted.len() == 1);
1612 Ok(())
1613 })
1614 .unwrap();
1615 });
1616 }
1617
1618 #[test]
1619 fn test_mempool_txs_sorted_by_fee() {
1620 TestWrapper::new("test_mempool_txs_sorted_by_fee").run(|path| {
1621 let db = Backend::create_or_open(path, DatabaseOptions::default());
1622 let _rng = rand::thread_rng();
1624 db.update(|txn| {
1625 for _i in 0..10u32 {
1626 let t: Transaction = Faker.fake();
1627 txn.store_mempool_tx(&t, 0)?;
1628 }
1629 Ok(())
1630 })
1631 .unwrap();
1632
1633 db.view(|txn| {
1634 let txs = txn.mempool_txs_sorted_by_fee();
1635
1636 let mut last_fee = u64::MAX;
1637 for t in txs {
1638 let fee = t.gas_price();
1639 assert!(
1640 fee <= last_fee,
1641 "tx fees are not in decreasing order"
1642 );
1643 last_fee = fee
1644 }
1645 assert_ne!(last_fee, u64::MAX, "No tx has been processed")
1646 });
1647 });
1648 }
1649
1650 #[test]
1651 fn test_txs_count() {
1652 TestWrapper::new("test_txs_count").run(|path| {
1653 let db = Backend::create_or_open(path, DatabaseOptions::default());
1654
1655 const N: usize = 100;
1656 const D: usize = 50;
1657
1658 let txs: Vec<_> = (0..N)
1659 .map(|i| ledger::faker::gen_dummy_tx(i as u64))
1660 .collect();
1661
1662 db.update(|db| {
1663 assert_eq!(db.mempool_txs_count(), 0);
1664 txs.iter().for_each(|t| {
1665 db.store_mempool_tx(&t, 0).expect("tx should be added")
1666 });
1667 Ok(())
1668 })
1669 .unwrap();
1670
1671 db.update(|db| {
1672 assert_eq!(db.mempool_txs_count(), N);
1674
1675 txs.iter().take(D).for_each(|tx| {
1676 let deleted = db
1677 .delete_mempool_tx(tx.id(), false)
1678 .expect("transaction should be deleted");
1679 assert!(deleted.len() == 1);
1680 });
1681
1682 Ok(())
1683 })
1684 .unwrap();
1685
1686 db.update(|db| {
1688 assert_eq!(db.mempool_txs_count(), N - D);
1689 Ok(())
1690 })
1691 .unwrap();
1692 });
1693 }
1694
1695 #[test]
1696 fn test_max_gas_limit() {
1697 TestWrapper::new("test_block_size_limit").run(|path| {
1698 let db = Backend::create_or_open(path, DatabaseOptions::default());
1699
1700 db.update(|txn| {
1701 for i in 0..10u32 {
1702 let t = ledger::faker::gen_dummy_tx(i as u64);
1703 txn.store_mempool_tx(&t, 0)?;
1704 }
1705 Ok(())
1706 })
1707 .unwrap();
1708
1709 let total_gas_price: u64 = 9 + 8 + 7 + 6 + 5 + 4 + 3 + 2 + 1;
1710 db.view(|txn| {
1711 let txs = txn
1712 .mempool_txs_sorted_by_fee()
1713 .map(|t| t.gas_price())
1714 .sum::<u64>();
1715
1716 assert_eq!(txs, total_gas_price);
1717 });
1718 });
1719 }
1720
1721 #[test]
1722 fn test_get_expired_txs() {
1723 TestWrapper::new("test_get_expired_txs").run(|path| {
1724 let db = Backend::create_or_open(path, DatabaseOptions::default());
1725
1726 let mut expiry_list = HashSet::new();
1727 let _ = db.update(|txn| {
1728 (1..101).for_each(|i| {
1729 let t = ledger::faker::gen_dummy_tx(i as u64);
1730 txn.store_mempool_tx(&t, i).expect("tx should be added");
1731 expiry_list.insert(t.id());
1732 });
1733
1734 (1000..1100).for_each(|i| {
1735 let t = ledger::faker::gen_dummy_tx(i as u64);
1736 txn.store_mempool_tx(&t, i).expect("tx should be added");
1737 });
1738
1739 Ok(())
1740 });
1741
1742 db.view(|vq| {
1743 let expired: HashSet<_> = vq
1744 .mempool_expired_txs(100)
1745 .unwrap()
1746 .into_iter()
1747 .map(|id| id)
1748 .collect();
1749
1750 assert_eq!(expiry_list, expired);
1751 });
1752 });
1753 }
1754
1755 fn to_spent_txs(txs: &Vec<Transaction>) -> Vec<SpentTransaction> {
1756 txs.iter()
1757 .map(|t| SpentTransaction {
1758 inner: t.clone(),
1759 block_height: 0,
1760 gas_spent: 0,
1761 err: None,
1762 })
1763 .collect()
1764 }
1765
1766 #[test]
1767 fn test_get_ledger_tx_by_hash() {
1768 TestWrapper::new("test_get_ledger_tx_by_hash").run(|path| {
1769 let db = Backend::create_or_open(path, DatabaseOptions::default());
1770 let b: Block = Faker.fake();
1771 assert!(!b.txs().is_empty());
1772
1773 assert!(db
1775 .update(|txn| {
1776 txn.store_block(
1777 b.header(),
1778 &to_spent_txs(b.txs()),
1779 b.faults(),
1780 Label::Final(3),
1781 )?;
1782 Ok(())
1783 })
1784 .is_ok());
1785
1786 db.view(|v| {
1789 for t in b.txs().iter() {
1790 assert!(v
1791 .ledger_tx(&t.id())
1792 .expect("should not return error")
1793 .expect("should find a transaction")
1794 .inner
1795 .eq(t));
1796 }
1797 });
1798 });
1799 }
1800
1801 #[test]
1802 fn test_fetch_block_hash_by_height() {
1803 TestWrapper::new("test_fetch_block_hash_by_height").run(|path| {
1804 let db = Backend::create_or_open(path, DatabaseOptions::default());
1805 let b: Block = Faker.fake();
1806
1807 assert!(db
1809 .update(|txn| {
1810 txn.store_block(
1811 b.header(),
1812 &to_spent_txs(b.txs()),
1813 b.faults(),
1814 Label::Attested(3),
1815 )?;
1816 Ok(())
1817 })
1818 .is_ok());
1819
1820 db.view(|v| {
1822 assert!(v
1823 .block_hash_by_height(b.header().height)
1824 .expect("should not return error")
1825 .expect("should find a block")
1826 .eq(&b.header().hash));
1827 });
1828 });
1829 }
1830
1831 #[test]
1832 fn test_fetch_block_label_by_height() {
1833 TestWrapper::new("test_fetch_block_hash_by_height").run(|path| {
1834 let db = Backend::create_or_open(path, DatabaseOptions::default());
1835 let b: Block = Faker.fake();
1836
1837 assert!(db
1839 .update(|txn| {
1840 txn.store_block(
1841 b.header(),
1842 &to_spent_txs(b.txs()),
1843 b.faults(),
1844 Label::Attested(3),
1845 )?;
1846 Ok(())
1847 })
1848 .is_ok());
1849
1850 db.view(|v| {
1852 assert!(v
1853 .block_label_by_height(b.header().height)
1854 .expect("should not return error")
1855 .expect("should find a block")
1856 .1
1857 .eq(&Label::Attested(3)));
1858 });
1859 });
1860 }
1861
1862 #[test]
1863 fn test_delete_block() {
1865 let t = TestWrapper::new("test_fetch_block_hash_by_height");
1866 t.run(|path| {
1867 let db = Backend::create_or_open(path, DatabaseOptions::default());
1868 let b: ledger::Block = Faker.fake();
1869
1870 assert!(db
1871 .update(|ut| {
1872 ut.store_block(
1873 b.header(),
1874 &to_spent_txs(b.txs()),
1875 b.faults(),
1876 Label::Final(3),
1877 )?;
1878 Ok(())
1879 })
1880 .is_ok());
1881
1882 assert!(db
1883 .update(|ut| {
1884 ut.delete_block(&b)?;
1885 Ok(())
1886 })
1887 .is_ok());
1888 });
1889
1890 let path = t.get_path();
1891 let opts = Options::default();
1892
1893 let vec = rocksdb::DB::list_cf(&opts, &path).unwrap();
1894 assert!(!vec.is_empty());
1895
1896 let db = rocksdb::DB::open_cf(&opts, &path, vec.clone()).unwrap();
1898 vec.into_iter()
1899 .map(|cf_name| {
1900 if cf_name == CF_METADATA {
1901 return;
1902 }
1903
1904 let cf = db.cf_handle(&cf_name).unwrap();
1905 assert_eq!(
1906 db.iterator_cf(cf, IteratorMode::Start)
1907 .map(Result::unwrap)
1908 .count(),
1909 0
1910 );
1911 })
1912 .for_each(drop);
1913 }
1914
1915 struct TestWrapper(tempfile::TempDir);
1916
1917 impl TestWrapper {
1918 fn new(path: &'static str) -> Self {
1919 Self(
1920 tempfile::TempDir::with_prefix(path)
1921 .expect("Temp directory to be created"),
1922 )
1923 }
1924
1925 pub fn run<F>(&self, test_func: F)
1926 where
1927 F: FnOnce(&Path),
1928 {
1929 test_func(self.0.path());
1930 }
1931
1932 pub fn get_path(&self) -> std::path::PathBuf {
1933 self.0.path().to_owned().join(DB_FOLDER_NAME)
1934 }
1935 }
1936}