1use std::cell::RefCell;
8use std::collections::HashSet;
9use std::io::{Read, Write};
10use std::path::Path;
11use std::sync::Arc;
12use std::{io, vec};
13
14use anyhow::Result;
15use dusk_core::transfer::data::BlobSidecar;
16use node_data::ledger::{
17 Block, Fault, Header, Label, SpendingId, SpentTransaction, Transaction,
18};
19use node_data::message::{payload, ConsensusHeader};
20use node_data::Serializable;
21use rocksdb::{
22 AsColumnFamilyRef, BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor,
23 DBAccess, DBRawIteratorWithThreadMode, IteratorMode, LogLevel,
24 OptimisticTransactionDB, OptimisticTransactionOptions, Options,
25 WriteOptions,
26};
27use tracing::info;
28
29use super::{
30 ConsensusStorage, DatabaseOptions, Ledger, LightBlock, Metadata, Persist,
31 DB,
32};
33use crate::database::Mempool;
34
35const CF_LEDGER_HEADER: &str = "cf_ledger_header";
36const CF_LEDGER_TXS: &str = "cf_ledger_txs";
37const CF_LEDGER_BLOBS: &str = "cf_ledger_blobs";
38const CF_LEDGER_BLOBS_HEIGHT: &str = "cf_ledger_blobs_height";
39const CF_LEDGER_FAULTS: &str = "cf_ledger_faults";
40const CF_LEDGER_HEIGHT: &str = "cf_ledger_height";
41const CF_CANDIDATES: &str = "cf_candidates";
42const CF_CANDIDATES_HEIGHT: &str = "cf_candidates_height";
43const CF_VALIDATION_RESULTS: &str = "cf_validation_results";
44const CF_MEMPOOL: &str = "cf_mempool";
45const CF_MEMPOOL_SPENDING_ID: &str = "cf_mempool_spending_id";
46const CF_MEMPOOL_FEES: &str = "cf_mempool_fees";
47const CF_METADATA: &str = "cf_metadata";
48
49const DB_FOLDER_NAME: &str = "chain.db";
50
51pub const MD_HASH_KEY: &[u8] = b"hash_key";
53pub const MD_STATE_ROOT_KEY: &[u8] = b"state_hash_key";
54pub const MD_AVG_VALIDATION: &[u8] = b"avg_validation_time";
55pub const MD_AVG_RATIFICATION: &[u8] = b"avg_ratification_time";
56pub const MD_AVG_PROPOSAL: &[u8] = b"avg_proposal_time";
57pub const MD_LAST_ITER: &[u8] = b"consensus_last_iter";
58
59#[derive(Clone)]
60pub struct Backend {
61 rocksdb: Arc<OptimisticTransactionDB>,
62}
63
64impl Backend {
65 fn begin_tx(&self) -> DBTransaction<'_, OptimisticTransactionDB> {
66 let write_options = WriteOptions::default();
68 let tx_options = OptimisticTransactionOptions::default();
69
70 let inner = self.rocksdb.transaction_opt(&write_options, &tx_options);
71
72 let ledger_cf = self
74 .rocksdb
75 .cf_handle(CF_LEDGER_HEADER)
76 .expect("ledger_header column family must exist");
77
78 let ledger_txs_cf = self
79 .rocksdb
80 .cf_handle(CF_LEDGER_TXS)
81 .expect("CF_LEDGER_TXS column family must exist");
82
83 let ledger_faults_cf = self
84 .rocksdb
85 .cf_handle(CF_LEDGER_FAULTS)
86 .expect("CF_LEDGER_FAULTS column family must exist");
87
88 let candidates_cf = self
89 .rocksdb
90 .cf_handle(CF_CANDIDATES)
91 .expect("candidates column family must exist");
92
93 let candidates_height_cf = self
94 .rocksdb
95 .cf_handle(CF_CANDIDATES_HEIGHT)
96 .expect("candidates_height column family must exist");
97
98 let validation_results_cf = self
99 .rocksdb
100 .cf_handle(CF_VALIDATION_RESULTS)
101 .expect("validation result column family must exist");
102
103 let mempool_cf = self
104 .rocksdb
105 .cf_handle(CF_MEMPOOL)
106 .expect("mempool column family must exist");
107
108 let spending_id_cf = self
109 .rocksdb
110 .cf_handle(CF_MEMPOOL_SPENDING_ID)
111 .expect("CF_MEMPOOL_SPENDING_ID column family must exist");
112
113 let fees_cf = self
114 .rocksdb
115 .cf_handle(CF_MEMPOOL_FEES)
116 .expect("CF_MEMPOOL_FEES column family must exist");
117
118 let ledger_height_cf = self
119 .rocksdb
120 .cf_handle(CF_LEDGER_HEIGHT)
121 .expect("CF_LEDGER_HEIGHT column family must exist");
122
123 let metadata_cf = self
124 .rocksdb
125 .cf_handle(CF_METADATA)
126 .expect("CF_METADATA column family must exist");
127
128 let ledger_blobs_cf = self
129 .rocksdb
130 .cf_handle(CF_LEDGER_BLOBS)
131 .expect("CF_LEDGER_BLOBS column family must exist");
132
133 let ledger_blobs_height_cf = self
134 .rocksdb
135 .cf_handle(CF_LEDGER_BLOBS_HEIGHT)
136 .expect("CF_LEDGER_BLOBS_HEIGHT column family must exist");
137
138 DBTransaction::<'_, OptimisticTransactionDB> {
139 inner,
140 candidates_cf,
141 candidates_height_cf,
142 validation_results_cf,
143 ledger_cf,
144 ledger_txs_cf,
145 ledger_faults_cf,
146 mempool_cf,
147 spending_id_cf,
148 fees_cf,
149 ledger_height_cf,
150 ledger_blobs_cf,
151 ledger_blobs_height_cf,
152 metadata_cf,
153 cumulative_inner_size: RefCell::new(0),
154 }
155 }
156}
157
158impl DB for Backend {
159 type P<'a> = DBTransaction<'a, OptimisticTransactionDB>;
160
161 fn create_or_open<T>(path: T, db_opts: DatabaseOptions) -> Self
162 where
163 T: AsRef<Path>,
164 {
165 let path = path.as_ref().join(DB_FOLDER_NAME);
166 info!("Opening database in {path:?}, {:?} ", db_opts);
167
168 let mut blocks_cf_opts = Options::default();
171 blocks_cf_opts.create_if_missing(db_opts.create_if_missing);
172 blocks_cf_opts.create_missing_column_families(true);
173 blocks_cf_opts.set_level_compaction_dynamic_level_bytes(true);
174 blocks_cf_opts
175 .set_write_buffer_size(db_opts.blocks_cf_max_write_buffer_size);
176
177 if db_opts.enable_debug {
178 blocks_cf_opts.set_log_level(LogLevel::Info);
179 blocks_cf_opts.set_dump_malloc_stats(true);
180 blocks_cf_opts.enable_statistics();
181 }
182
183 if db_opts.blocks_cf_disable_block_cache {
184 let mut block_opts = BlockBasedOptions::default();
185 block_opts.disable_cache();
186 blocks_cf_opts.set_block_based_table_factory(&block_opts);
187 }
188
189 let mut mp_opts = blocks_cf_opts.clone();
192 mp_opts.set_manual_wal_flush(true);
194 mp_opts.create_if_missing(true);
195 mp_opts.create_missing_column_families(true);
196 mp_opts.set_write_buffer_size(db_opts.mempool_cf_max_write_buffer_size);
197
198 if db_opts.enable_debug {
199 mp_opts.set_log_level(LogLevel::Info);
200 mp_opts.set_dump_malloc_stats(true);
201 mp_opts.enable_statistics();
202 }
203
204 let cfs = vec![
205 ColumnFamilyDescriptor::new(
206 CF_LEDGER_HEADER,
207 blocks_cf_opts.clone(),
208 ),
209 ColumnFamilyDescriptor::new(CF_LEDGER_TXS, blocks_cf_opts.clone()),
210 ColumnFamilyDescriptor::new(
211 CF_LEDGER_FAULTS,
212 blocks_cf_opts.clone(),
213 ),
214 ColumnFamilyDescriptor::new(
215 CF_LEDGER_HEIGHT,
216 blocks_cf_opts.clone(),
217 ),
218 ColumnFamilyDescriptor::new(
219 CF_LEDGER_BLOBS,
220 blocks_cf_opts.clone(),
221 ),
222 ColumnFamilyDescriptor::new(
223 CF_LEDGER_BLOBS_HEIGHT,
224 blocks_cf_opts.clone(),
225 ),
226 ColumnFamilyDescriptor::new(CF_CANDIDATES, blocks_cf_opts.clone()),
227 ColumnFamilyDescriptor::new(
228 CF_CANDIDATES_HEIGHT,
229 blocks_cf_opts.clone(),
230 ),
231 ColumnFamilyDescriptor::new(
232 CF_VALIDATION_RESULTS,
233 blocks_cf_opts.clone(),
234 ),
235 ColumnFamilyDescriptor::new(CF_METADATA, blocks_cf_opts.clone()),
236 ColumnFamilyDescriptor::new(CF_MEMPOOL, mp_opts.clone()),
237 ColumnFamilyDescriptor::new(
238 CF_MEMPOOL_SPENDING_ID,
239 mp_opts.clone(),
240 ),
241 ColumnFamilyDescriptor::new(CF_MEMPOOL_FEES, mp_opts.clone()),
242 ];
243
244 Self {
245 rocksdb: Arc::new(
246 OptimisticTransactionDB::open_cf_descriptors(
247 &blocks_cf_opts,
248 &path,
249 cfs,
250 )
251 .unwrap_or_else(|_| {
252 panic!("should be a valid database in {path:?}")
253 }),
254 ),
255 }
256 }
257
258 fn view<F, T>(&self, f: F) -> T
259 where
260 F: for<'a> FnOnce(&Self::P<'a>) -> T,
261 {
262 let tx = self.begin_tx();
264
265 let ret = f(&tx);
267 tx.rollback().expect("rollback to succeed for readonly");
268 ret
269 }
270
271 fn update<F, T>(&self, execute: F) -> Result<T>
272 where
273 F: for<'a> FnOnce(&mut Self::P<'a>) -> Result<T>,
274 {
275 self.update_dry_run(false, execute)
276 }
277
278 fn update_dry_run<F, T>(&self, dry_run: bool, execute: F) -> Result<T>
279 where
280 F: for<'a> FnOnce(&mut Self::P<'a>) -> Result<T>,
281 {
282 let mut tx = self.begin_tx();
284
285 let ret = execute(&mut tx)?;
288
289 if dry_run {
290 tx.rollback()?;
291 } else {
292 tx.commit()?;
294 }
295
296 Ok(ret)
297 }
298
299 fn close(&mut self) {}
300}
301
302pub struct DBTransaction<'db, DB: DBAccess> {
303 inner: rocksdb::Transaction<'db, DB>,
304 cumulative_inner_size: RefCell<usize>,
306
307 candidates_cf: &'db ColumnFamily,
310 candidates_height_cf: &'db ColumnFamily,
311 validation_results_cf: &'db ColumnFamily,
313
314 ledger_cf: &'db ColumnFamily,
316 ledger_faults_cf: &'db ColumnFamily,
317 ledger_txs_cf: &'db ColumnFamily,
318 ledger_height_cf: &'db ColumnFamily,
319 ledger_blobs_cf: &'db ColumnFamily,
320 ledger_blobs_height_cf: &'db ColumnFamily,
321
322 mempool_cf: &'db ColumnFamily,
324 spending_id_cf: &'db ColumnFamily,
325 fees_cf: &'db ColumnFamily,
326
327 metadata_cf: &'db ColumnFamily,
328}
329
330impl<DB: DBAccess> Ledger for DBTransaction<'_, DB> {
331 fn store_block(
332 &mut self,
333 header: &Header,
334 txs: &[SpentTransaction],
335 faults: &[Fault],
336 label: Label,
337 ) -> Result<usize> {
338 {
342 let cf = self.ledger_cf;
343
344 let mut buf = vec![];
345 LightBlock {
346 header: header.clone(),
347 transactions_ids: txs.iter().map(|t| t.inner.id()).collect(),
348 faults_ids: faults.iter().map(|f| f.id()).collect(),
349 }
350 .write(&mut buf)?;
351
352 self.put_cf(cf, header.hash, buf)?;
353 }
354
355 self.op_write(MD_HASH_KEY, header.hash)?;
357 self.op_write(MD_STATE_ROOT_KEY, header.state_hash)?;
358
359 {
361 let cf = self.ledger_txs_cf;
362
363 let mut stored_blobs = Vec::with_capacity(6);
364
365 for tx in txs {
367 let mut d = vec![];
368
369 if tx.inner.inner.blob().is_some() {
370 let mut strip_tx = tx.clone();
371 if let Some(blobs) = strip_tx.inner.inner.strip_blobs() {
372 for (hash, sidecar) in blobs.into_iter() {
373 let sidecar_bytes = sidecar.to_var_bytes();
374 self.store_blob_data(&hash, sidecar_bytes)?;
375 stored_blobs.push(hash);
376 }
377 }
378 strip_tx.write(&mut d)?;
379 } else {
380 tx.write(&mut d)?;
381 }
382 self.put_cf(cf, tx.inner.id(), d)?;
383 }
384
385 if !stored_blobs.is_empty() {
386 self.store_blobs_height(header.height, &stored_blobs)?;
388 }
389 }
390
391 {
393 let cf = self.ledger_faults_cf;
394
395 for f in faults {
397 let mut d = vec![];
398 f.write(&mut d)?;
399 self.put_cf(cf, f.id(), d)?;
400 }
401 }
402 self.store_block_label(header.height, &header.hash, label)?;
403
404 Ok(self.get_size())
405 }
406
407 fn faults_by_block(&self, start_height: u64) -> Result<Vec<Fault>> {
408 let mut faults = vec![];
409 let mut hash = self
410 .op_read(MD_HASH_KEY)?
411 .ok_or(anyhow::anyhow!("Cannot read tip"))?;
412
413 loop {
414 let block = self.light_block(&hash)?.ok_or(anyhow::anyhow!(
415 "Cannot read block {}",
416 hex::encode(&hash)
417 ))?;
418
419 let block_height = block.header.height;
420
421 if block_height >= start_height {
422 hash = block.header.prev_block_hash.to_vec();
423 faults.extend(self.faults(&block.faults_ids)?);
424 } else {
425 break;
426 }
427
428 if block_height == 0 {
429 break;
430 }
431 }
432 Ok(faults)
433 }
434
435 fn store_block_label(
436 &mut self,
437 height: u64,
438 hash: &[u8; 32],
439 label: Label,
440 ) -> Result<()> {
441 let mut buf = vec![];
443 buf.write_all(hash)?;
444 label.write(&mut buf)?;
445
446 self.put_cf(self.ledger_height_cf, height.to_le_bytes(), buf)?;
447 Ok(())
448 }
449
450 fn delete_block(&mut self, b: &Block) -> Result<()> {
451 self.inner.delete_cf(
452 self.ledger_height_cf,
453 b.header().height.to_le_bytes(),
454 )?;
455
456 for tx in b.txs() {
457 self.inner.delete_cf(self.ledger_txs_cf, tx.id())?;
458 }
459 for f in b.faults() {
460 self.inner.delete_cf(self.ledger_faults_cf, f.id())?;
461 }
462
463 self.delete_blobs_by_height(b.header().height)?;
464 self.inner.delete_cf(self.ledger_cf, b.header().hash)?;
465
466 Ok(())
467 }
468
469 fn block_exists(&self, hash: &[u8]) -> Result<bool> {
470 Ok(self.inner.get_cf(self.ledger_cf, hash)?.is_some())
471 }
472
473 fn faults(&self, faults_ids: &[[u8; 32]]) -> Result<Vec<Fault>> {
474 if faults_ids.is_empty() {
475 return Ok(vec![]);
476 }
477 let ids = faults_ids
478 .iter()
479 .map(|id| (self.ledger_faults_cf, id))
480 .collect::<Vec<_>>();
481
482 let faults_buffer = self.inner.multi_get_cf(ids);
484
485 let mut faults = vec![];
486 for buf in faults_buffer {
487 let buf = buf?.unwrap();
488 let fault = Fault::read(&mut &buf[..])?;
489 faults.push(fault);
490 }
491
492 Ok(faults)
493 }
494
495 fn blob_data_by_hash(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
496 Ok(self.inner.get_cf(self.ledger_blobs_cf, hash)?)
497 }
498
499 fn store_blob_data(&self, hash: &[u8; 32], data: Vec<u8>) -> Result<()> {
500 self.inner.put_cf(self.ledger_blobs_cf, hash, data)?;
501 Ok(())
502 }
503 fn store_blobs_height(
504 &self,
505 block_height: u64,
506 blob_hashes: &[[u8; 32]],
507 ) -> Result<()> {
508 if blob_hashes.is_empty() {
509 return Ok(());
510 }
511 let blob_hashes_bytes: Vec<_> =
512 blob_hashes.iter().flat_map(|hash| hash.to_vec()).collect();
513 self.inner.put_cf(
514 self.ledger_blobs_height_cf,
515 block_height.to_be_bytes(),
516 blob_hashes_bytes,
517 )?;
518 Ok(())
519 }
520
521 fn delete_blobs_by_height(&self, block_height: u64) -> Result<()> {
522 let blobs_to_delete = self.blobs_by_height(block_height)?;
523 if let Some(blob_hashes) = blobs_to_delete {
524 for hash in blob_hashes {
525 self.inner.delete_cf(self.ledger_blobs_cf, hash)?;
528 }
529 self.inner.delete_cf(
530 self.ledger_blobs_height_cf,
531 block_height.to_be_bytes(),
532 )?;
533 }
534
535 Ok(())
536 }
537
538 fn blobs_by_height(
539 &self,
540 block_height: u64,
541 ) -> Result<Option<Vec<[u8; 32]>>> {
542 let blob_hashes_bytes = self
543 .inner
544 .get_cf(self.ledger_blobs_height_cf, block_height.to_be_bytes())?;
545
546 if let Some(blob_hashes_bytes) = blob_hashes_bytes {
547 let mut blob_hashes = vec![];
548 for chunk in blob_hashes_bytes.chunks(32) {
549 let mut hash = [0u8; 32];
550 hash.copy_from_slice(chunk);
551 blob_hashes.push(hash);
552 }
553 Ok(Some(blob_hashes))
554 } else {
555 Ok(None)
556 }
557 }
558
559 fn block(&self, hash: &[u8]) -> Result<Option<Block>> {
560 match self.inner.get_cf(self.ledger_cf, hash)? {
561 Some(blob) => {
562 let record = LightBlock::read(&mut &blob[..])?;
563
564 let txs_buffers = self.inner.multi_get_cf(
566 record
567 .transactions_ids
568 .iter()
569 .map(|id| (self.ledger_txs_cf, id))
570 .collect::<Vec<(&ColumnFamily, &[u8; 32])>>(),
571 );
572
573 let mut txs = vec![];
574 for buf in txs_buffers {
575 let buf = buf?.unwrap();
576 let mut tx = SpentTransaction::read(&mut &buf[..])?;
577 if let Some(blobs) = tx.inner.inner.blob_mut() {
578 for blob in blobs {
579 let sidecar = self
581 .blob_data_by_hash(&blob.hash)?
582 .map(|bytes| {
583 BlobSidecar::from_buf(&mut &bytes[..])
584 })
585 .transpose()
586 .map_err(|e| {
587 anyhow::anyhow!(
588 "Failed to parse blob sidecar: {e:?}"
589 )
590 })?;
591 blob.data = sidecar;
592 }
593 }
594 txs.push(tx.inner);
595 }
596
597 let faults_buffer = self.inner.multi_get_cf(
599 record
600 .faults_ids
601 .iter()
602 .map(|id| (self.ledger_faults_cf, id))
603 .collect::<Vec<(&ColumnFamily, &[u8; 32])>>(),
604 );
605 let mut faults = vec![];
606 for buf in faults_buffer {
607 let buf = buf?.unwrap();
608 let fault = Fault::read(&mut &buf[..])?;
609 faults.push(fault);
610 }
611
612 Ok(Some(
613 Block::new(record.header, txs, faults)
614 .expect("block should be valid"),
615 ))
616 }
617 None => Ok(None),
618 }
619 }
620
621 fn light_block(&self, hash: &[u8]) -> Result<Option<LightBlock>> {
622 match self.inner.get_cf(self.ledger_cf, hash)? {
623 Some(blob) => {
624 let record = LightBlock::read(&mut &blob[..])?;
625 Ok(Some(record))
626 }
627 None => Ok(None),
628 }
629 }
630
631 fn block_header(&self, hash: &[u8]) -> Result<Option<Header>> {
632 match self.inner.get_cf(self.ledger_cf, hash)? {
633 Some(blob) => {
634 let record = Header::read(&mut &blob[..])?;
635 Ok(Some(record))
636 }
637 None => Ok(None),
638 }
639 }
640
641 fn block_hash_by_height(&self, height: u64) -> Result<Option<[u8; 32]>> {
642 Ok(self
643 .inner
644 .get_cf(self.ledger_height_cf, height.to_le_bytes())?
645 .map(|h| {
646 const LEN: usize = 32;
647 let mut hash = [0u8; LEN];
648 hash.copy_from_slice(&h.as_slice()[0..LEN]);
649 hash
650 }))
651 }
652
653 fn ledger_tx(&self, tx_id: &[u8]) -> Result<Option<SpentTransaction>> {
654 let tx = self
655 .inner
656 .get_cf(self.ledger_txs_cf, tx_id)?
657 .map(|blob| SpentTransaction::read(&mut &blob[..]))
658 .transpose()?;
659
660 Ok(tx)
661 }
662
663 fn ledger_txs(
670 &self,
671 tx_ids: Vec<&[u8; 32]>,
672 ) -> Result<Vec<SpentTransaction>> {
673 let cf = self.ledger_txs_cf;
674
675 let ids = tx_ids.into_iter().map(|id| (cf, id)).collect::<Vec<_>>();
676
677 let multi_get_results = self.inner.multi_get_cf(ids);
678
679 let mut spent_transactions =
680 Vec::with_capacity(multi_get_results.len());
681 for result in multi_get_results.into_iter() {
682 let opt_blob = result.map_err(|e| {
683 std::io::Error::new(std::io::ErrorKind::Other, e)
684 })?;
685
686 let Some(blob) = opt_blob else {
687 return Err(anyhow::anyhow!(
688 "At least one Transaction ID was not found"
689 ));
690 };
691
692 let stx = SpentTransaction::read(&mut &blob[..])?;
693
694 spent_transactions.push(stx);
695 }
696
697 Ok(spent_transactions)
698 }
699
700 fn ledger_tx_exists(&self, tx_id: &[u8]) -> Result<bool> {
706 Ok(self.inner.get_cf(self.ledger_txs_cf, tx_id)?.is_some())
707 }
708
709 fn block_by_height(&self, height: u64) -> Result<Option<Block>> {
710 let hash = self.block_hash_by_height(height)?;
711 let block = match hash {
712 Some(hash) => self.block(&hash)?,
713 None => None,
714 };
715 Ok(block)
716 }
717
718 fn block_label_by_height(
719 &self,
720 height: u64,
721 ) -> Result<Option<([u8; 32], Label)>> {
722 const HASH_LEN: usize = 32;
723 Ok(self
724 .inner
725 .get_cf(self.ledger_height_cf, height.to_le_bytes())?
726 .map(|h| {
727 let mut hash = [0u8; HASH_LEN];
728 hash.copy_from_slice(&h.as_slice()[0..HASH_LEN]);
729
730 let label_buff = h[HASH_LEN..].to_vec();
731 Label::read(&mut &label_buff[..]).map(|label| (hash, label))
732 })
733 .transpose()?)
734 }
735}
736
737impl<DB: DBAccess> ConsensusStorage for DBTransaction<'_, DB> {
739 fn store_candidate(&mut self, b: Block) -> Result<()> {
750 let mut serialized = vec![];
751 b.write(&mut serialized)?;
752
753 self.inner
754 .put_cf(self.candidates_cf, b.header().hash, serialized)?;
755
756 let key = serialize_key(b.header().height, b.header().hash)?;
757 self.inner
758 .put_cf(self.candidates_height_cf, key, b.header().hash)?;
759
760 Ok(())
761 }
762
763 fn candidate(&self, hash: &[u8]) -> Result<Option<Block>> {
774 if let Some(blob) = self.inner.get_cf(self.candidates_cf, hash)? {
775 let b = Block::read(&mut &blob[..])?;
776 return Ok(Some(b));
777 }
778
779 Ok(None)
781 }
782
783 fn candidate_by_iteration(
784 &self,
785 consensus_header: &ConsensusHeader,
786 ) -> Result<Option<Block>> {
787 let iter = self
788 .inner
789 .iterator_cf(self.candidates_cf, IteratorMode::Start);
790
791 for (_, blob) in iter.map(Result::unwrap) {
792 let b = Block::read(&mut &blob[..])?;
793
794 let header = b.header();
795 if header.prev_block_hash == consensus_header.prev_block_hash
796 && header.iteration == consensus_header.iteration
797 {
798 return Ok(Some(b));
799 }
800 }
801
802 Ok(None)
803 }
804
805 fn delete_candidate<F>(&mut self, closure: F) -> Result<()>
816 where
817 F: FnOnce(u64) -> bool + std::marker::Copy,
818 {
819 let iter = self
820 .inner
821 .iterator_cf(self.candidates_height_cf, IteratorMode::Start);
822
823 for (key, hash) in iter.map(Result::unwrap) {
824 let (height, _) = deserialize_key(&mut &key.to_vec()[..])?;
825 if closure(height) {
826 self.inner.delete_cf(self.candidates_cf, hash)?;
827 self.inner.delete_cf(self.candidates_height_cf, key)?;
828 }
829 }
830
831 Ok(())
832 }
833
834 fn count_candidates(&self) -> usize {
835 let iter = self
836 .inner
837 .iterator_cf(self.candidates_height_cf, IteratorMode::Start);
838
839 iter.count()
840 }
841
842 fn clear_candidates(&mut self) -> Result<()> {
849 self.delete_candidate(|_| true)
850 }
851
852 fn store_validation_result(
863 &mut self,
864 consensus_header: &ConsensusHeader,
865 validation_result: &payload::ValidationResult,
866 ) -> Result<()> {
867 let mut serialized = vec![];
868 validation_result.write(&mut serialized)?;
869
870 let key = serialize_iter_key(consensus_header)?;
871 self.inner
872 .put_cf(self.validation_results_cf, key, serialized)?;
873
874 Ok(())
875 }
876
877 fn validation_result(
889 &self,
890 consensus_header: &ConsensusHeader,
891 ) -> Result<Option<payload::ValidationResult>> {
892 let key = serialize_iter_key(consensus_header)?;
893 if let Some(blob) =
894 self.inner.get_cf(self.validation_results_cf, key)?
895 {
896 let validation_result =
897 payload::ValidationResult::read(&mut &blob[..])?;
898 return Ok(Some(validation_result));
899 }
900
901 Ok(None)
903 }
904
905 fn delete_validation_results<F>(&mut self, closure: F) -> Result<()>
917 where
918 F: FnOnce([u8; 32]) -> bool + std::marker::Copy,
919 {
920 let iter = self
921 .inner
922 .iterator_cf(self.validation_results_cf, IteratorMode::Start);
923
924 for (key, _) in iter.map(Result::unwrap) {
925 let (prev_block_hash, _) =
926 deserialize_iter_key(&mut &key.to_vec()[..])?;
927 if closure(prev_block_hash) {
928 self.inner.delete_cf(self.validation_results_cf, key)?;
929 }
930 }
931
932 Ok(())
933 }
934
935 fn count_validation_results(&self) -> usize {
936 let iter = self
937 .inner
938 .iterator_cf(self.validation_results_cf, IteratorMode::Start);
939
940 iter.count()
941 }
942
943 fn clear_validation_results(&mut self) -> Result<()> {
950 self.delete_validation_results(|_| true)
951 }
952}
953
954impl<DB: DBAccess> Persist for DBTransaction<'_, DB> {
955 fn clear_database(&mut self) -> Result<()> {
957 let iter = self.inner.iterator_cf(self.ledger_cf, IteratorMode::Start);
959
960 for (key, _) in iter.map(Result::unwrap) {
962 self.inner.delete_cf(self.ledger_cf, key)?;
963 }
964
965 self.clear_candidates()?;
966 self.clear_validation_results()?;
967 Ok(())
968 }
969
970 fn commit(self) -> Result<()> {
971 if let Err(e) = self.inner.commit() {
972 return Err(anyhow::Error::new(e).context("failed to commit"));
973 }
974
975 Ok(())
976 }
977
978 fn rollback(self) -> Result<()> {
979 if let Err(e) = self.inner.rollback() {
980 return Err(anyhow::Error::new(e).context("failed to rollback"));
981 }
982
983 Ok(())
984 }
985}
986
987impl<DB: DBAccess> Mempool for DBTransaction<'_, DB> {
988 fn store_mempool_tx(
989 &mut self,
990 tx: &Transaction,
991 timestamp: u64,
992 ) -> Result<()> {
993 let mut tx_data = vec![];
995 tx.write(&mut tx_data)?;
996
997 let hash = tx.id();
998 self.put_cf(self.mempool_cf, hash, tx_data)?;
999
1000 for n in tx.to_spend_ids() {
1003 let key = n.to_bytes();
1004 self.put_cf(self.spending_id_cf, key, hash)?;
1005 }
1006
1007 let timestamp = timestamp.to_be_bytes();
1008
1009 self.put_cf(
1013 self.fees_cf,
1014 serialize_key(tx.gas_price(), hash)?,
1015 timestamp,
1016 )?;
1017
1018 Ok(())
1019 }
1020
1021 fn mempool_tx(&self, hash: [u8; 32]) -> Result<Option<Transaction>> {
1022 let data = self.inner.get_cf(self.mempool_cf, hash)?;
1023
1024 match data {
1025 None => Ok(None),
1027 Some(blob) => Ok(Some(Transaction::read(&mut &blob.to_vec()[..])?)),
1028 }
1029 }
1030
1031 fn mempool_tx_exists(&self, h: [u8; 32]) -> Result<bool> {
1032 Ok(self.inner.get_cf(self.mempool_cf, h)?.is_some())
1033 }
1034
1035 fn delete_mempool_tx(
1036 &mut self,
1037 h: [u8; 32],
1038 cascade: bool,
1039 ) -> Result<Vec<[u8; 32]>> {
1040 let mut deleted = vec![];
1041 let tx = self.mempool_tx(h)?;
1042 if let Some(tx) = tx {
1043 let hash = tx.id();
1044
1045 self.inner.delete_cf(self.mempool_cf, hash)?;
1046
1047 for n in tx.to_spend_ids() {
1050 let key = n.to_bytes();
1051 self.inner.delete_cf(self.spending_id_cf, key)?;
1052 }
1053
1054 self.inner.delete_cf(
1056 self.fees_cf,
1057 serialize_key(tx.gas_price(), hash)?,
1058 )?;
1059
1060 deleted.push(h);
1061
1062 if cascade {
1063 let mut dependants = vec![];
1064 let mut next_spending_id = tx.next_spending_id();
1067 while let Some(spending_id) = next_spending_id {
1068 next_spending_id = spending_id.next();
1069 let next_txs =
1070 self.mempool_txs_by_spendable_ids(&[spending_id]);
1071 if next_txs.is_empty() {
1072 break;
1073 }
1074 dependants.extend(next_txs);
1075 }
1076
1077 for tx_id in dependants {
1079 let cascade_deleted =
1080 self.delete_mempool_tx(tx_id, false)?;
1081 deleted.extend(cascade_deleted);
1082 }
1083 }
1084 }
1085
1086 Ok(deleted)
1087 }
1088
1089 fn mempool_txs_by_spendable_ids(
1090 &self,
1091 n: &[SpendingId],
1092 ) -> HashSet<[u8; 32]> {
1093 n.iter()
1094 .filter_map(|n| {
1095 match self.inner.get_cf(self.spending_id_cf, n.to_bytes()) {
1096 Ok(Some(tx_id)) => tx_id.try_into().ok(),
1097 _ => None,
1098 }
1099 })
1100 .collect()
1101 }
1102
1103 fn mempool_txs_sorted_by_fee(
1104 &self,
1105 ) -> Box<dyn Iterator<Item = Transaction> + '_> {
1106 let iter = MemPoolIterator::new(&self.inner, self.fees_cf, self);
1107
1108 Box::new(iter)
1109 }
1110
1111 fn mempool_txs_ids_sorted_by_fee(
1112 &self,
1113 ) -> Box<dyn Iterator<Item = (u64, [u8; 32])> + '_> {
1114 let iter = MemPoolFeeIterator::new(&self.inner, self.fees_cf, true);
1115
1116 Box::new(iter)
1117 }
1118
1119 fn mempool_txs_ids_sorted_by_low_fee(
1120 &self,
1121 ) -> Box<dyn Iterator<Item = (u64, [u8; 32])> + '_> {
1122 let iter = MemPoolFeeIterator::new(&self.inner, self.fees_cf, false);
1123
1124 Box::new(iter)
1125 }
1126
1127 fn mempool_expired_txs(&self, timestamp: u64) -> Result<Vec<[u8; 32]>> {
1129 let mut iter = self.inner.raw_iterator_cf(self.fees_cf);
1130 iter.seek_to_first();
1131 let mut txs_list = vec![];
1132
1133 while iter.valid() {
1134 if let Some(key) = iter.key() {
1135 let (_, tx_id) = deserialize_key(&mut &key.to_vec()[..])?;
1136
1137 let tx_timestamp = u64::from_be_bytes(
1138 iter.value()
1139 .ok_or_else(|| {
1140 io::Error::new(
1141 io::ErrorKind::InvalidData,
1142 "no value",
1143 )
1144 })?
1145 .try_into()
1146 .map_err(|_| {
1147 io::Error::new(
1148 io::ErrorKind::InvalidData,
1149 "invalid data",
1150 )
1151 })?,
1152 );
1153
1154 if tx_timestamp <= timestamp {
1155 txs_list.push(tx_id);
1156 }
1157 }
1158
1159 iter.next();
1160 }
1161
1162 Ok(txs_list)
1163 }
1164
1165 fn mempool_txs_ids(&self) -> Result<Vec<[u8; 32]>> {
1166 let mut iter = self.inner.raw_iterator_cf(self.fees_cf);
1167 iter.seek_to_last();
1168
1169 let mut txs_list = vec![];
1170
1171 while iter.valid() {
1173 if let Some(key) = iter.key() {
1174 let (_, tx_id) = deserialize_key(&mut &key.to_vec()[..])?;
1175
1176 txs_list.push(tx_id);
1177 }
1178
1179 iter.prev();
1180 }
1181
1182 Ok(txs_list)
1183 }
1184
1185 fn mempool_txs_count(&self) -> usize {
1186 self.inner
1187 .iterator_cf(self.mempool_cf, IteratorMode::Start)
1188 .count()
1189 }
1190}
1191
1192pub struct MemPoolIterator<'db, DB: DBAccess, M: Mempool> {
1193 iter: MemPoolFeeIterator<'db, DB>,
1194 mempool: &'db M,
1195}
1196
1197impl<'db, DB: DBAccess, M: Mempool> MemPoolIterator<'db, DB, M> {
1198 fn new(
1199 db: &'db rocksdb::Transaction<DB>,
1200 fees_cf: &ColumnFamily,
1201 mempool: &'db M,
1202 ) -> Self {
1203 let iter = MemPoolFeeIterator::new(db, fees_cf, true);
1204 MemPoolIterator { iter, mempool }
1205 }
1206}
1207
1208impl<DB: DBAccess, M: Mempool> Iterator for MemPoolIterator<'_, DB, M> {
1209 type Item = Transaction;
1210 fn next(&mut self) -> Option<Self::Item> {
1211 self.iter.next().and_then(|(_, tx_id)| {
1212 self.mempool.mempool_tx(tx_id).ok().flatten()
1213 })
1214 }
1215}
1216
1217pub struct MemPoolFeeIterator<'db, DB: DBAccess> {
1218 iter: DBRawIteratorWithThreadMode<'db, rocksdb::Transaction<'db, DB>>,
1219 fee_desc: bool,
1220}
1221
1222impl<'db, DB: DBAccess> MemPoolFeeIterator<'db, DB> {
1223 fn new(
1224 db: &'db rocksdb::Transaction<DB>,
1225 fees_cf: &ColumnFamily,
1226 fee_desc: bool,
1227 ) -> Self {
1228 let mut iter = db.raw_iterator_cf(fees_cf);
1229 if fee_desc {
1230 iter.seek_to_last();
1231 };
1232 MemPoolFeeIterator { iter, fee_desc }
1233 }
1234}
1235
1236impl<DB: DBAccess> Iterator for MemPoolFeeIterator<'_, DB> {
1237 type Item = (u64, [u8; 32]);
1238 fn next(&mut self) -> Option<Self::Item> {
1239 match self.iter.valid() {
1240 true => {
1241 if let Some(key) = self.iter.key() {
1242 let (gas_price, hash) =
1243 deserialize_key(&mut &key.to_vec()[..]).ok()?;
1244 if self.fee_desc {
1245 self.iter.prev();
1246 } else {
1247 self.iter.next();
1248 }
1249 Some((gas_price, hash))
1250 } else {
1251 None
1252 }
1253 }
1254 false => None,
1255 }
1256 }
1257}
1258
1259impl<DB: DBAccess> std::fmt::Debug for DBTransaction<'_, DB> {
1260 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1261 let iter = self.inner.iterator_cf(self.ledger_cf, IteratorMode::Start);
1263
1264 iter.map(Result::unwrap).try_for_each(|(hash, _)| {
1265 if let Ok(Some(blob)) = self.inner.get_cf(self.ledger_cf, &hash[..])
1266 {
1267 let b = Block::read(&mut &blob[..]).unwrap_or_default();
1268 writeln!(f, "ledger_block [{}]: {:#?}", b.header().height, b)
1269 } else {
1270 Ok(())
1271 }
1272 })?;
1273
1274 let iter = self
1276 .inner
1277 .iterator_cf(self.candidates_cf, IteratorMode::Start);
1278
1279 let results: std::fmt::Result =
1280 iter.map(Result::unwrap).try_for_each(|(hash, _)| {
1281 if let Ok(Some(blob)) =
1282 self.inner.get_cf(self.candidates_cf, &hash[..])
1283 {
1284 let b = Block::read(&mut &blob[..]).unwrap_or_default();
1285 writeln!(
1286 f,
1287 "candidate_block [{}]: {:#?}",
1288 b.header().height,
1289 b
1290 )
1291 } else {
1292 Ok(())
1293 }
1294 });
1295
1296 results
1297 }
1298}
1299
1300impl<DB: DBAccess> Metadata for DBTransaction<'_, DB> {
1301 fn op_write<T: AsRef<[u8]>>(&mut self, key: &[u8], value: T) -> Result<()> {
1302 self.put_cf(self.metadata_cf, key, value)?;
1303 Ok(())
1304 }
1305
1306 fn op_read(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1307 self.inner.get_cf(self.metadata_cf, key).map_err(Into::into)
1308 }
1309}
1310
1311impl<DB: DBAccess> DBTransaction<'_, DB> {
1312 fn put_cf<K: AsRef<[u8]>, V: AsRef<[u8]>>(
1315 &self,
1316 cf: &impl AsColumnFamilyRef,
1317 key: K,
1318 value: V,
1319 ) -> Result<()> {
1320 let kv_size = key.as_ref().len() + value.as_ref().len();
1321 self.inner.put_cf(cf, key, value)?;
1322 *self.cumulative_inner_size.borrow_mut() += kv_size;
1323 Ok(())
1324 }
1325
1326 pub fn get_size(&self) -> usize {
1327 *self.cumulative_inner_size.borrow()
1328 }
1329}
1330
1331fn serialize_key(value: u64, hash: [u8; 32]) -> std::io::Result<Vec<u8>> {
1332 let mut w = vec![];
1333 std::io::Write::write_all(&mut w, &value.to_be_bytes())?;
1334 std::io::Write::write_all(&mut w, &hash)?;
1335 Ok(w)
1336}
1337
1338fn deserialize_key<R: Read>(r: &mut R) -> Result<(u64, [u8; 32])> {
1339 let mut buf = [0u8; 8];
1340 r.read_exact(&mut buf)?;
1341 let value = u64::from_be_bytes(buf);
1342 let mut hash = [0u8; 32];
1343 r.read_exact(&mut hash[..])?;
1344
1345 Ok((value, hash))
1346}
1347
1348fn serialize_iter_key(ch: &ConsensusHeader) -> std::io::Result<Vec<u8>> {
1349 let mut w = vec![];
1350 std::io::Write::write_all(&mut w, &ch.prev_block_hash)?;
1351 std::io::Write::write_all(&mut w, &[ch.iteration])?;
1352 Ok(w)
1353}
1354
1355fn deserialize_iter_key<R: Read>(r: &mut R) -> Result<([u8; 32], u8)> {
1356 let mut prev_block_hash = [0u8; 32];
1357 r.read_exact(&mut prev_block_hash)?;
1358
1359 let mut iter_byte = [0u8; 1];
1360 r.read_exact(&mut iter_byte)?;
1361 let iteration = u8::from_be_bytes(iter_byte);
1362
1363 Ok((prev_block_hash, iteration))
1364}
1365
1366impl node_data::Serializable for LightBlock {
1367 fn write<W: Write>(&self, w: &mut W) -> io::Result<()> {
1368 self.header.write(w)?;
1370
1371 let len = self.transactions_ids.len() as u32;
1373 w.write_all(&len.to_le_bytes())?;
1374
1375 for tx_id in &self.transactions_ids {
1377 w.write_all(tx_id)?;
1378 }
1379
1380 let len = self.faults_ids.len() as u32;
1382 w.write_all(&len.to_le_bytes())?;
1383
1384 for f_id in &self.faults_ids {
1386 w.write_all(f_id)?;
1387 }
1388
1389 Ok(())
1390 }
1391
1392 fn read<R: Read>(r: &mut R) -> io::Result<Self>
1393 where
1394 Self: Sized,
1395 {
1396 let header = Header::read(r)?;
1398
1399 let len = Self::read_u32_le(r)?;
1401
1402 let mut transactions_ids = vec![];
1404 for _ in 0..len {
1405 let mut tx_id = [0u8; 32];
1406 r.read_exact(&mut tx_id[..])?;
1407
1408 transactions_ids.push(tx_id);
1409 }
1410
1411 let len = Self::read_u32_le(r)?;
1413
1414 let mut faults_ids = vec![];
1416 for _ in 0..len {
1417 let mut f_id = [0u8; 32];
1418 r.read_exact(&mut f_id[..])?;
1419
1420 faults_ids.push(f_id);
1421 }
1422
1423 Ok(Self {
1424 header,
1425 transactions_ids,
1426 faults_ids,
1427 })
1428 }
1429}
1430
1431#[cfg(test)]
1432mod tests {
1433 use fake::{Fake, Faker};
1434 use node_data::ledger;
1435
1436 use super::*;
1437
1438 #[test]
1439 fn test_store_block() {
1440 TestWrapper::new("test_store_block").run(|path| {
1441 let db = Backend::create_or_open(path, DatabaseOptions::default());
1442
1443 let b: Block = Faker.fake();
1444 assert!(!b.txs().is_empty());
1445
1446 let hash = b.header().hash;
1447
1448 assert!(db
1449 .update(|txn| {
1450 txn.store_block(
1451 b.header(),
1452 &to_spent_txs(b.txs()),
1453 b.faults(),
1454 Label::Final(3),
1455 )?;
1456 Ok(())
1457 })
1458 .is_ok());
1459
1460 db.view(|txn| {
1461 let db_blk = txn
1463 .block(&hash)
1464 .expect("Block to be fetched")
1465 .expect("Block to exist");
1466 assert_eq!(db_blk.header().hash, b.header().hash);
1467
1468 for pos in 0..b.txs().len() {
1471 assert_eq!(db_blk.txs()[pos].id(), b.txs()[pos].id());
1472 }
1473
1474 for pos in 0..b.faults().len() {
1477 assert_eq!(db_blk.faults()[pos].id(), b.faults()[pos].id());
1478 }
1479 });
1480
1481 assert!(db
1482 .update(|txn| {
1483 txn.clear_database()?;
1484 Ok(())
1485 })
1486 .is_ok());
1487
1488 db.view(|txn| {
1489 assert!(txn
1490 .block(&hash)
1491 .expect("block to be fetched")
1492 .is_none());
1493 });
1494 });
1495 }
1496
1497 #[test]
1498 fn test_read_only() {
1499 TestWrapper::new("test_read_only").run(|path| {
1500 let db = Backend::create_or_open(path, DatabaseOptions::default());
1501 let b: Block = Faker.fake();
1502 db.update_dry_run(true, |txn| {
1503 txn.store_block(
1504 b.header(),
1505 &to_spent_txs(b.txs()),
1506 b.faults(),
1507 Label::Final(3),
1508 )
1509 })
1510 .expect("block to be stored");
1511 db.view(|txn| {
1512 assert!(txn
1513 .block(&b.header().hash)
1514 .expect("block to be fetched")
1515 .is_none());
1516 });
1517 });
1518 }
1519
1520 #[test]
1521 fn test_transaction_isolation() {
1522 TestWrapper::new("test_transaction_isolation").run(|path| {
1523 let db = Backend::create_or_open(path, DatabaseOptions::default());
1524 let mut b: Block = Faker.fake();
1525 let hash = b.header().hash;
1526
1527 db.view(|txn| {
1528 assert!(db
1531 .update(|inner| {
1532 inner
1533 .store_block(
1534 b.header(),
1535 &to_spent_txs(b.txs()),
1536 b.faults(),
1537 Label::Final(3),
1538 )
1539 .unwrap();
1540
1541 assert!(inner.block(&hash)?.is_some());
1543 assert!(txn.block(&hash)?.is_none());
1546 Ok(())
1547 })
1548 .is_ok());
1549
1550 assert!(txn
1553 .block(&hash)
1554 .expect("block to be fetched")
1555 .is_some());
1556 });
1557
1558 db.view(|txn| {
1560 assert_blocks_eq(
1561 &mut txn
1562 .block(&hash)
1563 .expect("block to be fetched")
1564 .unwrap(),
1565 &mut b,
1566 );
1567 });
1568 });
1569 }
1570
1571 fn assert_blocks_eq(a: &Block, b: &Block) {
1572 assert!(a.header().hash != [0u8; 32]);
1573 assert!(a.header().hash.eq(&b.header().hash));
1574 }
1575
1576 #[test]
1577 fn test_add_mempool_tx() {
1578 TestWrapper::new("test_add_tx").run(|path| {
1579 let db = Backend::create_or_open(path, DatabaseOptions::default());
1580 let t: Transaction = Faker.fake();
1581
1582 assert!(db.update(|txn| { txn.store_mempool_tx(&t, 0) }).is_ok());
1583
1584 db.view(|vq| {
1585 assert!(vq.mempool_tx_exists(t.id()).unwrap());
1586
1587 let fetched_tx = vq
1588 .mempool_tx(t.id())
1589 .expect("valid contract call")
1590 .unwrap();
1591
1592 assert_eq!(
1593 fetched_tx.id(),
1594 t.id(),
1595 "fetched transaction should be the same"
1596 );
1597 });
1598
1599 db.update(|txn| {
1601 let deleted =
1602 txn.delete_mempool_tx(t.id(), false).expect("valid tx");
1603 assert!(deleted.len() == 1);
1604 Ok(())
1605 })
1606 .unwrap();
1607 });
1608 }
1609
1610 #[test]
1611 fn test_mempool_txs_sorted_by_fee() {
1612 TestWrapper::new("test_mempool_txs_sorted_by_fee").run(|path| {
1613 let db = Backend::create_or_open(path, DatabaseOptions::default());
1614 let _rng = rand::thread_rng();
1616 db.update(|txn| {
1617 for _i in 0..10u32 {
1618 let t: Transaction = Faker.fake();
1619 txn.store_mempool_tx(&t, 0)?;
1620 }
1621 Ok(())
1622 })
1623 .unwrap();
1624
1625 db.view(|txn| {
1626 let txs = txn.mempool_txs_sorted_by_fee();
1627
1628 let mut last_fee = u64::MAX;
1629 for t in txs {
1630 let fee = t.gas_price();
1631 assert!(
1632 fee <= last_fee,
1633 "tx fees are not in decreasing order"
1634 );
1635 last_fee = fee
1636 }
1637 assert_ne!(last_fee, u64::MAX, "No tx has been processed")
1638 });
1639 });
1640 }
1641
1642 #[test]
1643 fn test_txs_count() {
1644 TestWrapper::new("test_txs_count").run(|path| {
1645 let db = Backend::create_or_open(path, DatabaseOptions::default());
1646
1647 const N: usize = 100;
1648 const D: usize = 50;
1649
1650 let txs: Vec<_> = (0..N)
1651 .map(|i| ledger::faker::gen_dummy_tx(i as u64))
1652 .collect();
1653
1654 db.update(|db| {
1655 assert_eq!(db.mempool_txs_count(), 0);
1656 txs.iter().for_each(|t| {
1657 db.store_mempool_tx(&t, 0).expect("tx should be added")
1658 });
1659 Ok(())
1660 })
1661 .unwrap();
1662
1663 db.update(|db| {
1664 assert_eq!(db.mempool_txs_count(), N);
1666
1667 txs.iter().take(D).for_each(|tx| {
1668 let deleted = db
1669 .delete_mempool_tx(tx.id(), false)
1670 .expect("transaction should be deleted");
1671 assert!(deleted.len() == 1);
1672 });
1673
1674 Ok(())
1675 })
1676 .unwrap();
1677
1678 db.update(|db| {
1680 assert_eq!(db.mempool_txs_count(), N - D);
1681 Ok(())
1682 })
1683 .unwrap();
1684 });
1685 }
1686
1687 #[test]
1688 fn test_max_gas_limit() {
1689 TestWrapper::new("test_block_size_limit").run(|path| {
1690 let db = Backend::create_or_open(path, DatabaseOptions::default());
1691
1692 db.update(|txn| {
1693 for i in 0..10u32 {
1694 let t = ledger::faker::gen_dummy_tx(i as u64);
1695 txn.store_mempool_tx(&t, 0)?;
1696 }
1697 Ok(())
1698 })
1699 .unwrap();
1700
1701 let total_gas_price: u64 = 9 + 8 + 7 + 6 + 5 + 4 + 3 + 2 + 1;
1702 db.view(|txn| {
1703 let txs = txn
1704 .mempool_txs_sorted_by_fee()
1705 .map(|t| t.gas_price())
1706 .sum::<u64>();
1707
1708 assert_eq!(txs, total_gas_price);
1709 });
1710 });
1711 }
1712
1713 #[test]
1714 fn test_get_expired_txs() {
1715 TestWrapper::new("test_get_expired_txs").run(|path| {
1716 let db = Backend::create_or_open(path, DatabaseOptions::default());
1717
1718 let mut expiry_list = HashSet::new();
1719 let _ = db.update(|txn| {
1720 (1..101).for_each(|i| {
1721 let t = ledger::faker::gen_dummy_tx(i as u64);
1722 txn.store_mempool_tx(&t, i).expect("tx should be added");
1723 expiry_list.insert(t.id());
1724 });
1725
1726 (1000..1100).for_each(|i| {
1727 let t = ledger::faker::gen_dummy_tx(i as u64);
1728 txn.store_mempool_tx(&t, i).expect("tx should be added");
1729 });
1730
1731 Ok(())
1732 });
1733
1734 db.view(|vq| {
1735 let expired: HashSet<_> = vq
1736 .mempool_expired_txs(100)
1737 .unwrap()
1738 .into_iter()
1739 .map(|id| id)
1740 .collect();
1741
1742 assert_eq!(expiry_list, expired);
1743 });
1744 });
1745 }
1746
1747 fn to_spent_txs(txs: &Vec<Transaction>) -> Vec<SpentTransaction> {
1748 txs.iter()
1749 .map(|t| SpentTransaction {
1750 inner: t.clone(),
1751 block_height: 0,
1752 gas_spent: 0,
1753 err: None,
1754 })
1755 .collect()
1756 }
1757
1758 #[test]
1759 fn test_get_ledger_tx_by_hash() {
1760 TestWrapper::new("test_get_ledger_tx_by_hash").run(|path| {
1761 let db = Backend::create_or_open(path, DatabaseOptions::default());
1762 let b: Block = Faker.fake();
1763 assert!(!b.txs().is_empty());
1764
1765 assert!(db
1767 .update(|txn| {
1768 txn.store_block(
1769 b.header(),
1770 &to_spent_txs(b.txs()),
1771 b.faults(),
1772 Label::Final(3),
1773 )?;
1774 Ok(())
1775 })
1776 .is_ok());
1777
1778 db.view(|v| {
1781 for t in b.txs().iter() {
1782 assert!(v
1783 .ledger_tx(&t.id())
1784 .expect("should not return error")
1785 .expect("should find a transaction")
1786 .inner
1787 .eq(t));
1788 }
1789 });
1790 });
1791 }
1792
1793 #[test]
1794 fn test_fetch_block_hash_by_height() {
1795 TestWrapper::new("test_fetch_block_hash_by_height").run(|path| {
1796 let db = Backend::create_or_open(path, DatabaseOptions::default());
1797 let b: Block = Faker.fake();
1798
1799 assert!(db
1801 .update(|txn| {
1802 txn.store_block(
1803 b.header(),
1804 &to_spent_txs(b.txs()),
1805 b.faults(),
1806 Label::Attested(3),
1807 )?;
1808 Ok(())
1809 })
1810 .is_ok());
1811
1812 db.view(|v| {
1814 assert!(v
1815 .block_hash_by_height(b.header().height)
1816 .expect("should not return error")
1817 .expect("should find a block")
1818 .eq(&b.header().hash));
1819 });
1820 });
1821 }
1822
1823 #[test]
1824 fn test_fetch_block_label_by_height() {
1825 TestWrapper::new("test_fetch_block_hash_by_height").run(|path| {
1826 let db = Backend::create_or_open(path, DatabaseOptions::default());
1827 let b: Block = Faker.fake();
1828
1829 assert!(db
1831 .update(|txn| {
1832 txn.store_block(
1833 b.header(),
1834 &to_spent_txs(b.txs()),
1835 b.faults(),
1836 Label::Attested(3),
1837 )?;
1838 Ok(())
1839 })
1840 .is_ok());
1841
1842 db.view(|v| {
1844 assert!(v
1845 .block_label_by_height(b.header().height)
1846 .expect("should not return error")
1847 .expect("should find a block")
1848 .1
1849 .eq(&Label::Attested(3)));
1850 });
1851 });
1852 }
1853
1854 #[test]
1855 fn test_delete_block() {
1857 let t = TestWrapper::new("test_fetch_block_hash_by_height");
1858 t.run(|path| {
1859 let db = Backend::create_or_open(path, DatabaseOptions::default());
1860 let b: ledger::Block = Faker.fake();
1861
1862 assert!(db
1863 .update(|ut| {
1864 ut.store_block(
1865 b.header(),
1866 &to_spent_txs(b.txs()),
1867 b.faults(),
1868 Label::Final(3),
1869 )?;
1870 Ok(())
1871 })
1872 .is_ok());
1873
1874 assert!(db
1875 .update(|ut| {
1876 ut.delete_block(&b)?;
1877 Ok(())
1878 })
1879 .is_ok());
1880 });
1881
1882 let path = t.get_path();
1883 let opts = Options::default();
1884
1885 let vec = rocksdb::DB::list_cf(&opts, &path).unwrap();
1886 assert!(!vec.is_empty());
1887
1888 let db = rocksdb::DB::open_cf(&opts, &path, vec.clone()).unwrap();
1890 vec.into_iter()
1891 .map(|cf_name| {
1892 if cf_name == CF_METADATA {
1893 return;
1894 }
1895
1896 let cf = db.cf_handle(&cf_name).unwrap();
1897 assert_eq!(
1898 db.iterator_cf(cf, IteratorMode::Start)
1899 .map(Result::unwrap)
1900 .count(),
1901 0
1902 );
1903 })
1904 .for_each(drop);
1905 }
1906
1907 struct TestWrapper(tempfile::TempDir);
1908
1909 impl TestWrapper {
1910 fn new(path: &'static str) -> Self {
1911 Self(
1912 tempfile::TempDir::with_prefix(path)
1913 .expect("Temp directory to be created"),
1914 )
1915 }
1916
1917 pub fn run<F>(&self, test_func: F)
1918 where
1919 F: FnOnce(&Path),
1920 {
1921 test_func(self.0.path());
1922 }
1923
1924 pub fn get_path(&self) -> std::path::PathBuf {
1925 self.0.path().to_owned().join(DB_FOLDER_NAME)
1926 }
1927 }
1928}