Skip to main content

dusk_node/database/
rocksdb.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4//
5// Copyright (c) DUSK NETWORK. All rights reserved.
6
7use std::cell::RefCell;
8use std::collections::HashSet;
9use std::io::{Read, Write};
10use std::path::Path;
11use std::sync::Arc;
12use std::{io, vec};
13
14use anyhow::Result;
15use dusk_core::transfer::data::BlobSidecar;
16use node_data::Serializable;
17use node_data::ledger::{
18    Block, Fault, Header, Label, SpendingId, SpentTransaction, Transaction,
19};
20use node_data::message::{ConsensusHeader, payload};
21use rocksdb::{
22    AsColumnFamilyRef, BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor,
23    DBAccess, DBRawIteratorWithThreadMode, IteratorMode, LogLevel,
24    OptimisticTransactionDB, OptimisticTransactionOptions, Options,
25    WriteOptions,
26};
27use tracing::info;
28
29use super::{
30    ConsensusStorage, DB, DatabaseOptions, Ledger, LightBlock, Metadata,
31    Persist,
32};
33use crate::database::Mempool;
34
35const CF_LEDGER_HEADER: &str = "cf_ledger_header";
36const CF_LEDGER_TXS: &str = "cf_ledger_txs";
37const CF_LEDGER_BLOBS: &str = "cf_ledger_blobs";
38const CF_LEDGER_BLOBS_HEIGHT: &str = "cf_ledger_blobs_height";
39const CF_LEDGER_FAULTS: &str = "cf_ledger_faults";
40const CF_LEDGER_HEIGHT: &str = "cf_ledger_height";
41const CF_CANDIDATES: &str = "cf_candidates";
42const CF_CANDIDATES_HEIGHT: &str = "cf_candidates_height";
43const CF_VALIDATION_RESULTS: &str = "cf_validation_results";
44const CF_MEMPOOL: &str = "cf_mempool";
45const CF_MEMPOOL_SPENDING_ID: &str = "cf_mempool_spending_id";
46const CF_MEMPOOL_FEES: &str = "cf_mempool_fees";
47const CF_METADATA: &str = "cf_metadata";
48
49const DB_FOLDER_NAME: &str = "chain.db";
50
51// List of supported metadata keys
52pub const MD_HASH_KEY: &[u8] = b"hash_key";
53pub const MD_STATE_ROOT_KEY: &[u8] = b"state_hash_key";
54pub const MD_AVG_VALIDATION: &[u8] = b"avg_validation_time";
55pub const MD_AVG_RATIFICATION: &[u8] = b"avg_ratification_time";
56pub const MD_AVG_PROPOSAL: &[u8] = b"avg_proposal_time";
57pub const MD_LAST_ITER: &[u8] = b"consensus_last_iter";
58
59#[derive(Clone)]
60pub struct Backend {
61    rocksdb: Arc<OptimisticTransactionDB>,
62}
63
64impl Backend {
65    fn begin_tx(&self) -> DBTransaction<'_, OptimisticTransactionDB> {
66        // Create a new RocksDB transaction
67        let write_options = WriteOptions::default();
68        let tx_options = OptimisticTransactionOptions::default();
69
70        let inner = self.rocksdb.transaction_opt(&write_options, &tx_options);
71
72        // Borrow column families
73        let ledger_cf = self
74            .rocksdb
75            .cf_handle(CF_LEDGER_HEADER)
76            .expect("ledger_header column family must exist");
77
78        let ledger_txs_cf = self
79            .rocksdb
80            .cf_handle(CF_LEDGER_TXS)
81            .expect("CF_LEDGER_TXS column family must exist");
82
83        let ledger_faults_cf = self
84            .rocksdb
85            .cf_handle(CF_LEDGER_FAULTS)
86            .expect("CF_LEDGER_FAULTS column family must exist");
87
88        let candidates_cf = self
89            .rocksdb
90            .cf_handle(CF_CANDIDATES)
91            .expect("candidates column family must exist");
92
93        let candidates_height_cf = self
94            .rocksdb
95            .cf_handle(CF_CANDIDATES_HEIGHT)
96            .expect("candidates_height column family must exist");
97
98        let validation_results_cf = self
99            .rocksdb
100            .cf_handle(CF_VALIDATION_RESULTS)
101            .expect("validation result column family must exist");
102
103        let mempool_cf = self
104            .rocksdb
105            .cf_handle(CF_MEMPOOL)
106            .expect("mempool column family must exist");
107
108        let spending_id_cf = self
109            .rocksdb
110            .cf_handle(CF_MEMPOOL_SPENDING_ID)
111            .expect("CF_MEMPOOL_SPENDING_ID column family must exist");
112
113        let fees_cf = self
114            .rocksdb
115            .cf_handle(CF_MEMPOOL_FEES)
116            .expect("CF_MEMPOOL_FEES column family must exist");
117
118        let ledger_height_cf = self
119            .rocksdb
120            .cf_handle(CF_LEDGER_HEIGHT)
121            .expect("CF_LEDGER_HEIGHT column family must exist");
122
123        let metadata_cf = self
124            .rocksdb
125            .cf_handle(CF_METADATA)
126            .expect("CF_METADATA column family must exist");
127
128        let ledger_blobs_cf = self
129            .rocksdb
130            .cf_handle(CF_LEDGER_BLOBS)
131            .expect("CF_LEDGER_BLOBS column family must exist");
132
133        let ledger_blobs_height_cf = self
134            .rocksdb
135            .cf_handle(CF_LEDGER_BLOBS_HEIGHT)
136            .expect("CF_LEDGER_BLOBS_HEIGHT column family must exist");
137
138        DBTransaction::<'_, OptimisticTransactionDB> {
139            inner,
140            candidates_cf,
141            candidates_height_cf,
142            validation_results_cf,
143            ledger_cf,
144            ledger_txs_cf,
145            ledger_faults_cf,
146            mempool_cf,
147            spending_id_cf,
148            fees_cf,
149            ledger_height_cf,
150            ledger_blobs_cf,
151            ledger_blobs_height_cf,
152            metadata_cf,
153            cumulative_inner_size: RefCell::new(0),
154        }
155    }
156}
157
158impl DB for Backend {
159    type P<'a> = DBTransaction<'a, OptimisticTransactionDB>;
160
161    fn create_or_open<T>(path: T, db_opts: DatabaseOptions) -> Self
162    where
163        T: AsRef<Path>,
164    {
165        let path = path.as_ref().join(DB_FOLDER_NAME);
166        info!("Opening database in {path:?}, {:?} ", db_opts);
167
168        // A set of options for initializing any blocks-related CF (including
169        // METADATA CF)
170        let mut blocks_cf_opts = Options::default();
171        blocks_cf_opts.create_if_missing(db_opts.create_if_missing);
172        blocks_cf_opts.create_missing_column_families(true);
173        blocks_cf_opts.set_level_compaction_dynamic_level_bytes(true);
174        blocks_cf_opts
175            .set_write_buffer_size(db_opts.blocks_cf_max_write_buffer_size);
176
177        if db_opts.enable_debug {
178            blocks_cf_opts.set_log_level(LogLevel::Info);
179            blocks_cf_opts.set_dump_malloc_stats(true);
180            blocks_cf_opts.enable_statistics();
181        }
182
183        if db_opts.blocks_cf_disable_block_cache {
184            let mut block_opts = BlockBasedOptions::default();
185            block_opts.disable_cache();
186            blocks_cf_opts.set_block_based_table_factory(&block_opts);
187        }
188
189        // Configure CF_MEMPOOL column family, so it benefits from low
190        // write-latency of L0
191        let mut mp_opts = blocks_cf_opts.clone();
192        // Disable WAL by default
193        mp_opts.set_manual_wal_flush(true);
194        mp_opts.create_if_missing(true);
195        mp_opts.create_missing_column_families(true);
196        mp_opts.set_write_buffer_size(db_opts.mempool_cf_max_write_buffer_size);
197
198        if db_opts.enable_debug {
199            mp_opts.set_log_level(LogLevel::Info);
200            mp_opts.set_dump_malloc_stats(true);
201            mp_opts.enable_statistics();
202        }
203
204        let cfs = vec![
205            ColumnFamilyDescriptor::new(
206                CF_LEDGER_HEADER,
207                blocks_cf_opts.clone(),
208            ),
209            ColumnFamilyDescriptor::new(CF_LEDGER_TXS, blocks_cf_opts.clone()),
210            ColumnFamilyDescriptor::new(
211                CF_LEDGER_FAULTS,
212                blocks_cf_opts.clone(),
213            ),
214            ColumnFamilyDescriptor::new(
215                CF_LEDGER_HEIGHT,
216                blocks_cf_opts.clone(),
217            ),
218            ColumnFamilyDescriptor::new(
219                CF_LEDGER_BLOBS,
220                blocks_cf_opts.clone(),
221            ),
222            ColumnFamilyDescriptor::new(
223                CF_LEDGER_BLOBS_HEIGHT,
224                blocks_cf_opts.clone(),
225            ),
226            ColumnFamilyDescriptor::new(CF_CANDIDATES, blocks_cf_opts.clone()),
227            ColumnFamilyDescriptor::new(
228                CF_CANDIDATES_HEIGHT,
229                blocks_cf_opts.clone(),
230            ),
231            ColumnFamilyDescriptor::new(
232                CF_VALIDATION_RESULTS,
233                blocks_cf_opts.clone(),
234            ),
235            ColumnFamilyDescriptor::new(CF_METADATA, blocks_cf_opts.clone()),
236            ColumnFamilyDescriptor::new(CF_MEMPOOL, mp_opts.clone()),
237            ColumnFamilyDescriptor::new(
238                CF_MEMPOOL_SPENDING_ID,
239                mp_opts.clone(),
240            ),
241            ColumnFamilyDescriptor::new(CF_MEMPOOL_FEES, mp_opts.clone()),
242        ];
243
244        Self {
245            rocksdb: Arc::new(
246                OptimisticTransactionDB::open_cf_descriptors(
247                    &blocks_cf_opts,
248                    &path,
249                    cfs,
250                )
251                .unwrap_or_else(|_| {
252                    panic!("should be a valid database in {path:?}")
253                }),
254            ),
255        }
256    }
257
258    fn view<F, T>(&self, f: F) -> T
259    where
260        F: for<'a> FnOnce(&Self::P<'a>) -> T,
261    {
262        // Create a new read-only transaction
263        let tx = self.begin_tx();
264
265        // Execute all read-only transactions in isolation
266        let ret = f(&tx);
267        tx.rollback().expect("rollback to succeed for readonly");
268        ret
269    }
270
271    fn update<F, T>(&self, execute: F) -> Result<T>
272    where
273        F: for<'a> FnOnce(&mut Self::P<'a>) -> Result<T>,
274    {
275        self.update_dry_run(false, execute)
276    }
277
278    fn update_dry_run<F, T>(&self, dry_run: bool, execute: F) -> Result<T>
279    where
280        F: for<'a> FnOnce(&mut Self::P<'a>) -> Result<T>,
281    {
282        // Create read-write transaction
283        let mut tx = self.begin_tx();
284
285        // If f returns err, no commit will be applied into backend
286        // storage
287        let ret = execute(&mut tx)?;
288
289        if dry_run {
290            tx.rollback()?;
291        } else {
292            // Apply changes in atomic way
293            tx.commit()?;
294        }
295
296        Ok(ret)
297    }
298
299    fn close(&mut self) {}
300}
301
302pub struct DBTransaction<'db, DB: DBAccess> {
303    inner: rocksdb::Transaction<'db, DB>,
304    /// cumulative size of transaction footprint
305    cumulative_inner_size: RefCell<usize>,
306
307    // TODO: pack all column families into a single array
308    // Candidates column family
309    candidates_cf: &'db ColumnFamily,
310    candidates_height_cf: &'db ColumnFamily,
311    // ValidationResults column family
312    validation_results_cf: &'db ColumnFamily,
313
314    // Ledger column families
315    ledger_cf: &'db ColumnFamily,
316    ledger_faults_cf: &'db ColumnFamily,
317    ledger_txs_cf: &'db ColumnFamily,
318    ledger_height_cf: &'db ColumnFamily,
319    ledger_blobs_cf: &'db ColumnFamily,
320    ledger_blobs_height_cf: &'db ColumnFamily,
321
322    // Mempool column families
323    mempool_cf: &'db ColumnFamily,
324    spending_id_cf: &'db ColumnFamily,
325    fees_cf: &'db ColumnFamily,
326
327    metadata_cf: &'db ColumnFamily,
328}
329
330impl<DB: DBAccess> Ledger for DBTransaction<'_, DB> {
331    fn store_block(
332        &mut self,
333        header: &Header,
334        txs: &[SpentTransaction],
335        faults: &[Fault],
336        label: Label,
337    ) -> Result<usize> {
338        // COLUMN FAMILY: CF_LEDGER_HEADER
339        // It consists of one record per block - Header record
340        // It also includes single record to store metadata - Register record
341        {
342            let cf = self.ledger_cf;
343
344            let mut buf = vec![];
345            LightBlock {
346                header: header.clone(),
347                transactions_ids: txs.iter().map(|t| t.inner.id()).collect(),
348                faults_ids: faults.iter().map(|f| f.id()).collect(),
349            }
350            .write(&mut buf)?;
351
352            self.put_cf(cf, header.hash, buf)?;
353        }
354
355        // Update metadata values
356        self.op_write(MD_HASH_KEY, header.hash)?;
357        self.op_write(MD_STATE_ROOT_KEY, header.state_hash)?;
358
359        // COLUMN FAMILY: CF_LEDGER_TXS
360        {
361            let cf = self.ledger_txs_cf;
362
363            let mut stored_blobs = Vec::with_capacity(6);
364
365            // store all block transactions
366            for tx in txs {
367                let mut d = vec![];
368
369                if tx.inner.inner.blob().is_some() {
370                    let mut strip_tx = tx.clone();
371                    if let Some(blobs) = strip_tx.inner.inner.strip_blobs() {
372                        for (hash, sidecar) in blobs.into_iter() {
373                            let sidecar_bytes = sidecar.to_var_bytes();
374                            self.store_blob_data(&hash, sidecar_bytes)?;
375                            stored_blobs.push(hash);
376                        }
377                    }
378                    strip_tx.write(&mut d)?;
379                } else {
380                    tx.write(&mut d)?;
381                }
382                self.put_cf(cf, tx.inner.id(), d)?;
383            }
384
385            if !stored_blobs.is_empty() {
386                // Store all blobs hashes in the ledger
387                self.store_blobs_height(header.height, &stored_blobs)?;
388            }
389        }
390
391        // COLUMN FAMILY: CF_LEDGER_FAULTS
392        {
393            let cf = self.ledger_faults_cf;
394
395            // store all block faults
396            for f in faults {
397                let mut d = vec![];
398                f.write(&mut d)?;
399                self.put_cf(cf, f.id(), d)?;
400            }
401        }
402        self.store_block_label(header.height, &header.hash, label)?;
403
404        Ok(self.get_size())
405    }
406
407    fn faults_by_block(&self, start_height: u64) -> Result<Vec<Fault>> {
408        let mut faults = vec![];
409        let mut hash = self
410            .op_read(MD_HASH_KEY)?
411            .ok_or(anyhow::anyhow!("Cannot read tip"))?;
412
413        loop {
414            let block = self.light_block(&hash)?.ok_or(anyhow::anyhow!(
415                "Cannot read block {}",
416                hex::encode(&hash)
417            ))?;
418
419            let block_height = block.header.height;
420
421            if block_height >= start_height {
422                hash = block.header.prev_block_hash.to_vec();
423                faults.extend(self.faults(&block.faults_ids)?);
424            } else {
425                break;
426            }
427
428            if block_height == 0 {
429                break;
430            }
431        }
432        Ok(faults)
433    }
434
435    fn store_block_label(
436        &mut self,
437        height: u64,
438        hash: &[u8; 32],
439        label: Label,
440    ) -> Result<()> {
441        // CF: HEIGHT -> (BLOCK_HASH, BLOCK_LABEL)
442        let mut buf = vec![];
443        buf.write_all(hash)?;
444        label.write(&mut buf)?;
445
446        self.put_cf(self.ledger_height_cf, height.to_le_bytes(), buf)?;
447        Ok(())
448    }
449
450    fn delete_block(&mut self, b: &Block) -> Result<()> {
451        self.inner.delete_cf(
452            self.ledger_height_cf,
453            b.header().height.to_le_bytes(),
454        )?;
455
456        for tx in b.txs() {
457            self.inner.delete_cf(self.ledger_txs_cf, tx.id())?;
458        }
459        for f in b.faults() {
460            self.inner.delete_cf(self.ledger_faults_cf, f.id())?;
461        }
462
463        self.delete_blobs_by_height(b.header().height)?;
464        self.inner.delete_cf(self.ledger_cf, b.header().hash)?;
465
466        Ok(())
467    }
468
469    fn block_exists(&self, hash: &[u8]) -> Result<bool> {
470        Ok(self.inner.get_cf(self.ledger_cf, hash)?.is_some())
471    }
472
473    fn faults(&self, faults_ids: &[[u8; 32]]) -> Result<Vec<Fault>> {
474        if faults_ids.is_empty() {
475            return Ok(vec![]);
476        }
477        let ids = faults_ids
478            .iter()
479            .map(|id| (self.ledger_faults_cf, id))
480            .collect::<Vec<_>>();
481
482        // Retrieve all faults ID with single call
483        let faults_buffer = self.inner.multi_get_cf(ids);
484
485        let mut faults = vec![];
486        for buf in faults_buffer {
487            let buf = buf?.unwrap();
488            let fault = Fault::read(&mut &buf[..])?;
489            faults.push(fault);
490        }
491
492        Ok(faults)
493    }
494
495    fn latest_block(&self) -> Result<LightBlock> {
496        let tip_hash = self
497            .op_read(MD_HASH_KEY)?
498            .ok_or(anyhow::anyhow!("Cannot find tip stored in metadata"))?;
499        self.light_block(&tip_hash)?
500            .ok_or(anyhow::anyhow!("Cannot find tip block"))
501    }
502
503    fn blob_data_by_hash(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
504        Ok(self.inner.get_cf(self.ledger_blobs_cf, hash)?)
505    }
506
507    fn store_blob_data(&self, hash: &[u8; 32], data: Vec<u8>) -> Result<()> {
508        self.inner.put_cf(self.ledger_blobs_cf, hash, data)?;
509        Ok(())
510    }
511    fn store_blobs_height(
512        &self,
513        block_height: u64,
514        blob_hashes: &[[u8; 32]],
515    ) -> Result<()> {
516        if blob_hashes.is_empty() {
517            return Ok(());
518        }
519        let blob_hashes_bytes: Vec<_> =
520            blob_hashes.iter().flat_map(|hash| hash.to_vec()).collect();
521        self.inner.put_cf(
522            self.ledger_blobs_height_cf,
523            block_height.to_be_bytes(),
524            blob_hashes_bytes,
525        )?;
526        Ok(())
527    }
528
529    fn delete_blobs_by_height(&self, block_height: u64) -> Result<()> {
530        let blobs_to_delete = self.blobs_by_height(block_height)?;
531        if let Some(blob_hashes) = blobs_to_delete {
532            for hash in blob_hashes {
533                // What happen if the blobs also exists linked to another
534                // transaction?
535                self.inner.delete_cf(self.ledger_blobs_cf, hash)?;
536            }
537            self.inner.delete_cf(
538                self.ledger_blobs_height_cf,
539                block_height.to_be_bytes(),
540            )?;
541        }
542
543        Ok(())
544    }
545
546    fn blobs_by_height(
547        &self,
548        block_height: u64,
549    ) -> Result<Option<Vec<[u8; 32]>>> {
550        let blob_hashes_bytes = self
551            .inner
552            .get_cf(self.ledger_blobs_height_cf, block_height.to_be_bytes())?;
553
554        if let Some(blob_hashes_bytes) = blob_hashes_bytes {
555            let mut blob_hashes = vec![];
556            for chunk in blob_hashes_bytes.chunks(32) {
557                let mut hash = [0u8; 32];
558                hash.copy_from_slice(chunk);
559                blob_hashes.push(hash);
560            }
561            Ok(Some(blob_hashes))
562        } else {
563            Ok(None)
564        }
565    }
566
567    fn block(&self, hash: &[u8]) -> Result<Option<Block>> {
568        match self.inner.get_cf(self.ledger_cf, hash)? {
569            Some(blob) => {
570                let record = LightBlock::read(&mut &blob[..])?;
571
572                // Retrieve all transactions buffers with single call
573                let txs_buffers = self.inner.multi_get_cf(
574                    record
575                        .transactions_ids
576                        .iter()
577                        .map(|id| (self.ledger_txs_cf, id))
578                        .collect::<Vec<(&ColumnFamily, &[u8; 32])>>(),
579                );
580
581                let mut txs = vec![];
582                for buf in txs_buffers {
583                    let buf = buf?.unwrap();
584                    let mut tx = SpentTransaction::read(&mut &buf[..])?;
585                    if let Some(blobs) = tx.inner.inner.blob_mut() {
586                        for blob in blobs {
587                            // Retrieve blob data from the ledger
588                            let sidecar = self
589                                .blob_data_by_hash(&blob.hash)?
590                                .map(|bytes| {
591                                    BlobSidecar::from_buf(&mut &bytes[..])
592                                })
593                                .transpose()
594                                .map_err(|e| {
595                                    anyhow::anyhow!(
596                                        "Failed to parse blob sidecar: {e:?}"
597                                    )
598                                })?;
599                            blob.data = sidecar;
600                        }
601                    }
602                    txs.push(tx.inner);
603                }
604
605                // Retrieve all faults ID with single call
606                let faults_buffer = self.inner.multi_get_cf(
607                    record
608                        .faults_ids
609                        .iter()
610                        .map(|id| (self.ledger_faults_cf, id))
611                        .collect::<Vec<(&ColumnFamily, &[u8; 32])>>(),
612                );
613                let mut faults = vec![];
614                for buf in faults_buffer {
615                    let buf = buf?.unwrap();
616                    let fault = Fault::read(&mut &buf[..])?;
617                    faults.push(fault);
618                }
619
620                Ok(Some(
621                    Block::new(record.header, txs, faults)
622                        .expect("block should be valid"),
623                ))
624            }
625            None => Ok(None),
626        }
627    }
628
629    fn light_block(&self, hash: &[u8]) -> Result<Option<LightBlock>> {
630        match self.inner.get_cf(self.ledger_cf, hash)? {
631            Some(blob) => {
632                let record = LightBlock::read(&mut &blob[..])?;
633                Ok(Some(record))
634            }
635            None => Ok(None),
636        }
637    }
638
639    fn block_header(&self, hash: &[u8]) -> Result<Option<Header>> {
640        match self.inner.get_cf(self.ledger_cf, hash)? {
641            Some(blob) => {
642                let record = Header::read(&mut &blob[..])?;
643                Ok(Some(record))
644            }
645            None => Ok(None),
646        }
647    }
648
649    fn block_hash_by_height(&self, height: u64) -> Result<Option<[u8; 32]>> {
650        Ok(self
651            .inner
652            .get_cf(self.ledger_height_cf, height.to_le_bytes())?
653            .map(|h| {
654                const LEN: usize = 32;
655                let mut hash = [0u8; LEN];
656                hash.copy_from_slice(&h.as_slice()[0..LEN]);
657                hash
658            }))
659    }
660
661    fn ledger_tx(&self, tx_id: &[u8]) -> Result<Option<SpentTransaction>> {
662        let tx = self
663            .inner
664            .get_cf(self.ledger_txs_cf, tx_id)?
665            .map(|blob| SpentTransaction::read(&mut &blob[..]))
666            .transpose()?;
667
668        Ok(tx)
669    }
670
671    /// Returns a list of transactions from the ledger
672    ///
673    /// This function expects a list of transaction IDs that are in the ledger.
674    ///
675    /// It will return an error if any of the transaction IDs are not found in
676    /// the ledger.
677    fn ledger_txs(
678        &self,
679        tx_ids: Vec<&[u8; 32]>,
680    ) -> Result<Vec<SpentTransaction>> {
681        let cf = self.ledger_txs_cf;
682
683        let ids = tx_ids.into_iter().map(|id| (cf, id)).collect::<Vec<_>>();
684
685        let multi_get_results = self.inner.multi_get_cf(ids);
686
687        let mut spent_transactions =
688            Vec::with_capacity(multi_get_results.len());
689        for result in multi_get_results.into_iter() {
690            let opt_blob = result.map_err(std::io::Error::other)?;
691
692            let Some(blob) = opt_blob else {
693                return Err(anyhow::anyhow!(
694                    "At least one Transaction ID was not found"
695                ));
696            };
697
698            let stx = SpentTransaction::read(&mut &blob[..])?;
699
700            spent_transactions.push(stx);
701        }
702
703        Ok(spent_transactions)
704    }
705
706    /// Returns true if the transaction exists in the
707    /// ledger
708    ///
709    /// This is a convenience method that checks if a transaction exists in the
710    /// ledger without unmarshalling the transaction
711    fn ledger_tx_exists(&self, tx_id: &[u8]) -> Result<bool> {
712        Ok(self.inner.get_cf(self.ledger_txs_cf, tx_id)?.is_some())
713    }
714
715    fn block_by_height(&self, height: u64) -> Result<Option<Block>> {
716        let hash = self.block_hash_by_height(height)?;
717        let block = match hash {
718            Some(hash) => self.block(&hash)?,
719            None => None,
720        };
721        Ok(block)
722    }
723
724    fn block_label_by_height(
725        &self,
726        height: u64,
727    ) -> Result<Option<([u8; 32], Label)>> {
728        const HASH_LEN: usize = 32;
729        Ok(self
730            .inner
731            .get_cf(self.ledger_height_cf, height.to_le_bytes())?
732            .map(|h| {
733                let mut hash = [0u8; HASH_LEN];
734                hash.copy_from_slice(&h.as_slice()[0..HASH_LEN]);
735
736                let label_buff = h[HASH_LEN..].to_vec();
737                Label::read(&mut &label_buff[..]).map(|label| (hash, label))
738            })
739            .transpose()?)
740    }
741}
742
743/// Implementation of the `Candidate` trait for `DBTransaction<'db, DB>`.
744impl<DB: DBAccess> ConsensusStorage for DBTransaction<'_, DB> {
745    /// Stores a candidate block in the database.
746    ///
747    /// # Arguments
748    ///
749    /// * `b` - The block to store.
750    ///
751    /// # Returns
752    ///
753    /// Returns `Ok(())` if the block is successfully stored, or an error if the
754    /// operation fails.
755    fn store_candidate(&mut self, b: Block) -> Result<()> {
756        let mut serialized = vec![];
757        b.write(&mut serialized)?;
758
759        self.inner
760            .put_cf(self.candidates_cf, b.header().hash, serialized)?;
761
762        let key = serialize_key(b.header().height, b.header().hash)?;
763        self.inner
764            .put_cf(self.candidates_height_cf, key, b.header().hash)?;
765
766        Ok(())
767    }
768
769    /// Fetches a candidate block from the database.
770    ///
771    /// # Arguments
772    ///
773    /// * `hash` - The hash of the block to fetch.
774    ///
775    /// # Returns
776    ///
777    /// Returns `Ok(Some(block))` if the block is found, `Ok(None)` if the block
778    /// is not found, or an error if the operation fails.
779    fn candidate(&self, hash: &[u8]) -> Result<Option<Block>> {
780        if let Some(blob) = self.inner.get_cf(self.candidates_cf, hash)? {
781            let b = Block::read(&mut &blob[..])?;
782            return Ok(Some(b));
783        }
784
785        // Block not found
786        Ok(None)
787    }
788
789    fn candidate_by_iteration(
790        &self,
791        consensus_header: &ConsensusHeader,
792    ) -> Result<Option<Block>> {
793        let iter = self
794            .inner
795            .iterator_cf(self.candidates_cf, IteratorMode::Start);
796
797        for (_, blob) in iter.map(Result::unwrap) {
798            let b = Block::read(&mut &blob[..])?;
799
800            let header = b.header();
801            if header.prev_block_hash == consensus_header.prev_block_hash
802                && header.iteration == consensus_header.iteration
803            {
804                return Ok(Some(b));
805            }
806        }
807
808        Ok(None)
809    }
810
811    /// Deletes candidate-related items from the database based on a closure.
812    ///
813    /// # Arguments
814    ///
815    /// * `closure` - If the closure returns `true`, the block will be deleted.
816    ///
817    /// # Returns
818    ///
819    /// Returns `Ok(())` if the deletion is successful, or an error if the
820    /// operation fails.
821    fn delete_candidate<F>(&mut self, closure: F) -> Result<()>
822    where
823        F: FnOnce(u64) -> bool + std::marker::Copy,
824    {
825        let iter = self
826            .inner
827            .iterator_cf(self.candidates_height_cf, IteratorMode::Start);
828
829        for (key, hash) in iter.map(Result::unwrap) {
830            let (height, _) = deserialize_key(&mut &key.to_vec()[..])?;
831            if closure(height) {
832                self.inner.delete_cf(self.candidates_cf, hash)?;
833                self.inner.delete_cf(self.candidates_height_cf, key)?;
834            }
835        }
836
837        Ok(())
838    }
839
840    fn count_candidates(&self) -> usize {
841        let iter = self
842            .inner
843            .iterator_cf(self.candidates_height_cf, IteratorMode::Start);
844
845        iter.count()
846    }
847
848    /// Deletes all items from the `CF_CANDIDATES` column family.
849    ///
850    /// # Returns
851    ///
852    /// Returns `Ok(())` if the deletion is successful, or an error if the
853    /// operation fails.
854    fn clear_candidates(&mut self) -> Result<()> {
855        self.delete_candidate(|_| true)
856    }
857
858    /// Stores a ValidationResult in the database.
859    ///
860    /// # Arguments
861    ///
862    /// * `vr` - The ValidationResult to store.
863    ///
864    /// # Returns
865    ///
866    /// Returns `Ok(())` if the ValidationResult is successfully stored, or an
867    /// error if the operation fails.
868    fn store_validation_result(
869        &mut self,
870        consensus_header: &ConsensusHeader,
871        validation_result: &payload::ValidationResult,
872    ) -> Result<()> {
873        let mut serialized = vec![];
874        validation_result.write(&mut serialized)?;
875
876        let key = serialize_iter_key(consensus_header)?;
877        self.inner
878            .put_cf(self.validation_results_cf, key, serialized)?;
879
880        Ok(())
881    }
882
883    /// Fetches a ValidationResult from the database.
884    ///
885    /// # Arguments
886    ///
887    /// * `consensus_header` - The ConsensusHeader of the ValidationResult.
888    ///
889    /// # Returns
890    ///
891    /// Returns `Ok(Some(ValidationResult))` if the ValidationResult is found,
892    /// `Ok(None)` if the ValidationResult is not found, or an error if the
893    /// operation fails.
894    fn validation_result(
895        &self,
896        consensus_header: &ConsensusHeader,
897    ) -> Result<Option<payload::ValidationResult>> {
898        let key = serialize_iter_key(consensus_header)?;
899        if let Some(blob) =
900            self.inner.get_cf(self.validation_results_cf, key)?
901        {
902            let validation_result =
903                payload::ValidationResult::read(&mut &blob[..])?;
904            return Ok(Some(validation_result));
905        }
906
907        // ValidationResult not found
908        Ok(None)
909    }
910
911    /// Deletes ValidationResult items from the database based on a closure.
912    ///
913    /// # Arguments
914    ///
915    /// * `closure` - If the closure returns `true`, the ValidationResult will
916    ///   be deleted.
917    ///
918    /// # Returns
919    ///
920    /// Returns `Ok(())` if the deletion is successful, or an error if the
921    /// operation fails.
922    fn delete_validation_results<F>(&mut self, closure: F) -> Result<()>
923    where
924        F: FnOnce([u8; 32]) -> bool + std::marker::Copy,
925    {
926        let iter = self
927            .inner
928            .iterator_cf(self.validation_results_cf, IteratorMode::Start);
929
930        for (key, _) in iter.map(Result::unwrap) {
931            let (prev_block_hash, _) =
932                deserialize_iter_key(&mut &key.to_vec()[..])?;
933            if closure(prev_block_hash) {
934                self.inner.delete_cf(self.validation_results_cf, key)?;
935            }
936        }
937
938        Ok(())
939    }
940
941    fn count_validation_results(&self) -> usize {
942        let iter = self
943            .inner
944            .iterator_cf(self.validation_results_cf, IteratorMode::Start);
945
946        iter.count()
947    }
948
949    /// Deletes all items from the `CF_VALIDATION_RESULTS` column family.
950    ///
951    /// # Returns
952    ///
953    /// Returns `Ok(())` if the deletion is successful, or an error if the
954    /// operation fails.
955    fn clear_validation_results(&mut self) -> Result<()> {
956        self.delete_validation_results(|_| true)
957    }
958}
959
960impl<DB: DBAccess> Persist for DBTransaction<'_, DB> {
961    /// Deletes all items from both CF_LEDGER and CF_CANDIDATES column families
962    fn clear_database(&mut self) -> Result<()> {
963        // Create an iterator over the column family CF_LEDGER
964        let iter = self.inner.iterator_cf(self.ledger_cf, IteratorMode::Start);
965
966        // Iterate through the CF_LEDGER column family and delete all items
967        for (key, _) in iter.map(Result::unwrap) {
968            self.inner.delete_cf(self.ledger_cf, key)?;
969        }
970
971        self.clear_candidates()?;
972        self.clear_validation_results()?;
973        Ok(())
974    }
975
976    fn commit(self) -> Result<()> {
977        if let Err(e) = self.inner.commit() {
978            return Err(anyhow::Error::new(e).context("failed to commit"));
979        }
980
981        Ok(())
982    }
983
984    fn rollback(self) -> Result<()> {
985        if let Err(e) = self.inner.rollback() {
986            return Err(anyhow::Error::new(e).context("failed to rollback"));
987        }
988
989        Ok(())
990    }
991}
992
993impl<DB: DBAccess> Mempool for DBTransaction<'_, DB> {
994    fn store_mempool_tx(
995        &mut self,
996        tx: &Transaction,
997        timestamp: u64,
998    ) -> Result<()> {
999        // Map Hash to serialized transaction
1000        let mut tx_data = vec![];
1001        tx.write(&mut tx_data)?;
1002
1003        let hash = tx.id();
1004        self.put_cf(self.mempool_cf, hash, tx_data)?;
1005
1006        // Add Secondary indexes //
1007        // Spending Ids
1008        for n in tx.to_spend_ids() {
1009            let key = n.to_bytes();
1010            self.put_cf(self.spending_id_cf, key, hash)?;
1011        }
1012
1013        let timestamp = timestamp.to_be_bytes();
1014
1015        // Map Fee_Hash to Timestamp
1016        // Key pair is used to facilitate sort-by-fee
1017        // Also, the timestamp is used to remove expired transactions
1018        self.put_cf(
1019            self.fees_cf,
1020            serialize_key(tx.gas_price(), hash)?,
1021            timestamp,
1022        )?;
1023
1024        Ok(())
1025    }
1026
1027    fn mempool_tx(&self, hash: [u8; 32]) -> Result<Option<Transaction>> {
1028        let data = self.inner.get_cf(self.mempool_cf, hash)?;
1029
1030        match data {
1031            // None has a meaning key not found
1032            None => Ok(None),
1033            Some(blob) => Ok(Some(Transaction::read(&mut &blob.to_vec()[..])?)),
1034        }
1035    }
1036
1037    fn mempool_tx_exists(&self, h: [u8; 32]) -> Result<bool> {
1038        Ok(self.inner.get_cf(self.mempool_cf, h)?.is_some())
1039    }
1040
1041    fn delete_mempool_tx(
1042        &mut self,
1043        h: [u8; 32],
1044        cascade: bool,
1045    ) -> Result<Vec<[u8; 32]>> {
1046        let mut deleted = vec![];
1047        let tx = self.mempool_tx(h)?;
1048        if let Some(tx) = tx {
1049            let hash = tx.id();
1050
1051            self.inner.delete_cf(self.mempool_cf, hash)?;
1052
1053            // Delete Secondary indexes
1054            // Delete spendingids (nullifiers or nonce)
1055            for n in tx.to_spend_ids() {
1056                let key = n.to_bytes();
1057                self.inner.delete_cf(self.spending_id_cf, key)?;
1058            }
1059
1060            // Delete Fee_Hash
1061            self.inner.delete_cf(
1062                self.fees_cf,
1063                serialize_key(tx.gas_price(), hash)?,
1064            )?;
1065
1066            deleted.push(h);
1067
1068            if cascade {
1069                let mut dependants = vec![];
1070                // Get the next spending id (aka next nonce tx)
1071                // retrieve tx_id and delete it
1072                let mut next_spending_id = tx.next_spending_id();
1073                while let Some(spending_id) = next_spending_id {
1074                    next_spending_id = spending_id.next();
1075                    let next_txs =
1076                        self.mempool_txs_by_spendable_ids(&[spending_id]);
1077                    if next_txs.is_empty() {
1078                        break;
1079                    }
1080                    dependants.extend(next_txs);
1081                }
1082
1083                // delete all dependants
1084                for tx_id in dependants {
1085                    let cascade_deleted =
1086                        self.delete_mempool_tx(tx_id, false)?;
1087                    deleted.extend(cascade_deleted);
1088                }
1089            }
1090        }
1091
1092        Ok(deleted)
1093    }
1094
1095    fn mempool_txs_by_spendable_ids(
1096        &self,
1097        n: &[SpendingId],
1098    ) -> HashSet<[u8; 32]> {
1099        n.iter()
1100            .filter_map(|n| {
1101                match self.inner.get_cf(self.spending_id_cf, n.to_bytes()) {
1102                    Ok(Some(tx_id)) => tx_id.try_into().ok(),
1103                    _ => None,
1104                }
1105            })
1106            .collect()
1107    }
1108
1109    fn mempool_txs_sorted_by_fee(
1110        &self,
1111    ) -> Box<dyn Iterator<Item = Transaction> + '_> {
1112        let iter = MemPoolIterator::new(&self.inner, self.fees_cf, self);
1113
1114        Box::new(iter)
1115    }
1116
1117    fn mempool_txs_ids_sorted_by_fee(
1118        &self,
1119    ) -> Box<dyn Iterator<Item = (u64, [u8; 32])> + '_> {
1120        let iter = MemPoolFeeIterator::new(&self.inner, self.fees_cf, true);
1121
1122        Box::new(iter)
1123    }
1124
1125    fn mempool_txs_ids_sorted_by_low_fee(
1126        &self,
1127    ) -> Box<dyn Iterator<Item = (u64, [u8; 32])> + '_> {
1128        let iter = MemPoolFeeIterator::new(&self.inner, self.fees_cf, false);
1129
1130        Box::new(iter)
1131    }
1132
1133    /// Get all expired transactions hashes.
1134    fn mempool_expired_txs(&self, timestamp: u64) -> Result<Vec<[u8; 32]>> {
1135        let mut iter = self.inner.raw_iterator_cf(self.fees_cf);
1136        iter.seek_to_first();
1137        let mut txs_list = vec![];
1138
1139        while iter.valid() {
1140            if let Some(key) = iter.key() {
1141                let (_, tx_id) = deserialize_key(&mut &key.to_vec()[..])?;
1142
1143                let tx_timestamp = u64::from_be_bytes(
1144                    iter.value()
1145                        .ok_or_else(|| {
1146                            io::Error::new(
1147                                io::ErrorKind::InvalidData,
1148                                "no value",
1149                            )
1150                        })?
1151                        .try_into()
1152                        .map_err(|_| {
1153                            io::Error::new(
1154                                io::ErrorKind::InvalidData,
1155                                "invalid data",
1156                            )
1157                        })?,
1158                );
1159
1160                if tx_timestamp <= timestamp {
1161                    txs_list.push(tx_id);
1162                }
1163            }
1164
1165            iter.next();
1166        }
1167
1168        Ok(txs_list)
1169    }
1170
1171    fn mempool_txs_ids(&self) -> Result<Vec<[u8; 32]>> {
1172        let mut iter = self.inner.raw_iterator_cf(self.fees_cf);
1173        iter.seek_to_last();
1174
1175        let mut txs_list = vec![];
1176
1177        // Iterate all keys from the end in reverse lexicographic order
1178        while iter.valid() {
1179            if let Some(key) = iter.key() {
1180                let (_, tx_id) = deserialize_key(&mut &key.to_vec()[..])?;
1181
1182                txs_list.push(tx_id);
1183            }
1184
1185            iter.prev();
1186        }
1187
1188        Ok(txs_list)
1189    }
1190
1191    fn mempool_txs_count(&self) -> usize {
1192        self.inner
1193            .iterator_cf(self.mempool_cf, IteratorMode::Start)
1194            .count()
1195    }
1196}
1197
1198pub struct MemPoolIterator<'db, DB: DBAccess, M: Mempool> {
1199    iter: MemPoolFeeIterator<'db, DB>,
1200    mempool: &'db M,
1201}
1202
1203impl<'db, DB: DBAccess, M: Mempool> MemPoolIterator<'db, DB, M> {
1204    fn new(
1205        db: &'db rocksdb::Transaction<DB>,
1206        fees_cf: &ColumnFamily,
1207        mempool: &'db M,
1208    ) -> Self {
1209        let iter = MemPoolFeeIterator::new(db, fees_cf, true);
1210        MemPoolIterator { iter, mempool }
1211    }
1212}
1213
1214impl<DB: DBAccess, M: Mempool> Iterator for MemPoolIterator<'_, DB, M> {
1215    type Item = Transaction;
1216    fn next(&mut self) -> Option<Self::Item> {
1217        self.iter.next().and_then(|(_, tx_id)| {
1218            self.mempool.mempool_tx(tx_id).ok().flatten()
1219        })
1220    }
1221}
1222
1223pub struct MemPoolFeeIterator<'db, DB: DBAccess> {
1224    iter: DBRawIteratorWithThreadMode<'db, rocksdb::Transaction<'db, DB>>,
1225    fee_desc: bool,
1226}
1227
1228impl<'db, DB: DBAccess> MemPoolFeeIterator<'db, DB> {
1229    fn new(
1230        db: &'db rocksdb::Transaction<DB>,
1231        fees_cf: &ColumnFamily,
1232        fee_desc: bool,
1233    ) -> Self {
1234        let mut iter = db.raw_iterator_cf(fees_cf);
1235        if fee_desc {
1236            iter.seek_to_last();
1237        };
1238        MemPoolFeeIterator { iter, fee_desc }
1239    }
1240}
1241
1242impl<DB: DBAccess> Iterator for MemPoolFeeIterator<'_, DB> {
1243    type Item = (u64, [u8; 32]);
1244    fn next(&mut self) -> Option<Self::Item> {
1245        match self.iter.valid() {
1246            true => {
1247                if let Some(key) = self.iter.key() {
1248                    let (gas_price, hash) =
1249                        deserialize_key(&mut &key.to_vec()[..]).ok()?;
1250                    if self.fee_desc {
1251                        self.iter.prev();
1252                    } else {
1253                        self.iter.next();
1254                    }
1255                    Some((gas_price, hash))
1256                } else {
1257                    None
1258                }
1259            }
1260            false => None,
1261        }
1262    }
1263}
1264
1265impl<DB: DBAccess> std::fmt::Debug for DBTransaction<'_, DB> {
1266    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1267        //  Print ledger blocks
1268        let iter = self.inner.iterator_cf(self.ledger_cf, IteratorMode::Start);
1269
1270        iter.map(Result::unwrap).try_for_each(|(hash, _)| {
1271            if let Ok(Some(blob)) = self.inner.get_cf(self.ledger_cf, &hash[..])
1272            {
1273                let b = Block::read(&mut &blob[..]).unwrap_or_default();
1274                writeln!(f, "ledger_block [{}]: {:#?}", b.header().height, b)
1275            } else {
1276                Ok(())
1277            }
1278        })?;
1279
1280        // Print candidate blocks
1281        let iter = self
1282            .inner
1283            .iterator_cf(self.candidates_cf, IteratorMode::Start);
1284
1285        let results: std::fmt::Result =
1286            iter.map(Result::unwrap).try_for_each(|(hash, _)| {
1287                if let Ok(Some(blob)) =
1288                    self.inner.get_cf(self.candidates_cf, &hash[..])
1289                {
1290                    let b = Block::read(&mut &blob[..]).unwrap_or_default();
1291                    writeln!(
1292                        f,
1293                        "candidate_block [{}]: {:#?}",
1294                        b.header().height,
1295                        b
1296                    )
1297                } else {
1298                    Ok(())
1299                }
1300            });
1301
1302        results
1303    }
1304}
1305
1306impl<DB: DBAccess> Metadata for DBTransaction<'_, DB> {
1307    fn op_write<T: AsRef<[u8]>>(&mut self, key: &[u8], value: T) -> Result<()> {
1308        self.put_cf(self.metadata_cf, key, value)?;
1309        Ok(())
1310    }
1311
1312    fn op_read(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1313        self.inner.get_cf(self.metadata_cf, key).map_err(Into::into)
1314    }
1315}
1316
1317impl<DB: DBAccess> DBTransaction<'_, DB> {
1318    /// A thin wrapper around inner.put_cf that calculates a db transaction
1319    /// disk footprint
1320    fn put_cf<K: AsRef<[u8]>, V: AsRef<[u8]>>(
1321        &self,
1322        cf: &impl AsColumnFamilyRef,
1323        key: K,
1324        value: V,
1325    ) -> Result<()> {
1326        let kv_size = key.as_ref().len() + value.as_ref().len();
1327        self.inner.put_cf(cf, key, value)?;
1328        *self.cumulative_inner_size.borrow_mut() += kv_size;
1329        Ok(())
1330    }
1331
1332    pub fn get_size(&self) -> usize {
1333        *self.cumulative_inner_size.borrow()
1334    }
1335}
1336
1337fn serialize_key(value: u64, hash: [u8; 32]) -> std::io::Result<Vec<u8>> {
1338    let mut w = vec![];
1339    std::io::Write::write_all(&mut w, &value.to_be_bytes())?;
1340    std::io::Write::write_all(&mut w, &hash)?;
1341    Ok(w)
1342}
1343
1344fn deserialize_key<R: Read>(r: &mut R) -> Result<(u64, [u8; 32])> {
1345    let mut buf = [0u8; 8];
1346    r.read_exact(&mut buf)?;
1347    let value = u64::from_be_bytes(buf);
1348    let mut hash = [0u8; 32];
1349    r.read_exact(&mut hash[..])?;
1350
1351    Ok((value, hash))
1352}
1353
1354fn serialize_iter_key(ch: &ConsensusHeader) -> std::io::Result<Vec<u8>> {
1355    let mut w = vec![];
1356    std::io::Write::write_all(&mut w, &ch.prev_block_hash)?;
1357    std::io::Write::write_all(&mut w, &[ch.iteration])?;
1358    Ok(w)
1359}
1360
1361fn deserialize_iter_key<R: Read>(r: &mut R) -> Result<([u8; 32], u8)> {
1362    let mut prev_block_hash = [0u8; 32];
1363    r.read_exact(&mut prev_block_hash)?;
1364
1365    let mut iter_byte = [0u8; 1];
1366    r.read_exact(&mut iter_byte)?;
1367    let iteration = u8::from_be_bytes(iter_byte);
1368
1369    Ok((prev_block_hash, iteration))
1370}
1371
1372impl node_data::Serializable for LightBlock {
1373    fn write<W: Write>(&self, w: &mut W) -> io::Result<()> {
1374        // Write block header
1375        self.header.write(w)?;
1376
1377        // Write transactions count
1378        let len = self.transactions_ids.len() as u32;
1379        w.write_all(&len.to_le_bytes())?;
1380
1381        // Write transactions hashes
1382        for tx_id in &self.transactions_ids {
1383            w.write_all(tx_id)?;
1384        }
1385
1386        // Write faults count
1387        let len = self.faults_ids.len() as u32;
1388        w.write_all(&len.to_le_bytes())?;
1389
1390        // Write faults id
1391        for f_id in &self.faults_ids {
1392            w.write_all(f_id)?;
1393        }
1394
1395        Ok(())
1396    }
1397
1398    fn read<R: Read>(r: &mut R) -> io::Result<Self>
1399    where
1400        Self: Sized,
1401    {
1402        // Read block header
1403        let header = Header::read(r)?;
1404
1405        // Read transactions count
1406        let len = Self::read_u32_le(r)?;
1407
1408        // Read transactions hashes
1409        let mut transactions_ids = vec![];
1410        for _ in 0..len {
1411            let mut tx_id = [0u8; 32];
1412            r.read_exact(&mut tx_id[..])?;
1413
1414            transactions_ids.push(tx_id);
1415        }
1416
1417        // Read faults count
1418        let len = Self::read_u32_le(r)?;
1419
1420        // Read faults ids
1421        let mut faults_ids = vec![];
1422        for _ in 0..len {
1423            let mut f_id = [0u8; 32];
1424            r.read_exact(&mut f_id[..])?;
1425
1426            faults_ids.push(f_id);
1427        }
1428
1429        Ok(Self {
1430            header,
1431            transactions_ids,
1432            faults_ids,
1433        })
1434    }
1435}
1436
1437#[cfg(test)]
1438mod tests {
1439    use fake::{Fake, Faker};
1440    use node_data::ledger;
1441
1442    use super::*;
1443
1444    #[test]
1445    fn test_store_block() {
1446        TestWrapper::new("test_store_block").run(|path| {
1447            let db = Backend::create_or_open(path, DatabaseOptions::default());
1448
1449            let b: Block = Faker.fake();
1450            assert!(!b.txs().is_empty());
1451
1452            let hash = b.header().hash;
1453
1454            assert!(
1455                db.update(|txn| {
1456                    txn.store_block(
1457                        b.header(),
1458                        &to_spent_txs(b.txs()),
1459                        b.faults(),
1460                        Label::Final(3),
1461                    )?;
1462                    Ok(())
1463                })
1464                .is_ok()
1465            );
1466
1467            db.view(|txn| {
1468                // Assert block header is fully fetched from ledger
1469                let db_blk = txn
1470                    .block(&hash)
1471                    .expect("Block to be fetched")
1472                    .expect("Block to exist");
1473                assert_eq!(db_blk.header().hash, b.header().hash);
1474
1475                // Assert all transactions are fully fetched from ledger as
1476                // well.
1477                for pos in 0..b.txs().len() {
1478                    assert_eq!(db_blk.txs()[pos].id(), b.txs()[pos].id());
1479                }
1480
1481                // Assert all faults are fully fetched from ledger as
1482                // well.
1483                for pos in 0..b.faults().len() {
1484                    assert_eq!(db_blk.faults()[pos].id(), b.faults()[pos].id());
1485                }
1486            });
1487
1488            assert!(
1489                db.update(|txn| {
1490                    txn.clear_database()?;
1491                    Ok(())
1492                })
1493                .is_ok()
1494            );
1495
1496            db.view(|txn| {
1497                assert!(
1498                    txn.block(&hash).expect("block to be fetched").is_none()
1499                );
1500            });
1501        });
1502    }
1503
1504    #[test]
1505    fn test_read_only() {
1506        TestWrapper::new("test_read_only").run(|path| {
1507            let db = Backend::create_or_open(path, DatabaseOptions::default());
1508            let b: Block = Faker.fake();
1509            db.update_dry_run(true, |txn| {
1510                txn.store_block(
1511                    b.header(),
1512                    &to_spent_txs(b.txs()),
1513                    b.faults(),
1514                    Label::Final(3),
1515                )
1516            })
1517            .expect("block to be stored");
1518            db.view(|txn| {
1519                assert!(
1520                    txn.block(&b.header().hash)
1521                        .expect("block to be fetched")
1522                        .is_none()
1523                );
1524            });
1525        });
1526    }
1527
1528    #[test]
1529    fn test_transaction_isolation() {
1530        TestWrapper::new("test_transaction_isolation").run(|path| {
1531            let db = Backend::create_or_open(path, DatabaseOptions::default());
1532            let mut b: Block = Faker.fake();
1533            let hash = b.header().hash;
1534
1535            db.view(|txn| {
1536                // Simulate a concurrent update is committed during read-only
1537                // transaction
1538                assert!(
1539                    db.update(|inner| {
1540                        inner
1541                            .store_block(
1542                                b.header(),
1543                                &to_spent_txs(b.txs()),
1544                                b.faults(),
1545                                Label::Final(3),
1546                            )
1547                            .unwrap();
1548
1549                        // We support Read-Your-Own-Writes
1550                        assert!(inner.block(&hash)?.is_some());
1551                        // Data is isolated until the transaction is not
1552                        // committed
1553                        assert!(txn.block(&hash)?.is_none());
1554                        Ok(())
1555                    })
1556                    .is_ok()
1557                );
1558
1559                // Asserts that the read-only/view transaction get the updated
1560                // data after the tx is committed
1561                assert!(
1562                    txn.block(&hash).expect("block to be fetched").is_some()
1563                );
1564            });
1565
1566            // Asserts that update was done
1567            db.view(|txn| {
1568                assert_blocks_eq(
1569                    &mut txn
1570                        .block(&hash)
1571                        .expect("block to be fetched")
1572                        .unwrap(),
1573                    &mut b,
1574                );
1575            });
1576        });
1577    }
1578
1579    fn assert_blocks_eq(a: &Block, b: &Block) {
1580        assert!(a.header().hash != [0u8; 32]);
1581        assert!(a.header().hash.eq(&b.header().hash));
1582    }
1583
1584    #[test]
1585    fn test_add_mempool_tx() {
1586        TestWrapper::new("test_add_tx").run(|path| {
1587            let db = Backend::create_or_open(path, DatabaseOptions::default());
1588            let t: Transaction = Faker.fake();
1589
1590            assert!(db.update(|txn| { txn.store_mempool_tx(&t, 0) }).is_ok());
1591
1592            db.view(|vq| {
1593                assert!(vq.mempool_tx_exists(t.id()).unwrap());
1594
1595                let fetched_tx = vq
1596                    .mempool_tx(t.id())
1597                    .expect("valid contract call")
1598                    .unwrap();
1599
1600                assert_eq!(
1601                    fetched_tx.id(),
1602                    t.id(),
1603                    "fetched transaction should be the same"
1604                );
1605            });
1606
1607            // Delete a contract call
1608            db.update(|txn| {
1609                let deleted =
1610                    txn.delete_mempool_tx(t.id(), false).expect("valid tx");
1611                assert!(deleted.len() == 1);
1612                Ok(())
1613            })
1614            .unwrap();
1615        });
1616    }
1617
1618    #[test]
1619    fn test_mempool_txs_sorted_by_fee() {
1620        TestWrapper::new("test_mempool_txs_sorted_by_fee").run(|path| {
1621            let db = Backend::create_or_open(path, DatabaseOptions::default());
1622            // Populate mempool with N contract calls
1623            let _rng = rand::thread_rng();
1624            db.update(|txn| {
1625                for _i in 0..10u32 {
1626                    let t: Transaction = Faker.fake();
1627                    txn.store_mempool_tx(&t, 0)?;
1628                }
1629                Ok(())
1630            })
1631            .unwrap();
1632
1633            db.view(|txn| {
1634                let txs = txn.mempool_txs_sorted_by_fee();
1635
1636                let mut last_fee = u64::MAX;
1637                for t in txs {
1638                    let fee = t.gas_price();
1639                    assert!(
1640                        fee <= last_fee,
1641                        "tx fees are not in decreasing order"
1642                    );
1643                    last_fee = fee
1644                }
1645                assert_ne!(last_fee, u64::MAX, "No tx has been processed")
1646            });
1647        });
1648    }
1649
1650    #[test]
1651    fn test_txs_count() {
1652        TestWrapper::new("test_txs_count").run(|path| {
1653            let db = Backend::create_or_open(path, DatabaseOptions::default());
1654
1655            const N: usize = 100;
1656            const D: usize = 50;
1657
1658            let txs: Vec<_> = (0..N)
1659                .map(|i| ledger::faker::gen_dummy_tx(i as u64))
1660                .collect();
1661
1662            db.update(|db| {
1663                assert_eq!(db.mempool_txs_count(), 0);
1664                txs.iter().for_each(|t| {
1665                    db.store_mempool_tx(&t, 0).expect("tx should be added")
1666                });
1667                Ok(())
1668            })
1669            .unwrap();
1670
1671            db.update(|db| {
1672                // Ensure txs count is equal to the number of added tx
1673                assert_eq!(db.mempool_txs_count(), N);
1674
1675                txs.iter().take(D).for_each(|tx| {
1676                    let deleted = db
1677                        .delete_mempool_tx(tx.id(), false)
1678                        .expect("transaction should be deleted");
1679                    assert!(deleted.len() == 1);
1680                });
1681
1682                Ok(())
1683            })
1684            .unwrap();
1685
1686            // Ensure txs count is updated after the deletion
1687            db.update(|db| {
1688                assert_eq!(db.mempool_txs_count(), N - D);
1689                Ok(())
1690            })
1691            .unwrap();
1692        });
1693    }
1694
1695    #[test]
1696    fn test_max_gas_limit() {
1697        TestWrapper::new("test_block_size_limit").run(|path| {
1698            let db = Backend::create_or_open(path, DatabaseOptions::default());
1699
1700            db.update(|txn| {
1701                for i in 0..10u32 {
1702                    let t = ledger::faker::gen_dummy_tx(i as u64);
1703                    txn.store_mempool_tx(&t, 0)?;
1704                }
1705                Ok(())
1706            })
1707            .unwrap();
1708
1709            let total_gas_price: u64 = 9 + 8 + 7 + 6 + 5 + 4 + 3 + 2 + 1;
1710            db.view(|txn| {
1711                let txs = txn
1712                    .mempool_txs_sorted_by_fee()
1713                    .map(|t| t.gas_price())
1714                    .sum::<u64>();
1715
1716                assert_eq!(txs, total_gas_price);
1717            });
1718        });
1719    }
1720
1721    #[test]
1722    fn test_get_expired_txs() {
1723        TestWrapper::new("test_get_expired_txs").run(|path| {
1724            let db = Backend::create_or_open(path, DatabaseOptions::default());
1725
1726            let mut expiry_list = HashSet::new();
1727            let _ = db.update(|txn| {
1728                (1..101).for_each(|i| {
1729                    let t = ledger::faker::gen_dummy_tx(i as u64);
1730                    txn.store_mempool_tx(&t, i).expect("tx should be added");
1731                    expiry_list.insert(t.id());
1732                });
1733
1734                (1000..1100).for_each(|i| {
1735                    let t = ledger::faker::gen_dummy_tx(i as u64);
1736                    txn.store_mempool_tx(&t, i).expect("tx should be added");
1737                });
1738
1739                Ok(())
1740            });
1741
1742            db.view(|vq| {
1743                let expired: HashSet<_> = vq
1744                    .mempool_expired_txs(100)
1745                    .unwrap()
1746                    .into_iter()
1747                    .map(|id| id)
1748                    .collect();
1749
1750                assert_eq!(expiry_list, expired);
1751            });
1752        });
1753    }
1754
1755    fn to_spent_txs(txs: &Vec<Transaction>) -> Vec<SpentTransaction> {
1756        txs.iter()
1757            .map(|t| SpentTransaction {
1758                inner: t.clone(),
1759                block_height: 0,
1760                gas_spent: 0,
1761                err: None,
1762            })
1763            .collect()
1764    }
1765
1766    #[test]
1767    fn test_get_ledger_tx_by_hash() {
1768        TestWrapper::new("test_get_ledger_tx_by_hash").run(|path| {
1769            let db = Backend::create_or_open(path, DatabaseOptions::default());
1770            let b: Block = Faker.fake();
1771            assert!(!b.txs().is_empty());
1772
1773            // Store a block
1774            assert!(
1775                db.update(|txn| {
1776                    txn.store_block(
1777                        b.header(),
1778                        &to_spent_txs(b.txs()),
1779                        b.faults(),
1780                        Label::Final(3),
1781                    )?;
1782                    Ok(())
1783                })
1784                .is_ok()
1785            );
1786
1787            // Assert all transactions of the accepted (stored) block are
1788            // accessible by hash.
1789            db.view(|v| {
1790                for t in b.txs().iter() {
1791                    assert!(
1792                        v.ledger_tx(&t.id())
1793                            .expect("should not return error")
1794                            .expect("should find a transaction")
1795                            .inner
1796                            .eq(t)
1797                    );
1798                }
1799            });
1800        });
1801    }
1802
1803    #[test]
1804    fn test_fetch_block_hash_by_height() {
1805        TestWrapper::new("test_fetch_block_hash_by_height").run(|path| {
1806            let db = Backend::create_or_open(path, DatabaseOptions::default());
1807            let b: Block = Faker.fake();
1808
1809            // Store a block
1810            assert!(
1811                db.update(|txn| {
1812                    txn.store_block(
1813                        b.header(),
1814                        &to_spent_txs(b.txs()),
1815                        b.faults(),
1816                        Label::Attested(3),
1817                    )?;
1818                    Ok(())
1819                })
1820                .is_ok()
1821            );
1822
1823            // Assert block hash is accessible by height.
1824            db.view(|v| {
1825                assert!(
1826                    v.block_hash_by_height(b.header().height)
1827                        .expect("should not return error")
1828                        .expect("should find a block")
1829                        .eq(&b.header().hash)
1830                );
1831            });
1832        });
1833    }
1834
1835    #[test]
1836    fn test_fetch_block_label_by_height() {
1837        TestWrapper::new("test_fetch_block_hash_by_height").run(|path| {
1838            let db = Backend::create_or_open(path, DatabaseOptions::default());
1839            let b: Block = Faker.fake();
1840
1841            // Store a block
1842            assert!(
1843                db.update(|txn| {
1844                    txn.store_block(
1845                        b.header(),
1846                        &to_spent_txs(b.txs()),
1847                        b.faults(),
1848                        Label::Attested(3),
1849                    )?;
1850                    Ok(())
1851                })
1852                .is_ok()
1853            );
1854
1855            // Assert block hash is accessible by height.
1856            db.view(|v| {
1857                assert!(
1858                    v.block_label_by_height(b.header().height)
1859                        .expect("should not return error")
1860                        .expect("should find a block")
1861                        .1
1862                        .eq(&Label::Attested(3))
1863                );
1864            });
1865        });
1866    }
1867
1868    #[test]
1869    /// Ensures delete_block fn removes all keys of a single block
1870    fn test_delete_block() {
1871        let t = TestWrapper::new("test_fetch_block_hash_by_height");
1872        t.run(|path| {
1873            let db = Backend::create_or_open(path, DatabaseOptions::default());
1874            let b: ledger::Block = Faker.fake();
1875
1876            assert!(
1877                db.update(|ut| {
1878                    ut.store_block(
1879                        b.header(),
1880                        &to_spent_txs(b.txs()),
1881                        b.faults(),
1882                        Label::Final(3),
1883                    )?;
1884                    Ok(())
1885                })
1886                .is_ok()
1887            );
1888
1889            assert!(
1890                db.update(|ut| {
1891                    ut.delete_block(&b)?;
1892                    Ok(())
1893                })
1894                .is_ok()
1895            );
1896        });
1897
1898        let path = t.get_path();
1899        let opts = Options::default();
1900
1901        let vec = rocksdb::DB::list_cf(&opts, &path).unwrap();
1902        assert!(!vec.is_empty());
1903
1904        // Ensure no block fields leak after its deletion
1905        let db = rocksdb::DB::open_cf(&opts, &path, vec.clone()).unwrap();
1906        vec.into_iter()
1907            .map(|cf_name| {
1908                if cf_name == CF_METADATA {
1909                    return;
1910                }
1911
1912                let cf = db.cf_handle(&cf_name).unwrap();
1913                assert_eq!(
1914                    db.iterator_cf(cf, IteratorMode::Start)
1915                        .map(Result::unwrap)
1916                        .count(),
1917                    0
1918                );
1919            })
1920            .for_each(drop);
1921    }
1922
1923    struct TestWrapper(tempfile::TempDir);
1924
1925    impl TestWrapper {
1926        fn new(path: &'static str) -> Self {
1927            Self(
1928                tempfile::TempDir::with_prefix(path)
1929                    .expect("Temp directory to be created"),
1930            )
1931        }
1932
1933        pub fn run<F>(&self, test_func: F)
1934        where
1935            F: FnOnce(&Path),
1936        {
1937            test_func(self.0.path());
1938        }
1939
1940        pub fn get_path(&self) -> std::path::PathBuf {
1941            self.0.path().to_owned().join(DB_FOLDER_NAME)
1942        }
1943    }
1944}