1use std::cell::RefCell;
8use std::collections::HashSet;
9use std::io::{Read, Write};
10use std::path::Path;
11use std::sync::Arc;
12use std::{io, vec};
13
14use anyhow::Result;
15use node_data::ledger::{
16 Block, Fault, Header, Label, SpendingId, SpentTransaction, Transaction,
17};
18use node_data::message::{payload, ConsensusHeader};
19use node_data::Serializable;
20use rocksdb::{
21 AsColumnFamilyRef, BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor,
22 DBAccess, DBRawIteratorWithThreadMode, IteratorMode, LogLevel,
23 OptimisticTransactionDB, OptimisticTransactionOptions, Options,
24 WriteOptions,
25};
26use tracing::info;
27
28use super::{
29 ConsensusStorage, DatabaseOptions, Ledger, LightBlock, Metadata, Persist,
30 DB,
31};
32use crate::database::Mempool;
33
34const CF_LEDGER_HEADER: &str = "cf_ledger_header";
35const CF_LEDGER_TXS: &str = "cf_ledger_txs";
36const CF_LEDGER_FAULTS: &str = "cf_ledger_faults";
37const CF_LEDGER_HEIGHT: &str = "cf_ledger_height";
38const CF_CANDIDATES: &str = "cf_candidates";
39const CF_CANDIDATES_HEIGHT: &str = "cf_candidates_height";
40const CF_VALIDATION_RESULTS: &str = "cf_validation_results";
41const CF_MEMPOOL: &str = "cf_mempool";
42const CF_MEMPOOL_SPENDING_ID: &str = "cf_mempool_spending_id";
43const CF_MEMPOOL_FEES: &str = "cf_mempool_fees";
44const CF_METADATA: &str = "cf_metadata";
45
46const DB_FOLDER_NAME: &str = "chain.db";
47
48pub const MD_HASH_KEY: &[u8] = b"hash_key";
50pub const MD_STATE_ROOT_KEY: &[u8] = b"state_hash_key";
51pub const MD_AVG_VALIDATION: &[u8] = b"avg_validation_time";
52pub const MD_AVG_RATIFICATION: &[u8] = b"avg_ratification_time";
53pub const MD_AVG_PROPOSAL: &[u8] = b"avg_proposal_time";
54pub const MD_LAST_ITER: &[u8] = b"consensus_last_iter";
55
56#[derive(Clone)]
57pub struct Backend {
58 rocksdb: Arc<OptimisticTransactionDB>,
59}
60
61impl Backend {
62 fn begin_tx(&self) -> DBTransaction<'_, OptimisticTransactionDB> {
63 let write_options = WriteOptions::default();
65 let tx_options = OptimisticTransactionOptions::default();
66
67 let inner = self.rocksdb.transaction_opt(&write_options, &tx_options);
68
69 let ledger_cf = self
71 .rocksdb
72 .cf_handle(CF_LEDGER_HEADER)
73 .expect("ledger_header column family must exist");
74
75 let ledger_txs_cf = self
76 .rocksdb
77 .cf_handle(CF_LEDGER_TXS)
78 .expect("CF_LEDGER_TXS column family must exist");
79
80 let ledger_faults_cf = self
81 .rocksdb
82 .cf_handle(CF_LEDGER_FAULTS)
83 .expect("CF_LEDGER_FAULTS column family must exist");
84
85 let candidates_cf = self
86 .rocksdb
87 .cf_handle(CF_CANDIDATES)
88 .expect("candidates column family must exist");
89
90 let candidates_height_cf = self
91 .rocksdb
92 .cf_handle(CF_CANDIDATES_HEIGHT)
93 .expect("candidates_height column family must exist");
94
95 let validation_results_cf = self
96 .rocksdb
97 .cf_handle(CF_VALIDATION_RESULTS)
98 .expect("validation result column family must exist");
99
100 let mempool_cf = self
101 .rocksdb
102 .cf_handle(CF_MEMPOOL)
103 .expect("mempool column family must exist");
104
105 let spending_id_cf = self
106 .rocksdb
107 .cf_handle(CF_MEMPOOL_SPENDING_ID)
108 .expect("CF_MEMPOOL_SPENDING_ID column family must exist");
109
110 let fees_cf = self
111 .rocksdb
112 .cf_handle(CF_MEMPOOL_FEES)
113 .expect("CF_MEMPOOL_FEES column family must exist");
114
115 let ledger_height_cf = self
116 .rocksdb
117 .cf_handle(CF_LEDGER_HEIGHT)
118 .expect("CF_LEDGER_HEIGHT column family must exist");
119
120 let metadata_cf = self
121 .rocksdb
122 .cf_handle(CF_METADATA)
123 .expect("CF_METADATA column family must exist");
124
125 DBTransaction::<'_, OptimisticTransactionDB> {
126 inner,
127 candidates_cf,
128 candidates_height_cf,
129 validation_results_cf,
130 ledger_cf,
131 ledger_txs_cf,
132 ledger_faults_cf,
133 mempool_cf,
134 spending_id_cf,
135 fees_cf,
136 ledger_height_cf,
137 metadata_cf,
138 cumulative_inner_size: RefCell::new(0),
139 }
140 }
141}
142
143impl DB for Backend {
144 type P<'a> = DBTransaction<'a, OptimisticTransactionDB>;
145
146 fn create_or_open<T>(path: T, db_opts: DatabaseOptions) -> Self
147 where
148 T: AsRef<Path>,
149 {
150 let path = path.as_ref().join(DB_FOLDER_NAME);
151 info!("Opening database in {path:?}, {:?} ", db_opts);
152
153 let mut blocks_cf_opts = Options::default();
156 blocks_cf_opts.create_if_missing(true);
157 blocks_cf_opts.create_missing_column_families(true);
158 blocks_cf_opts.set_level_compaction_dynamic_level_bytes(true);
159 blocks_cf_opts
160 .set_write_buffer_size(db_opts.blocks_cf_max_write_buffer_size);
161
162 if db_opts.enable_debug {
163 blocks_cf_opts.set_log_level(LogLevel::Info);
164 blocks_cf_opts.set_dump_malloc_stats(true);
165 blocks_cf_opts.enable_statistics();
166 }
167
168 if db_opts.blocks_cf_disable_block_cache {
169 let mut block_opts = BlockBasedOptions::default();
170 block_opts.disable_cache();
171 blocks_cf_opts.set_block_based_table_factory(&block_opts);
172 }
173
174 let mut mp_opts = blocks_cf_opts.clone();
177 mp_opts.set_manual_wal_flush(true);
179 mp_opts.create_if_missing(true);
180 mp_opts.create_missing_column_families(true);
181 mp_opts.set_write_buffer_size(db_opts.mempool_cf_max_write_buffer_size);
182
183 if db_opts.enable_debug {
184 mp_opts.set_log_level(LogLevel::Info);
185 mp_opts.set_dump_malloc_stats(true);
186 mp_opts.enable_statistics();
187 }
188
189 let cfs = vec![
190 ColumnFamilyDescriptor::new(
191 CF_LEDGER_HEADER,
192 blocks_cf_opts.clone(),
193 ),
194 ColumnFamilyDescriptor::new(CF_LEDGER_TXS, blocks_cf_opts.clone()),
195 ColumnFamilyDescriptor::new(
196 CF_LEDGER_FAULTS,
197 blocks_cf_opts.clone(),
198 ),
199 ColumnFamilyDescriptor::new(
200 CF_LEDGER_HEIGHT,
201 blocks_cf_opts.clone(),
202 ),
203 ColumnFamilyDescriptor::new(CF_CANDIDATES, blocks_cf_opts.clone()),
204 ColumnFamilyDescriptor::new(
205 CF_CANDIDATES_HEIGHT,
206 blocks_cf_opts.clone(),
207 ),
208 ColumnFamilyDescriptor::new(
209 CF_VALIDATION_RESULTS,
210 blocks_cf_opts.clone(),
211 ),
212 ColumnFamilyDescriptor::new(CF_METADATA, blocks_cf_opts.clone()),
213 ColumnFamilyDescriptor::new(CF_MEMPOOL, mp_opts.clone()),
214 ColumnFamilyDescriptor::new(
215 CF_MEMPOOL_SPENDING_ID,
216 mp_opts.clone(),
217 ),
218 ColumnFamilyDescriptor::new(CF_MEMPOOL_FEES, mp_opts.clone()),
219 ];
220
221 Self {
222 rocksdb: Arc::new(
223 OptimisticTransactionDB::open_cf_descriptors(
224 &blocks_cf_opts,
225 path,
226 cfs,
227 )
228 .expect("should be a valid database in {path}"),
229 ),
230 }
231 }
232
233 fn view<F, T>(&self, f: F) -> T
234 where
235 F: for<'a> FnOnce(&Self::P<'a>) -> T,
236 {
237 let tx = self.begin_tx();
239
240 let ret = f(&tx);
242 tx.rollback().expect("rollback to succeed for readonly");
243 ret
244 }
245
246 fn update<F, T>(&self, execute: F) -> Result<T>
247 where
248 F: for<'a> FnOnce(&mut Self::P<'a>) -> Result<T>,
249 {
250 self.update_dry_run(false, execute)
251 }
252
253 fn update_dry_run<F, T>(&self, dry_run: bool, execute: F) -> Result<T>
254 where
255 F: for<'a> FnOnce(&mut Self::P<'a>) -> Result<T>,
256 {
257 let mut tx = self.begin_tx();
259
260 let ret = execute(&mut tx)?;
263
264 if dry_run {
265 tx.rollback()?;
266 } else {
267 tx.commit()?;
269 }
270
271 Ok(ret)
272 }
273
274 fn close(&mut self) {}
275}
276
277pub struct DBTransaction<'db, DB: DBAccess> {
278 inner: rocksdb::Transaction<'db, DB>,
279 cumulative_inner_size: RefCell<usize>,
281
282 candidates_cf: &'db ColumnFamily,
285 candidates_height_cf: &'db ColumnFamily,
286 validation_results_cf: &'db ColumnFamily,
288
289 ledger_cf: &'db ColumnFamily,
291 ledger_faults_cf: &'db ColumnFamily,
292 ledger_txs_cf: &'db ColumnFamily,
293 ledger_height_cf: &'db ColumnFamily,
294
295 mempool_cf: &'db ColumnFamily,
297 spending_id_cf: &'db ColumnFamily,
298 fees_cf: &'db ColumnFamily,
299
300 metadata_cf: &'db ColumnFamily,
301}
302
303impl<'db, DB: DBAccess> Ledger for DBTransaction<'db, DB> {
304 fn store_block(
305 &mut self,
306 header: &Header,
307 txs: &[SpentTransaction],
308 faults: &[Fault],
309 label: Label,
310 ) -> Result<usize> {
311 {
315 let cf = self.ledger_cf;
316
317 let mut buf = vec![];
318 LightBlock {
319 header: header.clone(),
320 transactions_ids: txs.iter().map(|t| t.inner.id()).collect(),
321 faults_ids: faults.iter().map(|f| f.id()).collect(),
322 }
323 .write(&mut buf)?;
324
325 self.put_cf(cf, header.hash, buf)?;
326 }
327
328 self.op_write(MD_HASH_KEY, header.hash)?;
330 self.op_write(MD_STATE_ROOT_KEY, header.state_hash)?;
331
332 {
334 let cf = self.ledger_txs_cf;
335
336 for tx in txs {
338 let mut d = vec![];
339 tx.write(&mut d)?;
340 self.put_cf(cf, tx.inner.id(), d)?;
341 }
342 }
343
344 {
346 let cf = self.ledger_faults_cf;
347
348 for f in faults {
350 let mut d = vec![];
351 f.write(&mut d)?;
352 self.put_cf(cf, f.id(), d)?;
353 }
354 }
355 self.store_block_label(header.height, &header.hash, label)?;
356
357 Ok(self.get_size())
358 }
359
360 fn faults_by_block(&self, start_height: u64) -> Result<Vec<Fault>> {
361 let mut faults = vec![];
362 let mut hash = self
363 .op_read(MD_HASH_KEY)?
364 .ok_or(anyhow::anyhow!("Cannot read tip"))?;
365
366 loop {
367 let block = self.light_block(&hash)?.ok_or(anyhow::anyhow!(
368 "Cannot read block {}",
369 hex::encode(&hash)
370 ))?;
371
372 let block_height = block.header.height;
373
374 if block_height >= start_height {
375 hash = block.header.prev_block_hash.to_vec();
376 faults.extend(self.faults(&block.faults_ids)?);
377 } else {
378 break;
379 }
380
381 if block_height == 0 {
382 break;
383 }
384 }
385 Ok(faults)
386 }
387
388 fn store_block_label(
389 &mut self,
390 height: u64,
391 hash: &[u8; 32],
392 label: Label,
393 ) -> Result<()> {
394 let mut buf = vec![];
396 buf.write_all(hash)?;
397 label.write(&mut buf)?;
398
399 self.put_cf(self.ledger_height_cf, height.to_le_bytes(), buf)?;
400 Ok(())
401 }
402
403 fn delete_block(&mut self, b: &Block) -> Result<()> {
404 self.inner.delete_cf(
405 self.ledger_height_cf,
406 b.header().height.to_le_bytes(),
407 )?;
408
409 for tx in b.txs() {
410 self.inner.delete_cf(self.ledger_txs_cf, tx.id())?;
411 }
412 for f in b.faults() {
413 self.inner.delete_cf(self.ledger_faults_cf, f.id())?;
414 }
415
416 self.inner.delete_cf(self.ledger_cf, b.header().hash)?;
417
418 Ok(())
419 }
420
421 fn block_exists(&self, hash: &[u8]) -> Result<bool> {
422 Ok(self.inner.get_cf(self.ledger_cf, hash)?.is_some())
423 }
424
425 fn faults(&self, faults_ids: &[[u8; 32]]) -> Result<Vec<Fault>> {
426 if faults_ids.is_empty() {
427 return Ok(vec![]);
428 }
429 let ids = faults_ids
430 .iter()
431 .map(|id| (self.ledger_faults_cf, id))
432 .collect::<Vec<_>>();
433
434 let faults_buffer = self.inner.multi_get_cf(ids);
436
437 let mut faults = vec![];
438 for buf in faults_buffer {
439 let buf = buf?.unwrap();
440 let fault = Fault::read(&mut &buf[..])?;
441 faults.push(fault);
442 }
443
444 Ok(faults)
445 }
446
447 fn block(&self, hash: &[u8]) -> Result<Option<Block>> {
448 match self.inner.get_cf(self.ledger_cf, hash)? {
449 Some(blob) => {
450 let record = LightBlock::read(&mut &blob[..])?;
451
452 let txs_buffers = self.inner.multi_get_cf(
454 record
455 .transactions_ids
456 .iter()
457 .map(|id| (self.ledger_txs_cf, id))
458 .collect::<Vec<(&ColumnFamily, &[u8; 32])>>(),
459 );
460
461 let mut txs = vec![];
462 for buf in txs_buffers {
463 let buf = buf?.unwrap();
464 let tx = SpentTransaction::read(&mut &buf[..])?;
465 txs.push(tx.inner);
466 }
467
468 let faults_buffer = self.inner.multi_get_cf(
470 record
471 .faults_ids
472 .iter()
473 .map(|id| (self.ledger_faults_cf, id))
474 .collect::<Vec<(&ColumnFamily, &[u8; 32])>>(),
475 );
476 let mut faults = vec![];
477 for buf in faults_buffer {
478 let buf = buf?.unwrap();
479 let fault = Fault::read(&mut &buf[..])?;
480 faults.push(fault);
481 }
482
483 Ok(Some(
484 Block::new(record.header, txs, faults)
485 .expect("block should be valid"),
486 ))
487 }
488 None => Ok(None),
489 }
490 }
491
492 fn light_block(&self, hash: &[u8]) -> Result<Option<LightBlock>> {
493 match self.inner.get_cf(self.ledger_cf, hash)? {
494 Some(blob) => {
495 let record = LightBlock::read(&mut &blob[..])?;
496 Ok(Some(record))
497 }
498 None => Ok(None),
499 }
500 }
501
502 fn block_header(&self, hash: &[u8]) -> Result<Option<Header>> {
503 match self.inner.get_cf(self.ledger_cf, hash)? {
504 Some(blob) => {
505 let record = Header::read(&mut &blob[..])?;
506 Ok(Some(record))
507 }
508 None => Ok(None),
509 }
510 }
511
512 fn block_hash_by_height(&self, height: u64) -> Result<Option<[u8; 32]>> {
513 Ok(self
514 .inner
515 .get_cf(self.ledger_height_cf, height.to_le_bytes())?
516 .map(|h| {
517 const LEN: usize = 32;
518 let mut hash = [0u8; LEN];
519 hash.copy_from_slice(&h.as_slice()[0..LEN]);
520 hash
521 }))
522 }
523
524 fn ledger_tx(&self, tx_id: &[u8]) -> Result<Option<SpentTransaction>> {
525 let tx = self
526 .inner
527 .get_cf(self.ledger_txs_cf, tx_id)?
528 .map(|blob| SpentTransaction::read(&mut &blob[..]))
529 .transpose()?;
530
531 Ok(tx)
532 }
533
534 fn ledger_txs(
541 &self,
542 tx_ids: Vec<&[u8; 32]>,
543 ) -> Result<Vec<SpentTransaction>> {
544 let cf = self.ledger_txs_cf;
545
546 let ids = tx_ids.into_iter().map(|id| (cf, id)).collect::<Vec<_>>();
547
548 let multi_get_results = self.inner.multi_get_cf(ids);
549
550 let mut spent_transactions =
551 Vec::with_capacity(multi_get_results.len());
552 for result in multi_get_results.into_iter() {
553 let opt_blob = result.map_err(|e| {
554 std::io::Error::new(std::io::ErrorKind::Other, e)
555 })?;
556
557 let Some(blob) = opt_blob else {
558 return Err(anyhow::anyhow!(
559 "At least one Transaction ID was not found"
560 ));
561 };
562
563 let stx = SpentTransaction::read(&mut &blob[..])?;
564
565 spent_transactions.push(stx);
566 }
567
568 Ok(spent_transactions)
569 }
570
571 fn ledger_tx_exists(&self, tx_id: &[u8]) -> Result<bool> {
577 Ok(self.inner.get_cf(self.ledger_txs_cf, tx_id)?.is_some())
578 }
579
580 fn block_by_height(&self, height: u64) -> Result<Option<Block>> {
581 let hash = self.block_hash_by_height(height)?;
582 let block = match hash {
583 Some(hash) => self.block(&hash)?,
584 None => None,
585 };
586 Ok(block)
587 }
588
589 fn block_label_by_height(
590 &self,
591 height: u64,
592 ) -> Result<Option<([u8; 32], Label)>> {
593 const HASH_LEN: usize = 32;
594 Ok(self
595 .inner
596 .get_cf(self.ledger_height_cf, height.to_le_bytes())?
597 .map(|h| {
598 let mut hash = [0u8; HASH_LEN];
599 hash.copy_from_slice(&h.as_slice()[0..HASH_LEN]);
600
601 let label_buff = h[HASH_LEN..].to_vec();
602 Label::read(&mut &label_buff[..]).map(|label| (hash, label))
603 })
604 .transpose()?)
605 }
606}
607
608impl<'db, DB: DBAccess> ConsensusStorage for DBTransaction<'db, DB> {
610 fn store_candidate(&mut self, b: Block) -> Result<()> {
621 let mut serialized = vec![];
622 b.write(&mut serialized)?;
623
624 self.inner
625 .put_cf(self.candidates_cf, b.header().hash, serialized)?;
626
627 let key = serialize_key(b.header().height, b.header().hash)?;
628 self.inner
629 .put_cf(self.candidates_height_cf, key, b.header().hash)?;
630
631 Ok(())
632 }
633
634 fn candidate(&self, hash: &[u8]) -> Result<Option<Block>> {
645 if let Some(blob) = self.inner.get_cf(self.candidates_cf, hash)? {
646 let b = Block::read(&mut &blob[..])?;
647 return Ok(Some(b));
648 }
649
650 Ok(None)
652 }
653
654 fn candidate_by_iteration(
655 &self,
656 consensus_header: &ConsensusHeader,
657 ) -> Result<Option<Block>> {
658 let iter = self
659 .inner
660 .iterator_cf(self.candidates_cf, IteratorMode::Start);
661
662 for (_, blob) in iter.map(Result::unwrap) {
663 let b = Block::read(&mut &blob[..])?;
664
665 let header = b.header();
666 if header.prev_block_hash == consensus_header.prev_block_hash
667 && header.iteration == consensus_header.iteration
668 {
669 return Ok(Some(b));
670 }
671 }
672
673 Ok(None)
674 }
675
676 fn delete_candidate<F>(&mut self, closure: F) -> Result<()>
687 where
688 F: FnOnce(u64) -> bool + std::marker::Copy,
689 {
690 let iter = self
691 .inner
692 .iterator_cf(self.candidates_height_cf, IteratorMode::Start);
693
694 for (key, hash) in iter.map(Result::unwrap) {
695 let (height, _) = deserialize_key(&mut &key.to_vec()[..])?;
696 if closure(height) {
697 self.inner.delete_cf(self.candidates_cf, hash)?;
698 self.inner.delete_cf(self.candidates_height_cf, key)?;
699 }
700 }
701
702 Ok(())
703 }
704
705 fn count_candidates(&self) -> usize {
706 let iter = self
707 .inner
708 .iterator_cf(self.candidates_height_cf, IteratorMode::Start);
709
710 iter.count()
711 }
712
713 fn clear_candidates(&mut self) -> Result<()> {
720 self.delete_candidate(|_| true)
721 }
722
723 fn store_validation_result(
734 &mut self,
735 consensus_header: &ConsensusHeader,
736 validation_result: &payload::ValidationResult,
737 ) -> Result<()> {
738 let mut serialized = vec![];
739 validation_result.write(&mut serialized)?;
740
741 let key = serialize_iter_key(consensus_header)?;
742 self.inner
743 .put_cf(self.validation_results_cf, key, serialized)?;
744
745 Ok(())
746 }
747
748 fn validation_result(
760 &self,
761 consensus_header: &ConsensusHeader,
762 ) -> Result<Option<payload::ValidationResult>> {
763 let key = serialize_iter_key(consensus_header)?;
764 if let Some(blob) =
765 self.inner.get_cf(self.validation_results_cf, key)?
766 {
767 let validation_result =
768 payload::ValidationResult::read(&mut &blob[..])?;
769 return Ok(Some(validation_result));
770 }
771
772 Ok(None)
774 }
775
776 fn delete_validation_results<F>(&mut self, closure: F) -> Result<()>
788 where
789 F: FnOnce([u8; 32]) -> bool + std::marker::Copy,
790 {
791 let iter = self
792 .inner
793 .iterator_cf(self.validation_results_cf, IteratorMode::Start);
794
795 for (key, _) in iter.map(Result::unwrap) {
796 let (prev_block_hash, _) =
797 deserialize_iter_key(&mut &key.to_vec()[..])?;
798 if closure(prev_block_hash) {
799 self.inner.delete_cf(self.validation_results_cf, key)?;
800 }
801 }
802
803 Ok(())
804 }
805
806 fn count_validation_results(&self) -> usize {
807 let iter = self
808 .inner
809 .iterator_cf(self.validation_results_cf, IteratorMode::Start);
810
811 iter.count()
812 }
813
814 fn clear_validation_results(&mut self) -> Result<()> {
821 self.delete_validation_results(|_| true)
822 }
823}
824
825impl<'db, DB: DBAccess> Persist for DBTransaction<'db, DB> {
826 fn clear_database(&mut self) -> Result<()> {
828 let iter = self.inner.iterator_cf(self.ledger_cf, IteratorMode::Start);
830
831 for (key, _) in iter.map(Result::unwrap) {
833 self.inner.delete_cf(self.ledger_cf, key)?;
834 }
835
836 self.clear_candidates()?;
837 self.clear_validation_results()?;
838 Ok(())
839 }
840
841 fn commit(self) -> Result<()> {
842 if let Err(e) = self.inner.commit() {
843 return Err(anyhow::Error::new(e).context("failed to commit"));
844 }
845
846 Ok(())
847 }
848
849 fn rollback(self) -> Result<()> {
850 if let Err(e) = self.inner.rollback() {
851 return Err(anyhow::Error::new(e).context("failed to rollback"));
852 }
853
854 Ok(())
855 }
856}
857
858impl<'db, DB: DBAccess> Mempool for DBTransaction<'db, DB> {
859 fn store_mempool_tx(
860 &mut self,
861 tx: &Transaction,
862 timestamp: u64,
863 ) -> Result<()> {
864 let mut tx_data = vec![];
866 tx.write(&mut tx_data)?;
867
868 let hash = tx.id();
869 self.put_cf(self.mempool_cf, hash, tx_data)?;
870
871 for n in tx.to_spend_ids() {
874 let key = n.to_bytes();
875 self.put_cf(self.spending_id_cf, key, hash)?;
876 }
877
878 let timestamp = timestamp.to_be_bytes();
879
880 self.put_cf(
884 self.fees_cf,
885 serialize_key(tx.gas_price(), hash)?,
886 timestamp,
887 )?;
888
889 Ok(())
890 }
891
892 fn mempool_tx(&self, hash: [u8; 32]) -> Result<Option<Transaction>> {
893 let data = self.inner.get_cf(self.mempool_cf, hash)?;
894
895 match data {
896 None => Ok(None),
898 Some(blob) => Ok(Some(Transaction::read(&mut &blob.to_vec()[..])?)),
899 }
900 }
901
902 fn mempool_tx_exists(&self, h: [u8; 32]) -> Result<bool> {
903 Ok(self.inner.get_cf(self.mempool_cf, h)?.is_some())
904 }
905
906 fn delete_mempool_tx(
907 &mut self,
908 h: [u8; 32],
909 cascade: bool,
910 ) -> Result<Vec<[u8; 32]>> {
911 let mut deleted = vec![];
912 let tx = self.mempool_tx(h)?;
913 if let Some(tx) = tx {
914 let hash = tx.id();
915
916 self.inner.delete_cf(self.mempool_cf, hash)?;
917
918 for n in tx.to_spend_ids() {
921 let key = n.to_bytes();
922 self.inner.delete_cf(self.spending_id_cf, key)?;
923 }
924
925 self.inner.delete_cf(
927 self.fees_cf,
928 serialize_key(tx.gas_price(), hash)?,
929 )?;
930
931 deleted.push(h);
932
933 if cascade {
934 let mut dependants = vec![];
935 let mut next_spending_id = tx.next_spending_id();
938 while let Some(spending_id) = next_spending_id {
939 next_spending_id = spending_id.next();
940 let next_txs =
941 self.mempool_txs_by_spendable_ids(&[spending_id]);
942 if next_txs.is_empty() {
943 break;
944 }
945 dependants.extend(next_txs);
946 }
947
948 for tx_id in dependants {
950 let cascade_deleted =
951 self.delete_mempool_tx(tx_id, false)?;
952 deleted.extend(cascade_deleted);
953 }
954 }
955 }
956
957 Ok(deleted)
958 }
959
960 fn mempool_txs_by_spendable_ids(
961 &self,
962 n: &[SpendingId],
963 ) -> HashSet<[u8; 32]> {
964 n.iter()
965 .filter_map(|n| {
966 match self.inner.get_cf(self.spending_id_cf, n.to_bytes()) {
967 Ok(Some(tx_id)) => tx_id.try_into().ok(),
968 _ => None,
969 }
970 })
971 .collect()
972 }
973
974 fn mempool_txs_sorted_by_fee(
975 &self,
976 ) -> Result<Box<dyn Iterator<Item = Transaction> + '_>> {
977 let iter = MemPoolIterator::new(&self.inner, self.fees_cf, self);
978
979 Ok(Box::new(iter))
980 }
981
982 fn mempool_txs_ids_sorted_by_fee(
983 &self,
984 ) -> Result<Box<dyn Iterator<Item = (u64, [u8; 32])> + '_>> {
985 let iter = MemPoolFeeIterator::new(&self.inner, self.fees_cf, true);
986
987 Ok(Box::new(iter))
988 }
989
990 fn mempool_txs_ids_sorted_by_low_fee(
991 &self,
992 ) -> Result<Box<dyn Iterator<Item = (u64, [u8; 32])> + '_>> {
993 let iter = MemPoolFeeIterator::new(&self.inner, self.fees_cf, false);
994
995 Ok(Box::new(iter))
996 }
997
998 fn mempool_expired_txs(&self, timestamp: u64) -> Result<Vec<[u8; 32]>> {
1000 let mut iter = self.inner.raw_iterator_cf(self.fees_cf);
1001 iter.seek_to_first();
1002 let mut txs_list = vec![];
1003
1004 while iter.valid() {
1005 if let Some(key) = iter.key() {
1006 let (_, tx_id) = deserialize_key(&mut &key.to_vec()[..])?;
1007
1008 let tx_timestamp = u64::from_be_bytes(
1009 iter.value()
1010 .ok_or_else(|| {
1011 io::Error::new(
1012 io::ErrorKind::InvalidData,
1013 "no value",
1014 )
1015 })?
1016 .try_into()
1017 .map_err(|_| {
1018 io::Error::new(
1019 io::ErrorKind::InvalidData,
1020 "invalid data",
1021 )
1022 })?,
1023 );
1024
1025 if tx_timestamp <= timestamp {
1026 txs_list.push(tx_id);
1027 }
1028 }
1029
1030 iter.next();
1031 }
1032
1033 Ok(txs_list)
1034 }
1035
1036 fn mempool_txs_ids(&self) -> Result<Vec<[u8; 32]>> {
1037 let mut iter = self.inner.raw_iterator_cf(self.fees_cf);
1038 iter.seek_to_last();
1039
1040 let mut txs_list = vec![];
1041
1042 while iter.valid() {
1044 if let Some(key) = iter.key() {
1045 let (_, tx_id) = deserialize_key(&mut &key.to_vec()[..])?;
1046
1047 txs_list.push(tx_id);
1048 }
1049
1050 iter.prev();
1051 }
1052
1053 Ok(txs_list)
1054 }
1055
1056 fn mempool_txs_count(&self) -> usize {
1057 self.inner
1058 .iterator_cf(self.mempool_cf, IteratorMode::Start)
1059 .count()
1060 }
1061}
1062
1063pub struct MemPoolIterator<'db, DB: DBAccess, M: Mempool> {
1064 iter: MemPoolFeeIterator<'db, DB>,
1065 mempool: &'db M,
1066}
1067
1068impl<'db, DB: DBAccess, M: Mempool> MemPoolIterator<'db, DB, M> {
1069 fn new(
1070 db: &'db rocksdb::Transaction<DB>,
1071 fees_cf: &ColumnFamily,
1072 mempool: &'db M,
1073 ) -> Self {
1074 let iter = MemPoolFeeIterator::new(db, fees_cf, true);
1075 MemPoolIterator { iter, mempool }
1076 }
1077}
1078
1079impl<DB: DBAccess, M: Mempool> Iterator for MemPoolIterator<'_, DB, M> {
1080 type Item = Transaction;
1081 fn next(&mut self) -> Option<Self::Item> {
1082 self.iter.next().and_then(|(_, tx_id)| {
1083 self.mempool.mempool_tx(tx_id).ok().flatten()
1084 })
1085 }
1086}
1087
1088pub struct MemPoolFeeIterator<'db, DB: DBAccess> {
1089 iter: DBRawIteratorWithThreadMode<'db, rocksdb::Transaction<'db, DB>>,
1090 fee_desc: bool,
1091}
1092
1093impl<'db, DB: DBAccess> MemPoolFeeIterator<'db, DB> {
1094 fn new(
1095 db: &'db rocksdb::Transaction<DB>,
1096 fees_cf: &ColumnFamily,
1097 fee_desc: bool,
1098 ) -> Self {
1099 let mut iter = db.raw_iterator_cf(fees_cf);
1100 if fee_desc {
1101 iter.seek_to_last();
1102 };
1103 MemPoolFeeIterator { iter, fee_desc }
1104 }
1105}
1106
1107impl<DB: DBAccess> Iterator for MemPoolFeeIterator<'_, DB> {
1108 type Item = (u64, [u8; 32]);
1109 fn next(&mut self) -> Option<Self::Item> {
1110 match self.iter.valid() {
1111 true => {
1112 if let Some(key) = self.iter.key() {
1113 let (gas_price, hash) =
1114 deserialize_key(&mut &key.to_vec()[..]).ok()?;
1115 if self.fee_desc {
1116 self.iter.prev();
1117 } else {
1118 self.iter.next();
1119 }
1120 Some((gas_price, hash))
1121 } else {
1122 None
1123 }
1124 }
1125 false => None,
1126 }
1127 }
1128}
1129
1130impl<'db, DB: DBAccess> std::fmt::Debug for DBTransaction<'db, DB> {
1131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1132 let iter = self.inner.iterator_cf(self.ledger_cf, IteratorMode::Start);
1134
1135 iter.map(Result::unwrap).try_for_each(|(hash, _)| {
1136 if let Ok(Some(blob)) = self.inner.get_cf(self.ledger_cf, &hash[..])
1137 {
1138 let b = Block::read(&mut &blob[..]).unwrap_or_default();
1139 writeln!(f, "ledger_block [{}]: {:#?}", b.header().height, b)
1140 } else {
1141 Ok(())
1142 }
1143 })?;
1144
1145 let iter = self
1147 .inner
1148 .iterator_cf(self.candidates_cf, IteratorMode::Start);
1149
1150 let results: std::fmt::Result =
1151 iter.map(Result::unwrap).try_for_each(|(hash, _)| {
1152 if let Ok(Some(blob)) =
1153 self.inner.get_cf(self.candidates_cf, &hash[..])
1154 {
1155 let b = Block::read(&mut &blob[..]).unwrap_or_default();
1156 writeln!(
1157 f,
1158 "candidate_block [{}]: {:#?}",
1159 b.header().height,
1160 b
1161 )
1162 } else {
1163 Ok(())
1164 }
1165 });
1166
1167 results
1168 }
1169}
1170
1171impl<'db, DB: DBAccess> Metadata for DBTransaction<'db, DB> {
1172 fn op_write<T: AsRef<[u8]>>(&mut self, key: &[u8], value: T) -> Result<()> {
1173 self.put_cf(self.metadata_cf, key, value)?;
1174 Ok(())
1175 }
1176
1177 fn op_read(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1178 self.inner.get_cf(self.metadata_cf, key).map_err(Into::into)
1179 }
1180}
1181
1182impl<'db, DB: DBAccess> DBTransaction<'db, DB> {
1183 fn put_cf<K: AsRef<[u8]>, V: AsRef<[u8]>>(
1186 &self,
1187 cf: &impl AsColumnFamilyRef,
1188 key: K,
1189 value: V,
1190 ) -> Result<()> {
1191 let kv_size = key.as_ref().len() + value.as_ref().len();
1192 self.inner.put_cf(cf, key, value)?;
1193 *self.cumulative_inner_size.borrow_mut() += kv_size;
1194 Ok(())
1195 }
1196
1197 pub fn get_size(&self) -> usize {
1198 *self.cumulative_inner_size.borrow()
1199 }
1200}
1201
1202fn serialize_key(value: u64, hash: [u8; 32]) -> std::io::Result<Vec<u8>> {
1203 let mut w = vec![];
1204 std::io::Write::write_all(&mut w, &value.to_be_bytes())?;
1205 std::io::Write::write_all(&mut w, &hash)?;
1206 Ok(w)
1207}
1208
1209fn deserialize_key<R: Read>(r: &mut R) -> Result<(u64, [u8; 32])> {
1210 let mut buf = [0u8; 8];
1211 r.read_exact(&mut buf)?;
1212 let value = u64::from_be_bytes(buf);
1213 let mut hash = [0u8; 32];
1214 r.read_exact(&mut hash[..])?;
1215
1216 Ok((value, hash))
1217}
1218
1219fn serialize_iter_key(ch: &ConsensusHeader) -> std::io::Result<Vec<u8>> {
1220 let mut w = vec![];
1221 std::io::Write::write_all(&mut w, &ch.prev_block_hash)?;
1222 std::io::Write::write_all(&mut w, &[ch.iteration])?;
1223 Ok(w)
1224}
1225
1226fn deserialize_iter_key<R: Read>(r: &mut R) -> Result<([u8; 32], u8)> {
1227 let mut prev_block_hash = [0u8; 32];
1228 r.read_exact(&mut prev_block_hash)?;
1229
1230 let mut iter_byte = [0u8; 1];
1231 r.read_exact(&mut iter_byte)?;
1232 let iteration = u8::from_be_bytes(iter_byte);
1233
1234 Ok((prev_block_hash, iteration))
1235}
1236
1237impl node_data::Serializable for LightBlock {
1238 fn write<W: Write>(&self, w: &mut W) -> io::Result<()> {
1239 self.header.write(w)?;
1241
1242 let len = self.transactions_ids.len() as u32;
1244 w.write_all(&len.to_le_bytes())?;
1245
1246 for tx_id in &self.transactions_ids {
1248 w.write_all(tx_id)?;
1249 }
1250
1251 let len = self.faults_ids.len() as u32;
1253 w.write_all(&len.to_le_bytes())?;
1254
1255 for f_id in &self.faults_ids {
1257 w.write_all(f_id)?;
1258 }
1259
1260 Ok(())
1261 }
1262
1263 fn read<R: Read>(r: &mut R) -> io::Result<Self>
1264 where
1265 Self: Sized,
1266 {
1267 let header = Header::read(r)?;
1269
1270 let len = Self::read_u32_le(r)?;
1272
1273 let mut transactions_ids = vec![];
1275 for _ in 0..len {
1276 let mut tx_id = [0u8; 32];
1277 r.read_exact(&mut tx_id[..])?;
1278
1279 transactions_ids.push(tx_id);
1280 }
1281
1282 let len = Self::read_u32_le(r)?;
1284
1285 let mut faults_ids = vec![];
1287 for _ in 0..len {
1288 let mut f_id = [0u8; 32];
1289 r.read_exact(&mut f_id[..])?;
1290
1291 faults_ids.push(f_id);
1292 }
1293
1294 Ok(Self {
1295 header,
1296 transactions_ids,
1297 faults_ids,
1298 })
1299 }
1300}
1301
1302#[cfg(test)]
1303mod tests {
1304 use fake::{Fake, Faker};
1305 use node_data::ledger;
1306
1307 use super::*;
1308
1309 #[test]
1310 fn test_store_block() {
1311 TestWrapper::new("test_store_block").run(|path| {
1312 let db = Backend::create_or_open(path, DatabaseOptions::default());
1313
1314 let b: Block = Faker.fake();
1315 assert!(!b.txs().is_empty());
1316
1317 let hash = b.header().hash;
1318
1319 assert!(db
1320 .update(|txn| {
1321 txn.store_block(
1322 b.header(),
1323 &to_spent_txs(b.txs()),
1324 b.faults(),
1325 Label::Final(3),
1326 )?;
1327 Ok(())
1328 })
1329 .is_ok());
1330
1331 db.view(|txn| {
1332 let db_blk = txn
1334 .block(&hash)
1335 .expect("Block to be fetched")
1336 .expect("Block to exist");
1337 assert_eq!(db_blk.header().hash, b.header().hash);
1338
1339 for pos in 0..b.txs().len() {
1342 assert_eq!(db_blk.txs()[pos].id(), b.txs()[pos].id());
1343 }
1344
1345 for pos in 0..b.faults().len() {
1348 assert_eq!(db_blk.faults()[pos].id(), b.faults()[pos].id());
1349 }
1350 });
1351
1352 assert!(db
1353 .update(|txn| {
1354 txn.clear_database()?;
1355 Ok(())
1356 })
1357 .is_ok());
1358
1359 db.view(|txn| {
1360 assert!(txn
1361 .block(&hash)
1362 .expect("block to be fetched")
1363 .is_none());
1364 });
1365 });
1366 }
1367
1368 #[test]
1369 fn test_read_only() {
1370 TestWrapper::new("test_read_only").run(|path| {
1371 let db = Backend::create_or_open(path, DatabaseOptions::default());
1372 let b: Block = Faker.fake();
1373 db.update_dry_run(true, |txn| {
1374 txn.store_block(
1375 b.header(),
1376 &to_spent_txs(b.txs()),
1377 b.faults(),
1378 Label::Final(3),
1379 )
1380 })
1381 .expect("block to be stored");
1382 db.view(|txn| {
1383 assert!(txn
1384 .block(&b.header().hash)
1385 .expect("block to be fetched")
1386 .is_none());
1387 });
1388 });
1389 }
1390
1391 #[test]
1392 fn test_transaction_isolation() {
1393 TestWrapper::new("test_transaction_isolation").run(|path| {
1394 let db = Backend::create_or_open(path, DatabaseOptions::default());
1395 let mut b: Block = Faker.fake();
1396 let hash = b.header().hash;
1397
1398 db.view(|txn| {
1399 assert!(db
1402 .update(|inner| {
1403 inner
1404 .store_block(
1405 b.header(),
1406 &to_spent_txs(b.txs()),
1407 b.faults(),
1408 Label::Final(3),
1409 )
1410 .unwrap();
1411
1412 assert!(inner.block(&hash)?.is_some());
1414 assert!(txn.block(&hash)?.is_none());
1417 Ok(())
1418 })
1419 .is_ok());
1420
1421 assert!(txn
1424 .block(&hash)
1425 .expect("block to be fetched")
1426 .is_some());
1427 });
1428
1429 db.view(|txn| {
1431 assert_blocks_eq(
1432 &mut txn
1433 .block(&hash)
1434 .expect("block to be fetched")
1435 .unwrap(),
1436 &mut b,
1437 );
1438 });
1439 });
1440 }
1441
1442 fn assert_blocks_eq(a: &Block, b: &Block) {
1443 assert!(a.header().hash != [0u8; 32]);
1444 assert!(a.header().hash.eq(&b.header().hash));
1445 }
1446
1447 #[test]
1448 fn test_add_mempool_tx() {
1449 TestWrapper::new("test_add_tx").run(|path| {
1450 let db = Backend::create_or_open(path, DatabaseOptions::default());
1451 let t: Transaction = Faker.fake();
1452
1453 assert!(db.update(|txn| { txn.store_mempool_tx(&t, 0) }).is_ok());
1454
1455 db.view(|vq| {
1456 assert!(vq.mempool_tx_exists(t.id()).unwrap());
1457
1458 let fetched_tx = vq
1459 .mempool_tx(t.id())
1460 .expect("valid contract call")
1461 .unwrap();
1462
1463 assert_eq!(
1464 fetched_tx.id(),
1465 t.id(),
1466 "fetched transaction should be the same"
1467 );
1468 });
1469
1470 db.update(|txn| {
1472 let deleted =
1473 txn.delete_mempool_tx(t.id(), false).expect("valid tx");
1474 assert!(deleted.len() == 1);
1475 Ok(())
1476 })
1477 .unwrap();
1478 });
1479 }
1480
1481 #[test]
1482 fn test_mempool_txs_sorted_by_fee() {
1483 TestWrapper::new("test_mempool_txs_sorted_by_fee").run(|path| {
1484 let db = Backend::create_or_open(path, DatabaseOptions::default());
1485 let _rng = rand::thread_rng();
1487 db.update(|txn| {
1488 for _i in 0..10u32 {
1489 let t: Transaction = Faker.fake();
1490 txn.store_mempool_tx(&t, 0)?;
1491 }
1492 Ok(())
1493 })
1494 .unwrap();
1495
1496 db.view(|txn| {
1497 let txs = txn
1498 .mempool_txs_sorted_by_fee()
1499 .expect("iter should return");
1500
1501 let mut last_fee = u64::MAX;
1502 for t in txs {
1503 let fee = t.gas_price();
1504 assert!(
1505 fee <= last_fee,
1506 "tx fees are not in decreasing order"
1507 );
1508 last_fee = fee
1509 }
1510 assert_ne!(last_fee, u64::MAX, "No tx has been processed")
1511 });
1512 });
1513 }
1514
1515 #[test]
1516 fn test_txs_count() {
1517 TestWrapper::new("test_txs_count").run(|path| {
1518 let db = Backend::create_or_open(path, DatabaseOptions::default());
1519
1520 const N: usize = 100;
1521 const D: usize = 50;
1522
1523 let txs: Vec<_> = (0..N)
1524 .map(|i| ledger::faker::gen_dummy_tx(i as u64))
1525 .collect();
1526
1527 db.update(|db| {
1528 assert_eq!(db.mempool_txs_count(), 0);
1529 txs.iter().for_each(|t| {
1530 db.store_mempool_tx(&t, 0).expect("tx should be added")
1531 });
1532 Ok(())
1533 })
1534 .unwrap();
1535
1536 db.update(|db| {
1537 assert_eq!(db.mempool_txs_count(), N);
1539
1540 txs.iter().take(D).for_each(|tx| {
1541 let deleted = db
1542 .delete_mempool_tx(tx.id(), false)
1543 .expect("transaction should be deleted");
1544 assert!(deleted.len() == 1);
1545 });
1546
1547 Ok(())
1548 })
1549 .unwrap();
1550
1551 db.update(|db| {
1553 assert_eq!(db.mempool_txs_count(), N - D);
1554 Ok(())
1555 })
1556 .unwrap();
1557 });
1558 }
1559
1560 #[test]
1561 fn test_max_gas_limit() {
1562 TestWrapper::new("test_block_size_limit").run(|path| {
1563 let db = Backend::create_or_open(path, DatabaseOptions::default());
1564
1565 db.update(|txn| {
1566 for i in 0..10u32 {
1567 let t = ledger::faker::gen_dummy_tx(i as u64);
1568 txn.store_mempool_tx(&t, 0)?;
1569 }
1570 Ok(())
1571 })
1572 .unwrap();
1573
1574 let total_gas_price: u64 = 9 + 8 + 7 + 6 + 5 + 4 + 3 + 2 + 1;
1575 db.view(|txn| {
1576 let txs = txn
1577 .mempool_txs_sorted_by_fee()
1578 .expect("should return all txs")
1579 .map(|t| t.gas_price())
1580 .sum::<u64>();
1581
1582 assert_eq!(txs, total_gas_price);
1583 });
1584 });
1585 }
1586
1587 #[test]
1588 fn test_get_expired_txs() {
1589 TestWrapper::new("test_get_expired_txs").run(|path| {
1590 let db = Backend::create_or_open(path, DatabaseOptions::default());
1591
1592 let mut expiry_list = HashSet::new();
1593 let _ = db.update(|txn| {
1594 (1..101).for_each(|i| {
1595 let t = ledger::faker::gen_dummy_tx(i as u64);
1596 txn.store_mempool_tx(&t, i).expect("tx should be added");
1597 expiry_list.insert(t.id());
1598 });
1599
1600 (1000..1100).for_each(|i| {
1601 let t = ledger::faker::gen_dummy_tx(i as u64);
1602 txn.store_mempool_tx(&t, i).expect("tx should be added");
1603 });
1604
1605 Ok(())
1606 });
1607
1608 db.view(|vq| {
1609 let expired: HashSet<_> = vq
1610 .mempool_expired_txs(100)
1611 .unwrap()
1612 .into_iter()
1613 .map(|id| id)
1614 .collect();
1615
1616 assert_eq!(expiry_list, expired);
1617 });
1618 });
1619 }
1620
1621 fn to_spent_txs(txs: &Vec<Transaction>) -> Vec<SpentTransaction> {
1622 txs.iter()
1623 .map(|t| SpentTransaction {
1624 inner: t.clone(),
1625 block_height: 0,
1626 gas_spent: 0,
1627 err: None,
1628 })
1629 .collect()
1630 }
1631
1632 #[test]
1633 fn test_get_ledger_tx_by_hash() {
1634 TestWrapper::new("test_get_ledger_tx_by_hash").run(|path| {
1635 let db = Backend::create_or_open(path, DatabaseOptions::default());
1636 let b: Block = Faker.fake();
1637 assert!(!b.txs().is_empty());
1638
1639 assert!(db
1641 .update(|txn| {
1642 txn.store_block(
1643 b.header(),
1644 &to_spent_txs(b.txs()),
1645 b.faults(),
1646 Label::Final(3),
1647 )?;
1648 Ok(())
1649 })
1650 .is_ok());
1651
1652 db.view(|v| {
1655 for t in b.txs().iter() {
1656 assert!(v
1657 .ledger_tx(&t.id())
1658 .expect("should not return error")
1659 .expect("should find a transaction")
1660 .inner
1661 .eq(t));
1662 }
1663 });
1664 });
1665 }
1666
1667 #[test]
1668 fn test_fetch_block_hash_by_height() {
1669 TestWrapper::new("test_fetch_block_hash_by_height").run(|path| {
1670 let db = Backend::create_or_open(path, DatabaseOptions::default());
1671 let b: Block = Faker.fake();
1672
1673 assert!(db
1675 .update(|txn| {
1676 txn.store_block(
1677 b.header(),
1678 &to_spent_txs(b.txs()),
1679 b.faults(),
1680 Label::Attested(3),
1681 )?;
1682 Ok(())
1683 })
1684 .is_ok());
1685
1686 db.view(|v| {
1688 assert!(v
1689 .block_hash_by_height(b.header().height)
1690 .expect("should not return error")
1691 .expect("should find a block")
1692 .eq(&b.header().hash));
1693 });
1694 });
1695 }
1696
1697 #[test]
1698 fn test_fetch_block_label_by_height() {
1699 TestWrapper::new("test_fetch_block_hash_by_height").run(|path| {
1700 let db = Backend::create_or_open(path, DatabaseOptions::default());
1701 let b: Block = Faker.fake();
1702
1703 assert!(db
1705 .update(|txn| {
1706 txn.store_block(
1707 b.header(),
1708 &to_spent_txs(b.txs()),
1709 b.faults(),
1710 Label::Attested(3),
1711 )?;
1712 Ok(())
1713 })
1714 .is_ok());
1715
1716 db.view(|v| {
1718 assert!(v
1719 .block_label_by_height(b.header().height)
1720 .expect("should not return error")
1721 .expect("should find a block")
1722 .1
1723 .eq(&Label::Attested(3)));
1724 });
1725 });
1726 }
1727
1728 #[test]
1729 fn test_delete_block() {
1731 let t = TestWrapper::new("test_fetch_block_hash_by_height");
1732 t.run(|path| {
1733 let db = Backend::create_or_open(path, DatabaseOptions::default());
1734 let b: ledger::Block = Faker.fake();
1735
1736 assert!(db
1737 .update(|ut| {
1738 ut.store_block(
1739 b.header(),
1740 &to_spent_txs(b.txs()),
1741 b.faults(),
1742 Label::Final(3),
1743 )?;
1744 Ok(())
1745 })
1746 .is_ok());
1747
1748 assert!(db
1749 .update(|ut| {
1750 ut.delete_block(&b)?;
1751 Ok(())
1752 })
1753 .is_ok());
1754 });
1755
1756 let path = t.get_path();
1757 let opts = Options::default();
1758
1759 let vec = rocksdb::DB::list_cf(&opts, &path).unwrap();
1760 assert!(!vec.is_empty());
1761
1762 let db = rocksdb::DB::open_cf(&opts, &path, vec.clone()).unwrap();
1764 vec.into_iter()
1765 .map(|cf_name| {
1766 if cf_name == CF_METADATA {
1767 return;
1768 }
1769
1770 let cf = db.cf_handle(&cf_name).unwrap();
1771 assert_eq!(
1772 db.iterator_cf(cf, IteratorMode::Start)
1773 .map(Result::unwrap)
1774 .count(),
1775 0
1776 );
1777 })
1778 .for_each(drop);
1779 }
1780
1781 struct TestWrapper(tempfile::TempDir);
1782
1783 impl TestWrapper {
1784 fn new(path: &'static str) -> Self {
1785 Self(
1786 tempfile::TempDir::with_prefix(path)
1787 .expect("Temp directory to be created"),
1788 )
1789 }
1790
1791 pub fn run<F>(&self, test_func: F)
1792 where
1793 F: FnOnce(&Path),
1794 {
1795 test_func(self.0.path());
1796 }
1797
1798 pub fn get_path(&self) -> std::path::PathBuf {
1799 self.0.path().to_owned().join(DB_FOLDER_NAME)
1800 }
1801 }
1802}