dusk_node/database/
rocksdb.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4//
5// Copyright (c) DUSK NETWORK. All rights reserved.
6
7use std::cell::RefCell;
8use std::collections::HashSet;
9use std::io::{Read, Write};
10use std::path::Path;
11use std::sync::Arc;
12use std::{io, vec};
13
14use anyhow::Result;
15use dusk_core::transfer::data::BlobSidecar;
16use node_data::ledger::{
17    Block, Fault, Header, Label, SpendingId, SpentTransaction, Transaction,
18};
19use node_data::message::{payload, ConsensusHeader};
20use node_data::Serializable;
21use rocksdb::{
22    AsColumnFamilyRef, BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor,
23    DBAccess, DBRawIteratorWithThreadMode, IteratorMode, LogLevel,
24    OptimisticTransactionDB, OptimisticTransactionOptions, Options,
25    WriteOptions,
26};
27use tracing::info;
28
29use super::{
30    ConsensusStorage, DatabaseOptions, Ledger, LightBlock, Metadata, Persist,
31    DB,
32};
33use crate::database::Mempool;
34
35const CF_LEDGER_HEADER: &str = "cf_ledger_header";
36const CF_LEDGER_TXS: &str = "cf_ledger_txs";
37const CF_LEDGER_BLOBS: &str = "cf_ledger_blobs";
38const CF_LEDGER_BLOBS_HEIGHT: &str = "cf_ledger_blobs_height";
39const CF_LEDGER_FAULTS: &str = "cf_ledger_faults";
40const CF_LEDGER_HEIGHT: &str = "cf_ledger_height";
41const CF_CANDIDATES: &str = "cf_candidates";
42const CF_CANDIDATES_HEIGHT: &str = "cf_candidates_height";
43const CF_VALIDATION_RESULTS: &str = "cf_validation_results";
44const CF_MEMPOOL: &str = "cf_mempool";
45const CF_MEMPOOL_SPENDING_ID: &str = "cf_mempool_spending_id";
46const CF_MEMPOOL_FEES: &str = "cf_mempool_fees";
47const CF_METADATA: &str = "cf_metadata";
48
49const DB_FOLDER_NAME: &str = "chain.db";
50
51// List of supported metadata keys
52pub const MD_HASH_KEY: &[u8] = b"hash_key";
53pub const MD_STATE_ROOT_KEY: &[u8] = b"state_hash_key";
54pub const MD_AVG_VALIDATION: &[u8] = b"avg_validation_time";
55pub const MD_AVG_RATIFICATION: &[u8] = b"avg_ratification_time";
56pub const MD_AVG_PROPOSAL: &[u8] = b"avg_proposal_time";
57pub const MD_LAST_ITER: &[u8] = b"consensus_last_iter";
58
59#[derive(Clone)]
60pub struct Backend {
61    rocksdb: Arc<OptimisticTransactionDB>,
62}
63
64impl Backend {
65    fn begin_tx(&self) -> DBTransaction<'_, OptimisticTransactionDB> {
66        // Create a new RocksDB transaction
67        let write_options = WriteOptions::default();
68        let tx_options = OptimisticTransactionOptions::default();
69
70        let inner = self.rocksdb.transaction_opt(&write_options, &tx_options);
71
72        // Borrow column families
73        let ledger_cf = self
74            .rocksdb
75            .cf_handle(CF_LEDGER_HEADER)
76            .expect("ledger_header column family must exist");
77
78        let ledger_txs_cf = self
79            .rocksdb
80            .cf_handle(CF_LEDGER_TXS)
81            .expect("CF_LEDGER_TXS column family must exist");
82
83        let ledger_faults_cf = self
84            .rocksdb
85            .cf_handle(CF_LEDGER_FAULTS)
86            .expect("CF_LEDGER_FAULTS column family must exist");
87
88        let candidates_cf = self
89            .rocksdb
90            .cf_handle(CF_CANDIDATES)
91            .expect("candidates column family must exist");
92
93        let candidates_height_cf = self
94            .rocksdb
95            .cf_handle(CF_CANDIDATES_HEIGHT)
96            .expect("candidates_height column family must exist");
97
98        let validation_results_cf = self
99            .rocksdb
100            .cf_handle(CF_VALIDATION_RESULTS)
101            .expect("validation result column family must exist");
102
103        let mempool_cf = self
104            .rocksdb
105            .cf_handle(CF_MEMPOOL)
106            .expect("mempool column family must exist");
107
108        let spending_id_cf = self
109            .rocksdb
110            .cf_handle(CF_MEMPOOL_SPENDING_ID)
111            .expect("CF_MEMPOOL_SPENDING_ID column family must exist");
112
113        let fees_cf = self
114            .rocksdb
115            .cf_handle(CF_MEMPOOL_FEES)
116            .expect("CF_MEMPOOL_FEES column family must exist");
117
118        let ledger_height_cf = self
119            .rocksdb
120            .cf_handle(CF_LEDGER_HEIGHT)
121            .expect("CF_LEDGER_HEIGHT column family must exist");
122
123        let metadata_cf = self
124            .rocksdb
125            .cf_handle(CF_METADATA)
126            .expect("CF_METADATA column family must exist");
127
128        let ledger_blobs_cf = self
129            .rocksdb
130            .cf_handle(CF_LEDGER_BLOBS)
131            .expect("CF_LEDGER_BLOBS column family must exist");
132
133        let ledger_blobs_height_cf = self
134            .rocksdb
135            .cf_handle(CF_LEDGER_BLOBS_HEIGHT)
136            .expect("CF_LEDGER_BLOBS_HEIGHT column family must exist");
137
138        DBTransaction::<'_, OptimisticTransactionDB> {
139            inner,
140            candidates_cf,
141            candidates_height_cf,
142            validation_results_cf,
143            ledger_cf,
144            ledger_txs_cf,
145            ledger_faults_cf,
146            mempool_cf,
147            spending_id_cf,
148            fees_cf,
149            ledger_height_cf,
150            ledger_blobs_cf,
151            ledger_blobs_height_cf,
152            metadata_cf,
153            cumulative_inner_size: RefCell::new(0),
154        }
155    }
156}
157
158impl DB for Backend {
159    type P<'a> = DBTransaction<'a, OptimisticTransactionDB>;
160
161    fn create_or_open<T>(path: T, db_opts: DatabaseOptions) -> Self
162    where
163        T: AsRef<Path>,
164    {
165        let path = path.as_ref().join(DB_FOLDER_NAME);
166        info!("Opening database in {path:?}, {:?} ", db_opts);
167
168        // A set of options for initializing any blocks-related CF (including
169        // METADATA CF)
170        let mut blocks_cf_opts = Options::default();
171        blocks_cf_opts.create_if_missing(db_opts.create_if_missing);
172        blocks_cf_opts.create_missing_column_families(true);
173        blocks_cf_opts.set_level_compaction_dynamic_level_bytes(true);
174        blocks_cf_opts
175            .set_write_buffer_size(db_opts.blocks_cf_max_write_buffer_size);
176
177        if db_opts.enable_debug {
178            blocks_cf_opts.set_log_level(LogLevel::Info);
179            blocks_cf_opts.set_dump_malloc_stats(true);
180            blocks_cf_opts.enable_statistics();
181        }
182
183        if db_opts.blocks_cf_disable_block_cache {
184            let mut block_opts = BlockBasedOptions::default();
185            block_opts.disable_cache();
186            blocks_cf_opts.set_block_based_table_factory(&block_opts);
187        }
188
189        // Configure CF_MEMPOOL column family, so it benefits from low
190        // write-latency of L0
191        let mut mp_opts = blocks_cf_opts.clone();
192        // Disable WAL by default
193        mp_opts.set_manual_wal_flush(true);
194        mp_opts.create_if_missing(true);
195        mp_opts.create_missing_column_families(true);
196        mp_opts.set_write_buffer_size(db_opts.mempool_cf_max_write_buffer_size);
197
198        if db_opts.enable_debug {
199            mp_opts.set_log_level(LogLevel::Info);
200            mp_opts.set_dump_malloc_stats(true);
201            mp_opts.enable_statistics();
202        }
203
204        let cfs = vec![
205            ColumnFamilyDescriptor::new(
206                CF_LEDGER_HEADER,
207                blocks_cf_opts.clone(),
208            ),
209            ColumnFamilyDescriptor::new(CF_LEDGER_TXS, blocks_cf_opts.clone()),
210            ColumnFamilyDescriptor::new(
211                CF_LEDGER_FAULTS,
212                blocks_cf_opts.clone(),
213            ),
214            ColumnFamilyDescriptor::new(
215                CF_LEDGER_HEIGHT,
216                blocks_cf_opts.clone(),
217            ),
218            ColumnFamilyDescriptor::new(
219                CF_LEDGER_BLOBS,
220                blocks_cf_opts.clone(),
221            ),
222            ColumnFamilyDescriptor::new(
223                CF_LEDGER_BLOBS_HEIGHT,
224                blocks_cf_opts.clone(),
225            ),
226            ColumnFamilyDescriptor::new(CF_CANDIDATES, blocks_cf_opts.clone()),
227            ColumnFamilyDescriptor::new(
228                CF_CANDIDATES_HEIGHT,
229                blocks_cf_opts.clone(),
230            ),
231            ColumnFamilyDescriptor::new(
232                CF_VALIDATION_RESULTS,
233                blocks_cf_opts.clone(),
234            ),
235            ColumnFamilyDescriptor::new(CF_METADATA, blocks_cf_opts.clone()),
236            ColumnFamilyDescriptor::new(CF_MEMPOOL, mp_opts.clone()),
237            ColumnFamilyDescriptor::new(
238                CF_MEMPOOL_SPENDING_ID,
239                mp_opts.clone(),
240            ),
241            ColumnFamilyDescriptor::new(CF_MEMPOOL_FEES, mp_opts.clone()),
242        ];
243
244        Self {
245            rocksdb: Arc::new(
246                OptimisticTransactionDB::open_cf_descriptors(
247                    &blocks_cf_opts,
248                    &path,
249                    cfs,
250                )
251                .unwrap_or_else(|_| {
252                    panic!("should be a valid database in {path:?}")
253                }),
254            ),
255        }
256    }
257
258    fn view<F, T>(&self, f: F) -> T
259    where
260        F: for<'a> FnOnce(&Self::P<'a>) -> T,
261    {
262        // Create a new read-only transaction
263        let tx = self.begin_tx();
264
265        // Execute all read-only transactions in isolation
266        let ret = f(&tx);
267        tx.rollback().expect("rollback to succeed for readonly");
268        ret
269    }
270
271    fn update<F, T>(&self, execute: F) -> Result<T>
272    where
273        F: for<'a> FnOnce(&mut Self::P<'a>) -> Result<T>,
274    {
275        self.update_dry_run(false, execute)
276    }
277
278    fn update_dry_run<F, T>(&self, dry_run: bool, execute: F) -> Result<T>
279    where
280        F: for<'a> FnOnce(&mut Self::P<'a>) -> Result<T>,
281    {
282        // Create read-write transaction
283        let mut tx = self.begin_tx();
284
285        // If f returns err, no commit will be applied into backend
286        // storage
287        let ret = execute(&mut tx)?;
288
289        if dry_run {
290            tx.rollback()?;
291        } else {
292            // Apply changes in atomic way
293            tx.commit()?;
294        }
295
296        Ok(ret)
297    }
298
299    fn close(&mut self) {}
300}
301
302pub struct DBTransaction<'db, DB: DBAccess> {
303    inner: rocksdb::Transaction<'db, DB>,
304    /// cumulative size of transaction footprint
305    cumulative_inner_size: RefCell<usize>,
306
307    // TODO: pack all column families into a single array
308    // Candidates column family
309    candidates_cf: &'db ColumnFamily,
310    candidates_height_cf: &'db ColumnFamily,
311    // ValidationResults column family
312    validation_results_cf: &'db ColumnFamily,
313
314    // Ledger column families
315    ledger_cf: &'db ColumnFamily,
316    ledger_faults_cf: &'db ColumnFamily,
317    ledger_txs_cf: &'db ColumnFamily,
318    ledger_height_cf: &'db ColumnFamily,
319    ledger_blobs_cf: &'db ColumnFamily,
320    ledger_blobs_height_cf: &'db ColumnFamily,
321
322    // Mempool column families
323    mempool_cf: &'db ColumnFamily,
324    spending_id_cf: &'db ColumnFamily,
325    fees_cf: &'db ColumnFamily,
326
327    metadata_cf: &'db ColumnFamily,
328}
329
330impl<DB: DBAccess> Ledger for DBTransaction<'_, DB> {
331    fn store_block(
332        &mut self,
333        header: &Header,
334        txs: &[SpentTransaction],
335        faults: &[Fault],
336        label: Label,
337    ) -> Result<usize> {
338        // COLUMN FAMILY: CF_LEDGER_HEADER
339        // It consists of one record per block - Header record
340        // It also includes single record to store metadata - Register record
341        {
342            let cf = self.ledger_cf;
343
344            let mut buf = vec![];
345            LightBlock {
346                header: header.clone(),
347                transactions_ids: txs.iter().map(|t| t.inner.id()).collect(),
348                faults_ids: faults.iter().map(|f| f.id()).collect(),
349            }
350            .write(&mut buf)?;
351
352            self.put_cf(cf, header.hash, buf)?;
353        }
354
355        // Update metadata values
356        self.op_write(MD_HASH_KEY, header.hash)?;
357        self.op_write(MD_STATE_ROOT_KEY, header.state_hash)?;
358
359        // COLUMN FAMILY: CF_LEDGER_TXS
360        {
361            let cf = self.ledger_txs_cf;
362
363            let mut stored_blobs = Vec::with_capacity(6);
364
365            // store all block transactions
366            for tx in txs {
367                let mut d = vec![];
368
369                if tx.inner.inner.blob().is_some() {
370                    let mut strip_tx = tx.clone();
371                    if let Some(blobs) = strip_tx.inner.inner.strip_blobs() {
372                        for (hash, sidecar) in blobs.into_iter() {
373                            let sidecar_bytes = sidecar.to_var_bytes();
374                            self.store_blob_data(&hash, sidecar_bytes)?;
375                            stored_blobs.push(hash);
376                        }
377                    }
378                    strip_tx.write(&mut d)?;
379                } else {
380                    tx.write(&mut d)?;
381                }
382                self.put_cf(cf, tx.inner.id(), d)?;
383            }
384
385            if !stored_blobs.is_empty() {
386                // Store all blobs hashes in the ledger
387                self.store_blobs_height(header.height, &stored_blobs)?;
388            }
389        }
390
391        // COLUMN FAMILY: CF_LEDGER_FAULTS
392        {
393            let cf = self.ledger_faults_cf;
394
395            // store all block faults
396            for f in faults {
397                let mut d = vec![];
398                f.write(&mut d)?;
399                self.put_cf(cf, f.id(), d)?;
400            }
401        }
402        self.store_block_label(header.height, &header.hash, label)?;
403
404        Ok(self.get_size())
405    }
406
407    fn faults_by_block(&self, start_height: u64) -> Result<Vec<Fault>> {
408        let mut faults = vec![];
409        let mut hash = self
410            .op_read(MD_HASH_KEY)?
411            .ok_or(anyhow::anyhow!("Cannot read tip"))?;
412
413        loop {
414            let block = self.light_block(&hash)?.ok_or(anyhow::anyhow!(
415                "Cannot read block {}",
416                hex::encode(&hash)
417            ))?;
418
419            let block_height = block.header.height;
420
421            if block_height >= start_height {
422                hash = block.header.prev_block_hash.to_vec();
423                faults.extend(self.faults(&block.faults_ids)?);
424            } else {
425                break;
426            }
427
428            if block_height == 0 {
429                break;
430            }
431        }
432        Ok(faults)
433    }
434
435    fn store_block_label(
436        &mut self,
437        height: u64,
438        hash: &[u8; 32],
439        label: Label,
440    ) -> Result<()> {
441        // CF: HEIGHT -> (BLOCK_HASH, BLOCK_LABEL)
442        let mut buf = vec![];
443        buf.write_all(hash)?;
444        label.write(&mut buf)?;
445
446        self.put_cf(self.ledger_height_cf, height.to_le_bytes(), buf)?;
447        Ok(())
448    }
449
450    fn delete_block(&mut self, b: &Block) -> Result<()> {
451        self.inner.delete_cf(
452            self.ledger_height_cf,
453            b.header().height.to_le_bytes(),
454        )?;
455
456        for tx in b.txs() {
457            self.inner.delete_cf(self.ledger_txs_cf, tx.id())?;
458        }
459        for f in b.faults() {
460            self.inner.delete_cf(self.ledger_faults_cf, f.id())?;
461        }
462
463        self.delete_blobs_by_height(b.header().height)?;
464        self.inner.delete_cf(self.ledger_cf, b.header().hash)?;
465
466        Ok(())
467    }
468
469    fn block_exists(&self, hash: &[u8]) -> Result<bool> {
470        Ok(self.inner.get_cf(self.ledger_cf, hash)?.is_some())
471    }
472
473    fn faults(&self, faults_ids: &[[u8; 32]]) -> Result<Vec<Fault>> {
474        if faults_ids.is_empty() {
475            return Ok(vec![]);
476        }
477        let ids = faults_ids
478            .iter()
479            .map(|id| (self.ledger_faults_cf, id))
480            .collect::<Vec<_>>();
481
482        // Retrieve all faults ID with single call
483        let faults_buffer = self.inner.multi_get_cf(ids);
484
485        let mut faults = vec![];
486        for buf in faults_buffer {
487            let buf = buf?.unwrap();
488            let fault = Fault::read(&mut &buf[..])?;
489            faults.push(fault);
490        }
491
492        Ok(faults)
493    }
494
495    fn blob_data_by_hash(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
496        Ok(self.inner.get_cf(self.ledger_blobs_cf, hash)?)
497    }
498
499    fn store_blob_data(&self, hash: &[u8; 32], data: Vec<u8>) -> Result<()> {
500        self.inner.put_cf(self.ledger_blobs_cf, hash, data)?;
501        Ok(())
502    }
503    fn store_blobs_height(
504        &self,
505        block_height: u64,
506        blob_hashes: &[[u8; 32]],
507    ) -> Result<()> {
508        if blob_hashes.is_empty() {
509            return Ok(());
510        }
511        let blob_hashes_bytes: Vec<_> =
512            blob_hashes.iter().flat_map(|hash| hash.to_vec()).collect();
513        self.inner.put_cf(
514            self.ledger_blobs_height_cf,
515            block_height.to_be_bytes(),
516            blob_hashes_bytes,
517        )?;
518        Ok(())
519    }
520
521    fn delete_blobs_by_height(&self, block_height: u64) -> Result<()> {
522        let blobs_to_delete = self.blobs_by_height(block_height)?;
523        if let Some(blob_hashes) = blobs_to_delete {
524            for hash in blob_hashes {
525                // What happen if the blobs also exists linked to another
526                // transaction?
527                self.inner.delete_cf(self.ledger_blobs_cf, hash)?;
528            }
529            self.inner.delete_cf(
530                self.ledger_blobs_height_cf,
531                block_height.to_be_bytes(),
532            )?;
533        }
534
535        Ok(())
536    }
537
538    fn blobs_by_height(
539        &self,
540        block_height: u64,
541    ) -> Result<Option<Vec<[u8; 32]>>> {
542        let blob_hashes_bytes = self
543            .inner
544            .get_cf(self.ledger_blobs_height_cf, block_height.to_be_bytes())?;
545
546        if let Some(blob_hashes_bytes) = blob_hashes_bytes {
547            let mut blob_hashes = vec![];
548            for chunk in blob_hashes_bytes.chunks(32) {
549                let mut hash = [0u8; 32];
550                hash.copy_from_slice(chunk);
551                blob_hashes.push(hash);
552            }
553            Ok(Some(blob_hashes))
554        } else {
555            Ok(None)
556        }
557    }
558
559    fn block(&self, hash: &[u8]) -> Result<Option<Block>> {
560        match self.inner.get_cf(self.ledger_cf, hash)? {
561            Some(blob) => {
562                let record = LightBlock::read(&mut &blob[..])?;
563
564                // Retrieve all transactions buffers with single call
565                let txs_buffers = self.inner.multi_get_cf(
566                    record
567                        .transactions_ids
568                        .iter()
569                        .map(|id| (self.ledger_txs_cf, id))
570                        .collect::<Vec<(&ColumnFamily, &[u8; 32])>>(),
571                );
572
573                let mut txs = vec![];
574                for buf in txs_buffers {
575                    let buf = buf?.unwrap();
576                    let mut tx = SpentTransaction::read(&mut &buf[..])?;
577                    if let Some(blobs) = tx.inner.inner.blob_mut() {
578                        for blob in blobs {
579                            // Retrieve blob data from the ledger
580                            let sidecar = self
581                                .blob_data_by_hash(&blob.hash)?
582                                .map(|bytes| {
583                                    BlobSidecar::from_buf(&mut &bytes[..])
584                                })
585                                .transpose()
586                                .map_err(|e| {
587                                    anyhow::anyhow!(
588                                        "Failed to parse blob sidecar: {e:?}"
589                                    )
590                                })?;
591                            blob.data = sidecar;
592                        }
593                    }
594                    txs.push(tx.inner);
595                }
596
597                // Retrieve all faults ID with single call
598                let faults_buffer = self.inner.multi_get_cf(
599                    record
600                        .faults_ids
601                        .iter()
602                        .map(|id| (self.ledger_faults_cf, id))
603                        .collect::<Vec<(&ColumnFamily, &[u8; 32])>>(),
604                );
605                let mut faults = vec![];
606                for buf in faults_buffer {
607                    let buf = buf?.unwrap();
608                    let fault = Fault::read(&mut &buf[..])?;
609                    faults.push(fault);
610                }
611
612                Ok(Some(
613                    Block::new(record.header, txs, faults)
614                        .expect("block should be valid"),
615                ))
616            }
617            None => Ok(None),
618        }
619    }
620
621    fn light_block(&self, hash: &[u8]) -> Result<Option<LightBlock>> {
622        match self.inner.get_cf(self.ledger_cf, hash)? {
623            Some(blob) => {
624                let record = LightBlock::read(&mut &blob[..])?;
625                Ok(Some(record))
626            }
627            None => Ok(None),
628        }
629    }
630
631    fn block_header(&self, hash: &[u8]) -> Result<Option<Header>> {
632        match self.inner.get_cf(self.ledger_cf, hash)? {
633            Some(blob) => {
634                let record = Header::read(&mut &blob[..])?;
635                Ok(Some(record))
636            }
637            None => Ok(None),
638        }
639    }
640
641    fn block_hash_by_height(&self, height: u64) -> Result<Option<[u8; 32]>> {
642        Ok(self
643            .inner
644            .get_cf(self.ledger_height_cf, height.to_le_bytes())?
645            .map(|h| {
646                const LEN: usize = 32;
647                let mut hash = [0u8; LEN];
648                hash.copy_from_slice(&h.as_slice()[0..LEN]);
649                hash
650            }))
651    }
652
653    fn ledger_tx(&self, tx_id: &[u8]) -> Result<Option<SpentTransaction>> {
654        let tx = self
655            .inner
656            .get_cf(self.ledger_txs_cf, tx_id)?
657            .map(|blob| SpentTransaction::read(&mut &blob[..]))
658            .transpose()?;
659
660        Ok(tx)
661    }
662
663    /// Returns a list of transactions from the ledger
664    ///
665    /// This function expects a list of transaction IDs that are in the ledger.
666    ///
667    /// It will return an error if any of the transaction IDs are not found in
668    /// the ledger.
669    fn ledger_txs(
670        &self,
671        tx_ids: Vec<&[u8; 32]>,
672    ) -> Result<Vec<SpentTransaction>> {
673        let cf = self.ledger_txs_cf;
674
675        let ids = tx_ids.into_iter().map(|id| (cf, id)).collect::<Vec<_>>();
676
677        let multi_get_results = self.inner.multi_get_cf(ids);
678
679        let mut spent_transactions =
680            Vec::with_capacity(multi_get_results.len());
681        for result in multi_get_results.into_iter() {
682            let opt_blob = result.map_err(|e| {
683                std::io::Error::new(std::io::ErrorKind::Other, e)
684            })?;
685
686            let Some(blob) = opt_blob else {
687                return Err(anyhow::anyhow!(
688                    "At least one Transaction ID was not found"
689                ));
690            };
691
692            let stx = SpentTransaction::read(&mut &blob[..])?;
693
694            spent_transactions.push(stx);
695        }
696
697        Ok(spent_transactions)
698    }
699
700    /// Returns true if the transaction exists in the
701    /// ledger
702    ///
703    /// This is a convenience method that checks if a transaction exists in the
704    /// ledger without unmarshalling the transaction
705    fn ledger_tx_exists(&self, tx_id: &[u8]) -> Result<bool> {
706        Ok(self.inner.get_cf(self.ledger_txs_cf, tx_id)?.is_some())
707    }
708
709    fn block_by_height(&self, height: u64) -> Result<Option<Block>> {
710        let hash = self.block_hash_by_height(height)?;
711        let block = match hash {
712            Some(hash) => self.block(&hash)?,
713            None => None,
714        };
715        Ok(block)
716    }
717
718    fn block_label_by_height(
719        &self,
720        height: u64,
721    ) -> Result<Option<([u8; 32], Label)>> {
722        const HASH_LEN: usize = 32;
723        Ok(self
724            .inner
725            .get_cf(self.ledger_height_cf, height.to_le_bytes())?
726            .map(|h| {
727                let mut hash = [0u8; HASH_LEN];
728                hash.copy_from_slice(&h.as_slice()[0..HASH_LEN]);
729
730                let label_buff = h[HASH_LEN..].to_vec();
731                Label::read(&mut &label_buff[..]).map(|label| (hash, label))
732            })
733            .transpose()?)
734    }
735}
736
737/// Implementation of the `Candidate` trait for `DBTransaction<'db, DB>`.
738impl<DB: DBAccess> ConsensusStorage for DBTransaction<'_, DB> {
739    /// Stores a candidate block in the database.
740    ///
741    /// # Arguments
742    ///
743    /// * `b` - The block to store.
744    ///
745    /// # Returns
746    ///
747    /// Returns `Ok(())` if the block is successfully stored, or an error if the
748    /// operation fails.
749    fn store_candidate(&mut self, b: Block) -> Result<()> {
750        let mut serialized = vec![];
751        b.write(&mut serialized)?;
752
753        self.inner
754            .put_cf(self.candidates_cf, b.header().hash, serialized)?;
755
756        let key = serialize_key(b.header().height, b.header().hash)?;
757        self.inner
758            .put_cf(self.candidates_height_cf, key, b.header().hash)?;
759
760        Ok(())
761    }
762
763    /// Fetches a candidate block from the database.
764    ///
765    /// # Arguments
766    ///
767    /// * `hash` - The hash of the block to fetch.
768    ///
769    /// # Returns
770    ///
771    /// Returns `Ok(Some(block))` if the block is found, `Ok(None)` if the block
772    /// is not found, or an error if the operation fails.
773    fn candidate(&self, hash: &[u8]) -> Result<Option<Block>> {
774        if let Some(blob) = self.inner.get_cf(self.candidates_cf, hash)? {
775            let b = Block::read(&mut &blob[..])?;
776            return Ok(Some(b));
777        }
778
779        // Block not found
780        Ok(None)
781    }
782
783    fn candidate_by_iteration(
784        &self,
785        consensus_header: &ConsensusHeader,
786    ) -> Result<Option<Block>> {
787        let iter = self
788            .inner
789            .iterator_cf(self.candidates_cf, IteratorMode::Start);
790
791        for (_, blob) in iter.map(Result::unwrap) {
792            let b = Block::read(&mut &blob[..])?;
793
794            let header = b.header();
795            if header.prev_block_hash == consensus_header.prev_block_hash
796                && header.iteration == consensus_header.iteration
797            {
798                return Ok(Some(b));
799            }
800        }
801
802        Ok(None)
803    }
804
805    /// Deletes candidate-related items from the database based on a closure.
806    ///
807    /// # Arguments
808    ///
809    /// * `closure` - If the closure returns `true`, the block will be deleted.
810    ///
811    /// # Returns
812    ///
813    /// Returns `Ok(())` if the deletion is successful, or an error if the
814    /// operation fails.
815    fn delete_candidate<F>(&mut self, closure: F) -> Result<()>
816    where
817        F: FnOnce(u64) -> bool + std::marker::Copy,
818    {
819        let iter = self
820            .inner
821            .iterator_cf(self.candidates_height_cf, IteratorMode::Start);
822
823        for (key, hash) in iter.map(Result::unwrap) {
824            let (height, _) = deserialize_key(&mut &key.to_vec()[..])?;
825            if closure(height) {
826                self.inner.delete_cf(self.candidates_cf, hash)?;
827                self.inner.delete_cf(self.candidates_height_cf, key)?;
828            }
829        }
830
831        Ok(())
832    }
833
834    fn count_candidates(&self) -> usize {
835        let iter = self
836            .inner
837            .iterator_cf(self.candidates_height_cf, IteratorMode::Start);
838
839        iter.count()
840    }
841
842    /// Deletes all items from the `CF_CANDIDATES` column family.
843    ///
844    /// # Returns
845    ///
846    /// Returns `Ok(())` if the deletion is successful, or an error if the
847    /// operation fails.
848    fn clear_candidates(&mut self) -> Result<()> {
849        self.delete_candidate(|_| true)
850    }
851
852    /// Stores a ValidationResult in the database.
853    ///
854    /// # Arguments
855    ///
856    /// * `vr` - The ValidationResult to store.
857    ///
858    /// # Returns
859    ///
860    /// Returns `Ok(())` if the ValidationResult is successfully stored, or an
861    /// error if the operation fails.
862    fn store_validation_result(
863        &mut self,
864        consensus_header: &ConsensusHeader,
865        validation_result: &payload::ValidationResult,
866    ) -> Result<()> {
867        let mut serialized = vec![];
868        validation_result.write(&mut serialized)?;
869
870        let key = serialize_iter_key(consensus_header)?;
871        self.inner
872            .put_cf(self.validation_results_cf, key, serialized)?;
873
874        Ok(())
875    }
876
877    /// Fetches a ValidationResult from the database.
878    ///
879    /// # Arguments
880    ///
881    /// * `consensus_header` - The ConsensusHeader of the ValidationResult.
882    ///
883    /// # Returns
884    ///
885    /// Returns `Ok(Some(ValidationResult))` if the ValidationResult is found,
886    /// `Ok(None)` if the ValidationResult is not found, or an error if the
887    /// operation fails.
888    fn validation_result(
889        &self,
890        consensus_header: &ConsensusHeader,
891    ) -> Result<Option<payload::ValidationResult>> {
892        let key = serialize_iter_key(consensus_header)?;
893        if let Some(blob) =
894            self.inner.get_cf(self.validation_results_cf, key)?
895        {
896            let validation_result =
897                payload::ValidationResult::read(&mut &blob[..])?;
898            return Ok(Some(validation_result));
899        }
900
901        // ValidationResult not found
902        Ok(None)
903    }
904
905    /// Deletes ValidationResult items from the database based on a closure.
906    ///
907    /// # Arguments
908    ///
909    /// * `closure` - If the closure returns `true`, the ValidationResult will
910    ///   be deleted.
911    ///
912    /// # Returns
913    ///
914    /// Returns `Ok(())` if the deletion is successful, or an error if the
915    /// operation fails.
916    fn delete_validation_results<F>(&mut self, closure: F) -> Result<()>
917    where
918        F: FnOnce([u8; 32]) -> bool + std::marker::Copy,
919    {
920        let iter = self
921            .inner
922            .iterator_cf(self.validation_results_cf, IteratorMode::Start);
923
924        for (key, _) in iter.map(Result::unwrap) {
925            let (prev_block_hash, _) =
926                deserialize_iter_key(&mut &key.to_vec()[..])?;
927            if closure(prev_block_hash) {
928                self.inner.delete_cf(self.validation_results_cf, key)?;
929            }
930        }
931
932        Ok(())
933    }
934
935    fn count_validation_results(&self) -> usize {
936        let iter = self
937            .inner
938            .iterator_cf(self.validation_results_cf, IteratorMode::Start);
939
940        iter.count()
941    }
942
943    /// Deletes all items from the `CF_VALIDATION_RESULTS` column family.
944    ///
945    /// # Returns
946    ///
947    /// Returns `Ok(())` if the deletion is successful, or an error if the
948    /// operation fails.
949    fn clear_validation_results(&mut self) -> Result<()> {
950        self.delete_validation_results(|_| true)
951    }
952}
953
954impl<DB: DBAccess> Persist for DBTransaction<'_, DB> {
955    /// Deletes all items from both CF_LEDGER and CF_CANDIDATES column families
956    fn clear_database(&mut self) -> Result<()> {
957        // Create an iterator over the column family CF_LEDGER
958        let iter = self.inner.iterator_cf(self.ledger_cf, IteratorMode::Start);
959
960        // Iterate through the CF_LEDGER column family and delete all items
961        for (key, _) in iter.map(Result::unwrap) {
962            self.inner.delete_cf(self.ledger_cf, key)?;
963        }
964
965        self.clear_candidates()?;
966        self.clear_validation_results()?;
967        Ok(())
968    }
969
970    fn commit(self) -> Result<()> {
971        if let Err(e) = self.inner.commit() {
972            return Err(anyhow::Error::new(e).context("failed to commit"));
973        }
974
975        Ok(())
976    }
977
978    fn rollback(self) -> Result<()> {
979        if let Err(e) = self.inner.rollback() {
980            return Err(anyhow::Error::new(e).context("failed to rollback"));
981        }
982
983        Ok(())
984    }
985}
986
987impl<DB: DBAccess> Mempool for DBTransaction<'_, DB> {
988    fn store_mempool_tx(
989        &mut self,
990        tx: &Transaction,
991        timestamp: u64,
992    ) -> Result<()> {
993        // Map Hash to serialized transaction
994        let mut tx_data = vec![];
995        tx.write(&mut tx_data)?;
996
997        let hash = tx.id();
998        self.put_cf(self.mempool_cf, hash, tx_data)?;
999
1000        // Add Secondary indexes //
1001        // Spending Ids
1002        for n in tx.to_spend_ids() {
1003            let key = n.to_bytes();
1004            self.put_cf(self.spending_id_cf, key, hash)?;
1005        }
1006
1007        let timestamp = timestamp.to_be_bytes();
1008
1009        // Map Fee_Hash to Timestamp
1010        // Key pair is used to facilitate sort-by-fee
1011        // Also, the timestamp is used to remove expired transactions
1012        self.put_cf(
1013            self.fees_cf,
1014            serialize_key(tx.gas_price(), hash)?,
1015            timestamp,
1016        )?;
1017
1018        Ok(())
1019    }
1020
1021    fn mempool_tx(&self, hash: [u8; 32]) -> Result<Option<Transaction>> {
1022        let data = self.inner.get_cf(self.mempool_cf, hash)?;
1023
1024        match data {
1025            // None has a meaning key not found
1026            None => Ok(None),
1027            Some(blob) => Ok(Some(Transaction::read(&mut &blob.to_vec()[..])?)),
1028        }
1029    }
1030
1031    fn mempool_tx_exists(&self, h: [u8; 32]) -> Result<bool> {
1032        Ok(self.inner.get_cf(self.mempool_cf, h)?.is_some())
1033    }
1034
1035    fn delete_mempool_tx(
1036        &mut self,
1037        h: [u8; 32],
1038        cascade: bool,
1039    ) -> Result<Vec<[u8; 32]>> {
1040        let mut deleted = vec![];
1041        let tx = self.mempool_tx(h)?;
1042        if let Some(tx) = tx {
1043            let hash = tx.id();
1044
1045            self.inner.delete_cf(self.mempool_cf, hash)?;
1046
1047            // Delete Secondary indexes
1048            // Delete spendingids (nullifiers or nonce)
1049            for n in tx.to_spend_ids() {
1050                let key = n.to_bytes();
1051                self.inner.delete_cf(self.spending_id_cf, key)?;
1052            }
1053
1054            // Delete Fee_Hash
1055            self.inner.delete_cf(
1056                self.fees_cf,
1057                serialize_key(tx.gas_price(), hash)?,
1058            )?;
1059
1060            deleted.push(h);
1061
1062            if cascade {
1063                let mut dependants = vec![];
1064                // Get the next spending id (aka next nonce tx)
1065                // retrieve tx_id and delete it
1066                let mut next_spending_id = tx.next_spending_id();
1067                while let Some(spending_id) = next_spending_id {
1068                    next_spending_id = spending_id.next();
1069                    let next_txs =
1070                        self.mempool_txs_by_spendable_ids(&[spending_id]);
1071                    if next_txs.is_empty() {
1072                        break;
1073                    }
1074                    dependants.extend(next_txs);
1075                }
1076
1077                // delete all dependants
1078                for tx_id in dependants {
1079                    let cascade_deleted =
1080                        self.delete_mempool_tx(tx_id, false)?;
1081                    deleted.extend(cascade_deleted);
1082                }
1083            }
1084        }
1085
1086        Ok(deleted)
1087    }
1088
1089    fn mempool_txs_by_spendable_ids(
1090        &self,
1091        n: &[SpendingId],
1092    ) -> HashSet<[u8; 32]> {
1093        n.iter()
1094            .filter_map(|n| {
1095                match self.inner.get_cf(self.spending_id_cf, n.to_bytes()) {
1096                    Ok(Some(tx_id)) => tx_id.try_into().ok(),
1097                    _ => None,
1098                }
1099            })
1100            .collect()
1101    }
1102
1103    fn mempool_txs_sorted_by_fee(
1104        &self,
1105    ) -> Box<dyn Iterator<Item = Transaction> + '_> {
1106        let iter = MemPoolIterator::new(&self.inner, self.fees_cf, self);
1107
1108        Box::new(iter)
1109    }
1110
1111    fn mempool_txs_ids_sorted_by_fee(
1112        &self,
1113    ) -> Box<dyn Iterator<Item = (u64, [u8; 32])> + '_> {
1114        let iter = MemPoolFeeIterator::new(&self.inner, self.fees_cf, true);
1115
1116        Box::new(iter)
1117    }
1118
1119    fn mempool_txs_ids_sorted_by_low_fee(
1120        &self,
1121    ) -> Box<dyn Iterator<Item = (u64, [u8; 32])> + '_> {
1122        let iter = MemPoolFeeIterator::new(&self.inner, self.fees_cf, false);
1123
1124        Box::new(iter)
1125    }
1126
1127    /// Get all expired transactions hashes.
1128    fn mempool_expired_txs(&self, timestamp: u64) -> Result<Vec<[u8; 32]>> {
1129        let mut iter = self.inner.raw_iterator_cf(self.fees_cf);
1130        iter.seek_to_first();
1131        let mut txs_list = vec![];
1132
1133        while iter.valid() {
1134            if let Some(key) = iter.key() {
1135                let (_, tx_id) = deserialize_key(&mut &key.to_vec()[..])?;
1136
1137                let tx_timestamp = u64::from_be_bytes(
1138                    iter.value()
1139                        .ok_or_else(|| {
1140                            io::Error::new(
1141                                io::ErrorKind::InvalidData,
1142                                "no value",
1143                            )
1144                        })?
1145                        .try_into()
1146                        .map_err(|_| {
1147                            io::Error::new(
1148                                io::ErrorKind::InvalidData,
1149                                "invalid data",
1150                            )
1151                        })?,
1152                );
1153
1154                if tx_timestamp <= timestamp {
1155                    txs_list.push(tx_id);
1156                }
1157            }
1158
1159            iter.next();
1160        }
1161
1162        Ok(txs_list)
1163    }
1164
1165    fn mempool_txs_ids(&self) -> Result<Vec<[u8; 32]>> {
1166        let mut iter = self.inner.raw_iterator_cf(self.fees_cf);
1167        iter.seek_to_last();
1168
1169        let mut txs_list = vec![];
1170
1171        // Iterate all keys from the end in reverse lexicographic order
1172        while iter.valid() {
1173            if let Some(key) = iter.key() {
1174                let (_, tx_id) = deserialize_key(&mut &key.to_vec()[..])?;
1175
1176                txs_list.push(tx_id);
1177            }
1178
1179            iter.prev();
1180        }
1181
1182        Ok(txs_list)
1183    }
1184
1185    fn mempool_txs_count(&self) -> usize {
1186        self.inner
1187            .iterator_cf(self.mempool_cf, IteratorMode::Start)
1188            .count()
1189    }
1190}
1191
1192pub struct MemPoolIterator<'db, DB: DBAccess, M: Mempool> {
1193    iter: MemPoolFeeIterator<'db, DB>,
1194    mempool: &'db M,
1195}
1196
1197impl<'db, DB: DBAccess, M: Mempool> MemPoolIterator<'db, DB, M> {
1198    fn new(
1199        db: &'db rocksdb::Transaction<DB>,
1200        fees_cf: &ColumnFamily,
1201        mempool: &'db M,
1202    ) -> Self {
1203        let iter = MemPoolFeeIterator::new(db, fees_cf, true);
1204        MemPoolIterator { iter, mempool }
1205    }
1206}
1207
1208impl<DB: DBAccess, M: Mempool> Iterator for MemPoolIterator<'_, DB, M> {
1209    type Item = Transaction;
1210    fn next(&mut self) -> Option<Self::Item> {
1211        self.iter.next().and_then(|(_, tx_id)| {
1212            self.mempool.mempool_tx(tx_id).ok().flatten()
1213        })
1214    }
1215}
1216
1217pub struct MemPoolFeeIterator<'db, DB: DBAccess> {
1218    iter: DBRawIteratorWithThreadMode<'db, rocksdb::Transaction<'db, DB>>,
1219    fee_desc: bool,
1220}
1221
1222impl<'db, DB: DBAccess> MemPoolFeeIterator<'db, DB> {
1223    fn new(
1224        db: &'db rocksdb::Transaction<DB>,
1225        fees_cf: &ColumnFamily,
1226        fee_desc: bool,
1227    ) -> Self {
1228        let mut iter = db.raw_iterator_cf(fees_cf);
1229        if fee_desc {
1230            iter.seek_to_last();
1231        };
1232        MemPoolFeeIterator { iter, fee_desc }
1233    }
1234}
1235
1236impl<DB: DBAccess> Iterator for MemPoolFeeIterator<'_, DB> {
1237    type Item = (u64, [u8; 32]);
1238    fn next(&mut self) -> Option<Self::Item> {
1239        match self.iter.valid() {
1240            true => {
1241                if let Some(key) = self.iter.key() {
1242                    let (gas_price, hash) =
1243                        deserialize_key(&mut &key.to_vec()[..]).ok()?;
1244                    if self.fee_desc {
1245                        self.iter.prev();
1246                    } else {
1247                        self.iter.next();
1248                    }
1249                    Some((gas_price, hash))
1250                } else {
1251                    None
1252                }
1253            }
1254            false => None,
1255        }
1256    }
1257}
1258
1259impl<DB: DBAccess> std::fmt::Debug for DBTransaction<'_, DB> {
1260    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1261        //  Print ledger blocks
1262        let iter = self.inner.iterator_cf(self.ledger_cf, IteratorMode::Start);
1263
1264        iter.map(Result::unwrap).try_for_each(|(hash, _)| {
1265            if let Ok(Some(blob)) = self.inner.get_cf(self.ledger_cf, &hash[..])
1266            {
1267                let b = Block::read(&mut &blob[..]).unwrap_or_default();
1268                writeln!(f, "ledger_block [{}]: {:#?}", b.header().height, b)
1269            } else {
1270                Ok(())
1271            }
1272        })?;
1273
1274        // Print candidate blocks
1275        let iter = self
1276            .inner
1277            .iterator_cf(self.candidates_cf, IteratorMode::Start);
1278
1279        let results: std::fmt::Result =
1280            iter.map(Result::unwrap).try_for_each(|(hash, _)| {
1281                if let Ok(Some(blob)) =
1282                    self.inner.get_cf(self.candidates_cf, &hash[..])
1283                {
1284                    let b = Block::read(&mut &blob[..]).unwrap_or_default();
1285                    writeln!(
1286                        f,
1287                        "candidate_block [{}]: {:#?}",
1288                        b.header().height,
1289                        b
1290                    )
1291                } else {
1292                    Ok(())
1293                }
1294            });
1295
1296        results
1297    }
1298}
1299
1300impl<DB: DBAccess> Metadata for DBTransaction<'_, DB> {
1301    fn op_write<T: AsRef<[u8]>>(&mut self, key: &[u8], value: T) -> Result<()> {
1302        self.put_cf(self.metadata_cf, key, value)?;
1303        Ok(())
1304    }
1305
1306    fn op_read(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1307        self.inner.get_cf(self.metadata_cf, key).map_err(Into::into)
1308    }
1309}
1310
1311impl<DB: DBAccess> DBTransaction<'_, DB> {
1312    /// A thin wrapper around inner.put_cf that calculates a db transaction
1313    /// disk footprint
1314    fn put_cf<K: AsRef<[u8]>, V: AsRef<[u8]>>(
1315        &self,
1316        cf: &impl AsColumnFamilyRef,
1317        key: K,
1318        value: V,
1319    ) -> Result<()> {
1320        let kv_size = key.as_ref().len() + value.as_ref().len();
1321        self.inner.put_cf(cf, key, value)?;
1322        *self.cumulative_inner_size.borrow_mut() += kv_size;
1323        Ok(())
1324    }
1325
1326    pub fn get_size(&self) -> usize {
1327        *self.cumulative_inner_size.borrow()
1328    }
1329}
1330
1331fn serialize_key(value: u64, hash: [u8; 32]) -> std::io::Result<Vec<u8>> {
1332    let mut w = vec![];
1333    std::io::Write::write_all(&mut w, &value.to_be_bytes())?;
1334    std::io::Write::write_all(&mut w, &hash)?;
1335    Ok(w)
1336}
1337
1338fn deserialize_key<R: Read>(r: &mut R) -> Result<(u64, [u8; 32])> {
1339    let mut buf = [0u8; 8];
1340    r.read_exact(&mut buf)?;
1341    let value = u64::from_be_bytes(buf);
1342    let mut hash = [0u8; 32];
1343    r.read_exact(&mut hash[..])?;
1344
1345    Ok((value, hash))
1346}
1347
1348fn serialize_iter_key(ch: &ConsensusHeader) -> std::io::Result<Vec<u8>> {
1349    let mut w = vec![];
1350    std::io::Write::write_all(&mut w, &ch.prev_block_hash)?;
1351    std::io::Write::write_all(&mut w, &[ch.iteration])?;
1352    Ok(w)
1353}
1354
1355fn deserialize_iter_key<R: Read>(r: &mut R) -> Result<([u8; 32], u8)> {
1356    let mut prev_block_hash = [0u8; 32];
1357    r.read_exact(&mut prev_block_hash)?;
1358
1359    let mut iter_byte = [0u8; 1];
1360    r.read_exact(&mut iter_byte)?;
1361    let iteration = u8::from_be_bytes(iter_byte);
1362
1363    Ok((prev_block_hash, iteration))
1364}
1365
1366impl node_data::Serializable for LightBlock {
1367    fn write<W: Write>(&self, w: &mut W) -> io::Result<()> {
1368        // Write block header
1369        self.header.write(w)?;
1370
1371        // Write transactions count
1372        let len = self.transactions_ids.len() as u32;
1373        w.write_all(&len.to_le_bytes())?;
1374
1375        // Write transactions hashes
1376        for tx_id in &self.transactions_ids {
1377            w.write_all(tx_id)?;
1378        }
1379
1380        // Write faults count
1381        let len = self.faults_ids.len() as u32;
1382        w.write_all(&len.to_le_bytes())?;
1383
1384        // Write faults id
1385        for f_id in &self.faults_ids {
1386            w.write_all(f_id)?;
1387        }
1388
1389        Ok(())
1390    }
1391
1392    fn read<R: Read>(r: &mut R) -> io::Result<Self>
1393    where
1394        Self: Sized,
1395    {
1396        // Read block header
1397        let header = Header::read(r)?;
1398
1399        // Read transactions count
1400        let len = Self::read_u32_le(r)?;
1401
1402        // Read transactions hashes
1403        let mut transactions_ids = vec![];
1404        for _ in 0..len {
1405            let mut tx_id = [0u8; 32];
1406            r.read_exact(&mut tx_id[..])?;
1407
1408            transactions_ids.push(tx_id);
1409        }
1410
1411        // Read faults count
1412        let len = Self::read_u32_le(r)?;
1413
1414        // Read faults ids
1415        let mut faults_ids = vec![];
1416        for _ in 0..len {
1417            let mut f_id = [0u8; 32];
1418            r.read_exact(&mut f_id[..])?;
1419
1420            faults_ids.push(f_id);
1421        }
1422
1423        Ok(Self {
1424            header,
1425            transactions_ids,
1426            faults_ids,
1427        })
1428    }
1429}
1430
1431#[cfg(test)]
1432mod tests {
1433    use fake::{Fake, Faker};
1434    use node_data::ledger;
1435
1436    use super::*;
1437
1438    #[test]
1439    fn test_store_block() {
1440        TestWrapper::new("test_store_block").run(|path| {
1441            let db = Backend::create_or_open(path, DatabaseOptions::default());
1442
1443            let b: Block = Faker.fake();
1444            assert!(!b.txs().is_empty());
1445
1446            let hash = b.header().hash;
1447
1448            assert!(db
1449                .update(|txn| {
1450                    txn.store_block(
1451                        b.header(),
1452                        &to_spent_txs(b.txs()),
1453                        b.faults(),
1454                        Label::Final(3),
1455                    )?;
1456                    Ok(())
1457                })
1458                .is_ok());
1459
1460            db.view(|txn| {
1461                // Assert block header is fully fetched from ledger
1462                let db_blk = txn
1463                    .block(&hash)
1464                    .expect("Block to be fetched")
1465                    .expect("Block to exist");
1466                assert_eq!(db_blk.header().hash, b.header().hash);
1467
1468                // Assert all transactions are fully fetched from ledger as
1469                // well.
1470                for pos in 0..b.txs().len() {
1471                    assert_eq!(db_blk.txs()[pos].id(), b.txs()[pos].id());
1472                }
1473
1474                // Assert all faults are fully fetched from ledger as
1475                // well.
1476                for pos in 0..b.faults().len() {
1477                    assert_eq!(db_blk.faults()[pos].id(), b.faults()[pos].id());
1478                }
1479            });
1480
1481            assert!(db
1482                .update(|txn| {
1483                    txn.clear_database()?;
1484                    Ok(())
1485                })
1486                .is_ok());
1487
1488            db.view(|txn| {
1489                assert!(txn
1490                    .block(&hash)
1491                    .expect("block to be fetched")
1492                    .is_none());
1493            });
1494        });
1495    }
1496
1497    #[test]
1498    fn test_read_only() {
1499        TestWrapper::new("test_read_only").run(|path| {
1500            let db = Backend::create_or_open(path, DatabaseOptions::default());
1501            let b: Block = Faker.fake();
1502            db.update_dry_run(true, |txn| {
1503                txn.store_block(
1504                    b.header(),
1505                    &to_spent_txs(b.txs()),
1506                    b.faults(),
1507                    Label::Final(3),
1508                )
1509            })
1510            .expect("block to be stored");
1511            db.view(|txn| {
1512                assert!(txn
1513                    .block(&b.header().hash)
1514                    .expect("block to be fetched")
1515                    .is_none());
1516            });
1517        });
1518    }
1519
1520    #[test]
1521    fn test_transaction_isolation() {
1522        TestWrapper::new("test_transaction_isolation").run(|path| {
1523            let db = Backend::create_or_open(path, DatabaseOptions::default());
1524            let mut b: Block = Faker.fake();
1525            let hash = b.header().hash;
1526
1527            db.view(|txn| {
1528                // Simulate a concurrent update is committed during read-only
1529                // transaction
1530                assert!(db
1531                    .update(|inner| {
1532                        inner
1533                            .store_block(
1534                                b.header(),
1535                                &to_spent_txs(b.txs()),
1536                                b.faults(),
1537                                Label::Final(3),
1538                            )
1539                            .unwrap();
1540
1541                        // We support Read-Your-Own-Writes
1542                        assert!(inner.block(&hash)?.is_some());
1543                        // Data is isolated until the transaction is not
1544                        // committed
1545                        assert!(txn.block(&hash)?.is_none());
1546                        Ok(())
1547                    })
1548                    .is_ok());
1549
1550                // Asserts that the read-only/view transaction get the updated
1551                // data after the tx is committed
1552                assert!(txn
1553                    .block(&hash)
1554                    .expect("block to be fetched")
1555                    .is_some());
1556            });
1557
1558            // Asserts that update was done
1559            db.view(|txn| {
1560                assert_blocks_eq(
1561                    &mut txn
1562                        .block(&hash)
1563                        .expect("block to be fetched")
1564                        .unwrap(),
1565                    &mut b,
1566                );
1567            });
1568        });
1569    }
1570
1571    fn assert_blocks_eq(a: &Block, b: &Block) {
1572        assert!(a.header().hash != [0u8; 32]);
1573        assert!(a.header().hash.eq(&b.header().hash));
1574    }
1575
1576    #[test]
1577    fn test_add_mempool_tx() {
1578        TestWrapper::new("test_add_tx").run(|path| {
1579            let db = Backend::create_or_open(path, DatabaseOptions::default());
1580            let t: Transaction = Faker.fake();
1581
1582            assert!(db.update(|txn| { txn.store_mempool_tx(&t, 0) }).is_ok());
1583
1584            db.view(|vq| {
1585                assert!(vq.mempool_tx_exists(t.id()).unwrap());
1586
1587                let fetched_tx = vq
1588                    .mempool_tx(t.id())
1589                    .expect("valid contract call")
1590                    .unwrap();
1591
1592                assert_eq!(
1593                    fetched_tx.id(),
1594                    t.id(),
1595                    "fetched transaction should be the same"
1596                );
1597            });
1598
1599            // Delete a contract call
1600            db.update(|txn| {
1601                let deleted =
1602                    txn.delete_mempool_tx(t.id(), false).expect("valid tx");
1603                assert!(deleted.len() == 1);
1604                Ok(())
1605            })
1606            .unwrap();
1607        });
1608    }
1609
1610    #[test]
1611    fn test_mempool_txs_sorted_by_fee() {
1612        TestWrapper::new("test_mempool_txs_sorted_by_fee").run(|path| {
1613            let db = Backend::create_or_open(path, DatabaseOptions::default());
1614            // Populate mempool with N contract calls
1615            let _rng = rand::thread_rng();
1616            db.update(|txn| {
1617                for _i in 0..10u32 {
1618                    let t: Transaction = Faker.fake();
1619                    txn.store_mempool_tx(&t, 0)?;
1620                }
1621                Ok(())
1622            })
1623            .unwrap();
1624
1625            db.view(|txn| {
1626                let txs = txn.mempool_txs_sorted_by_fee();
1627
1628                let mut last_fee = u64::MAX;
1629                for t in txs {
1630                    let fee = t.gas_price();
1631                    assert!(
1632                        fee <= last_fee,
1633                        "tx fees are not in decreasing order"
1634                    );
1635                    last_fee = fee
1636                }
1637                assert_ne!(last_fee, u64::MAX, "No tx has been processed")
1638            });
1639        });
1640    }
1641
1642    #[test]
1643    fn test_txs_count() {
1644        TestWrapper::new("test_txs_count").run(|path| {
1645            let db = Backend::create_or_open(path, DatabaseOptions::default());
1646
1647            const N: usize = 100;
1648            const D: usize = 50;
1649
1650            let txs: Vec<_> = (0..N)
1651                .map(|i| ledger::faker::gen_dummy_tx(i as u64))
1652                .collect();
1653
1654            db.update(|db| {
1655                assert_eq!(db.mempool_txs_count(), 0);
1656                txs.iter().for_each(|t| {
1657                    db.store_mempool_tx(&t, 0).expect("tx should be added")
1658                });
1659                Ok(())
1660            })
1661            .unwrap();
1662
1663            db.update(|db| {
1664                // Ensure txs count is equal to the number of added tx
1665                assert_eq!(db.mempool_txs_count(), N);
1666
1667                txs.iter().take(D).for_each(|tx| {
1668                    let deleted = db
1669                        .delete_mempool_tx(tx.id(), false)
1670                        .expect("transaction should be deleted");
1671                    assert!(deleted.len() == 1);
1672                });
1673
1674                Ok(())
1675            })
1676            .unwrap();
1677
1678            // Ensure txs count is updated after the deletion
1679            db.update(|db| {
1680                assert_eq!(db.mempool_txs_count(), N - D);
1681                Ok(())
1682            })
1683            .unwrap();
1684        });
1685    }
1686
1687    #[test]
1688    fn test_max_gas_limit() {
1689        TestWrapper::new("test_block_size_limit").run(|path| {
1690            let db = Backend::create_or_open(path, DatabaseOptions::default());
1691
1692            db.update(|txn| {
1693                for i in 0..10u32 {
1694                    let t = ledger::faker::gen_dummy_tx(i as u64);
1695                    txn.store_mempool_tx(&t, 0)?;
1696                }
1697                Ok(())
1698            })
1699            .unwrap();
1700
1701            let total_gas_price: u64 = 9 + 8 + 7 + 6 + 5 + 4 + 3 + 2 + 1;
1702            db.view(|txn| {
1703                let txs = txn
1704                    .mempool_txs_sorted_by_fee()
1705                    .map(|t| t.gas_price())
1706                    .sum::<u64>();
1707
1708                assert_eq!(txs, total_gas_price);
1709            });
1710        });
1711    }
1712
1713    #[test]
1714    fn test_get_expired_txs() {
1715        TestWrapper::new("test_get_expired_txs").run(|path| {
1716            let db = Backend::create_or_open(path, DatabaseOptions::default());
1717
1718            let mut expiry_list = HashSet::new();
1719            let _ = db.update(|txn| {
1720                (1..101).for_each(|i| {
1721                    let t = ledger::faker::gen_dummy_tx(i as u64);
1722                    txn.store_mempool_tx(&t, i).expect("tx should be added");
1723                    expiry_list.insert(t.id());
1724                });
1725
1726                (1000..1100).for_each(|i| {
1727                    let t = ledger::faker::gen_dummy_tx(i as u64);
1728                    txn.store_mempool_tx(&t, i).expect("tx should be added");
1729                });
1730
1731                Ok(())
1732            });
1733
1734            db.view(|vq| {
1735                let expired: HashSet<_> = vq
1736                    .mempool_expired_txs(100)
1737                    .unwrap()
1738                    .into_iter()
1739                    .map(|id| id)
1740                    .collect();
1741
1742                assert_eq!(expiry_list, expired);
1743            });
1744        });
1745    }
1746
1747    fn to_spent_txs(txs: &Vec<Transaction>) -> Vec<SpentTransaction> {
1748        txs.iter()
1749            .map(|t| SpentTransaction {
1750                inner: t.clone(),
1751                block_height: 0,
1752                gas_spent: 0,
1753                err: None,
1754            })
1755            .collect()
1756    }
1757
1758    #[test]
1759    fn test_get_ledger_tx_by_hash() {
1760        TestWrapper::new("test_get_ledger_tx_by_hash").run(|path| {
1761            let db = Backend::create_or_open(path, DatabaseOptions::default());
1762            let b: Block = Faker.fake();
1763            assert!(!b.txs().is_empty());
1764
1765            // Store a block
1766            assert!(db
1767                .update(|txn| {
1768                    txn.store_block(
1769                        b.header(),
1770                        &to_spent_txs(b.txs()),
1771                        b.faults(),
1772                        Label::Final(3),
1773                    )?;
1774                    Ok(())
1775                })
1776                .is_ok());
1777
1778            // Assert all transactions of the accepted (stored) block are
1779            // accessible by hash.
1780            db.view(|v| {
1781                for t in b.txs().iter() {
1782                    assert!(v
1783                        .ledger_tx(&t.id())
1784                        .expect("should not return error")
1785                        .expect("should find a transaction")
1786                        .inner
1787                        .eq(t));
1788                }
1789            });
1790        });
1791    }
1792
1793    #[test]
1794    fn test_fetch_block_hash_by_height() {
1795        TestWrapper::new("test_fetch_block_hash_by_height").run(|path| {
1796            let db = Backend::create_or_open(path, DatabaseOptions::default());
1797            let b: Block = Faker.fake();
1798
1799            // Store a block
1800            assert!(db
1801                .update(|txn| {
1802                    txn.store_block(
1803                        b.header(),
1804                        &to_spent_txs(b.txs()),
1805                        b.faults(),
1806                        Label::Attested(3),
1807                    )?;
1808                    Ok(())
1809                })
1810                .is_ok());
1811
1812            // Assert block hash is accessible by height.
1813            db.view(|v| {
1814                assert!(v
1815                    .block_hash_by_height(b.header().height)
1816                    .expect("should not return error")
1817                    .expect("should find a block")
1818                    .eq(&b.header().hash));
1819            });
1820        });
1821    }
1822
1823    #[test]
1824    fn test_fetch_block_label_by_height() {
1825        TestWrapper::new("test_fetch_block_hash_by_height").run(|path| {
1826            let db = Backend::create_or_open(path, DatabaseOptions::default());
1827            let b: Block = Faker.fake();
1828
1829            // Store a block
1830            assert!(db
1831                .update(|txn| {
1832                    txn.store_block(
1833                        b.header(),
1834                        &to_spent_txs(b.txs()),
1835                        b.faults(),
1836                        Label::Attested(3),
1837                    )?;
1838                    Ok(())
1839                })
1840                .is_ok());
1841
1842            // Assert block hash is accessible by height.
1843            db.view(|v| {
1844                assert!(v
1845                    .block_label_by_height(b.header().height)
1846                    .expect("should not return error")
1847                    .expect("should find a block")
1848                    .1
1849                    .eq(&Label::Attested(3)));
1850            });
1851        });
1852    }
1853
1854    #[test]
1855    /// Ensures delete_block fn removes all keys of a single block
1856    fn test_delete_block() {
1857        let t = TestWrapper::new("test_fetch_block_hash_by_height");
1858        t.run(|path| {
1859            let db = Backend::create_or_open(path, DatabaseOptions::default());
1860            let b: ledger::Block = Faker.fake();
1861
1862            assert!(db
1863                .update(|ut| {
1864                    ut.store_block(
1865                        b.header(),
1866                        &to_spent_txs(b.txs()),
1867                        b.faults(),
1868                        Label::Final(3),
1869                    )?;
1870                    Ok(())
1871                })
1872                .is_ok());
1873
1874            assert!(db
1875                .update(|ut| {
1876                    ut.delete_block(&b)?;
1877                    Ok(())
1878                })
1879                .is_ok());
1880        });
1881
1882        let path = t.get_path();
1883        let opts = Options::default();
1884
1885        let vec = rocksdb::DB::list_cf(&opts, &path).unwrap();
1886        assert!(!vec.is_empty());
1887
1888        // Ensure no block fields leak after its deletion
1889        let db = rocksdb::DB::open_cf(&opts, &path, vec.clone()).unwrap();
1890        vec.into_iter()
1891            .map(|cf_name| {
1892                if cf_name == CF_METADATA {
1893                    return;
1894                }
1895
1896                let cf = db.cf_handle(&cf_name).unwrap();
1897                assert_eq!(
1898                    db.iterator_cf(cf, IteratorMode::Start)
1899                        .map(Result::unwrap)
1900                        .count(),
1901                    0
1902                );
1903            })
1904            .for_each(drop);
1905    }
1906
1907    struct TestWrapper(tempfile::TempDir);
1908
1909    impl TestWrapper {
1910        fn new(path: &'static str) -> Self {
1911            Self(
1912                tempfile::TempDir::with_prefix(path)
1913                    .expect("Temp directory to be created"),
1914            )
1915        }
1916
1917        pub fn run<F>(&self, test_func: F)
1918        where
1919            F: FnOnce(&Path),
1920        {
1921            test_func(self.0.path());
1922        }
1923
1924        pub fn get_path(&self) -> std::path::PathBuf {
1925            self.0.path().to_owned().join(DB_FOLDER_NAME)
1926        }
1927    }
1928}