1use std::cell::RefCell;
8use std::collections::HashSet;
9use std::io::{Read, Write};
10use std::path::Path;
11use std::sync::Arc;
12use std::{io, vec};
13
14use anyhow::Result;
15use dusk_core::transfer::data::BlobSidecar;
16use node_data::Serializable;
17use node_data::ledger::{
18 Block, Fault, Header, Label, SpendingId, SpentTransaction, Transaction,
19};
20use node_data::message::{ConsensusHeader, payload};
21use rocksdb::{
22 AsColumnFamilyRef, BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor,
23 DBAccess, DBRawIteratorWithThreadMode, IteratorMode, LogLevel,
24 OptimisticTransactionDB, OptimisticTransactionOptions, Options,
25 WriteOptions,
26};
27use tracing::info;
28
29use super::{
30 ConsensusStorage, DB, DatabaseOptions, Ledger, LightBlock, Metadata,
31 Persist,
32};
33use crate::database::Mempool;
34
35const CF_LEDGER_HEADER: &str = "cf_ledger_header";
36const CF_LEDGER_TXS: &str = "cf_ledger_txs";
37const CF_LEDGER_BLOBS: &str = "cf_ledger_blobs";
38const CF_LEDGER_BLOBS_HEIGHT: &str = "cf_ledger_blobs_height";
39const CF_LEDGER_FAULTS: &str = "cf_ledger_faults";
40const CF_LEDGER_HEIGHT: &str = "cf_ledger_height";
41const CF_CANDIDATES: &str = "cf_candidates";
42const CF_CANDIDATES_HEIGHT: &str = "cf_candidates_height";
43const CF_VALIDATION_RESULTS: &str = "cf_validation_results";
44const CF_MEMPOOL: &str = "cf_mempool";
45const CF_MEMPOOL_SPENDING_ID: &str = "cf_mempool_spending_id";
46const CF_MEMPOOL_FEES: &str = "cf_mempool_fees";
47const CF_METADATA: &str = "cf_metadata";
48
49const DB_FOLDER_NAME: &str = "chain.db";
50
51pub const MD_HASH_KEY: &[u8] = b"hash_key";
53pub const MD_STATE_ROOT_KEY: &[u8] = b"state_hash_key";
54pub const MD_AVG_VALIDATION: &[u8] = b"avg_validation_time";
55pub const MD_AVG_RATIFICATION: &[u8] = b"avg_ratification_time";
56pub const MD_AVG_PROPOSAL: &[u8] = b"avg_proposal_time";
57pub const MD_LAST_ITER: &[u8] = b"consensus_last_iter";
58
59#[derive(Clone)]
60pub struct Backend {
61 rocksdb: Arc<OptimisticTransactionDB>,
62}
63
64impl Backend {
65 fn begin_tx(&self) -> DBTransaction<'_, OptimisticTransactionDB> {
66 let write_options = WriteOptions::default();
68 let tx_options = OptimisticTransactionOptions::default();
69
70 let inner = self.rocksdb.transaction_opt(&write_options, &tx_options);
71
72 let ledger_cf = self
74 .rocksdb
75 .cf_handle(CF_LEDGER_HEADER)
76 .expect("ledger_header column family must exist");
77
78 let ledger_txs_cf = self
79 .rocksdb
80 .cf_handle(CF_LEDGER_TXS)
81 .expect("CF_LEDGER_TXS column family must exist");
82
83 let ledger_faults_cf = self
84 .rocksdb
85 .cf_handle(CF_LEDGER_FAULTS)
86 .expect("CF_LEDGER_FAULTS column family must exist");
87
88 let candidates_cf = self
89 .rocksdb
90 .cf_handle(CF_CANDIDATES)
91 .expect("candidates column family must exist");
92
93 let candidates_height_cf = self
94 .rocksdb
95 .cf_handle(CF_CANDIDATES_HEIGHT)
96 .expect("candidates_height column family must exist");
97
98 let validation_results_cf = self
99 .rocksdb
100 .cf_handle(CF_VALIDATION_RESULTS)
101 .expect("validation result column family must exist");
102
103 let mempool_cf = self
104 .rocksdb
105 .cf_handle(CF_MEMPOOL)
106 .expect("mempool column family must exist");
107
108 let spending_id_cf = self
109 .rocksdb
110 .cf_handle(CF_MEMPOOL_SPENDING_ID)
111 .expect("CF_MEMPOOL_SPENDING_ID column family must exist");
112
113 let fees_cf = self
114 .rocksdb
115 .cf_handle(CF_MEMPOOL_FEES)
116 .expect("CF_MEMPOOL_FEES column family must exist");
117
118 let ledger_height_cf = self
119 .rocksdb
120 .cf_handle(CF_LEDGER_HEIGHT)
121 .expect("CF_LEDGER_HEIGHT column family must exist");
122
123 let metadata_cf = self
124 .rocksdb
125 .cf_handle(CF_METADATA)
126 .expect("CF_METADATA column family must exist");
127
128 let ledger_blobs_cf = self
129 .rocksdb
130 .cf_handle(CF_LEDGER_BLOBS)
131 .expect("CF_LEDGER_BLOBS column family must exist");
132
133 let ledger_blobs_height_cf = self
134 .rocksdb
135 .cf_handle(CF_LEDGER_BLOBS_HEIGHT)
136 .expect("CF_LEDGER_BLOBS_HEIGHT column family must exist");
137
138 DBTransaction::<'_, OptimisticTransactionDB> {
139 inner,
140 candidates_cf,
141 candidates_height_cf,
142 validation_results_cf,
143 ledger_cf,
144 ledger_txs_cf,
145 ledger_faults_cf,
146 mempool_cf,
147 spending_id_cf,
148 fees_cf,
149 ledger_height_cf,
150 ledger_blobs_cf,
151 ledger_blobs_height_cf,
152 metadata_cf,
153 cumulative_inner_size: RefCell::new(0),
154 }
155 }
156}
157
158impl DB for Backend {
159 type P<'a> = DBTransaction<'a, OptimisticTransactionDB>;
160
161 fn create_or_open<T>(path: T, db_opts: DatabaseOptions) -> Self
162 where
163 T: AsRef<Path>,
164 {
165 let path = path.as_ref().join(DB_FOLDER_NAME);
166 info!("Opening database in {path:?}, {:?} ", db_opts);
167
168 let mut blocks_cf_opts = Options::default();
171 blocks_cf_opts.create_if_missing(db_opts.create_if_missing);
172 blocks_cf_opts.create_missing_column_families(true);
173 blocks_cf_opts.set_level_compaction_dynamic_level_bytes(true);
174 blocks_cf_opts
175 .set_write_buffer_size(db_opts.blocks_cf_max_write_buffer_size);
176
177 if db_opts.enable_debug {
178 blocks_cf_opts.set_log_level(LogLevel::Info);
179 blocks_cf_opts.set_dump_malloc_stats(true);
180 blocks_cf_opts.enable_statistics();
181 }
182
183 if db_opts.blocks_cf_disable_block_cache {
184 let mut block_opts = BlockBasedOptions::default();
185 block_opts.disable_cache();
186 blocks_cf_opts.set_block_based_table_factory(&block_opts);
187 }
188
189 let mut mp_opts = blocks_cf_opts.clone();
192 mp_opts.set_manual_wal_flush(true);
194 mp_opts.create_if_missing(true);
195 mp_opts.create_missing_column_families(true);
196 mp_opts.set_write_buffer_size(db_opts.mempool_cf_max_write_buffer_size);
197
198 if db_opts.enable_debug {
199 mp_opts.set_log_level(LogLevel::Info);
200 mp_opts.set_dump_malloc_stats(true);
201 mp_opts.enable_statistics();
202 }
203
204 let cfs = vec![
205 ColumnFamilyDescriptor::new(
206 CF_LEDGER_HEADER,
207 blocks_cf_opts.clone(),
208 ),
209 ColumnFamilyDescriptor::new(CF_LEDGER_TXS, blocks_cf_opts.clone()),
210 ColumnFamilyDescriptor::new(
211 CF_LEDGER_FAULTS,
212 blocks_cf_opts.clone(),
213 ),
214 ColumnFamilyDescriptor::new(
215 CF_LEDGER_HEIGHT,
216 blocks_cf_opts.clone(),
217 ),
218 ColumnFamilyDescriptor::new(
219 CF_LEDGER_BLOBS,
220 blocks_cf_opts.clone(),
221 ),
222 ColumnFamilyDescriptor::new(
223 CF_LEDGER_BLOBS_HEIGHT,
224 blocks_cf_opts.clone(),
225 ),
226 ColumnFamilyDescriptor::new(CF_CANDIDATES, blocks_cf_opts.clone()),
227 ColumnFamilyDescriptor::new(
228 CF_CANDIDATES_HEIGHT,
229 blocks_cf_opts.clone(),
230 ),
231 ColumnFamilyDescriptor::new(
232 CF_VALIDATION_RESULTS,
233 blocks_cf_opts.clone(),
234 ),
235 ColumnFamilyDescriptor::new(CF_METADATA, blocks_cf_opts.clone()),
236 ColumnFamilyDescriptor::new(CF_MEMPOOL, mp_opts.clone()),
237 ColumnFamilyDescriptor::new(
238 CF_MEMPOOL_SPENDING_ID,
239 mp_opts.clone(),
240 ),
241 ColumnFamilyDescriptor::new(CF_MEMPOOL_FEES, mp_opts.clone()),
242 ];
243
244 Self {
245 rocksdb: Arc::new(
246 OptimisticTransactionDB::open_cf_descriptors(
247 &blocks_cf_opts,
248 &path,
249 cfs,
250 )
251 .unwrap_or_else(|_| {
252 panic!("should be a valid database in {path:?}")
253 }),
254 ),
255 }
256 }
257
258 fn view<F, T>(&self, f: F) -> T
259 where
260 F: for<'a> FnOnce(&Self::P<'a>) -> T,
261 {
262 let tx = self.begin_tx();
264
265 let ret = f(&tx);
267 tx.rollback().expect("rollback to succeed for readonly");
268 ret
269 }
270
271 fn update<F, T>(&self, execute: F) -> Result<T>
272 where
273 F: for<'a> FnOnce(&mut Self::P<'a>) -> Result<T>,
274 {
275 self.update_dry_run(false, execute)
276 }
277
278 fn update_dry_run<F, T>(&self, dry_run: bool, execute: F) -> Result<T>
279 where
280 F: for<'a> FnOnce(&mut Self::P<'a>) -> Result<T>,
281 {
282 let mut tx = self.begin_tx();
284
285 let ret = execute(&mut tx)?;
288
289 if dry_run {
290 tx.rollback()?;
291 } else {
292 tx.commit()?;
294 }
295
296 Ok(ret)
297 }
298
299 fn close(&mut self) {}
300}
301
302pub struct DBTransaction<'db, DB: DBAccess> {
303 inner: rocksdb::Transaction<'db, DB>,
304 cumulative_inner_size: RefCell<usize>,
306
307 candidates_cf: &'db ColumnFamily,
310 candidates_height_cf: &'db ColumnFamily,
311 validation_results_cf: &'db ColumnFamily,
313
314 ledger_cf: &'db ColumnFamily,
316 ledger_faults_cf: &'db ColumnFamily,
317 ledger_txs_cf: &'db ColumnFamily,
318 ledger_height_cf: &'db ColumnFamily,
319 ledger_blobs_cf: &'db ColumnFamily,
320 ledger_blobs_height_cf: &'db ColumnFamily,
321
322 mempool_cf: &'db ColumnFamily,
324 spending_id_cf: &'db ColumnFamily,
325 fees_cf: &'db ColumnFamily,
326
327 metadata_cf: &'db ColumnFamily,
328}
329
330impl<DB: DBAccess> Ledger for DBTransaction<'_, DB> {
331 fn store_block(
332 &mut self,
333 header: &Header,
334 txs: &[SpentTransaction],
335 faults: &[Fault],
336 label: Label,
337 ) -> Result<usize> {
338 {
342 let cf = self.ledger_cf;
343
344 let mut buf = vec![];
345 LightBlock {
346 header: header.clone(),
347 transactions_ids: txs.iter().map(|t| t.inner.id()).collect(),
348 faults_ids: faults.iter().map(|f| f.id()).collect(),
349 }
350 .write(&mut buf)?;
351
352 self.put_cf(cf, header.hash, buf)?;
353 }
354
355 self.op_write(MD_HASH_KEY, header.hash)?;
357 self.op_write(MD_STATE_ROOT_KEY, header.state_hash)?;
358
359 {
361 let cf = self.ledger_txs_cf;
362
363 let mut stored_blobs = Vec::with_capacity(6);
364
365 for tx in txs {
367 let mut d = vec![];
368
369 if tx.inner.inner.blob().is_some() {
370 let mut strip_tx = tx.clone();
371 if let Some(blobs) = strip_tx.inner.inner.strip_blobs() {
372 for (hash, sidecar) in blobs.into_iter() {
373 let sidecar_bytes = sidecar.to_var_bytes();
374 self.store_blob_data(&hash, sidecar_bytes)?;
375 stored_blobs.push(hash);
376 }
377 }
378 strip_tx.write(&mut d)?;
379 } else {
380 tx.write(&mut d)?;
381 }
382 self.put_cf(cf, tx.inner.id(), d)?;
383 }
384
385 if !stored_blobs.is_empty() {
386 self.store_blobs_height(header.height, &stored_blobs)?;
388 }
389 }
390
391 {
393 let cf = self.ledger_faults_cf;
394
395 for f in faults {
397 let mut d = vec![];
398 f.write(&mut d)?;
399 self.put_cf(cf, f.id(), d)?;
400 }
401 }
402 self.store_block_label(header.height, &header.hash, label)?;
403
404 Ok(self.get_size())
405 }
406
407 fn faults_by_block(&self, start_height: u64) -> Result<Vec<Fault>> {
408 let mut faults = vec![];
409 let mut hash = self
410 .op_read(MD_HASH_KEY)?
411 .ok_or(anyhow::anyhow!("Cannot read tip"))?;
412
413 loop {
414 let block = self.light_block(&hash)?.ok_or(anyhow::anyhow!(
415 "Cannot read block {}",
416 hex::encode(&hash)
417 ))?;
418
419 let block_height = block.header.height;
420
421 if block_height >= start_height {
422 hash = block.header.prev_block_hash.to_vec();
423 faults.extend(self.faults(&block.faults_ids)?);
424 } else {
425 break;
426 }
427
428 if block_height == 0 {
429 break;
430 }
431 }
432 Ok(faults)
433 }
434
435 fn store_block_label(
436 &mut self,
437 height: u64,
438 hash: &[u8; 32],
439 label: Label,
440 ) -> Result<()> {
441 let mut buf = vec![];
443 buf.write_all(hash)?;
444 label.write(&mut buf)?;
445
446 self.put_cf(self.ledger_height_cf, height.to_le_bytes(), buf)?;
447 Ok(())
448 }
449
450 fn delete_block(&mut self, b: &Block) -> Result<()> {
451 self.inner.delete_cf(
452 self.ledger_height_cf,
453 b.header().height.to_le_bytes(),
454 )?;
455
456 for tx in b.txs() {
457 self.inner.delete_cf(self.ledger_txs_cf, tx.id())?;
458 }
459 for f in b.faults() {
460 self.inner.delete_cf(self.ledger_faults_cf, f.id())?;
461 }
462
463 self.delete_blobs_by_height(b.header().height)?;
464 self.inner.delete_cf(self.ledger_cf, b.header().hash)?;
465
466 Ok(())
467 }
468
469 fn block_exists(&self, hash: &[u8]) -> Result<bool> {
470 Ok(self.inner.get_cf(self.ledger_cf, hash)?.is_some())
471 }
472
473 fn faults(&self, faults_ids: &[[u8; 32]]) -> Result<Vec<Fault>> {
474 if faults_ids.is_empty() {
475 return Ok(vec![]);
476 }
477 let ids = faults_ids
478 .iter()
479 .map(|id| (self.ledger_faults_cf, id))
480 .collect::<Vec<_>>();
481
482 let faults_buffer = self.inner.multi_get_cf(ids);
484
485 let mut faults = vec![];
486 for buf in faults_buffer {
487 let buf = buf?.unwrap();
488 let fault = Fault::read(&mut &buf[..])?;
489 faults.push(fault);
490 }
491
492 Ok(faults)
493 }
494
495 fn latest_block(&self) -> Result<LightBlock> {
496 let tip_hash = self
497 .op_read(MD_HASH_KEY)?
498 .ok_or(anyhow::anyhow!("Cannot find tip stored in metadata"))?;
499 self.light_block(&tip_hash)?
500 .ok_or(anyhow::anyhow!("Cannot find tip block"))
501 }
502
503 fn blob_data_by_hash(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
504 Ok(self.inner.get_cf(self.ledger_blobs_cf, hash)?)
505 }
506
507 fn store_blob_data(&self, hash: &[u8; 32], data: Vec<u8>) -> Result<()> {
508 self.inner.put_cf(self.ledger_blobs_cf, hash, data)?;
509 Ok(())
510 }
511 fn store_blobs_height(
512 &self,
513 block_height: u64,
514 blob_hashes: &[[u8; 32]],
515 ) -> Result<()> {
516 if blob_hashes.is_empty() {
517 return Ok(());
518 }
519 let blob_hashes_bytes: Vec<_> =
520 blob_hashes.iter().flat_map(|hash| hash.to_vec()).collect();
521 self.inner.put_cf(
522 self.ledger_blobs_height_cf,
523 block_height.to_be_bytes(),
524 blob_hashes_bytes,
525 )?;
526 Ok(())
527 }
528
529 fn delete_blobs_by_height(&self, block_height: u64) -> Result<()> {
530 let blobs_to_delete = self.blobs_by_height(block_height)?;
531 if let Some(blob_hashes) = blobs_to_delete {
532 for hash in blob_hashes {
533 self.inner.delete_cf(self.ledger_blobs_cf, hash)?;
536 }
537 self.inner.delete_cf(
538 self.ledger_blobs_height_cf,
539 block_height.to_be_bytes(),
540 )?;
541 }
542
543 Ok(())
544 }
545
546 fn blobs_by_height(
547 &self,
548 block_height: u64,
549 ) -> Result<Option<Vec<[u8; 32]>>> {
550 let blob_hashes_bytes = self
551 .inner
552 .get_cf(self.ledger_blobs_height_cf, block_height.to_be_bytes())?;
553
554 if let Some(blob_hashes_bytes) = blob_hashes_bytes {
555 let mut blob_hashes = vec![];
556 for chunk in blob_hashes_bytes.chunks(32) {
557 let mut hash = [0u8; 32];
558 hash.copy_from_slice(chunk);
559 blob_hashes.push(hash);
560 }
561 Ok(Some(blob_hashes))
562 } else {
563 Ok(None)
564 }
565 }
566
567 fn block(&self, hash: &[u8]) -> Result<Option<Block>> {
568 match self.inner.get_cf(self.ledger_cf, hash)? {
569 Some(blob) => {
570 let record = LightBlock::read(&mut &blob[..])?;
571
572 let txs_buffers = self.inner.multi_get_cf(
574 record
575 .transactions_ids
576 .iter()
577 .map(|id| (self.ledger_txs_cf, id))
578 .collect::<Vec<(&ColumnFamily, &[u8; 32])>>(),
579 );
580
581 let mut txs = vec![];
582 for buf in txs_buffers {
583 let buf = buf?.unwrap();
584 let mut tx = SpentTransaction::read(&mut &buf[..])?;
585 if let Some(blobs) = tx.inner.inner.blob_mut() {
586 for blob in blobs {
587 let sidecar = self
589 .blob_data_by_hash(&blob.hash)?
590 .map(|bytes| {
591 BlobSidecar::from_buf(&mut &bytes[..])
592 })
593 .transpose()
594 .map_err(|e| {
595 anyhow::anyhow!(
596 "Failed to parse blob sidecar: {e:?}"
597 )
598 })?;
599 blob.data = sidecar;
600 }
601 }
602 txs.push(tx.inner);
603 }
604
605 let faults_buffer = self.inner.multi_get_cf(
607 record
608 .faults_ids
609 .iter()
610 .map(|id| (self.ledger_faults_cf, id))
611 .collect::<Vec<(&ColumnFamily, &[u8; 32])>>(),
612 );
613 let mut faults = vec![];
614 for buf in faults_buffer {
615 let buf = buf?.unwrap();
616 let fault = Fault::read(&mut &buf[..])?;
617 faults.push(fault);
618 }
619
620 Ok(Some(
621 Block::new(record.header, txs, faults)
622 .expect("block should be valid"),
623 ))
624 }
625 None => Ok(None),
626 }
627 }
628
629 fn light_block(&self, hash: &[u8]) -> Result<Option<LightBlock>> {
630 match self.inner.get_cf(self.ledger_cf, hash)? {
631 Some(blob) => {
632 let record = LightBlock::read(&mut &blob[..])?;
633 Ok(Some(record))
634 }
635 None => Ok(None),
636 }
637 }
638
639 fn block_header(&self, hash: &[u8]) -> Result<Option<Header>> {
640 match self.inner.get_cf(self.ledger_cf, hash)? {
641 Some(blob) => {
642 let record = Header::read(&mut &blob[..])?;
643 Ok(Some(record))
644 }
645 None => Ok(None),
646 }
647 }
648
649 fn block_hash_by_height(&self, height: u64) -> Result<Option<[u8; 32]>> {
650 Ok(self
651 .inner
652 .get_cf(self.ledger_height_cf, height.to_le_bytes())?
653 .map(|h| {
654 const LEN: usize = 32;
655 let mut hash = [0u8; LEN];
656 hash.copy_from_slice(&h.as_slice()[0..LEN]);
657 hash
658 }))
659 }
660
661 fn ledger_tx(&self, tx_id: &[u8]) -> Result<Option<SpentTransaction>> {
662 let tx = self
663 .inner
664 .get_cf(self.ledger_txs_cf, tx_id)?
665 .map(|blob| SpentTransaction::read(&mut &blob[..]))
666 .transpose()?;
667
668 Ok(tx)
669 }
670
671 fn ledger_txs(
678 &self,
679 tx_ids: Vec<&[u8; 32]>,
680 ) -> Result<Vec<SpentTransaction>> {
681 let cf = self.ledger_txs_cf;
682
683 let ids = tx_ids.into_iter().map(|id| (cf, id)).collect::<Vec<_>>();
684
685 let multi_get_results = self.inner.multi_get_cf(ids);
686
687 let mut spent_transactions =
688 Vec::with_capacity(multi_get_results.len());
689 for result in multi_get_results.into_iter() {
690 let opt_blob = result.map_err(std::io::Error::other)?;
691
692 let Some(blob) = opt_blob else {
693 return Err(anyhow::anyhow!(
694 "At least one Transaction ID was not found"
695 ));
696 };
697
698 let stx = SpentTransaction::read(&mut &blob[..])?;
699
700 spent_transactions.push(stx);
701 }
702
703 Ok(spent_transactions)
704 }
705
706 fn ledger_tx_exists(&self, tx_id: &[u8]) -> Result<bool> {
712 Ok(self.inner.get_cf(self.ledger_txs_cf, tx_id)?.is_some())
713 }
714
715 fn block_by_height(&self, height: u64) -> Result<Option<Block>> {
716 let hash = self.block_hash_by_height(height)?;
717 let block = match hash {
718 Some(hash) => self.block(&hash)?,
719 None => None,
720 };
721 Ok(block)
722 }
723
724 fn block_label_by_height(
725 &self,
726 height: u64,
727 ) -> Result<Option<([u8; 32], Label)>> {
728 const HASH_LEN: usize = 32;
729 Ok(self
730 .inner
731 .get_cf(self.ledger_height_cf, height.to_le_bytes())?
732 .map(|h| {
733 let mut hash = [0u8; HASH_LEN];
734 hash.copy_from_slice(&h.as_slice()[0..HASH_LEN]);
735
736 let label_buff = h[HASH_LEN..].to_vec();
737 Label::read(&mut &label_buff[..]).map(|label| (hash, label))
738 })
739 .transpose()?)
740 }
741}
742
743impl<DB: DBAccess> ConsensusStorage for DBTransaction<'_, DB> {
745 fn store_candidate(&mut self, b: Block) -> Result<()> {
756 let mut serialized = vec![];
757 b.write(&mut serialized)?;
758
759 self.inner
760 .put_cf(self.candidates_cf, b.header().hash, serialized)?;
761
762 let key = serialize_key(b.header().height, b.header().hash)?;
763 self.inner
764 .put_cf(self.candidates_height_cf, key, b.header().hash)?;
765
766 Ok(())
767 }
768
769 fn candidate(&self, hash: &[u8]) -> Result<Option<Block>> {
780 if let Some(blob) = self.inner.get_cf(self.candidates_cf, hash)? {
781 let b = Block::read(&mut &blob[..])?;
782 return Ok(Some(b));
783 }
784
785 Ok(None)
787 }
788
789 fn candidate_by_iteration(
790 &self,
791 consensus_header: &ConsensusHeader,
792 ) -> Result<Option<Block>> {
793 let iter = self
794 .inner
795 .iterator_cf(self.candidates_cf, IteratorMode::Start);
796
797 for (_, blob) in iter.map(Result::unwrap) {
798 let b = Block::read(&mut &blob[..])?;
799
800 let header = b.header();
801 if header.prev_block_hash == consensus_header.prev_block_hash
802 && header.iteration == consensus_header.iteration
803 {
804 return Ok(Some(b));
805 }
806 }
807
808 Ok(None)
809 }
810
811 fn delete_candidate<F>(&mut self, closure: F) -> Result<()>
822 where
823 F: FnOnce(u64) -> bool + std::marker::Copy,
824 {
825 let iter = self
826 .inner
827 .iterator_cf(self.candidates_height_cf, IteratorMode::Start);
828
829 for (key, hash) in iter.map(Result::unwrap) {
830 let (height, _) = deserialize_key(&mut &key.to_vec()[..])?;
831 if closure(height) {
832 self.inner.delete_cf(self.candidates_cf, hash)?;
833 self.inner.delete_cf(self.candidates_height_cf, key)?;
834 }
835 }
836
837 Ok(())
838 }
839
840 fn count_candidates(&self) -> usize {
841 let iter = self
842 .inner
843 .iterator_cf(self.candidates_height_cf, IteratorMode::Start);
844
845 iter.count()
846 }
847
848 fn clear_candidates(&mut self) -> Result<()> {
855 self.delete_candidate(|_| true)
856 }
857
858 fn store_validation_result(
869 &mut self,
870 consensus_header: &ConsensusHeader,
871 validation_result: &payload::ValidationResult,
872 ) -> Result<()> {
873 let mut serialized = vec![];
874 validation_result.write(&mut serialized)?;
875
876 let key = serialize_iter_key(consensus_header)?;
877 self.inner
878 .put_cf(self.validation_results_cf, key, serialized)?;
879
880 Ok(())
881 }
882
883 fn validation_result(
895 &self,
896 consensus_header: &ConsensusHeader,
897 ) -> Result<Option<payload::ValidationResult>> {
898 let key = serialize_iter_key(consensus_header)?;
899 if let Some(blob) =
900 self.inner.get_cf(self.validation_results_cf, key)?
901 {
902 let validation_result =
903 payload::ValidationResult::read(&mut &blob[..])?;
904 return Ok(Some(validation_result));
905 }
906
907 Ok(None)
909 }
910
911 fn delete_validation_results<F>(&mut self, closure: F) -> Result<()>
923 where
924 F: FnOnce([u8; 32]) -> bool + std::marker::Copy,
925 {
926 let iter = self
927 .inner
928 .iterator_cf(self.validation_results_cf, IteratorMode::Start);
929
930 for (key, _) in iter.map(Result::unwrap) {
931 let (prev_block_hash, _) =
932 deserialize_iter_key(&mut &key.to_vec()[..])?;
933 if closure(prev_block_hash) {
934 self.inner.delete_cf(self.validation_results_cf, key)?;
935 }
936 }
937
938 Ok(())
939 }
940
941 fn count_validation_results(&self) -> usize {
942 let iter = self
943 .inner
944 .iterator_cf(self.validation_results_cf, IteratorMode::Start);
945
946 iter.count()
947 }
948
949 fn clear_validation_results(&mut self) -> Result<()> {
956 self.delete_validation_results(|_| true)
957 }
958}
959
960impl<DB: DBAccess> Persist for DBTransaction<'_, DB> {
961 fn clear_database(&mut self) -> Result<()> {
963 let iter = self.inner.iterator_cf(self.ledger_cf, IteratorMode::Start);
965
966 for (key, _) in iter.map(Result::unwrap) {
968 self.inner.delete_cf(self.ledger_cf, key)?;
969 }
970
971 self.clear_candidates()?;
972 self.clear_validation_results()?;
973 Ok(())
974 }
975
976 fn commit(self) -> Result<()> {
977 if let Err(e) = self.inner.commit() {
978 return Err(anyhow::Error::new(e).context("failed to commit"));
979 }
980
981 Ok(())
982 }
983
984 fn rollback(self) -> Result<()> {
985 if let Err(e) = self.inner.rollback() {
986 return Err(anyhow::Error::new(e).context("failed to rollback"));
987 }
988
989 Ok(())
990 }
991}
992
993impl<DB: DBAccess> Mempool for DBTransaction<'_, DB> {
994 fn store_mempool_tx(
995 &mut self,
996 tx: &Transaction,
997 timestamp: u64,
998 ) -> Result<()> {
999 let mut tx_data = vec![];
1001 tx.write(&mut tx_data)?;
1002
1003 let hash = tx.id();
1004 self.put_cf(self.mempool_cf, hash, tx_data)?;
1005
1006 for n in tx.to_spend_ids() {
1009 let key = n.to_bytes();
1010 self.put_cf(self.spending_id_cf, key, hash)?;
1011 }
1012
1013 let timestamp = timestamp.to_be_bytes();
1014
1015 self.put_cf(
1019 self.fees_cf,
1020 serialize_key(tx.gas_price(), hash)?,
1021 timestamp,
1022 )?;
1023
1024 Ok(())
1025 }
1026
1027 fn mempool_tx(&self, hash: [u8; 32]) -> Result<Option<Transaction>> {
1028 let data = self.inner.get_cf(self.mempool_cf, hash)?;
1029
1030 match data {
1031 None => Ok(None),
1033 Some(blob) => Ok(Some(Transaction::read(&mut &blob.to_vec()[..])?)),
1034 }
1035 }
1036
1037 fn mempool_tx_exists(&self, h: [u8; 32]) -> Result<bool> {
1038 Ok(self.inner.get_cf(self.mempool_cf, h)?.is_some())
1039 }
1040
1041 fn delete_mempool_tx(
1042 &mut self,
1043 h: [u8; 32],
1044 cascade: bool,
1045 ) -> Result<Vec<[u8; 32]>> {
1046 let mut deleted = vec![];
1047 let tx = self.mempool_tx(h)?;
1048 if let Some(tx) = tx {
1049 let hash = tx.id();
1050
1051 self.inner.delete_cf(self.mempool_cf, hash)?;
1052
1053 for n in tx.to_spend_ids() {
1056 let key = n.to_bytes();
1057 self.inner.delete_cf(self.spending_id_cf, key)?;
1058 }
1059
1060 self.inner.delete_cf(
1062 self.fees_cf,
1063 serialize_key(tx.gas_price(), hash)?,
1064 )?;
1065
1066 deleted.push(h);
1067
1068 if cascade {
1069 let mut dependants = vec![];
1070 let mut next_spending_id = tx.next_spending_id();
1073 while let Some(spending_id) = next_spending_id {
1074 next_spending_id = spending_id.next();
1075 let next_txs =
1076 self.mempool_txs_by_spendable_ids(&[spending_id]);
1077 if next_txs.is_empty() {
1078 break;
1079 }
1080 dependants.extend(next_txs);
1081 }
1082
1083 for tx_id in dependants {
1085 let cascade_deleted =
1086 self.delete_mempool_tx(tx_id, false)?;
1087 deleted.extend(cascade_deleted);
1088 }
1089 }
1090 }
1091
1092 Ok(deleted)
1093 }
1094
1095 fn mempool_txs_by_spendable_ids(
1096 &self,
1097 n: &[SpendingId],
1098 ) -> HashSet<[u8; 32]> {
1099 n.iter()
1100 .filter_map(|n| {
1101 match self.inner.get_cf(self.spending_id_cf, n.to_bytes()) {
1102 Ok(Some(tx_id)) => tx_id.try_into().ok(),
1103 _ => None,
1104 }
1105 })
1106 .collect()
1107 }
1108
1109 fn mempool_txs_sorted_by_fee(
1110 &self,
1111 ) -> Box<dyn Iterator<Item = Transaction> + '_> {
1112 let iter = MemPoolIterator::new(&self.inner, self.fees_cf, self);
1113
1114 Box::new(iter)
1115 }
1116
1117 fn mempool_txs_ids_sorted_by_fee(
1118 &self,
1119 ) -> Box<dyn Iterator<Item = (u64, [u8; 32])> + '_> {
1120 let iter = MemPoolFeeIterator::new(&self.inner, self.fees_cf, true);
1121
1122 Box::new(iter)
1123 }
1124
1125 fn mempool_txs_ids_sorted_by_low_fee(
1126 &self,
1127 ) -> Box<dyn Iterator<Item = (u64, [u8; 32])> + '_> {
1128 let iter = MemPoolFeeIterator::new(&self.inner, self.fees_cf, false);
1129
1130 Box::new(iter)
1131 }
1132
1133 fn mempool_expired_txs(&self, timestamp: u64) -> Result<Vec<[u8; 32]>> {
1135 let mut iter = self.inner.raw_iterator_cf(self.fees_cf);
1136 iter.seek_to_first();
1137 let mut txs_list = vec![];
1138
1139 while iter.valid() {
1140 if let Some(key) = iter.key() {
1141 let (_, tx_id) = deserialize_key(&mut &key.to_vec()[..])?;
1142
1143 let tx_timestamp = u64::from_be_bytes(
1144 iter.value()
1145 .ok_or_else(|| {
1146 io::Error::new(
1147 io::ErrorKind::InvalidData,
1148 "no value",
1149 )
1150 })?
1151 .try_into()
1152 .map_err(|_| {
1153 io::Error::new(
1154 io::ErrorKind::InvalidData,
1155 "invalid data",
1156 )
1157 })?,
1158 );
1159
1160 if tx_timestamp <= timestamp {
1161 txs_list.push(tx_id);
1162 }
1163 }
1164
1165 iter.next();
1166 }
1167
1168 Ok(txs_list)
1169 }
1170
1171 fn mempool_txs_ids(&self) -> Result<Vec<[u8; 32]>> {
1172 let mut iter = self.inner.raw_iterator_cf(self.fees_cf);
1173 iter.seek_to_last();
1174
1175 let mut txs_list = vec![];
1176
1177 while iter.valid() {
1179 if let Some(key) = iter.key() {
1180 let (_, tx_id) = deserialize_key(&mut &key.to_vec()[..])?;
1181
1182 txs_list.push(tx_id);
1183 }
1184
1185 iter.prev();
1186 }
1187
1188 Ok(txs_list)
1189 }
1190
1191 fn mempool_txs_count(&self) -> usize {
1192 self.inner
1193 .iterator_cf(self.mempool_cf, IteratorMode::Start)
1194 .count()
1195 }
1196}
1197
1198pub struct MemPoolIterator<'db, DB: DBAccess, M: Mempool> {
1199 iter: MemPoolFeeIterator<'db, DB>,
1200 mempool: &'db M,
1201}
1202
1203impl<'db, DB: DBAccess, M: Mempool> MemPoolIterator<'db, DB, M> {
1204 fn new(
1205 db: &'db rocksdb::Transaction<DB>,
1206 fees_cf: &ColumnFamily,
1207 mempool: &'db M,
1208 ) -> Self {
1209 let iter = MemPoolFeeIterator::new(db, fees_cf, true);
1210 MemPoolIterator { iter, mempool }
1211 }
1212}
1213
1214impl<DB: DBAccess, M: Mempool> Iterator for MemPoolIterator<'_, DB, M> {
1215 type Item = Transaction;
1216 fn next(&mut self) -> Option<Self::Item> {
1217 self.iter.next().and_then(|(_, tx_id)| {
1218 self.mempool.mempool_tx(tx_id).ok().flatten()
1219 })
1220 }
1221}
1222
1223pub struct MemPoolFeeIterator<'db, DB: DBAccess> {
1224 iter: DBRawIteratorWithThreadMode<'db, rocksdb::Transaction<'db, DB>>,
1225 fee_desc: bool,
1226}
1227
1228impl<'db, DB: DBAccess> MemPoolFeeIterator<'db, DB> {
1229 fn new(
1230 db: &'db rocksdb::Transaction<DB>,
1231 fees_cf: &ColumnFamily,
1232 fee_desc: bool,
1233 ) -> Self {
1234 let mut iter = db.raw_iterator_cf(fees_cf);
1235 if fee_desc {
1236 iter.seek_to_last();
1237 };
1238 MemPoolFeeIterator { iter, fee_desc }
1239 }
1240}
1241
1242impl<DB: DBAccess> Iterator for MemPoolFeeIterator<'_, DB> {
1243 type Item = (u64, [u8; 32]);
1244 fn next(&mut self) -> Option<Self::Item> {
1245 match self.iter.valid() {
1246 true => {
1247 if let Some(key) = self.iter.key() {
1248 let (gas_price, hash) =
1249 deserialize_key(&mut &key.to_vec()[..]).ok()?;
1250 if self.fee_desc {
1251 self.iter.prev();
1252 } else {
1253 self.iter.next();
1254 }
1255 Some((gas_price, hash))
1256 } else {
1257 None
1258 }
1259 }
1260 false => None,
1261 }
1262 }
1263}
1264
1265impl<DB: DBAccess> std::fmt::Debug for DBTransaction<'_, DB> {
1266 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1267 let iter = self.inner.iterator_cf(self.ledger_cf, IteratorMode::Start);
1269
1270 iter.map(Result::unwrap).try_for_each(|(hash, _)| {
1271 if let Ok(Some(blob)) = self.inner.get_cf(self.ledger_cf, &hash[..])
1272 {
1273 let b = Block::read(&mut &blob[..]).unwrap_or_default();
1274 writeln!(f, "ledger_block [{}]: {:#?}", b.header().height, b)
1275 } else {
1276 Ok(())
1277 }
1278 })?;
1279
1280 let iter = self
1282 .inner
1283 .iterator_cf(self.candidates_cf, IteratorMode::Start);
1284
1285 let results: std::fmt::Result =
1286 iter.map(Result::unwrap).try_for_each(|(hash, _)| {
1287 if let Ok(Some(blob)) =
1288 self.inner.get_cf(self.candidates_cf, &hash[..])
1289 {
1290 let b = Block::read(&mut &blob[..]).unwrap_or_default();
1291 writeln!(
1292 f,
1293 "candidate_block [{}]: {:#?}",
1294 b.header().height,
1295 b
1296 )
1297 } else {
1298 Ok(())
1299 }
1300 });
1301
1302 results
1303 }
1304}
1305
1306impl<DB: DBAccess> Metadata for DBTransaction<'_, DB> {
1307 fn op_write<T: AsRef<[u8]>>(&mut self, key: &[u8], value: T) -> Result<()> {
1308 self.put_cf(self.metadata_cf, key, value)?;
1309 Ok(())
1310 }
1311
1312 fn op_read(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1313 self.inner.get_cf(self.metadata_cf, key).map_err(Into::into)
1314 }
1315}
1316
1317impl<DB: DBAccess> DBTransaction<'_, DB> {
1318 fn put_cf<K: AsRef<[u8]>, V: AsRef<[u8]>>(
1321 &self,
1322 cf: &impl AsColumnFamilyRef,
1323 key: K,
1324 value: V,
1325 ) -> Result<()> {
1326 let kv_size = key.as_ref().len() + value.as_ref().len();
1327 self.inner.put_cf(cf, key, value)?;
1328 *self.cumulative_inner_size.borrow_mut() += kv_size;
1329 Ok(())
1330 }
1331
1332 pub fn get_size(&self) -> usize {
1333 *self.cumulative_inner_size.borrow()
1334 }
1335}
1336
1337fn serialize_key(value: u64, hash: [u8; 32]) -> std::io::Result<Vec<u8>> {
1338 let mut w = vec![];
1339 std::io::Write::write_all(&mut w, &value.to_be_bytes())?;
1340 std::io::Write::write_all(&mut w, &hash)?;
1341 Ok(w)
1342}
1343
1344fn deserialize_key<R: Read>(r: &mut R) -> Result<(u64, [u8; 32])> {
1345 let mut buf = [0u8; 8];
1346 r.read_exact(&mut buf)?;
1347 let value = u64::from_be_bytes(buf);
1348 let mut hash = [0u8; 32];
1349 r.read_exact(&mut hash[..])?;
1350
1351 Ok((value, hash))
1352}
1353
1354fn serialize_iter_key(ch: &ConsensusHeader) -> std::io::Result<Vec<u8>> {
1355 let mut w = vec![];
1356 std::io::Write::write_all(&mut w, &ch.prev_block_hash)?;
1357 std::io::Write::write_all(&mut w, &[ch.iteration])?;
1358 Ok(w)
1359}
1360
1361fn deserialize_iter_key<R: Read>(r: &mut R) -> Result<([u8; 32], u8)> {
1362 let mut prev_block_hash = [0u8; 32];
1363 r.read_exact(&mut prev_block_hash)?;
1364
1365 let mut iter_byte = [0u8; 1];
1366 r.read_exact(&mut iter_byte)?;
1367 let iteration = u8::from_be_bytes(iter_byte);
1368
1369 Ok((prev_block_hash, iteration))
1370}
1371
1372impl node_data::Serializable for LightBlock {
1373 fn write<W: Write>(&self, w: &mut W) -> io::Result<()> {
1374 self.header.write(w)?;
1376
1377 let len = self.transactions_ids.len() as u32;
1379 w.write_all(&len.to_le_bytes())?;
1380
1381 for tx_id in &self.transactions_ids {
1383 w.write_all(tx_id)?;
1384 }
1385
1386 let len = self.faults_ids.len() as u32;
1388 w.write_all(&len.to_le_bytes())?;
1389
1390 for f_id in &self.faults_ids {
1392 w.write_all(f_id)?;
1393 }
1394
1395 Ok(())
1396 }
1397
1398 fn read<R: Read>(r: &mut R) -> io::Result<Self>
1399 where
1400 Self: Sized,
1401 {
1402 let header = Header::read(r)?;
1404
1405 let len = Self::read_u32_le(r)?;
1407
1408 let mut transactions_ids = vec![];
1410 for _ in 0..len {
1411 let mut tx_id = [0u8; 32];
1412 r.read_exact(&mut tx_id[..])?;
1413
1414 transactions_ids.push(tx_id);
1415 }
1416
1417 let len = Self::read_u32_le(r)?;
1419
1420 let mut faults_ids = vec![];
1422 for _ in 0..len {
1423 let mut f_id = [0u8; 32];
1424 r.read_exact(&mut f_id[..])?;
1425
1426 faults_ids.push(f_id);
1427 }
1428
1429 Ok(Self {
1430 header,
1431 transactions_ids,
1432 faults_ids,
1433 })
1434 }
1435}
1436
1437#[cfg(test)]
1438mod tests {
1439 use fake::{Fake, Faker};
1440 use node_data::ledger;
1441
1442 use super::*;
1443
1444 #[test]
1445 fn test_store_block() {
1446 TestWrapper::new("test_store_block").run(|path| {
1447 let db = Backend::create_or_open(path, DatabaseOptions::default());
1448
1449 let b: Block = Faker.fake();
1450 assert!(!b.txs().is_empty());
1451
1452 let hash = b.header().hash;
1453
1454 assert!(
1455 db.update(|txn| {
1456 txn.store_block(
1457 b.header(),
1458 &to_spent_txs(b.txs()),
1459 b.faults(),
1460 Label::Final(3),
1461 )?;
1462 Ok(())
1463 })
1464 .is_ok()
1465 );
1466
1467 db.view(|txn| {
1468 let db_blk = txn
1470 .block(&hash)
1471 .expect("Block to be fetched")
1472 .expect("Block to exist");
1473 assert_eq!(db_blk.header().hash, b.header().hash);
1474
1475 for pos in 0..b.txs().len() {
1478 assert_eq!(db_blk.txs()[pos].id(), b.txs()[pos].id());
1479 }
1480
1481 for pos in 0..b.faults().len() {
1484 assert_eq!(db_blk.faults()[pos].id(), b.faults()[pos].id());
1485 }
1486 });
1487
1488 assert!(
1489 db.update(|txn| {
1490 txn.clear_database()?;
1491 Ok(())
1492 })
1493 .is_ok()
1494 );
1495
1496 db.view(|txn| {
1497 assert!(
1498 txn.block(&hash).expect("block to be fetched").is_none()
1499 );
1500 });
1501 });
1502 }
1503
1504 #[test]
1505 fn test_read_only() {
1506 TestWrapper::new("test_read_only").run(|path| {
1507 let db = Backend::create_or_open(path, DatabaseOptions::default());
1508 let b: Block = Faker.fake();
1509 db.update_dry_run(true, |txn| {
1510 txn.store_block(
1511 b.header(),
1512 &to_spent_txs(b.txs()),
1513 b.faults(),
1514 Label::Final(3),
1515 )
1516 })
1517 .expect("block to be stored");
1518 db.view(|txn| {
1519 assert!(
1520 txn.block(&b.header().hash)
1521 .expect("block to be fetched")
1522 .is_none()
1523 );
1524 });
1525 });
1526 }
1527
1528 #[test]
1529 fn test_transaction_isolation() {
1530 TestWrapper::new("test_transaction_isolation").run(|path| {
1531 let db = Backend::create_or_open(path, DatabaseOptions::default());
1532 let mut b: Block = Faker.fake();
1533 let hash = b.header().hash;
1534
1535 db.view(|txn| {
1536 assert!(
1539 db.update(|inner| {
1540 inner
1541 .store_block(
1542 b.header(),
1543 &to_spent_txs(b.txs()),
1544 b.faults(),
1545 Label::Final(3),
1546 )
1547 .unwrap();
1548
1549 assert!(inner.block(&hash)?.is_some());
1551 assert!(txn.block(&hash)?.is_none());
1554 Ok(())
1555 })
1556 .is_ok()
1557 );
1558
1559 assert!(
1562 txn.block(&hash).expect("block to be fetched").is_some()
1563 );
1564 });
1565
1566 db.view(|txn| {
1568 assert_blocks_eq(
1569 &mut txn
1570 .block(&hash)
1571 .expect("block to be fetched")
1572 .unwrap(),
1573 &mut b,
1574 );
1575 });
1576 });
1577 }
1578
1579 fn assert_blocks_eq(a: &Block, b: &Block) {
1580 assert!(a.header().hash != [0u8; 32]);
1581 assert!(a.header().hash.eq(&b.header().hash));
1582 }
1583
1584 #[test]
1585 fn test_add_mempool_tx() {
1586 TestWrapper::new("test_add_tx").run(|path| {
1587 let db = Backend::create_or_open(path, DatabaseOptions::default());
1588 let t: Transaction = Faker.fake();
1589
1590 assert!(db.update(|txn| { txn.store_mempool_tx(&t, 0) }).is_ok());
1591
1592 db.view(|vq| {
1593 assert!(vq.mempool_tx_exists(t.id()).unwrap());
1594
1595 let fetched_tx = vq
1596 .mempool_tx(t.id())
1597 .expect("valid contract call")
1598 .unwrap();
1599
1600 assert_eq!(
1601 fetched_tx.id(),
1602 t.id(),
1603 "fetched transaction should be the same"
1604 );
1605 });
1606
1607 db.update(|txn| {
1609 let deleted =
1610 txn.delete_mempool_tx(t.id(), false).expect("valid tx");
1611 assert!(deleted.len() == 1);
1612 Ok(())
1613 })
1614 .unwrap();
1615 });
1616 }
1617
1618 #[test]
1619 fn test_mempool_txs_sorted_by_fee() {
1620 TestWrapper::new("test_mempool_txs_sorted_by_fee").run(|path| {
1621 let db = Backend::create_or_open(path, DatabaseOptions::default());
1622 let _rng = rand::thread_rng();
1624 db.update(|txn| {
1625 for _i in 0..10u32 {
1626 let t: Transaction = Faker.fake();
1627 txn.store_mempool_tx(&t, 0)?;
1628 }
1629 Ok(())
1630 })
1631 .unwrap();
1632
1633 db.view(|txn| {
1634 let txs = txn.mempool_txs_sorted_by_fee();
1635
1636 let mut last_fee = u64::MAX;
1637 for t in txs {
1638 let fee = t.gas_price();
1639 assert!(
1640 fee <= last_fee,
1641 "tx fees are not in decreasing order"
1642 );
1643 last_fee = fee
1644 }
1645 assert_ne!(last_fee, u64::MAX, "No tx has been processed")
1646 });
1647 });
1648 }
1649
1650 #[test]
1651 fn test_txs_count() {
1652 TestWrapper::new("test_txs_count").run(|path| {
1653 let db = Backend::create_or_open(path, DatabaseOptions::default());
1654
1655 const N: usize = 100;
1656 const D: usize = 50;
1657
1658 let txs: Vec<_> = (0..N)
1659 .map(|i| ledger::faker::gen_dummy_tx(i as u64))
1660 .collect();
1661
1662 db.update(|db| {
1663 assert_eq!(db.mempool_txs_count(), 0);
1664 txs.iter().for_each(|t| {
1665 db.store_mempool_tx(&t, 0).expect("tx should be added")
1666 });
1667 Ok(())
1668 })
1669 .unwrap();
1670
1671 db.update(|db| {
1672 assert_eq!(db.mempool_txs_count(), N);
1674
1675 txs.iter().take(D).for_each(|tx| {
1676 let deleted = db
1677 .delete_mempool_tx(tx.id(), false)
1678 .expect("transaction should be deleted");
1679 assert!(deleted.len() == 1);
1680 });
1681
1682 Ok(())
1683 })
1684 .unwrap();
1685
1686 db.update(|db| {
1688 assert_eq!(db.mempool_txs_count(), N - D);
1689 Ok(())
1690 })
1691 .unwrap();
1692 });
1693 }
1694
1695 #[test]
1696 fn test_max_gas_limit() {
1697 TestWrapper::new("test_block_size_limit").run(|path| {
1698 let db = Backend::create_or_open(path, DatabaseOptions::default());
1699
1700 db.update(|txn| {
1701 for i in 0..10u32 {
1702 let t = ledger::faker::gen_dummy_tx(i as u64);
1703 txn.store_mempool_tx(&t, 0)?;
1704 }
1705 Ok(())
1706 })
1707 .unwrap();
1708
1709 let total_gas_price: u64 = 9 + 8 + 7 + 6 + 5 + 4 + 3 + 2 + 1;
1710 db.view(|txn| {
1711 let txs = txn
1712 .mempool_txs_sorted_by_fee()
1713 .map(|t| t.gas_price())
1714 .sum::<u64>();
1715
1716 assert_eq!(txs, total_gas_price);
1717 });
1718 });
1719 }
1720
1721 #[test]
1722 fn test_get_expired_txs() {
1723 TestWrapper::new("test_get_expired_txs").run(|path| {
1724 let db = Backend::create_or_open(path, DatabaseOptions::default());
1725
1726 let mut expiry_list = HashSet::new();
1727 let _ = db.update(|txn| {
1728 (1..101).for_each(|i| {
1729 let t = ledger::faker::gen_dummy_tx(i as u64);
1730 txn.store_mempool_tx(&t, i).expect("tx should be added");
1731 expiry_list.insert(t.id());
1732 });
1733
1734 (1000..1100).for_each(|i| {
1735 let t = ledger::faker::gen_dummy_tx(i as u64);
1736 txn.store_mempool_tx(&t, i).expect("tx should be added");
1737 });
1738
1739 Ok(())
1740 });
1741
1742 db.view(|vq| {
1743 let expired: HashSet<_> = vq
1744 .mempool_expired_txs(100)
1745 .unwrap()
1746 .into_iter()
1747 .map(|id| id)
1748 .collect();
1749
1750 assert_eq!(expiry_list, expired);
1751 });
1752 });
1753 }
1754
1755 fn to_spent_txs(txs: &Vec<Transaction>) -> Vec<SpentTransaction> {
1756 txs.iter()
1757 .map(|t| SpentTransaction {
1758 inner: t.clone(),
1759 block_height: 0,
1760 gas_spent: 0,
1761 err: None,
1762 })
1763 .collect()
1764 }
1765
1766 #[test]
1767 fn test_get_ledger_tx_by_hash() {
1768 TestWrapper::new("test_get_ledger_tx_by_hash").run(|path| {
1769 let db = Backend::create_or_open(path, DatabaseOptions::default());
1770 let b: Block = Faker.fake();
1771 assert!(!b.txs().is_empty());
1772
1773 assert!(
1775 db.update(|txn| {
1776 txn.store_block(
1777 b.header(),
1778 &to_spent_txs(b.txs()),
1779 b.faults(),
1780 Label::Final(3),
1781 )?;
1782 Ok(())
1783 })
1784 .is_ok()
1785 );
1786
1787 db.view(|v| {
1790 for t in b.txs().iter() {
1791 assert!(
1792 v.ledger_tx(&t.id())
1793 .expect("should not return error")
1794 .expect("should find a transaction")
1795 .inner
1796 .eq(t)
1797 );
1798 }
1799 });
1800 });
1801 }
1802
1803 #[test]
1804 fn test_fetch_block_hash_by_height() {
1805 TestWrapper::new("test_fetch_block_hash_by_height").run(|path| {
1806 let db = Backend::create_or_open(path, DatabaseOptions::default());
1807 let b: Block = Faker.fake();
1808
1809 assert!(
1811 db.update(|txn| {
1812 txn.store_block(
1813 b.header(),
1814 &to_spent_txs(b.txs()),
1815 b.faults(),
1816 Label::Attested(3),
1817 )?;
1818 Ok(())
1819 })
1820 .is_ok()
1821 );
1822
1823 db.view(|v| {
1825 assert!(
1826 v.block_hash_by_height(b.header().height)
1827 .expect("should not return error")
1828 .expect("should find a block")
1829 .eq(&b.header().hash)
1830 );
1831 });
1832 });
1833 }
1834
1835 #[test]
1836 fn test_fetch_block_label_by_height() {
1837 TestWrapper::new("test_fetch_block_hash_by_height").run(|path| {
1838 let db = Backend::create_or_open(path, DatabaseOptions::default());
1839 let b: Block = Faker.fake();
1840
1841 assert!(
1843 db.update(|txn| {
1844 txn.store_block(
1845 b.header(),
1846 &to_spent_txs(b.txs()),
1847 b.faults(),
1848 Label::Attested(3),
1849 )?;
1850 Ok(())
1851 })
1852 .is_ok()
1853 );
1854
1855 db.view(|v| {
1857 assert!(
1858 v.block_label_by_height(b.header().height)
1859 .expect("should not return error")
1860 .expect("should find a block")
1861 .1
1862 .eq(&Label::Attested(3))
1863 );
1864 });
1865 });
1866 }
1867
1868 #[test]
1869 fn test_delete_block() {
1871 let t = TestWrapper::new("test_fetch_block_hash_by_height");
1872 t.run(|path| {
1873 let db = Backend::create_or_open(path, DatabaseOptions::default());
1874 let b: ledger::Block = Faker.fake();
1875
1876 assert!(
1877 db.update(|ut| {
1878 ut.store_block(
1879 b.header(),
1880 &to_spent_txs(b.txs()),
1881 b.faults(),
1882 Label::Final(3),
1883 )?;
1884 Ok(())
1885 })
1886 .is_ok()
1887 );
1888
1889 assert!(
1890 db.update(|ut| {
1891 ut.delete_block(&b)?;
1892 Ok(())
1893 })
1894 .is_ok()
1895 );
1896 });
1897
1898 let path = t.get_path();
1899 let opts = Options::default();
1900
1901 let vec = rocksdb::DB::list_cf(&opts, &path).unwrap();
1902 assert!(!vec.is_empty());
1903
1904 let db = rocksdb::DB::open_cf(&opts, &path, vec.clone()).unwrap();
1906 vec.into_iter()
1907 .map(|cf_name| {
1908 if cf_name == CF_METADATA {
1909 return;
1910 }
1911
1912 let cf = db.cf_handle(&cf_name).unwrap();
1913 assert_eq!(
1914 db.iterator_cf(cf, IteratorMode::Start)
1915 .map(Result::unwrap)
1916 .count(),
1917 0
1918 );
1919 })
1920 .for_each(drop);
1921 }
1922
1923 struct TestWrapper(tempfile::TempDir);
1924
1925 impl TestWrapper {
1926 fn new(path: &'static str) -> Self {
1927 Self(
1928 tempfile::TempDir::with_prefix(path)
1929 .expect("Temp directory to be created"),
1930 )
1931 }
1932
1933 pub fn run<F>(&self, test_func: F)
1934 where
1935 F: FnOnce(&Path),
1936 {
1937 test_func(self.0.path());
1938 }
1939
1940 pub fn get_path(&self) -> std::path::PathBuf {
1941 self.0.path().to_owned().join(DB_FOLDER_NAME)
1942 }
1943 }
1944}