dusk_node/database/
rocksdb.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4//
5// Copyright (c) DUSK NETWORK. All rights reserved.
6
7use std::cell::RefCell;
8use std::collections::HashSet;
9use std::io::{Read, Write};
10use std::path::Path;
11use std::sync::Arc;
12use std::{io, vec};
13
14use anyhow::Result;
15use node_data::ledger::{
16    Block, Fault, Header, Label, SpendingId, SpentTransaction, Transaction,
17};
18use node_data::message::{payload, ConsensusHeader};
19use node_data::Serializable;
20use rocksdb::{
21    AsColumnFamilyRef, BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor,
22    DBAccess, DBRawIteratorWithThreadMode, IteratorMode, LogLevel,
23    OptimisticTransactionDB, OptimisticTransactionOptions, Options,
24    WriteOptions,
25};
26use tracing::info;
27
28use super::{
29    ConsensusStorage, DatabaseOptions, Ledger, LightBlock, Metadata, Persist,
30    DB,
31};
32use crate::database::Mempool;
33
34const CF_LEDGER_HEADER: &str = "cf_ledger_header";
35const CF_LEDGER_TXS: &str = "cf_ledger_txs";
36const CF_LEDGER_FAULTS: &str = "cf_ledger_faults";
37const CF_LEDGER_HEIGHT: &str = "cf_ledger_height";
38const CF_CANDIDATES: &str = "cf_candidates";
39const CF_CANDIDATES_HEIGHT: &str = "cf_candidates_height";
40const CF_VALIDATION_RESULTS: &str = "cf_validation_results";
41const CF_MEMPOOL: &str = "cf_mempool";
42const CF_MEMPOOL_SPENDING_ID: &str = "cf_mempool_spending_id";
43const CF_MEMPOOL_FEES: &str = "cf_mempool_fees";
44const CF_METADATA: &str = "cf_metadata";
45
46const DB_FOLDER_NAME: &str = "chain.db";
47
48// List of supported metadata keys
49pub const MD_HASH_KEY: &[u8] = b"hash_key";
50pub const MD_STATE_ROOT_KEY: &[u8] = b"state_hash_key";
51pub const MD_AVG_VALIDATION: &[u8] = b"avg_validation_time";
52pub const MD_AVG_RATIFICATION: &[u8] = b"avg_ratification_time";
53pub const MD_AVG_PROPOSAL: &[u8] = b"avg_proposal_time";
54pub const MD_LAST_ITER: &[u8] = b"consensus_last_iter";
55
56#[derive(Clone)]
57pub struct Backend {
58    rocksdb: Arc<OptimisticTransactionDB>,
59}
60
61impl Backend {
62    fn begin_tx(&self) -> DBTransaction<'_, OptimisticTransactionDB> {
63        // Create a new RocksDB transaction
64        let write_options = WriteOptions::default();
65        let tx_options = OptimisticTransactionOptions::default();
66
67        let inner = self.rocksdb.transaction_opt(&write_options, &tx_options);
68
69        // Borrow column families
70        let ledger_cf = self
71            .rocksdb
72            .cf_handle(CF_LEDGER_HEADER)
73            .expect("ledger_header column family must exist");
74
75        let ledger_txs_cf = self
76            .rocksdb
77            .cf_handle(CF_LEDGER_TXS)
78            .expect("CF_LEDGER_TXS column family must exist");
79
80        let ledger_faults_cf = self
81            .rocksdb
82            .cf_handle(CF_LEDGER_FAULTS)
83            .expect("CF_LEDGER_FAULTS column family must exist");
84
85        let candidates_cf = self
86            .rocksdb
87            .cf_handle(CF_CANDIDATES)
88            .expect("candidates column family must exist");
89
90        let candidates_height_cf = self
91            .rocksdb
92            .cf_handle(CF_CANDIDATES_HEIGHT)
93            .expect("candidates_height column family must exist");
94
95        let validation_results_cf = self
96            .rocksdb
97            .cf_handle(CF_VALIDATION_RESULTS)
98            .expect("validation result column family must exist");
99
100        let mempool_cf = self
101            .rocksdb
102            .cf_handle(CF_MEMPOOL)
103            .expect("mempool column family must exist");
104
105        let spending_id_cf = self
106            .rocksdb
107            .cf_handle(CF_MEMPOOL_SPENDING_ID)
108            .expect("CF_MEMPOOL_SPENDING_ID column family must exist");
109
110        let fees_cf = self
111            .rocksdb
112            .cf_handle(CF_MEMPOOL_FEES)
113            .expect("CF_MEMPOOL_FEES column family must exist");
114
115        let ledger_height_cf = self
116            .rocksdb
117            .cf_handle(CF_LEDGER_HEIGHT)
118            .expect("CF_LEDGER_HEIGHT column family must exist");
119
120        let metadata_cf = self
121            .rocksdb
122            .cf_handle(CF_METADATA)
123            .expect("CF_METADATA column family must exist");
124
125        DBTransaction::<'_, OptimisticTransactionDB> {
126            inner,
127            candidates_cf,
128            candidates_height_cf,
129            validation_results_cf,
130            ledger_cf,
131            ledger_txs_cf,
132            ledger_faults_cf,
133            mempool_cf,
134            spending_id_cf,
135            fees_cf,
136            ledger_height_cf,
137            metadata_cf,
138            cumulative_inner_size: RefCell::new(0),
139        }
140    }
141}
142
143impl DB for Backend {
144    type P<'a> = DBTransaction<'a, OptimisticTransactionDB>;
145
146    fn create_or_open<T>(path: T, db_opts: DatabaseOptions) -> Self
147    where
148        T: AsRef<Path>,
149    {
150        let path = path.as_ref().join(DB_FOLDER_NAME);
151        info!("Opening database in {path:?}, {:?} ", db_opts);
152
153        // A set of options for initializing any blocks-related CF (including
154        // METADATA CF)
155        let mut blocks_cf_opts = Options::default();
156        blocks_cf_opts.create_if_missing(true);
157        blocks_cf_opts.create_missing_column_families(true);
158        blocks_cf_opts.set_level_compaction_dynamic_level_bytes(true);
159        blocks_cf_opts
160            .set_write_buffer_size(db_opts.blocks_cf_max_write_buffer_size);
161
162        if db_opts.enable_debug {
163            blocks_cf_opts.set_log_level(LogLevel::Info);
164            blocks_cf_opts.set_dump_malloc_stats(true);
165            blocks_cf_opts.enable_statistics();
166        }
167
168        if db_opts.blocks_cf_disable_block_cache {
169            let mut block_opts = BlockBasedOptions::default();
170            block_opts.disable_cache();
171            blocks_cf_opts.set_block_based_table_factory(&block_opts);
172        }
173
174        // Configure CF_MEMPOOL column family, so it benefits from low
175        // write-latency of L0
176        let mut mp_opts = blocks_cf_opts.clone();
177        // Disable WAL by default
178        mp_opts.set_manual_wal_flush(true);
179        mp_opts.create_if_missing(true);
180        mp_opts.create_missing_column_families(true);
181        mp_opts.set_write_buffer_size(db_opts.mempool_cf_max_write_buffer_size);
182
183        if db_opts.enable_debug {
184            mp_opts.set_log_level(LogLevel::Info);
185            mp_opts.set_dump_malloc_stats(true);
186            mp_opts.enable_statistics();
187        }
188
189        let cfs = vec![
190            ColumnFamilyDescriptor::new(
191                CF_LEDGER_HEADER,
192                blocks_cf_opts.clone(),
193            ),
194            ColumnFamilyDescriptor::new(CF_LEDGER_TXS, blocks_cf_opts.clone()),
195            ColumnFamilyDescriptor::new(
196                CF_LEDGER_FAULTS,
197                blocks_cf_opts.clone(),
198            ),
199            ColumnFamilyDescriptor::new(
200                CF_LEDGER_HEIGHT,
201                blocks_cf_opts.clone(),
202            ),
203            ColumnFamilyDescriptor::new(CF_CANDIDATES, blocks_cf_opts.clone()),
204            ColumnFamilyDescriptor::new(
205                CF_CANDIDATES_HEIGHT,
206                blocks_cf_opts.clone(),
207            ),
208            ColumnFamilyDescriptor::new(
209                CF_VALIDATION_RESULTS,
210                blocks_cf_opts.clone(),
211            ),
212            ColumnFamilyDescriptor::new(CF_METADATA, blocks_cf_opts.clone()),
213            ColumnFamilyDescriptor::new(CF_MEMPOOL, mp_opts.clone()),
214            ColumnFamilyDescriptor::new(
215                CF_MEMPOOL_SPENDING_ID,
216                mp_opts.clone(),
217            ),
218            ColumnFamilyDescriptor::new(CF_MEMPOOL_FEES, mp_opts.clone()),
219        ];
220
221        Self {
222            rocksdb: Arc::new(
223                OptimisticTransactionDB::open_cf_descriptors(
224                    &blocks_cf_opts,
225                    path,
226                    cfs,
227                )
228                .expect("should be a valid database in {path}"),
229            ),
230        }
231    }
232
233    fn view<F, T>(&self, f: F) -> T
234    where
235        F: for<'a> FnOnce(&Self::P<'a>) -> T,
236    {
237        // Create a new read-only transaction
238        let tx = self.begin_tx();
239
240        // Execute all read-only transactions in isolation
241        let ret = f(&tx);
242        tx.rollback().expect("rollback to succeed for readonly");
243        ret
244    }
245
246    fn update<F, T>(&self, execute: F) -> Result<T>
247    where
248        F: for<'a> FnOnce(&mut Self::P<'a>) -> Result<T>,
249    {
250        self.update_dry_run(false, execute)
251    }
252
253    fn update_dry_run<F, T>(&self, dry_run: bool, execute: F) -> Result<T>
254    where
255        F: for<'a> FnOnce(&mut Self::P<'a>) -> Result<T>,
256    {
257        // Create read-write transaction
258        let mut tx = self.begin_tx();
259
260        // If f returns err, no commit will be applied into backend
261        // storage
262        let ret = execute(&mut tx)?;
263
264        if dry_run {
265            tx.rollback()?;
266        } else {
267            // Apply changes in atomic way
268            tx.commit()?;
269        }
270
271        Ok(ret)
272    }
273
274    fn close(&mut self) {}
275}
276
277pub struct DBTransaction<'db, DB: DBAccess> {
278    inner: rocksdb::Transaction<'db, DB>,
279    /// cumulative size of transaction footprint
280    cumulative_inner_size: RefCell<usize>,
281
282    // TODO: pack all column families into a single array
283    // Candidates column family
284    candidates_cf: &'db ColumnFamily,
285    candidates_height_cf: &'db ColumnFamily,
286    // ValidationResults column family
287    validation_results_cf: &'db ColumnFamily,
288
289    // Ledger column families
290    ledger_cf: &'db ColumnFamily,
291    ledger_faults_cf: &'db ColumnFamily,
292    ledger_txs_cf: &'db ColumnFamily,
293    ledger_height_cf: &'db ColumnFamily,
294
295    // Mempool column families
296    mempool_cf: &'db ColumnFamily,
297    spending_id_cf: &'db ColumnFamily,
298    fees_cf: &'db ColumnFamily,
299
300    metadata_cf: &'db ColumnFamily,
301}
302
303impl<'db, DB: DBAccess> Ledger for DBTransaction<'db, DB> {
304    fn store_block(
305        &mut self,
306        header: &Header,
307        txs: &[SpentTransaction],
308        faults: &[Fault],
309        label: Label,
310    ) -> Result<usize> {
311        // COLUMN FAMILY: CF_LEDGER_HEADER
312        // It consists of one record per block - Header record
313        // It also includes single record to store metadata - Register record
314        {
315            let cf = self.ledger_cf;
316
317            let mut buf = vec![];
318            LightBlock {
319                header: header.clone(),
320                transactions_ids: txs.iter().map(|t| t.inner.id()).collect(),
321                faults_ids: faults.iter().map(|f| f.id()).collect(),
322            }
323            .write(&mut buf)?;
324
325            self.put_cf(cf, header.hash, buf)?;
326        }
327
328        // Update metadata values
329        self.op_write(MD_HASH_KEY, header.hash)?;
330        self.op_write(MD_STATE_ROOT_KEY, header.state_hash)?;
331
332        // COLUMN FAMILY: CF_LEDGER_TXS
333        {
334            let cf = self.ledger_txs_cf;
335
336            // store all block transactions
337            for tx in txs {
338                let mut d = vec![];
339                tx.write(&mut d)?;
340                self.put_cf(cf, tx.inner.id(), d)?;
341            }
342        }
343
344        // COLUMN FAMILY: CF_LEDGER_FAULTS
345        {
346            let cf = self.ledger_faults_cf;
347
348            // store all block faults
349            for f in faults {
350                let mut d = vec![];
351                f.write(&mut d)?;
352                self.put_cf(cf, f.id(), d)?;
353            }
354        }
355        self.store_block_label(header.height, &header.hash, label)?;
356
357        Ok(self.get_size())
358    }
359
360    fn faults_by_block(&self, start_height: u64) -> Result<Vec<Fault>> {
361        let mut faults = vec![];
362        let mut hash = self
363            .op_read(MD_HASH_KEY)?
364            .ok_or(anyhow::anyhow!("Cannot read tip"))?;
365
366        loop {
367            let block = self.light_block(&hash)?.ok_or(anyhow::anyhow!(
368                "Cannot read block {}",
369                hex::encode(&hash)
370            ))?;
371
372            let block_height = block.header.height;
373
374            if block_height >= start_height {
375                hash = block.header.prev_block_hash.to_vec();
376                faults.extend(self.faults(&block.faults_ids)?);
377            } else {
378                break;
379            }
380
381            if block_height == 0 {
382                break;
383            }
384        }
385        Ok(faults)
386    }
387
388    fn store_block_label(
389        &mut self,
390        height: u64,
391        hash: &[u8; 32],
392        label: Label,
393    ) -> Result<()> {
394        // CF: HEIGHT -> (BLOCK_HASH, BLOCK_LABEL)
395        let mut buf = vec![];
396        buf.write_all(hash)?;
397        label.write(&mut buf)?;
398
399        self.put_cf(self.ledger_height_cf, height.to_le_bytes(), buf)?;
400        Ok(())
401    }
402
403    fn delete_block(&mut self, b: &Block) -> Result<()> {
404        self.inner.delete_cf(
405            self.ledger_height_cf,
406            b.header().height.to_le_bytes(),
407        )?;
408
409        for tx in b.txs() {
410            self.inner.delete_cf(self.ledger_txs_cf, tx.id())?;
411        }
412        for f in b.faults() {
413            self.inner.delete_cf(self.ledger_faults_cf, f.id())?;
414        }
415
416        self.inner.delete_cf(self.ledger_cf, b.header().hash)?;
417
418        Ok(())
419    }
420
421    fn block_exists(&self, hash: &[u8]) -> Result<bool> {
422        Ok(self.inner.get_cf(self.ledger_cf, hash)?.is_some())
423    }
424
425    fn faults(&self, faults_ids: &[[u8; 32]]) -> Result<Vec<Fault>> {
426        if faults_ids.is_empty() {
427            return Ok(vec![]);
428        }
429        let ids = faults_ids
430            .iter()
431            .map(|id| (self.ledger_faults_cf, id))
432            .collect::<Vec<_>>();
433
434        // Retrieve all faults ID with single call
435        let faults_buffer = self.inner.multi_get_cf(ids);
436
437        let mut faults = vec![];
438        for buf in faults_buffer {
439            let buf = buf?.unwrap();
440            let fault = Fault::read(&mut &buf[..])?;
441            faults.push(fault);
442        }
443
444        Ok(faults)
445    }
446
447    fn block(&self, hash: &[u8]) -> Result<Option<Block>> {
448        match self.inner.get_cf(self.ledger_cf, hash)? {
449            Some(blob) => {
450                let record = LightBlock::read(&mut &blob[..])?;
451
452                // Retrieve all transactions buffers with single call
453                let txs_buffers = self.inner.multi_get_cf(
454                    record
455                        .transactions_ids
456                        .iter()
457                        .map(|id| (self.ledger_txs_cf, id))
458                        .collect::<Vec<(&ColumnFamily, &[u8; 32])>>(),
459                );
460
461                let mut txs = vec![];
462                for buf in txs_buffers {
463                    let buf = buf?.unwrap();
464                    let tx = SpentTransaction::read(&mut &buf[..])?;
465                    txs.push(tx.inner);
466                }
467
468                // Retrieve all faults ID with single call
469                let faults_buffer = self.inner.multi_get_cf(
470                    record
471                        .faults_ids
472                        .iter()
473                        .map(|id| (self.ledger_faults_cf, id))
474                        .collect::<Vec<(&ColumnFamily, &[u8; 32])>>(),
475                );
476                let mut faults = vec![];
477                for buf in faults_buffer {
478                    let buf = buf?.unwrap();
479                    let fault = Fault::read(&mut &buf[..])?;
480                    faults.push(fault);
481                }
482
483                Ok(Some(
484                    Block::new(record.header, txs, faults)
485                        .expect("block should be valid"),
486                ))
487            }
488            None => Ok(None),
489        }
490    }
491
492    fn light_block(&self, hash: &[u8]) -> Result<Option<LightBlock>> {
493        match self.inner.get_cf(self.ledger_cf, hash)? {
494            Some(blob) => {
495                let record = LightBlock::read(&mut &blob[..])?;
496                Ok(Some(record))
497            }
498            None => Ok(None),
499        }
500    }
501
502    fn block_header(&self, hash: &[u8]) -> Result<Option<Header>> {
503        match self.inner.get_cf(self.ledger_cf, hash)? {
504            Some(blob) => {
505                let record = Header::read(&mut &blob[..])?;
506                Ok(Some(record))
507            }
508            None => Ok(None),
509        }
510    }
511
512    fn block_hash_by_height(&self, height: u64) -> Result<Option<[u8; 32]>> {
513        Ok(self
514            .inner
515            .get_cf(self.ledger_height_cf, height.to_le_bytes())?
516            .map(|h| {
517                const LEN: usize = 32;
518                let mut hash = [0u8; LEN];
519                hash.copy_from_slice(&h.as_slice()[0..LEN]);
520                hash
521            }))
522    }
523
524    fn ledger_tx(&self, tx_id: &[u8]) -> Result<Option<SpentTransaction>> {
525        let tx = self
526            .inner
527            .get_cf(self.ledger_txs_cf, tx_id)?
528            .map(|blob| SpentTransaction::read(&mut &blob[..]))
529            .transpose()?;
530
531        Ok(tx)
532    }
533
534    /// Returns a list of transactions from the ledger
535    ///
536    /// This function expects a list of transaction IDs that are in the ledger.
537    ///
538    /// It will return an error if any of the transaction IDs are not found in
539    /// the ledger.
540    fn ledger_txs(
541        &self,
542        tx_ids: Vec<&[u8; 32]>,
543    ) -> Result<Vec<SpentTransaction>> {
544        let cf = self.ledger_txs_cf;
545
546        let ids = tx_ids.into_iter().map(|id| (cf, id)).collect::<Vec<_>>();
547
548        let multi_get_results = self.inner.multi_get_cf(ids);
549
550        let mut spent_transactions =
551            Vec::with_capacity(multi_get_results.len());
552        for result in multi_get_results.into_iter() {
553            let opt_blob = result.map_err(|e| {
554                std::io::Error::new(std::io::ErrorKind::Other, e)
555            })?;
556
557            let Some(blob) = opt_blob else {
558                return Err(anyhow::anyhow!(
559                    "At least one Transaction ID was not found"
560                ));
561            };
562
563            let stx = SpentTransaction::read(&mut &blob[..])?;
564
565            spent_transactions.push(stx);
566        }
567
568        Ok(spent_transactions)
569    }
570
571    /// Returns true if the transaction exists in the
572    /// ledger
573    ///
574    /// This is a convenience method that checks if a transaction exists in the
575    /// ledger without unmarshalling the transaction
576    fn ledger_tx_exists(&self, tx_id: &[u8]) -> Result<bool> {
577        Ok(self.inner.get_cf(self.ledger_txs_cf, tx_id)?.is_some())
578    }
579
580    fn block_by_height(&self, height: u64) -> Result<Option<Block>> {
581        let hash = self.block_hash_by_height(height)?;
582        let block = match hash {
583            Some(hash) => self.block(&hash)?,
584            None => None,
585        };
586        Ok(block)
587    }
588
589    fn block_label_by_height(
590        &self,
591        height: u64,
592    ) -> Result<Option<([u8; 32], Label)>> {
593        const HASH_LEN: usize = 32;
594        Ok(self
595            .inner
596            .get_cf(self.ledger_height_cf, height.to_le_bytes())?
597            .map(|h| {
598                let mut hash = [0u8; HASH_LEN];
599                hash.copy_from_slice(&h.as_slice()[0..HASH_LEN]);
600
601                let label_buff = h[HASH_LEN..].to_vec();
602                Label::read(&mut &label_buff[..]).map(|label| (hash, label))
603            })
604            .transpose()?)
605    }
606}
607
608/// Implementation of the `Candidate` trait for `DBTransaction<'db, DB>`.
609impl<'db, DB: DBAccess> ConsensusStorage for DBTransaction<'db, DB> {
610    /// Stores a candidate block in the database.
611    ///
612    /// # Arguments
613    ///
614    /// * `b` - The block to store.
615    ///
616    /// # Returns
617    ///
618    /// Returns `Ok(())` if the block is successfully stored, or an error if the
619    /// operation fails.
620    fn store_candidate(&mut self, b: Block) -> Result<()> {
621        let mut serialized = vec![];
622        b.write(&mut serialized)?;
623
624        self.inner
625            .put_cf(self.candidates_cf, b.header().hash, serialized)?;
626
627        let key = serialize_key(b.header().height, b.header().hash)?;
628        self.inner
629            .put_cf(self.candidates_height_cf, key, b.header().hash)?;
630
631        Ok(())
632    }
633
634    /// Fetches a candidate block from the database.
635    ///
636    /// # Arguments
637    ///
638    /// * `hash` - The hash of the block to fetch.
639    ///
640    /// # Returns
641    ///
642    /// Returns `Ok(Some(block))` if the block is found, `Ok(None)` if the block
643    /// is not found, or an error if the operation fails.
644    fn candidate(&self, hash: &[u8]) -> Result<Option<Block>> {
645        if let Some(blob) = self.inner.get_cf(self.candidates_cf, hash)? {
646            let b = Block::read(&mut &blob[..])?;
647            return Ok(Some(b));
648        }
649
650        // Block not found
651        Ok(None)
652    }
653
654    fn candidate_by_iteration(
655        &self,
656        consensus_header: &ConsensusHeader,
657    ) -> Result<Option<Block>> {
658        let iter = self
659            .inner
660            .iterator_cf(self.candidates_cf, IteratorMode::Start);
661
662        for (_, blob) in iter.map(Result::unwrap) {
663            let b = Block::read(&mut &blob[..])?;
664
665            let header = b.header();
666            if header.prev_block_hash == consensus_header.prev_block_hash
667                && header.iteration == consensus_header.iteration
668            {
669                return Ok(Some(b));
670            }
671        }
672
673        Ok(None)
674    }
675
676    /// Deletes candidate-related items from the database based on a closure.
677    ///
678    /// # Arguments
679    ///
680    /// * `closure` - If the closure returns `true`, the block will be deleted.
681    ///
682    /// # Returns
683    ///
684    /// Returns `Ok(())` if the deletion is successful, or an error if the
685    /// operation fails.
686    fn delete_candidate<F>(&mut self, closure: F) -> Result<()>
687    where
688        F: FnOnce(u64) -> bool + std::marker::Copy,
689    {
690        let iter = self
691            .inner
692            .iterator_cf(self.candidates_height_cf, IteratorMode::Start);
693
694        for (key, hash) in iter.map(Result::unwrap) {
695            let (height, _) = deserialize_key(&mut &key.to_vec()[..])?;
696            if closure(height) {
697                self.inner.delete_cf(self.candidates_cf, hash)?;
698                self.inner.delete_cf(self.candidates_height_cf, key)?;
699            }
700        }
701
702        Ok(())
703    }
704
705    fn count_candidates(&self) -> usize {
706        let iter = self
707            .inner
708            .iterator_cf(self.candidates_height_cf, IteratorMode::Start);
709
710        iter.count()
711    }
712
713    /// Deletes all items from the `CF_CANDIDATES` column family.
714    ///
715    /// # Returns
716    ///
717    /// Returns `Ok(())` if the deletion is successful, or an error if the
718    /// operation fails.
719    fn clear_candidates(&mut self) -> Result<()> {
720        self.delete_candidate(|_| true)
721    }
722
723    /// Stores a ValidationResult in the database.
724    ///
725    /// # Arguments
726    ///
727    /// * `vr` - The ValidationResult to store.
728    ///
729    /// # Returns
730    ///
731    /// Returns `Ok(())` if the ValidationResult is successfully stored, or an
732    /// error if the operation fails.
733    fn store_validation_result(
734        &mut self,
735        consensus_header: &ConsensusHeader,
736        validation_result: &payload::ValidationResult,
737    ) -> Result<()> {
738        let mut serialized = vec![];
739        validation_result.write(&mut serialized)?;
740
741        let key = serialize_iter_key(consensus_header)?;
742        self.inner
743            .put_cf(self.validation_results_cf, key, serialized)?;
744
745        Ok(())
746    }
747
748    /// Fetches a ValidationResult from the database.
749    ///
750    /// # Arguments
751    ///
752    /// * `consensus_header` - The ConsensusHeader of the ValidationResult.
753    ///
754    /// # Returns
755    ///
756    /// Returns `Ok(Some(ValidationResult))` if the ValidationResult is found,
757    /// `Ok(None)` if the ValidationResult is not found, or an error if the
758    /// operation fails.
759    fn validation_result(
760        &self,
761        consensus_header: &ConsensusHeader,
762    ) -> Result<Option<payload::ValidationResult>> {
763        let key = serialize_iter_key(consensus_header)?;
764        if let Some(blob) =
765            self.inner.get_cf(self.validation_results_cf, key)?
766        {
767            let validation_result =
768                payload::ValidationResult::read(&mut &blob[..])?;
769            return Ok(Some(validation_result));
770        }
771
772        // ValidationResult not found
773        Ok(None)
774    }
775
776    /// Deletes ValidationResult items from the database based on a closure.
777    ///
778    /// # Arguments
779    ///
780    /// * `closure` - If the closure returns `true`, the ValidationResult will
781    ///   be deleted.
782    ///
783    /// # Returns
784    ///
785    /// Returns `Ok(())` if the deletion is successful, or an error if the
786    /// operation fails.
787    fn delete_validation_results<F>(&mut self, closure: F) -> Result<()>
788    where
789        F: FnOnce([u8; 32]) -> bool + std::marker::Copy,
790    {
791        let iter = self
792            .inner
793            .iterator_cf(self.validation_results_cf, IteratorMode::Start);
794
795        for (key, _) in iter.map(Result::unwrap) {
796            let (prev_block_hash, _) =
797                deserialize_iter_key(&mut &key.to_vec()[..])?;
798            if closure(prev_block_hash) {
799                self.inner.delete_cf(self.validation_results_cf, key)?;
800            }
801        }
802
803        Ok(())
804    }
805
806    fn count_validation_results(&self) -> usize {
807        let iter = self
808            .inner
809            .iterator_cf(self.validation_results_cf, IteratorMode::Start);
810
811        iter.count()
812    }
813
814    /// Deletes all items from the `CF_VALIDATION_RESULTS` column family.
815    ///
816    /// # Returns
817    ///
818    /// Returns `Ok(())` if the deletion is successful, or an error if the
819    /// operation fails.
820    fn clear_validation_results(&mut self) -> Result<()> {
821        self.delete_validation_results(|_| true)
822    }
823}
824
825impl<'db, DB: DBAccess> Persist for DBTransaction<'db, DB> {
826    /// Deletes all items from both CF_LEDGER and CF_CANDIDATES column families
827    fn clear_database(&mut self) -> Result<()> {
828        // Create an iterator over the column family CF_LEDGER
829        let iter = self.inner.iterator_cf(self.ledger_cf, IteratorMode::Start);
830
831        // Iterate through the CF_LEDGER column family and delete all items
832        for (key, _) in iter.map(Result::unwrap) {
833            self.inner.delete_cf(self.ledger_cf, key)?;
834        }
835
836        self.clear_candidates()?;
837        self.clear_validation_results()?;
838        Ok(())
839    }
840
841    fn commit(self) -> Result<()> {
842        if let Err(e) = self.inner.commit() {
843            return Err(anyhow::Error::new(e).context("failed to commit"));
844        }
845
846        Ok(())
847    }
848
849    fn rollback(self) -> Result<()> {
850        if let Err(e) = self.inner.rollback() {
851            return Err(anyhow::Error::new(e).context("failed to rollback"));
852        }
853
854        Ok(())
855    }
856}
857
858impl<'db, DB: DBAccess> Mempool for DBTransaction<'db, DB> {
859    fn store_mempool_tx(
860        &mut self,
861        tx: &Transaction,
862        timestamp: u64,
863    ) -> Result<()> {
864        // Map Hash to serialized transaction
865        let mut tx_data = vec![];
866        tx.write(&mut tx_data)?;
867
868        let hash = tx.id();
869        self.put_cf(self.mempool_cf, hash, tx_data)?;
870
871        // Add Secondary indexes //
872        // Spending Ids
873        for n in tx.to_spend_ids() {
874            let key = n.to_bytes();
875            self.put_cf(self.spending_id_cf, key, hash)?;
876        }
877
878        let timestamp = timestamp.to_be_bytes();
879
880        // Map Fee_Hash to Timestamp
881        // Key pair is used to facilitate sort-by-fee
882        // Also, the timestamp is used to remove expired transactions
883        self.put_cf(
884            self.fees_cf,
885            serialize_key(tx.gas_price(), hash)?,
886            timestamp,
887        )?;
888
889        Ok(())
890    }
891
892    fn mempool_tx(&self, hash: [u8; 32]) -> Result<Option<Transaction>> {
893        let data = self.inner.get_cf(self.mempool_cf, hash)?;
894
895        match data {
896            // None has a meaning key not found
897            None => Ok(None),
898            Some(blob) => Ok(Some(Transaction::read(&mut &blob.to_vec()[..])?)),
899        }
900    }
901
902    fn mempool_tx_exists(&self, h: [u8; 32]) -> Result<bool> {
903        Ok(self.inner.get_cf(self.mempool_cf, h)?.is_some())
904    }
905
906    fn delete_mempool_tx(
907        &mut self,
908        h: [u8; 32],
909        cascade: bool,
910    ) -> Result<Vec<[u8; 32]>> {
911        let mut deleted = vec![];
912        let tx = self.mempool_tx(h)?;
913        if let Some(tx) = tx {
914            let hash = tx.id();
915
916            self.inner.delete_cf(self.mempool_cf, hash)?;
917
918            // Delete Secondary indexes
919            // Delete spendingids (nullifiers or nonce)
920            for n in tx.to_spend_ids() {
921                let key = n.to_bytes();
922                self.inner.delete_cf(self.spending_id_cf, key)?;
923            }
924
925            // Delete Fee_Hash
926            self.inner.delete_cf(
927                self.fees_cf,
928                serialize_key(tx.gas_price(), hash)?,
929            )?;
930
931            deleted.push(h);
932
933            if cascade {
934                let mut dependants = vec![];
935                // Get the next spending id (aka next nonce tx)
936                // retrieve tx_id and delete it
937                let mut next_spending_id = tx.next_spending_id();
938                while let Some(spending_id) = next_spending_id {
939                    next_spending_id = spending_id.next();
940                    let next_txs =
941                        self.mempool_txs_by_spendable_ids(&[spending_id]);
942                    if next_txs.is_empty() {
943                        break;
944                    }
945                    dependants.extend(next_txs);
946                }
947
948                // delete all dependants
949                for tx_id in dependants {
950                    let cascade_deleted =
951                        self.delete_mempool_tx(tx_id, false)?;
952                    deleted.extend(cascade_deleted);
953                }
954            }
955        }
956
957        Ok(deleted)
958    }
959
960    fn mempool_txs_by_spendable_ids(
961        &self,
962        n: &[SpendingId],
963    ) -> HashSet<[u8; 32]> {
964        n.iter()
965            .filter_map(|n| {
966                match self.inner.get_cf(self.spending_id_cf, n.to_bytes()) {
967                    Ok(Some(tx_id)) => tx_id.try_into().ok(),
968                    _ => None,
969                }
970            })
971            .collect()
972    }
973
974    fn mempool_txs_sorted_by_fee(
975        &self,
976    ) -> Result<Box<dyn Iterator<Item = Transaction> + '_>> {
977        let iter = MemPoolIterator::new(&self.inner, self.fees_cf, self);
978
979        Ok(Box::new(iter))
980    }
981
982    fn mempool_txs_ids_sorted_by_fee(
983        &self,
984    ) -> Result<Box<dyn Iterator<Item = (u64, [u8; 32])> + '_>> {
985        let iter = MemPoolFeeIterator::new(&self.inner, self.fees_cf, true);
986
987        Ok(Box::new(iter))
988    }
989
990    fn mempool_txs_ids_sorted_by_low_fee(
991        &self,
992    ) -> Result<Box<dyn Iterator<Item = (u64, [u8; 32])> + '_>> {
993        let iter = MemPoolFeeIterator::new(&self.inner, self.fees_cf, false);
994
995        Ok(Box::new(iter))
996    }
997
998    /// Get all expired transactions hashes.
999    fn mempool_expired_txs(&self, timestamp: u64) -> Result<Vec<[u8; 32]>> {
1000        let mut iter = self.inner.raw_iterator_cf(self.fees_cf);
1001        iter.seek_to_first();
1002        let mut txs_list = vec![];
1003
1004        while iter.valid() {
1005            if let Some(key) = iter.key() {
1006                let (_, tx_id) = deserialize_key(&mut &key.to_vec()[..])?;
1007
1008                let tx_timestamp = u64::from_be_bytes(
1009                    iter.value()
1010                        .ok_or_else(|| {
1011                            io::Error::new(
1012                                io::ErrorKind::InvalidData,
1013                                "no value",
1014                            )
1015                        })?
1016                        .try_into()
1017                        .map_err(|_| {
1018                            io::Error::new(
1019                                io::ErrorKind::InvalidData,
1020                                "invalid data",
1021                            )
1022                        })?,
1023                );
1024
1025                if tx_timestamp <= timestamp {
1026                    txs_list.push(tx_id);
1027                }
1028            }
1029
1030            iter.next();
1031        }
1032
1033        Ok(txs_list)
1034    }
1035
1036    fn mempool_txs_ids(&self) -> Result<Vec<[u8; 32]>> {
1037        let mut iter = self.inner.raw_iterator_cf(self.fees_cf);
1038        iter.seek_to_last();
1039
1040        let mut txs_list = vec![];
1041
1042        // Iterate all keys from the end in reverse lexicographic order
1043        while iter.valid() {
1044            if let Some(key) = iter.key() {
1045                let (_, tx_id) = deserialize_key(&mut &key.to_vec()[..])?;
1046
1047                txs_list.push(tx_id);
1048            }
1049
1050            iter.prev();
1051        }
1052
1053        Ok(txs_list)
1054    }
1055
1056    fn mempool_txs_count(&self) -> usize {
1057        self.inner
1058            .iterator_cf(self.mempool_cf, IteratorMode::Start)
1059            .count()
1060    }
1061}
1062
1063pub struct MemPoolIterator<'db, DB: DBAccess, M: Mempool> {
1064    iter: MemPoolFeeIterator<'db, DB>,
1065    mempool: &'db M,
1066}
1067
1068impl<'db, DB: DBAccess, M: Mempool> MemPoolIterator<'db, DB, M> {
1069    fn new(
1070        db: &'db rocksdb::Transaction<DB>,
1071        fees_cf: &ColumnFamily,
1072        mempool: &'db M,
1073    ) -> Self {
1074        let iter = MemPoolFeeIterator::new(db, fees_cf, true);
1075        MemPoolIterator { iter, mempool }
1076    }
1077}
1078
1079impl<DB: DBAccess, M: Mempool> Iterator for MemPoolIterator<'_, DB, M> {
1080    type Item = Transaction;
1081    fn next(&mut self) -> Option<Self::Item> {
1082        self.iter.next().and_then(|(_, tx_id)| {
1083            self.mempool.mempool_tx(tx_id).ok().flatten()
1084        })
1085    }
1086}
1087
1088pub struct MemPoolFeeIterator<'db, DB: DBAccess> {
1089    iter: DBRawIteratorWithThreadMode<'db, rocksdb::Transaction<'db, DB>>,
1090    fee_desc: bool,
1091}
1092
1093impl<'db, DB: DBAccess> MemPoolFeeIterator<'db, DB> {
1094    fn new(
1095        db: &'db rocksdb::Transaction<DB>,
1096        fees_cf: &ColumnFamily,
1097        fee_desc: bool,
1098    ) -> Self {
1099        let mut iter = db.raw_iterator_cf(fees_cf);
1100        if fee_desc {
1101            iter.seek_to_last();
1102        };
1103        MemPoolFeeIterator { iter, fee_desc }
1104    }
1105}
1106
1107impl<DB: DBAccess> Iterator for MemPoolFeeIterator<'_, DB> {
1108    type Item = (u64, [u8; 32]);
1109    fn next(&mut self) -> Option<Self::Item> {
1110        match self.iter.valid() {
1111            true => {
1112                if let Some(key) = self.iter.key() {
1113                    let (gas_price, hash) =
1114                        deserialize_key(&mut &key.to_vec()[..]).ok()?;
1115                    if self.fee_desc {
1116                        self.iter.prev();
1117                    } else {
1118                        self.iter.next();
1119                    }
1120                    Some((gas_price, hash))
1121                } else {
1122                    None
1123                }
1124            }
1125            false => None,
1126        }
1127    }
1128}
1129
1130impl<'db, DB: DBAccess> std::fmt::Debug for DBTransaction<'db, DB> {
1131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1132        //  Print ledger blocks
1133        let iter = self.inner.iterator_cf(self.ledger_cf, IteratorMode::Start);
1134
1135        iter.map(Result::unwrap).try_for_each(|(hash, _)| {
1136            if let Ok(Some(blob)) = self.inner.get_cf(self.ledger_cf, &hash[..])
1137            {
1138                let b = Block::read(&mut &blob[..]).unwrap_or_default();
1139                writeln!(f, "ledger_block [{}]: {:#?}", b.header().height, b)
1140            } else {
1141                Ok(())
1142            }
1143        })?;
1144
1145        // Print candidate blocks
1146        let iter = self
1147            .inner
1148            .iterator_cf(self.candidates_cf, IteratorMode::Start);
1149
1150        let results: std::fmt::Result =
1151            iter.map(Result::unwrap).try_for_each(|(hash, _)| {
1152                if let Ok(Some(blob)) =
1153                    self.inner.get_cf(self.candidates_cf, &hash[..])
1154                {
1155                    let b = Block::read(&mut &blob[..]).unwrap_or_default();
1156                    writeln!(
1157                        f,
1158                        "candidate_block [{}]: {:#?}",
1159                        b.header().height,
1160                        b
1161                    )
1162                } else {
1163                    Ok(())
1164                }
1165            });
1166
1167        results
1168    }
1169}
1170
1171impl<'db, DB: DBAccess> Metadata for DBTransaction<'db, DB> {
1172    fn op_write<T: AsRef<[u8]>>(&mut self, key: &[u8], value: T) -> Result<()> {
1173        self.put_cf(self.metadata_cf, key, value)?;
1174        Ok(())
1175    }
1176
1177    fn op_read(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1178        self.inner.get_cf(self.metadata_cf, key).map_err(Into::into)
1179    }
1180}
1181
1182impl<'db, DB: DBAccess> DBTransaction<'db, DB> {
1183    /// A thin wrapper around inner.put_cf that calculates a db transaction
1184    /// disk footprint
1185    fn put_cf<K: AsRef<[u8]>, V: AsRef<[u8]>>(
1186        &self,
1187        cf: &impl AsColumnFamilyRef,
1188        key: K,
1189        value: V,
1190    ) -> Result<()> {
1191        let kv_size = key.as_ref().len() + value.as_ref().len();
1192        self.inner.put_cf(cf, key, value)?;
1193        *self.cumulative_inner_size.borrow_mut() += kv_size;
1194        Ok(())
1195    }
1196
1197    pub fn get_size(&self) -> usize {
1198        *self.cumulative_inner_size.borrow()
1199    }
1200}
1201
1202fn serialize_key(value: u64, hash: [u8; 32]) -> std::io::Result<Vec<u8>> {
1203    let mut w = vec![];
1204    std::io::Write::write_all(&mut w, &value.to_be_bytes())?;
1205    std::io::Write::write_all(&mut w, &hash)?;
1206    Ok(w)
1207}
1208
1209fn deserialize_key<R: Read>(r: &mut R) -> Result<(u64, [u8; 32])> {
1210    let mut buf = [0u8; 8];
1211    r.read_exact(&mut buf)?;
1212    let value = u64::from_be_bytes(buf);
1213    let mut hash = [0u8; 32];
1214    r.read_exact(&mut hash[..])?;
1215
1216    Ok((value, hash))
1217}
1218
1219fn serialize_iter_key(ch: &ConsensusHeader) -> std::io::Result<Vec<u8>> {
1220    let mut w = vec![];
1221    std::io::Write::write_all(&mut w, &ch.prev_block_hash)?;
1222    std::io::Write::write_all(&mut w, &[ch.iteration])?;
1223    Ok(w)
1224}
1225
1226fn deserialize_iter_key<R: Read>(r: &mut R) -> Result<([u8; 32], u8)> {
1227    let mut prev_block_hash = [0u8; 32];
1228    r.read_exact(&mut prev_block_hash)?;
1229
1230    let mut iter_byte = [0u8; 1];
1231    r.read_exact(&mut iter_byte)?;
1232    let iteration = u8::from_be_bytes(iter_byte);
1233
1234    Ok((prev_block_hash, iteration))
1235}
1236
1237impl node_data::Serializable for LightBlock {
1238    fn write<W: Write>(&self, w: &mut W) -> io::Result<()> {
1239        // Write block header
1240        self.header.write(w)?;
1241
1242        // Write transactions count
1243        let len = self.transactions_ids.len() as u32;
1244        w.write_all(&len.to_le_bytes())?;
1245
1246        // Write transactions hashes
1247        for tx_id in &self.transactions_ids {
1248            w.write_all(tx_id)?;
1249        }
1250
1251        // Write faults count
1252        let len = self.faults_ids.len() as u32;
1253        w.write_all(&len.to_le_bytes())?;
1254
1255        // Write faults id
1256        for f_id in &self.faults_ids {
1257            w.write_all(f_id)?;
1258        }
1259
1260        Ok(())
1261    }
1262
1263    fn read<R: Read>(r: &mut R) -> io::Result<Self>
1264    where
1265        Self: Sized,
1266    {
1267        // Read block header
1268        let header = Header::read(r)?;
1269
1270        // Read transactions count
1271        let len = Self::read_u32_le(r)?;
1272
1273        // Read transactions hashes
1274        let mut transactions_ids = vec![];
1275        for _ in 0..len {
1276            let mut tx_id = [0u8; 32];
1277            r.read_exact(&mut tx_id[..])?;
1278
1279            transactions_ids.push(tx_id);
1280        }
1281
1282        // Read faults count
1283        let len = Self::read_u32_le(r)?;
1284
1285        // Read faults ids
1286        let mut faults_ids = vec![];
1287        for _ in 0..len {
1288            let mut f_id = [0u8; 32];
1289            r.read_exact(&mut f_id[..])?;
1290
1291            faults_ids.push(f_id);
1292        }
1293
1294        Ok(Self {
1295            header,
1296            transactions_ids,
1297            faults_ids,
1298        })
1299    }
1300}
1301
1302#[cfg(test)]
1303mod tests {
1304    use fake::{Fake, Faker};
1305    use node_data::ledger;
1306
1307    use super::*;
1308
1309    #[test]
1310    fn test_store_block() {
1311        TestWrapper::new("test_store_block").run(|path| {
1312            let db = Backend::create_or_open(path, DatabaseOptions::default());
1313
1314            let b: Block = Faker.fake();
1315            assert!(!b.txs().is_empty());
1316
1317            let hash = b.header().hash;
1318
1319            assert!(db
1320                .update(|txn| {
1321                    txn.store_block(
1322                        b.header(),
1323                        &to_spent_txs(b.txs()),
1324                        b.faults(),
1325                        Label::Final(3),
1326                    )?;
1327                    Ok(())
1328                })
1329                .is_ok());
1330
1331            db.view(|txn| {
1332                // Assert block header is fully fetched from ledger
1333                let db_blk = txn
1334                    .block(&hash)
1335                    .expect("Block to be fetched")
1336                    .expect("Block to exist");
1337                assert_eq!(db_blk.header().hash, b.header().hash);
1338
1339                // Assert all transactions are fully fetched from ledger as
1340                // well.
1341                for pos in 0..b.txs().len() {
1342                    assert_eq!(db_blk.txs()[pos].id(), b.txs()[pos].id());
1343                }
1344
1345                // Assert all faults are fully fetched from ledger as
1346                // well.
1347                for pos in 0..b.faults().len() {
1348                    assert_eq!(db_blk.faults()[pos].id(), b.faults()[pos].id());
1349                }
1350            });
1351
1352            assert!(db
1353                .update(|txn| {
1354                    txn.clear_database()?;
1355                    Ok(())
1356                })
1357                .is_ok());
1358
1359            db.view(|txn| {
1360                assert!(txn
1361                    .block(&hash)
1362                    .expect("block to be fetched")
1363                    .is_none());
1364            });
1365        });
1366    }
1367
1368    #[test]
1369    fn test_read_only() {
1370        TestWrapper::new("test_read_only").run(|path| {
1371            let db = Backend::create_or_open(path, DatabaseOptions::default());
1372            let b: Block = Faker.fake();
1373            db.update_dry_run(true, |txn| {
1374                txn.store_block(
1375                    b.header(),
1376                    &to_spent_txs(b.txs()),
1377                    b.faults(),
1378                    Label::Final(3),
1379                )
1380            })
1381            .expect("block to be stored");
1382            db.view(|txn| {
1383                assert!(txn
1384                    .block(&b.header().hash)
1385                    .expect("block to be fetched")
1386                    .is_none());
1387            });
1388        });
1389    }
1390
1391    #[test]
1392    fn test_transaction_isolation() {
1393        TestWrapper::new("test_transaction_isolation").run(|path| {
1394            let db = Backend::create_or_open(path, DatabaseOptions::default());
1395            let mut b: Block = Faker.fake();
1396            let hash = b.header().hash;
1397
1398            db.view(|txn| {
1399                // Simulate a concurrent update is committed during read-only
1400                // transaction
1401                assert!(db
1402                    .update(|inner| {
1403                        inner
1404                            .store_block(
1405                                b.header(),
1406                                &to_spent_txs(b.txs()),
1407                                b.faults(),
1408                                Label::Final(3),
1409                            )
1410                            .unwrap();
1411
1412                        // We support Read-Your-Own-Writes
1413                        assert!(inner.block(&hash)?.is_some());
1414                        // Data is isolated until the transaction is not
1415                        // committed
1416                        assert!(txn.block(&hash)?.is_none());
1417                        Ok(())
1418                    })
1419                    .is_ok());
1420
1421                // Asserts that the read-only/view transaction get the updated
1422                // data after the tx is committed
1423                assert!(txn
1424                    .block(&hash)
1425                    .expect("block to be fetched")
1426                    .is_some());
1427            });
1428
1429            // Asserts that update was done
1430            db.view(|txn| {
1431                assert_blocks_eq(
1432                    &mut txn
1433                        .block(&hash)
1434                        .expect("block to be fetched")
1435                        .unwrap(),
1436                    &mut b,
1437                );
1438            });
1439        });
1440    }
1441
1442    fn assert_blocks_eq(a: &Block, b: &Block) {
1443        assert!(a.header().hash != [0u8; 32]);
1444        assert!(a.header().hash.eq(&b.header().hash));
1445    }
1446
1447    #[test]
1448    fn test_add_mempool_tx() {
1449        TestWrapper::new("test_add_tx").run(|path| {
1450            let db = Backend::create_or_open(path, DatabaseOptions::default());
1451            let t: Transaction = Faker.fake();
1452
1453            assert!(db.update(|txn| { txn.store_mempool_tx(&t, 0) }).is_ok());
1454
1455            db.view(|vq| {
1456                assert!(vq.mempool_tx_exists(t.id()).unwrap());
1457
1458                let fetched_tx = vq
1459                    .mempool_tx(t.id())
1460                    .expect("valid contract call")
1461                    .unwrap();
1462
1463                assert_eq!(
1464                    fetched_tx.id(),
1465                    t.id(),
1466                    "fetched transaction should be the same"
1467                );
1468            });
1469
1470            // Delete a contract call
1471            db.update(|txn| {
1472                let deleted =
1473                    txn.delete_mempool_tx(t.id(), false).expect("valid tx");
1474                assert!(deleted.len() == 1);
1475                Ok(())
1476            })
1477            .unwrap();
1478        });
1479    }
1480
1481    #[test]
1482    fn test_mempool_txs_sorted_by_fee() {
1483        TestWrapper::new("test_mempool_txs_sorted_by_fee").run(|path| {
1484            let db = Backend::create_or_open(path, DatabaseOptions::default());
1485            // Populate mempool with N contract calls
1486            let _rng = rand::thread_rng();
1487            db.update(|txn| {
1488                for _i in 0..10u32 {
1489                    let t: Transaction = Faker.fake();
1490                    txn.store_mempool_tx(&t, 0)?;
1491                }
1492                Ok(())
1493            })
1494            .unwrap();
1495
1496            db.view(|txn| {
1497                let txs = txn
1498                    .mempool_txs_sorted_by_fee()
1499                    .expect("iter should return");
1500
1501                let mut last_fee = u64::MAX;
1502                for t in txs {
1503                    let fee = t.gas_price();
1504                    assert!(
1505                        fee <= last_fee,
1506                        "tx fees are not in decreasing order"
1507                    );
1508                    last_fee = fee
1509                }
1510                assert_ne!(last_fee, u64::MAX, "No tx has been processed")
1511            });
1512        });
1513    }
1514
1515    #[test]
1516    fn test_txs_count() {
1517        TestWrapper::new("test_txs_count").run(|path| {
1518            let db = Backend::create_or_open(path, DatabaseOptions::default());
1519
1520            const N: usize = 100;
1521            const D: usize = 50;
1522
1523            let txs: Vec<_> = (0..N)
1524                .map(|i| ledger::faker::gen_dummy_tx(i as u64))
1525                .collect();
1526
1527            db.update(|db| {
1528                assert_eq!(db.mempool_txs_count(), 0);
1529                txs.iter().for_each(|t| {
1530                    db.store_mempool_tx(&t, 0).expect("tx should be added")
1531                });
1532                Ok(())
1533            })
1534            .unwrap();
1535
1536            db.update(|db| {
1537                // Ensure txs count is equal to the number of added tx
1538                assert_eq!(db.mempool_txs_count(), N);
1539
1540                txs.iter().take(D).for_each(|tx| {
1541                    let deleted = db
1542                        .delete_mempool_tx(tx.id(), false)
1543                        .expect("transaction should be deleted");
1544                    assert!(deleted.len() == 1);
1545                });
1546
1547                Ok(())
1548            })
1549            .unwrap();
1550
1551            // Ensure txs count is updated after the deletion
1552            db.update(|db| {
1553                assert_eq!(db.mempool_txs_count(), N - D);
1554                Ok(())
1555            })
1556            .unwrap();
1557        });
1558    }
1559
1560    #[test]
1561    fn test_max_gas_limit() {
1562        TestWrapper::new("test_block_size_limit").run(|path| {
1563            let db = Backend::create_or_open(path, DatabaseOptions::default());
1564
1565            db.update(|txn| {
1566                for i in 0..10u32 {
1567                    let t = ledger::faker::gen_dummy_tx(i as u64);
1568                    txn.store_mempool_tx(&t, 0)?;
1569                }
1570                Ok(())
1571            })
1572            .unwrap();
1573
1574            let total_gas_price: u64 = 9 + 8 + 7 + 6 + 5 + 4 + 3 + 2 + 1;
1575            db.view(|txn| {
1576                let txs = txn
1577                    .mempool_txs_sorted_by_fee()
1578                    .expect("should return all txs")
1579                    .map(|t| t.gas_price())
1580                    .sum::<u64>();
1581
1582                assert_eq!(txs, total_gas_price);
1583            });
1584        });
1585    }
1586
1587    #[test]
1588    fn test_get_expired_txs() {
1589        TestWrapper::new("test_get_expired_txs").run(|path| {
1590            let db = Backend::create_or_open(path, DatabaseOptions::default());
1591
1592            let mut expiry_list = HashSet::new();
1593            let _ = db.update(|txn| {
1594                (1..101).for_each(|i| {
1595                    let t = ledger::faker::gen_dummy_tx(i as u64);
1596                    txn.store_mempool_tx(&t, i).expect("tx should be added");
1597                    expiry_list.insert(t.id());
1598                });
1599
1600                (1000..1100).for_each(|i| {
1601                    let t = ledger::faker::gen_dummy_tx(i as u64);
1602                    txn.store_mempool_tx(&t, i).expect("tx should be added");
1603                });
1604
1605                Ok(())
1606            });
1607
1608            db.view(|vq| {
1609                let expired: HashSet<_> = vq
1610                    .mempool_expired_txs(100)
1611                    .unwrap()
1612                    .into_iter()
1613                    .map(|id| id)
1614                    .collect();
1615
1616                assert_eq!(expiry_list, expired);
1617            });
1618        });
1619    }
1620
1621    fn to_spent_txs(txs: &Vec<Transaction>) -> Vec<SpentTransaction> {
1622        txs.iter()
1623            .map(|t| SpentTransaction {
1624                inner: t.clone(),
1625                block_height: 0,
1626                gas_spent: 0,
1627                err: None,
1628            })
1629            .collect()
1630    }
1631
1632    #[test]
1633    fn test_get_ledger_tx_by_hash() {
1634        TestWrapper::new("test_get_ledger_tx_by_hash").run(|path| {
1635            let db = Backend::create_or_open(path, DatabaseOptions::default());
1636            let b: Block = Faker.fake();
1637            assert!(!b.txs().is_empty());
1638
1639            // Store a block
1640            assert!(db
1641                .update(|txn| {
1642                    txn.store_block(
1643                        b.header(),
1644                        &to_spent_txs(b.txs()),
1645                        b.faults(),
1646                        Label::Final(3),
1647                    )?;
1648                    Ok(())
1649                })
1650                .is_ok());
1651
1652            // Assert all transactions of the accepted (stored) block are
1653            // accessible by hash.
1654            db.view(|v| {
1655                for t in b.txs().iter() {
1656                    assert!(v
1657                        .ledger_tx(&t.id())
1658                        .expect("should not return error")
1659                        .expect("should find a transaction")
1660                        .inner
1661                        .eq(t));
1662                }
1663            });
1664        });
1665    }
1666
1667    #[test]
1668    fn test_fetch_block_hash_by_height() {
1669        TestWrapper::new("test_fetch_block_hash_by_height").run(|path| {
1670            let db = Backend::create_or_open(path, DatabaseOptions::default());
1671            let b: Block = Faker.fake();
1672
1673            // Store a block
1674            assert!(db
1675                .update(|txn| {
1676                    txn.store_block(
1677                        b.header(),
1678                        &to_spent_txs(b.txs()),
1679                        b.faults(),
1680                        Label::Attested(3),
1681                    )?;
1682                    Ok(())
1683                })
1684                .is_ok());
1685
1686            // Assert block hash is accessible by height.
1687            db.view(|v| {
1688                assert!(v
1689                    .block_hash_by_height(b.header().height)
1690                    .expect("should not return error")
1691                    .expect("should find a block")
1692                    .eq(&b.header().hash));
1693            });
1694        });
1695    }
1696
1697    #[test]
1698    fn test_fetch_block_label_by_height() {
1699        TestWrapper::new("test_fetch_block_hash_by_height").run(|path| {
1700            let db = Backend::create_or_open(path, DatabaseOptions::default());
1701            let b: Block = Faker.fake();
1702
1703            // Store a block
1704            assert!(db
1705                .update(|txn| {
1706                    txn.store_block(
1707                        b.header(),
1708                        &to_spent_txs(b.txs()),
1709                        b.faults(),
1710                        Label::Attested(3),
1711                    )?;
1712                    Ok(())
1713                })
1714                .is_ok());
1715
1716            // Assert block hash is accessible by height.
1717            db.view(|v| {
1718                assert!(v
1719                    .block_label_by_height(b.header().height)
1720                    .expect("should not return error")
1721                    .expect("should find a block")
1722                    .1
1723                    .eq(&Label::Attested(3)));
1724            });
1725        });
1726    }
1727
1728    #[test]
1729    /// Ensures delete_block fn removes all keys of a single block
1730    fn test_delete_block() {
1731        let t = TestWrapper::new("test_fetch_block_hash_by_height");
1732        t.run(|path| {
1733            let db = Backend::create_or_open(path, DatabaseOptions::default());
1734            let b: ledger::Block = Faker.fake();
1735
1736            assert!(db
1737                .update(|ut| {
1738                    ut.store_block(
1739                        b.header(),
1740                        &to_spent_txs(b.txs()),
1741                        b.faults(),
1742                        Label::Final(3),
1743                    )?;
1744                    Ok(())
1745                })
1746                .is_ok());
1747
1748            assert!(db
1749                .update(|ut| {
1750                    ut.delete_block(&b)?;
1751                    Ok(())
1752                })
1753                .is_ok());
1754        });
1755
1756        let path = t.get_path();
1757        let opts = Options::default();
1758
1759        let vec = rocksdb::DB::list_cf(&opts, &path).unwrap();
1760        assert!(!vec.is_empty());
1761
1762        // Ensure no block fields leak after its deletion
1763        let db = rocksdb::DB::open_cf(&opts, &path, vec.clone()).unwrap();
1764        vec.into_iter()
1765            .map(|cf_name| {
1766                if cf_name == CF_METADATA {
1767                    return;
1768                }
1769
1770                let cf = db.cf_handle(&cf_name).unwrap();
1771                assert_eq!(
1772                    db.iterator_cf(cf, IteratorMode::Start)
1773                        .map(Result::unwrap)
1774                        .count(),
1775                    0
1776                );
1777            })
1778            .for_each(drop);
1779    }
1780
1781    struct TestWrapper(tempfile::TempDir);
1782
1783    impl TestWrapper {
1784        fn new(path: &'static str) -> Self {
1785            Self(
1786                tempfile::TempDir::with_prefix(path)
1787                    .expect("Temp directory to be created"),
1788            )
1789        }
1790
1791        pub fn run<F>(&self, test_func: F)
1792        where
1793            F: FnOnce(&Path),
1794        {
1795            test_func(self.0.path());
1796        }
1797
1798        pub fn get_path(&self) -> std::path::PathBuf {
1799            self.0.path().to_owned().join(DB_FOLDER_NAME)
1800        }
1801    }
1802}