dusk_node/database/
rocksdb.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4//
5// Copyright (c) DUSK NETWORK. All rights reserved.
6
7use std::cell::RefCell;
8use std::collections::HashSet;
9use std::io::{Read, Write};
10use std::path::Path;
11use std::sync::Arc;
12use std::{io, vec};
13
14use anyhow::Result;
15use dusk_core::transfer::data::BlobSidecar;
16use node_data::ledger::{
17    Block, Fault, Header, Label, SpendingId, SpentTransaction, Transaction,
18};
19use node_data::message::{payload, ConsensusHeader};
20use node_data::Serializable;
21use rocksdb::{
22    AsColumnFamilyRef, BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor,
23    DBAccess, DBRawIteratorWithThreadMode, IteratorMode, LogLevel,
24    OptimisticTransactionDB, OptimisticTransactionOptions, Options,
25    WriteOptions,
26};
27use tracing::info;
28
29use super::{
30    ConsensusStorage, DatabaseOptions, Ledger, LightBlock, Metadata, Persist,
31    DB,
32};
33use crate::database::Mempool;
34
35const CF_LEDGER_HEADER: &str = "cf_ledger_header";
36const CF_LEDGER_TXS: &str = "cf_ledger_txs";
37const CF_LEDGER_BLOBS: &str = "cf_ledger_blobs";
38const CF_LEDGER_BLOBS_HEIGHT: &str = "cf_ledger_blobs_height";
39const CF_LEDGER_FAULTS: &str = "cf_ledger_faults";
40const CF_LEDGER_HEIGHT: &str = "cf_ledger_height";
41const CF_CANDIDATES: &str = "cf_candidates";
42const CF_CANDIDATES_HEIGHT: &str = "cf_candidates_height";
43const CF_VALIDATION_RESULTS: &str = "cf_validation_results";
44const CF_MEMPOOL: &str = "cf_mempool";
45const CF_MEMPOOL_SPENDING_ID: &str = "cf_mempool_spending_id";
46const CF_MEMPOOL_FEES: &str = "cf_mempool_fees";
47const CF_METADATA: &str = "cf_metadata";
48
49const DB_FOLDER_NAME: &str = "chain.db";
50
51// List of supported metadata keys
52pub const MD_HASH_KEY: &[u8] = b"hash_key";
53pub const MD_STATE_ROOT_KEY: &[u8] = b"state_hash_key";
54pub const MD_AVG_VALIDATION: &[u8] = b"avg_validation_time";
55pub const MD_AVG_RATIFICATION: &[u8] = b"avg_ratification_time";
56pub const MD_AVG_PROPOSAL: &[u8] = b"avg_proposal_time";
57pub const MD_LAST_ITER: &[u8] = b"consensus_last_iter";
58
59#[derive(Clone)]
60pub struct Backend {
61    rocksdb: Arc<OptimisticTransactionDB>,
62}
63
64impl Backend {
65    fn begin_tx(&self) -> DBTransaction<'_, OptimisticTransactionDB> {
66        // Create a new RocksDB transaction
67        let write_options = WriteOptions::default();
68        let tx_options = OptimisticTransactionOptions::default();
69
70        let inner = self.rocksdb.transaction_opt(&write_options, &tx_options);
71
72        // Borrow column families
73        let ledger_cf = self
74            .rocksdb
75            .cf_handle(CF_LEDGER_HEADER)
76            .expect("ledger_header column family must exist");
77
78        let ledger_txs_cf = self
79            .rocksdb
80            .cf_handle(CF_LEDGER_TXS)
81            .expect("CF_LEDGER_TXS column family must exist");
82
83        let ledger_faults_cf = self
84            .rocksdb
85            .cf_handle(CF_LEDGER_FAULTS)
86            .expect("CF_LEDGER_FAULTS column family must exist");
87
88        let candidates_cf = self
89            .rocksdb
90            .cf_handle(CF_CANDIDATES)
91            .expect("candidates column family must exist");
92
93        let candidates_height_cf = self
94            .rocksdb
95            .cf_handle(CF_CANDIDATES_HEIGHT)
96            .expect("candidates_height column family must exist");
97
98        let validation_results_cf = self
99            .rocksdb
100            .cf_handle(CF_VALIDATION_RESULTS)
101            .expect("validation result column family must exist");
102
103        let mempool_cf = self
104            .rocksdb
105            .cf_handle(CF_MEMPOOL)
106            .expect("mempool column family must exist");
107
108        let spending_id_cf = self
109            .rocksdb
110            .cf_handle(CF_MEMPOOL_SPENDING_ID)
111            .expect("CF_MEMPOOL_SPENDING_ID column family must exist");
112
113        let fees_cf = self
114            .rocksdb
115            .cf_handle(CF_MEMPOOL_FEES)
116            .expect("CF_MEMPOOL_FEES column family must exist");
117
118        let ledger_height_cf = self
119            .rocksdb
120            .cf_handle(CF_LEDGER_HEIGHT)
121            .expect("CF_LEDGER_HEIGHT column family must exist");
122
123        let metadata_cf = self
124            .rocksdb
125            .cf_handle(CF_METADATA)
126            .expect("CF_METADATA column family must exist");
127
128        let ledger_blobs_cf = self
129            .rocksdb
130            .cf_handle(CF_LEDGER_BLOBS)
131            .expect("CF_LEDGER_BLOBS column family must exist");
132
133        let ledger_blobs_height_cf = self
134            .rocksdb
135            .cf_handle(CF_LEDGER_BLOBS_HEIGHT)
136            .expect("CF_LEDGER_BLOBS_HEIGHT column family must exist");
137
138        DBTransaction::<'_, OptimisticTransactionDB> {
139            inner,
140            candidates_cf,
141            candidates_height_cf,
142            validation_results_cf,
143            ledger_cf,
144            ledger_txs_cf,
145            ledger_faults_cf,
146            mempool_cf,
147            spending_id_cf,
148            fees_cf,
149            ledger_height_cf,
150            ledger_blobs_cf,
151            ledger_blobs_height_cf,
152            metadata_cf,
153            cumulative_inner_size: RefCell::new(0),
154        }
155    }
156}
157
158impl DB for Backend {
159    type P<'a> = DBTransaction<'a, OptimisticTransactionDB>;
160
161    fn create_or_open<T>(path: T, db_opts: DatabaseOptions) -> Self
162    where
163        T: AsRef<Path>,
164    {
165        let path = path.as_ref().join(DB_FOLDER_NAME);
166        info!("Opening database in {path:?}, {:?} ", db_opts);
167
168        // A set of options for initializing any blocks-related CF (including
169        // METADATA CF)
170        let mut blocks_cf_opts = Options::default();
171        blocks_cf_opts.create_if_missing(db_opts.create_if_missing);
172        blocks_cf_opts.create_missing_column_families(true);
173        blocks_cf_opts.set_level_compaction_dynamic_level_bytes(true);
174        blocks_cf_opts
175            .set_write_buffer_size(db_opts.blocks_cf_max_write_buffer_size);
176
177        if db_opts.enable_debug {
178            blocks_cf_opts.set_log_level(LogLevel::Info);
179            blocks_cf_opts.set_dump_malloc_stats(true);
180            blocks_cf_opts.enable_statistics();
181        }
182
183        if db_opts.blocks_cf_disable_block_cache {
184            let mut block_opts = BlockBasedOptions::default();
185            block_opts.disable_cache();
186            blocks_cf_opts.set_block_based_table_factory(&block_opts);
187        }
188
189        // Configure CF_MEMPOOL column family, so it benefits from low
190        // write-latency of L0
191        let mut mp_opts = blocks_cf_opts.clone();
192        // Disable WAL by default
193        mp_opts.set_manual_wal_flush(true);
194        mp_opts.create_if_missing(true);
195        mp_opts.create_missing_column_families(true);
196        mp_opts.set_write_buffer_size(db_opts.mempool_cf_max_write_buffer_size);
197
198        if db_opts.enable_debug {
199            mp_opts.set_log_level(LogLevel::Info);
200            mp_opts.set_dump_malloc_stats(true);
201            mp_opts.enable_statistics();
202        }
203
204        let cfs = vec![
205            ColumnFamilyDescriptor::new(
206                CF_LEDGER_HEADER,
207                blocks_cf_opts.clone(),
208            ),
209            ColumnFamilyDescriptor::new(CF_LEDGER_TXS, blocks_cf_opts.clone()),
210            ColumnFamilyDescriptor::new(
211                CF_LEDGER_FAULTS,
212                blocks_cf_opts.clone(),
213            ),
214            ColumnFamilyDescriptor::new(
215                CF_LEDGER_HEIGHT,
216                blocks_cf_opts.clone(),
217            ),
218            ColumnFamilyDescriptor::new(
219                CF_LEDGER_BLOBS,
220                blocks_cf_opts.clone(),
221            ),
222            ColumnFamilyDescriptor::new(
223                CF_LEDGER_BLOBS_HEIGHT,
224                blocks_cf_opts.clone(),
225            ),
226            ColumnFamilyDescriptor::new(CF_CANDIDATES, blocks_cf_opts.clone()),
227            ColumnFamilyDescriptor::new(
228                CF_CANDIDATES_HEIGHT,
229                blocks_cf_opts.clone(),
230            ),
231            ColumnFamilyDescriptor::new(
232                CF_VALIDATION_RESULTS,
233                blocks_cf_opts.clone(),
234            ),
235            ColumnFamilyDescriptor::new(CF_METADATA, blocks_cf_opts.clone()),
236            ColumnFamilyDescriptor::new(CF_MEMPOOL, mp_opts.clone()),
237            ColumnFamilyDescriptor::new(
238                CF_MEMPOOL_SPENDING_ID,
239                mp_opts.clone(),
240            ),
241            ColumnFamilyDescriptor::new(CF_MEMPOOL_FEES, mp_opts.clone()),
242        ];
243
244        Self {
245            rocksdb: Arc::new(
246                OptimisticTransactionDB::open_cf_descriptors(
247                    &blocks_cf_opts,
248                    &path,
249                    cfs,
250                )
251                .unwrap_or_else(|_| {
252                    panic!("should be a valid database in {path:?}")
253                }),
254            ),
255        }
256    }
257
258    fn view<F, T>(&self, f: F) -> T
259    where
260        F: for<'a> FnOnce(&Self::P<'a>) -> T,
261    {
262        // Create a new read-only transaction
263        let tx = self.begin_tx();
264
265        // Execute all read-only transactions in isolation
266        let ret = f(&tx);
267        tx.rollback().expect("rollback to succeed for readonly");
268        ret
269    }
270
271    fn update<F, T>(&self, execute: F) -> Result<T>
272    where
273        F: for<'a> FnOnce(&mut Self::P<'a>) -> Result<T>,
274    {
275        self.update_dry_run(false, execute)
276    }
277
278    fn update_dry_run<F, T>(&self, dry_run: bool, execute: F) -> Result<T>
279    where
280        F: for<'a> FnOnce(&mut Self::P<'a>) -> Result<T>,
281    {
282        // Create read-write transaction
283        let mut tx = self.begin_tx();
284
285        // If f returns err, no commit will be applied into backend
286        // storage
287        let ret = execute(&mut tx)?;
288
289        if dry_run {
290            tx.rollback()?;
291        } else {
292            // Apply changes in atomic way
293            tx.commit()?;
294        }
295
296        Ok(ret)
297    }
298
299    fn close(&mut self) {}
300}
301
302pub struct DBTransaction<'db, DB: DBAccess> {
303    inner: rocksdb::Transaction<'db, DB>,
304    /// cumulative size of transaction footprint
305    cumulative_inner_size: RefCell<usize>,
306
307    // TODO: pack all column families into a single array
308    // Candidates column family
309    candidates_cf: &'db ColumnFamily,
310    candidates_height_cf: &'db ColumnFamily,
311    // ValidationResults column family
312    validation_results_cf: &'db ColumnFamily,
313
314    // Ledger column families
315    ledger_cf: &'db ColumnFamily,
316    ledger_faults_cf: &'db ColumnFamily,
317    ledger_txs_cf: &'db ColumnFamily,
318    ledger_height_cf: &'db ColumnFamily,
319    ledger_blobs_cf: &'db ColumnFamily,
320    ledger_blobs_height_cf: &'db ColumnFamily,
321
322    // Mempool column families
323    mempool_cf: &'db ColumnFamily,
324    spending_id_cf: &'db ColumnFamily,
325    fees_cf: &'db ColumnFamily,
326
327    metadata_cf: &'db ColumnFamily,
328}
329
330impl<DB: DBAccess> Ledger for DBTransaction<'_, DB> {
331    fn store_block(
332        &mut self,
333        header: &Header,
334        txs: &[SpentTransaction],
335        faults: &[Fault],
336        label: Label,
337    ) -> Result<usize> {
338        // COLUMN FAMILY: CF_LEDGER_HEADER
339        // It consists of one record per block - Header record
340        // It also includes single record to store metadata - Register record
341        {
342            let cf = self.ledger_cf;
343
344            let mut buf = vec![];
345            LightBlock {
346                header: header.clone(),
347                transactions_ids: txs.iter().map(|t| t.inner.id()).collect(),
348                faults_ids: faults.iter().map(|f| f.id()).collect(),
349            }
350            .write(&mut buf)?;
351
352            self.put_cf(cf, header.hash, buf)?;
353        }
354
355        // Update metadata values
356        self.op_write(MD_HASH_KEY, header.hash)?;
357        self.op_write(MD_STATE_ROOT_KEY, header.state_hash)?;
358
359        // COLUMN FAMILY: CF_LEDGER_TXS
360        {
361            let cf = self.ledger_txs_cf;
362
363            let mut stored_blobs = Vec::with_capacity(6);
364
365            // store all block transactions
366            for tx in txs {
367                let mut d = vec![];
368
369                if tx.inner.inner.blob().is_some() {
370                    let mut strip_tx = tx.clone();
371                    if let Some(blobs) = strip_tx.inner.inner.strip_blobs() {
372                        for (hash, sidecar) in blobs.into_iter() {
373                            let sidecar_bytes = sidecar.to_var_bytes();
374                            self.store_blob_data(&hash, sidecar_bytes)?;
375                            stored_blobs.push(hash);
376                        }
377                    }
378                    strip_tx.write(&mut d)?;
379                } else {
380                    tx.write(&mut d)?;
381                }
382                self.put_cf(cf, tx.inner.id(), d)?;
383            }
384
385            if !stored_blobs.is_empty() {
386                // Store all blobs hashes in the ledger
387                self.store_blobs_height(header.height, &stored_blobs)?;
388            }
389        }
390
391        // COLUMN FAMILY: CF_LEDGER_FAULTS
392        {
393            let cf = self.ledger_faults_cf;
394
395            // store all block faults
396            for f in faults {
397                let mut d = vec![];
398                f.write(&mut d)?;
399                self.put_cf(cf, f.id(), d)?;
400            }
401        }
402        self.store_block_label(header.height, &header.hash, label)?;
403
404        Ok(self.get_size())
405    }
406
407    fn faults_by_block(&self, start_height: u64) -> Result<Vec<Fault>> {
408        let mut faults = vec![];
409        let mut hash = self
410            .op_read(MD_HASH_KEY)?
411            .ok_or(anyhow::anyhow!("Cannot read tip"))?;
412
413        loop {
414            let block = self.light_block(&hash)?.ok_or(anyhow::anyhow!(
415                "Cannot read block {}",
416                hex::encode(&hash)
417            ))?;
418
419            let block_height = block.header.height;
420
421            if block_height >= start_height {
422                hash = block.header.prev_block_hash.to_vec();
423                faults.extend(self.faults(&block.faults_ids)?);
424            } else {
425                break;
426            }
427
428            if block_height == 0 {
429                break;
430            }
431        }
432        Ok(faults)
433    }
434
435    fn store_block_label(
436        &mut self,
437        height: u64,
438        hash: &[u8; 32],
439        label: Label,
440    ) -> Result<()> {
441        // CF: HEIGHT -> (BLOCK_HASH, BLOCK_LABEL)
442        let mut buf = vec![];
443        buf.write_all(hash)?;
444        label.write(&mut buf)?;
445
446        self.put_cf(self.ledger_height_cf, height.to_le_bytes(), buf)?;
447        Ok(())
448    }
449
450    fn delete_block(&mut self, b: &Block) -> Result<()> {
451        self.inner.delete_cf(
452            self.ledger_height_cf,
453            b.header().height.to_le_bytes(),
454        )?;
455
456        for tx in b.txs() {
457            self.inner.delete_cf(self.ledger_txs_cf, tx.id())?;
458        }
459        for f in b.faults() {
460            self.inner.delete_cf(self.ledger_faults_cf, f.id())?;
461        }
462
463        self.delete_blobs_by_height(b.header().height)?;
464        self.inner.delete_cf(self.ledger_cf, b.header().hash)?;
465
466        Ok(())
467    }
468
469    fn block_exists(&self, hash: &[u8]) -> Result<bool> {
470        Ok(self.inner.get_cf(self.ledger_cf, hash)?.is_some())
471    }
472
473    fn faults(&self, faults_ids: &[[u8; 32]]) -> Result<Vec<Fault>> {
474        if faults_ids.is_empty() {
475            return Ok(vec![]);
476        }
477        let ids = faults_ids
478            .iter()
479            .map(|id| (self.ledger_faults_cf, id))
480            .collect::<Vec<_>>();
481
482        // Retrieve all faults ID with single call
483        let faults_buffer = self.inner.multi_get_cf(ids);
484
485        let mut faults = vec![];
486        for buf in faults_buffer {
487            let buf = buf?.unwrap();
488            let fault = Fault::read(&mut &buf[..])?;
489            faults.push(fault);
490        }
491
492        Ok(faults)
493    }
494
495    fn latest_block(&self) -> Result<LightBlock> {
496        let tip_hash = self
497            .op_read(MD_HASH_KEY)?
498            .ok_or(anyhow::anyhow!("Cannot find tip stored in metadata"))?;
499        self.light_block(&tip_hash)?
500            .ok_or(anyhow::anyhow!("Cannot find tip block"))
501    }
502
503    fn blob_data_by_hash(&self, hash: &[u8; 32]) -> Result<Option<Vec<u8>>> {
504        Ok(self.inner.get_cf(self.ledger_blobs_cf, hash)?)
505    }
506
507    fn store_blob_data(&self, hash: &[u8; 32], data: Vec<u8>) -> Result<()> {
508        self.inner.put_cf(self.ledger_blobs_cf, hash, data)?;
509        Ok(())
510    }
511    fn store_blobs_height(
512        &self,
513        block_height: u64,
514        blob_hashes: &[[u8; 32]],
515    ) -> Result<()> {
516        if blob_hashes.is_empty() {
517            return Ok(());
518        }
519        let blob_hashes_bytes: Vec<_> =
520            blob_hashes.iter().flat_map(|hash| hash.to_vec()).collect();
521        self.inner.put_cf(
522            self.ledger_blobs_height_cf,
523            block_height.to_be_bytes(),
524            blob_hashes_bytes,
525        )?;
526        Ok(())
527    }
528
529    fn delete_blobs_by_height(&self, block_height: u64) -> Result<()> {
530        let blobs_to_delete = self.blobs_by_height(block_height)?;
531        if let Some(blob_hashes) = blobs_to_delete {
532            for hash in blob_hashes {
533                // What happen if the blobs also exists linked to another
534                // transaction?
535                self.inner.delete_cf(self.ledger_blobs_cf, hash)?;
536            }
537            self.inner.delete_cf(
538                self.ledger_blobs_height_cf,
539                block_height.to_be_bytes(),
540            )?;
541        }
542
543        Ok(())
544    }
545
546    fn blobs_by_height(
547        &self,
548        block_height: u64,
549    ) -> Result<Option<Vec<[u8; 32]>>> {
550        let blob_hashes_bytes = self
551            .inner
552            .get_cf(self.ledger_blobs_height_cf, block_height.to_be_bytes())?;
553
554        if let Some(blob_hashes_bytes) = blob_hashes_bytes {
555            let mut blob_hashes = vec![];
556            for chunk in blob_hashes_bytes.chunks(32) {
557                let mut hash = [0u8; 32];
558                hash.copy_from_slice(chunk);
559                blob_hashes.push(hash);
560            }
561            Ok(Some(blob_hashes))
562        } else {
563            Ok(None)
564        }
565    }
566
567    fn block(&self, hash: &[u8]) -> Result<Option<Block>> {
568        match self.inner.get_cf(self.ledger_cf, hash)? {
569            Some(blob) => {
570                let record = LightBlock::read(&mut &blob[..])?;
571
572                // Retrieve all transactions buffers with single call
573                let txs_buffers = self.inner.multi_get_cf(
574                    record
575                        .transactions_ids
576                        .iter()
577                        .map(|id| (self.ledger_txs_cf, id))
578                        .collect::<Vec<(&ColumnFamily, &[u8; 32])>>(),
579                );
580
581                let mut txs = vec![];
582                for buf in txs_buffers {
583                    let buf = buf?.unwrap();
584                    let mut tx = SpentTransaction::read(&mut &buf[..])?;
585                    if let Some(blobs) = tx.inner.inner.blob_mut() {
586                        for blob in blobs {
587                            // Retrieve blob data from the ledger
588                            let sidecar = self
589                                .blob_data_by_hash(&blob.hash)?
590                                .map(|bytes| {
591                                    BlobSidecar::from_buf(&mut &bytes[..])
592                                })
593                                .transpose()
594                                .map_err(|e| {
595                                    anyhow::anyhow!(
596                                        "Failed to parse blob sidecar: {e:?}"
597                                    )
598                                })?;
599                            blob.data = sidecar;
600                        }
601                    }
602                    txs.push(tx.inner);
603                }
604
605                // Retrieve all faults ID with single call
606                let faults_buffer = self.inner.multi_get_cf(
607                    record
608                        .faults_ids
609                        .iter()
610                        .map(|id| (self.ledger_faults_cf, id))
611                        .collect::<Vec<(&ColumnFamily, &[u8; 32])>>(),
612                );
613                let mut faults = vec![];
614                for buf in faults_buffer {
615                    let buf = buf?.unwrap();
616                    let fault = Fault::read(&mut &buf[..])?;
617                    faults.push(fault);
618                }
619
620                Ok(Some(
621                    Block::new(record.header, txs, faults)
622                        .expect("block should be valid"),
623                ))
624            }
625            None => Ok(None),
626        }
627    }
628
629    fn light_block(&self, hash: &[u8]) -> Result<Option<LightBlock>> {
630        match self.inner.get_cf(self.ledger_cf, hash)? {
631            Some(blob) => {
632                let record = LightBlock::read(&mut &blob[..])?;
633                Ok(Some(record))
634            }
635            None => Ok(None),
636        }
637    }
638
639    fn block_header(&self, hash: &[u8]) -> Result<Option<Header>> {
640        match self.inner.get_cf(self.ledger_cf, hash)? {
641            Some(blob) => {
642                let record = Header::read(&mut &blob[..])?;
643                Ok(Some(record))
644            }
645            None => Ok(None),
646        }
647    }
648
649    fn block_hash_by_height(&self, height: u64) -> Result<Option<[u8; 32]>> {
650        Ok(self
651            .inner
652            .get_cf(self.ledger_height_cf, height.to_le_bytes())?
653            .map(|h| {
654                const LEN: usize = 32;
655                let mut hash = [0u8; LEN];
656                hash.copy_from_slice(&h.as_slice()[0..LEN]);
657                hash
658            }))
659    }
660
661    fn ledger_tx(&self, tx_id: &[u8]) -> Result<Option<SpentTransaction>> {
662        let tx = self
663            .inner
664            .get_cf(self.ledger_txs_cf, tx_id)?
665            .map(|blob| SpentTransaction::read(&mut &blob[..]))
666            .transpose()?;
667
668        Ok(tx)
669    }
670
671    /// Returns a list of transactions from the ledger
672    ///
673    /// This function expects a list of transaction IDs that are in the ledger.
674    ///
675    /// It will return an error if any of the transaction IDs are not found in
676    /// the ledger.
677    fn ledger_txs(
678        &self,
679        tx_ids: Vec<&[u8; 32]>,
680    ) -> Result<Vec<SpentTransaction>> {
681        let cf = self.ledger_txs_cf;
682
683        let ids = tx_ids.into_iter().map(|id| (cf, id)).collect::<Vec<_>>();
684
685        let multi_get_results = self.inner.multi_get_cf(ids);
686
687        let mut spent_transactions =
688            Vec::with_capacity(multi_get_results.len());
689        for result in multi_get_results.into_iter() {
690            let opt_blob = result.map_err(|e| {
691                std::io::Error::new(std::io::ErrorKind::Other, e)
692            })?;
693
694            let Some(blob) = opt_blob else {
695                return Err(anyhow::anyhow!(
696                    "At least one Transaction ID was not found"
697                ));
698            };
699
700            let stx = SpentTransaction::read(&mut &blob[..])?;
701
702            spent_transactions.push(stx);
703        }
704
705        Ok(spent_transactions)
706    }
707
708    /// Returns true if the transaction exists in the
709    /// ledger
710    ///
711    /// This is a convenience method that checks if a transaction exists in the
712    /// ledger without unmarshalling the transaction
713    fn ledger_tx_exists(&self, tx_id: &[u8]) -> Result<bool> {
714        Ok(self.inner.get_cf(self.ledger_txs_cf, tx_id)?.is_some())
715    }
716
717    fn block_by_height(&self, height: u64) -> Result<Option<Block>> {
718        let hash = self.block_hash_by_height(height)?;
719        let block = match hash {
720            Some(hash) => self.block(&hash)?,
721            None => None,
722        };
723        Ok(block)
724    }
725
726    fn block_label_by_height(
727        &self,
728        height: u64,
729    ) -> Result<Option<([u8; 32], Label)>> {
730        const HASH_LEN: usize = 32;
731        Ok(self
732            .inner
733            .get_cf(self.ledger_height_cf, height.to_le_bytes())?
734            .map(|h| {
735                let mut hash = [0u8; HASH_LEN];
736                hash.copy_from_slice(&h.as_slice()[0..HASH_LEN]);
737
738                let label_buff = h[HASH_LEN..].to_vec();
739                Label::read(&mut &label_buff[..]).map(|label| (hash, label))
740            })
741            .transpose()?)
742    }
743}
744
745/// Implementation of the `Candidate` trait for `DBTransaction<'db, DB>`.
746impl<DB: DBAccess> ConsensusStorage for DBTransaction<'_, DB> {
747    /// Stores a candidate block in the database.
748    ///
749    /// # Arguments
750    ///
751    /// * `b` - The block to store.
752    ///
753    /// # Returns
754    ///
755    /// Returns `Ok(())` if the block is successfully stored, or an error if the
756    /// operation fails.
757    fn store_candidate(&mut self, b: Block) -> Result<()> {
758        let mut serialized = vec![];
759        b.write(&mut serialized)?;
760
761        self.inner
762            .put_cf(self.candidates_cf, b.header().hash, serialized)?;
763
764        let key = serialize_key(b.header().height, b.header().hash)?;
765        self.inner
766            .put_cf(self.candidates_height_cf, key, b.header().hash)?;
767
768        Ok(())
769    }
770
771    /// Fetches a candidate block from the database.
772    ///
773    /// # Arguments
774    ///
775    /// * `hash` - The hash of the block to fetch.
776    ///
777    /// # Returns
778    ///
779    /// Returns `Ok(Some(block))` if the block is found, `Ok(None)` if the block
780    /// is not found, or an error if the operation fails.
781    fn candidate(&self, hash: &[u8]) -> Result<Option<Block>> {
782        if let Some(blob) = self.inner.get_cf(self.candidates_cf, hash)? {
783            let b = Block::read(&mut &blob[..])?;
784            return Ok(Some(b));
785        }
786
787        // Block not found
788        Ok(None)
789    }
790
791    fn candidate_by_iteration(
792        &self,
793        consensus_header: &ConsensusHeader,
794    ) -> Result<Option<Block>> {
795        let iter = self
796            .inner
797            .iterator_cf(self.candidates_cf, IteratorMode::Start);
798
799        for (_, blob) in iter.map(Result::unwrap) {
800            let b = Block::read(&mut &blob[..])?;
801
802            let header = b.header();
803            if header.prev_block_hash == consensus_header.prev_block_hash
804                && header.iteration == consensus_header.iteration
805            {
806                return Ok(Some(b));
807            }
808        }
809
810        Ok(None)
811    }
812
813    /// Deletes candidate-related items from the database based on a closure.
814    ///
815    /// # Arguments
816    ///
817    /// * `closure` - If the closure returns `true`, the block will be deleted.
818    ///
819    /// # Returns
820    ///
821    /// Returns `Ok(())` if the deletion is successful, or an error if the
822    /// operation fails.
823    fn delete_candidate<F>(&mut self, closure: F) -> Result<()>
824    where
825        F: FnOnce(u64) -> bool + std::marker::Copy,
826    {
827        let iter = self
828            .inner
829            .iterator_cf(self.candidates_height_cf, IteratorMode::Start);
830
831        for (key, hash) in iter.map(Result::unwrap) {
832            let (height, _) = deserialize_key(&mut &key.to_vec()[..])?;
833            if closure(height) {
834                self.inner.delete_cf(self.candidates_cf, hash)?;
835                self.inner.delete_cf(self.candidates_height_cf, key)?;
836            }
837        }
838
839        Ok(())
840    }
841
842    fn count_candidates(&self) -> usize {
843        let iter = self
844            .inner
845            .iterator_cf(self.candidates_height_cf, IteratorMode::Start);
846
847        iter.count()
848    }
849
850    /// Deletes all items from the `CF_CANDIDATES` column family.
851    ///
852    /// # Returns
853    ///
854    /// Returns `Ok(())` if the deletion is successful, or an error if the
855    /// operation fails.
856    fn clear_candidates(&mut self) -> Result<()> {
857        self.delete_candidate(|_| true)
858    }
859
860    /// Stores a ValidationResult in the database.
861    ///
862    /// # Arguments
863    ///
864    /// * `vr` - The ValidationResult to store.
865    ///
866    /// # Returns
867    ///
868    /// Returns `Ok(())` if the ValidationResult is successfully stored, or an
869    /// error if the operation fails.
870    fn store_validation_result(
871        &mut self,
872        consensus_header: &ConsensusHeader,
873        validation_result: &payload::ValidationResult,
874    ) -> Result<()> {
875        let mut serialized = vec![];
876        validation_result.write(&mut serialized)?;
877
878        let key = serialize_iter_key(consensus_header)?;
879        self.inner
880            .put_cf(self.validation_results_cf, key, serialized)?;
881
882        Ok(())
883    }
884
885    /// Fetches a ValidationResult from the database.
886    ///
887    /// # Arguments
888    ///
889    /// * `consensus_header` - The ConsensusHeader of the ValidationResult.
890    ///
891    /// # Returns
892    ///
893    /// Returns `Ok(Some(ValidationResult))` if the ValidationResult is found,
894    /// `Ok(None)` if the ValidationResult is not found, or an error if the
895    /// operation fails.
896    fn validation_result(
897        &self,
898        consensus_header: &ConsensusHeader,
899    ) -> Result<Option<payload::ValidationResult>> {
900        let key = serialize_iter_key(consensus_header)?;
901        if let Some(blob) =
902            self.inner.get_cf(self.validation_results_cf, key)?
903        {
904            let validation_result =
905                payload::ValidationResult::read(&mut &blob[..])?;
906            return Ok(Some(validation_result));
907        }
908
909        // ValidationResult not found
910        Ok(None)
911    }
912
913    /// Deletes ValidationResult items from the database based on a closure.
914    ///
915    /// # Arguments
916    ///
917    /// * `closure` - If the closure returns `true`, the ValidationResult will
918    ///   be deleted.
919    ///
920    /// # Returns
921    ///
922    /// Returns `Ok(())` if the deletion is successful, or an error if the
923    /// operation fails.
924    fn delete_validation_results<F>(&mut self, closure: F) -> Result<()>
925    where
926        F: FnOnce([u8; 32]) -> bool + std::marker::Copy,
927    {
928        let iter = self
929            .inner
930            .iterator_cf(self.validation_results_cf, IteratorMode::Start);
931
932        for (key, _) in iter.map(Result::unwrap) {
933            let (prev_block_hash, _) =
934                deserialize_iter_key(&mut &key.to_vec()[..])?;
935            if closure(prev_block_hash) {
936                self.inner.delete_cf(self.validation_results_cf, key)?;
937            }
938        }
939
940        Ok(())
941    }
942
943    fn count_validation_results(&self) -> usize {
944        let iter = self
945            .inner
946            .iterator_cf(self.validation_results_cf, IteratorMode::Start);
947
948        iter.count()
949    }
950
951    /// Deletes all items from the `CF_VALIDATION_RESULTS` column family.
952    ///
953    /// # Returns
954    ///
955    /// Returns `Ok(())` if the deletion is successful, or an error if the
956    /// operation fails.
957    fn clear_validation_results(&mut self) -> Result<()> {
958        self.delete_validation_results(|_| true)
959    }
960}
961
962impl<DB: DBAccess> Persist for DBTransaction<'_, DB> {
963    /// Deletes all items from both CF_LEDGER and CF_CANDIDATES column families
964    fn clear_database(&mut self) -> Result<()> {
965        // Create an iterator over the column family CF_LEDGER
966        let iter = self.inner.iterator_cf(self.ledger_cf, IteratorMode::Start);
967
968        // Iterate through the CF_LEDGER column family and delete all items
969        for (key, _) in iter.map(Result::unwrap) {
970            self.inner.delete_cf(self.ledger_cf, key)?;
971        }
972
973        self.clear_candidates()?;
974        self.clear_validation_results()?;
975        Ok(())
976    }
977
978    fn commit(self) -> Result<()> {
979        if let Err(e) = self.inner.commit() {
980            return Err(anyhow::Error::new(e).context("failed to commit"));
981        }
982
983        Ok(())
984    }
985
986    fn rollback(self) -> Result<()> {
987        if let Err(e) = self.inner.rollback() {
988            return Err(anyhow::Error::new(e).context("failed to rollback"));
989        }
990
991        Ok(())
992    }
993}
994
995impl<DB: DBAccess> Mempool for DBTransaction<'_, DB> {
996    fn store_mempool_tx(
997        &mut self,
998        tx: &Transaction,
999        timestamp: u64,
1000    ) -> Result<()> {
1001        // Map Hash to serialized transaction
1002        let mut tx_data = vec![];
1003        tx.write(&mut tx_data)?;
1004
1005        let hash = tx.id();
1006        self.put_cf(self.mempool_cf, hash, tx_data)?;
1007
1008        // Add Secondary indexes //
1009        // Spending Ids
1010        for n in tx.to_spend_ids() {
1011            let key = n.to_bytes();
1012            self.put_cf(self.spending_id_cf, key, hash)?;
1013        }
1014
1015        let timestamp = timestamp.to_be_bytes();
1016
1017        // Map Fee_Hash to Timestamp
1018        // Key pair is used to facilitate sort-by-fee
1019        // Also, the timestamp is used to remove expired transactions
1020        self.put_cf(
1021            self.fees_cf,
1022            serialize_key(tx.gas_price(), hash)?,
1023            timestamp,
1024        )?;
1025
1026        Ok(())
1027    }
1028
1029    fn mempool_tx(&self, hash: [u8; 32]) -> Result<Option<Transaction>> {
1030        let data = self.inner.get_cf(self.mempool_cf, hash)?;
1031
1032        match data {
1033            // None has a meaning key not found
1034            None => Ok(None),
1035            Some(blob) => Ok(Some(Transaction::read(&mut &blob.to_vec()[..])?)),
1036        }
1037    }
1038
1039    fn mempool_tx_exists(&self, h: [u8; 32]) -> Result<bool> {
1040        Ok(self.inner.get_cf(self.mempool_cf, h)?.is_some())
1041    }
1042
1043    fn delete_mempool_tx(
1044        &mut self,
1045        h: [u8; 32],
1046        cascade: bool,
1047    ) -> Result<Vec<[u8; 32]>> {
1048        let mut deleted = vec![];
1049        let tx = self.mempool_tx(h)?;
1050        if let Some(tx) = tx {
1051            let hash = tx.id();
1052
1053            self.inner.delete_cf(self.mempool_cf, hash)?;
1054
1055            // Delete Secondary indexes
1056            // Delete spendingids (nullifiers or nonce)
1057            for n in tx.to_spend_ids() {
1058                let key = n.to_bytes();
1059                self.inner.delete_cf(self.spending_id_cf, key)?;
1060            }
1061
1062            // Delete Fee_Hash
1063            self.inner.delete_cf(
1064                self.fees_cf,
1065                serialize_key(tx.gas_price(), hash)?,
1066            )?;
1067
1068            deleted.push(h);
1069
1070            if cascade {
1071                let mut dependants = vec![];
1072                // Get the next spending id (aka next nonce tx)
1073                // retrieve tx_id and delete it
1074                let mut next_spending_id = tx.next_spending_id();
1075                while let Some(spending_id) = next_spending_id {
1076                    next_spending_id = spending_id.next();
1077                    let next_txs =
1078                        self.mempool_txs_by_spendable_ids(&[spending_id]);
1079                    if next_txs.is_empty() {
1080                        break;
1081                    }
1082                    dependants.extend(next_txs);
1083                }
1084
1085                // delete all dependants
1086                for tx_id in dependants {
1087                    let cascade_deleted =
1088                        self.delete_mempool_tx(tx_id, false)?;
1089                    deleted.extend(cascade_deleted);
1090                }
1091            }
1092        }
1093
1094        Ok(deleted)
1095    }
1096
1097    fn mempool_txs_by_spendable_ids(
1098        &self,
1099        n: &[SpendingId],
1100    ) -> HashSet<[u8; 32]> {
1101        n.iter()
1102            .filter_map(|n| {
1103                match self.inner.get_cf(self.spending_id_cf, n.to_bytes()) {
1104                    Ok(Some(tx_id)) => tx_id.try_into().ok(),
1105                    _ => None,
1106                }
1107            })
1108            .collect()
1109    }
1110
1111    fn mempool_txs_sorted_by_fee(
1112        &self,
1113    ) -> Box<dyn Iterator<Item = Transaction> + '_> {
1114        let iter = MemPoolIterator::new(&self.inner, self.fees_cf, self);
1115
1116        Box::new(iter)
1117    }
1118
1119    fn mempool_txs_ids_sorted_by_fee(
1120        &self,
1121    ) -> Box<dyn Iterator<Item = (u64, [u8; 32])> + '_> {
1122        let iter = MemPoolFeeIterator::new(&self.inner, self.fees_cf, true);
1123
1124        Box::new(iter)
1125    }
1126
1127    fn mempool_txs_ids_sorted_by_low_fee(
1128        &self,
1129    ) -> Box<dyn Iterator<Item = (u64, [u8; 32])> + '_> {
1130        let iter = MemPoolFeeIterator::new(&self.inner, self.fees_cf, false);
1131
1132        Box::new(iter)
1133    }
1134
1135    /// Get all expired transactions hashes.
1136    fn mempool_expired_txs(&self, timestamp: u64) -> Result<Vec<[u8; 32]>> {
1137        let mut iter = self.inner.raw_iterator_cf(self.fees_cf);
1138        iter.seek_to_first();
1139        let mut txs_list = vec![];
1140
1141        while iter.valid() {
1142            if let Some(key) = iter.key() {
1143                let (_, tx_id) = deserialize_key(&mut &key.to_vec()[..])?;
1144
1145                let tx_timestamp = u64::from_be_bytes(
1146                    iter.value()
1147                        .ok_or_else(|| {
1148                            io::Error::new(
1149                                io::ErrorKind::InvalidData,
1150                                "no value",
1151                            )
1152                        })?
1153                        .try_into()
1154                        .map_err(|_| {
1155                            io::Error::new(
1156                                io::ErrorKind::InvalidData,
1157                                "invalid data",
1158                            )
1159                        })?,
1160                );
1161
1162                if tx_timestamp <= timestamp {
1163                    txs_list.push(tx_id);
1164                }
1165            }
1166
1167            iter.next();
1168        }
1169
1170        Ok(txs_list)
1171    }
1172
1173    fn mempool_txs_ids(&self) -> Result<Vec<[u8; 32]>> {
1174        let mut iter = self.inner.raw_iterator_cf(self.fees_cf);
1175        iter.seek_to_last();
1176
1177        let mut txs_list = vec![];
1178
1179        // Iterate all keys from the end in reverse lexicographic order
1180        while iter.valid() {
1181            if let Some(key) = iter.key() {
1182                let (_, tx_id) = deserialize_key(&mut &key.to_vec()[..])?;
1183
1184                txs_list.push(tx_id);
1185            }
1186
1187            iter.prev();
1188        }
1189
1190        Ok(txs_list)
1191    }
1192
1193    fn mempool_txs_count(&self) -> usize {
1194        self.inner
1195            .iterator_cf(self.mempool_cf, IteratorMode::Start)
1196            .count()
1197    }
1198}
1199
1200pub struct MemPoolIterator<'db, DB: DBAccess, M: Mempool> {
1201    iter: MemPoolFeeIterator<'db, DB>,
1202    mempool: &'db M,
1203}
1204
1205impl<'db, DB: DBAccess, M: Mempool> MemPoolIterator<'db, DB, M> {
1206    fn new(
1207        db: &'db rocksdb::Transaction<DB>,
1208        fees_cf: &ColumnFamily,
1209        mempool: &'db M,
1210    ) -> Self {
1211        let iter = MemPoolFeeIterator::new(db, fees_cf, true);
1212        MemPoolIterator { iter, mempool }
1213    }
1214}
1215
1216impl<DB: DBAccess, M: Mempool> Iterator for MemPoolIterator<'_, DB, M> {
1217    type Item = Transaction;
1218    fn next(&mut self) -> Option<Self::Item> {
1219        self.iter.next().and_then(|(_, tx_id)| {
1220            self.mempool.mempool_tx(tx_id).ok().flatten()
1221        })
1222    }
1223}
1224
1225pub struct MemPoolFeeIterator<'db, DB: DBAccess> {
1226    iter: DBRawIteratorWithThreadMode<'db, rocksdb::Transaction<'db, DB>>,
1227    fee_desc: bool,
1228}
1229
1230impl<'db, DB: DBAccess> MemPoolFeeIterator<'db, DB> {
1231    fn new(
1232        db: &'db rocksdb::Transaction<DB>,
1233        fees_cf: &ColumnFamily,
1234        fee_desc: bool,
1235    ) -> Self {
1236        let mut iter = db.raw_iterator_cf(fees_cf);
1237        if fee_desc {
1238            iter.seek_to_last();
1239        };
1240        MemPoolFeeIterator { iter, fee_desc }
1241    }
1242}
1243
1244impl<DB: DBAccess> Iterator for MemPoolFeeIterator<'_, DB> {
1245    type Item = (u64, [u8; 32]);
1246    fn next(&mut self) -> Option<Self::Item> {
1247        match self.iter.valid() {
1248            true => {
1249                if let Some(key) = self.iter.key() {
1250                    let (gas_price, hash) =
1251                        deserialize_key(&mut &key.to_vec()[..]).ok()?;
1252                    if self.fee_desc {
1253                        self.iter.prev();
1254                    } else {
1255                        self.iter.next();
1256                    }
1257                    Some((gas_price, hash))
1258                } else {
1259                    None
1260                }
1261            }
1262            false => None,
1263        }
1264    }
1265}
1266
1267impl<DB: DBAccess> std::fmt::Debug for DBTransaction<'_, DB> {
1268    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1269        //  Print ledger blocks
1270        let iter = self.inner.iterator_cf(self.ledger_cf, IteratorMode::Start);
1271
1272        iter.map(Result::unwrap).try_for_each(|(hash, _)| {
1273            if let Ok(Some(blob)) = self.inner.get_cf(self.ledger_cf, &hash[..])
1274            {
1275                let b = Block::read(&mut &blob[..]).unwrap_or_default();
1276                writeln!(f, "ledger_block [{}]: {:#?}", b.header().height, b)
1277            } else {
1278                Ok(())
1279            }
1280        })?;
1281
1282        // Print candidate blocks
1283        let iter = self
1284            .inner
1285            .iterator_cf(self.candidates_cf, IteratorMode::Start);
1286
1287        let results: std::fmt::Result =
1288            iter.map(Result::unwrap).try_for_each(|(hash, _)| {
1289                if let Ok(Some(blob)) =
1290                    self.inner.get_cf(self.candidates_cf, &hash[..])
1291                {
1292                    let b = Block::read(&mut &blob[..]).unwrap_or_default();
1293                    writeln!(
1294                        f,
1295                        "candidate_block [{}]: {:#?}",
1296                        b.header().height,
1297                        b
1298                    )
1299                } else {
1300                    Ok(())
1301                }
1302            });
1303
1304        results
1305    }
1306}
1307
1308impl<DB: DBAccess> Metadata for DBTransaction<'_, DB> {
1309    fn op_write<T: AsRef<[u8]>>(&mut self, key: &[u8], value: T) -> Result<()> {
1310        self.put_cf(self.metadata_cf, key, value)?;
1311        Ok(())
1312    }
1313
1314    fn op_read(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
1315        self.inner.get_cf(self.metadata_cf, key).map_err(Into::into)
1316    }
1317}
1318
1319impl<DB: DBAccess> DBTransaction<'_, DB> {
1320    /// A thin wrapper around inner.put_cf that calculates a db transaction
1321    /// disk footprint
1322    fn put_cf<K: AsRef<[u8]>, V: AsRef<[u8]>>(
1323        &self,
1324        cf: &impl AsColumnFamilyRef,
1325        key: K,
1326        value: V,
1327    ) -> Result<()> {
1328        let kv_size = key.as_ref().len() + value.as_ref().len();
1329        self.inner.put_cf(cf, key, value)?;
1330        *self.cumulative_inner_size.borrow_mut() += kv_size;
1331        Ok(())
1332    }
1333
1334    pub fn get_size(&self) -> usize {
1335        *self.cumulative_inner_size.borrow()
1336    }
1337}
1338
1339fn serialize_key(value: u64, hash: [u8; 32]) -> std::io::Result<Vec<u8>> {
1340    let mut w = vec![];
1341    std::io::Write::write_all(&mut w, &value.to_be_bytes())?;
1342    std::io::Write::write_all(&mut w, &hash)?;
1343    Ok(w)
1344}
1345
1346fn deserialize_key<R: Read>(r: &mut R) -> Result<(u64, [u8; 32])> {
1347    let mut buf = [0u8; 8];
1348    r.read_exact(&mut buf)?;
1349    let value = u64::from_be_bytes(buf);
1350    let mut hash = [0u8; 32];
1351    r.read_exact(&mut hash[..])?;
1352
1353    Ok((value, hash))
1354}
1355
1356fn serialize_iter_key(ch: &ConsensusHeader) -> std::io::Result<Vec<u8>> {
1357    let mut w = vec![];
1358    std::io::Write::write_all(&mut w, &ch.prev_block_hash)?;
1359    std::io::Write::write_all(&mut w, &[ch.iteration])?;
1360    Ok(w)
1361}
1362
1363fn deserialize_iter_key<R: Read>(r: &mut R) -> Result<([u8; 32], u8)> {
1364    let mut prev_block_hash = [0u8; 32];
1365    r.read_exact(&mut prev_block_hash)?;
1366
1367    let mut iter_byte = [0u8; 1];
1368    r.read_exact(&mut iter_byte)?;
1369    let iteration = u8::from_be_bytes(iter_byte);
1370
1371    Ok((prev_block_hash, iteration))
1372}
1373
1374impl node_data::Serializable for LightBlock {
1375    fn write<W: Write>(&self, w: &mut W) -> io::Result<()> {
1376        // Write block header
1377        self.header.write(w)?;
1378
1379        // Write transactions count
1380        let len = self.transactions_ids.len() as u32;
1381        w.write_all(&len.to_le_bytes())?;
1382
1383        // Write transactions hashes
1384        for tx_id in &self.transactions_ids {
1385            w.write_all(tx_id)?;
1386        }
1387
1388        // Write faults count
1389        let len = self.faults_ids.len() as u32;
1390        w.write_all(&len.to_le_bytes())?;
1391
1392        // Write faults id
1393        for f_id in &self.faults_ids {
1394            w.write_all(f_id)?;
1395        }
1396
1397        Ok(())
1398    }
1399
1400    fn read<R: Read>(r: &mut R) -> io::Result<Self>
1401    where
1402        Self: Sized,
1403    {
1404        // Read block header
1405        let header = Header::read(r)?;
1406
1407        // Read transactions count
1408        let len = Self::read_u32_le(r)?;
1409
1410        // Read transactions hashes
1411        let mut transactions_ids = vec![];
1412        for _ in 0..len {
1413            let mut tx_id = [0u8; 32];
1414            r.read_exact(&mut tx_id[..])?;
1415
1416            transactions_ids.push(tx_id);
1417        }
1418
1419        // Read faults count
1420        let len = Self::read_u32_le(r)?;
1421
1422        // Read faults ids
1423        let mut faults_ids = vec![];
1424        for _ in 0..len {
1425            let mut f_id = [0u8; 32];
1426            r.read_exact(&mut f_id[..])?;
1427
1428            faults_ids.push(f_id);
1429        }
1430
1431        Ok(Self {
1432            header,
1433            transactions_ids,
1434            faults_ids,
1435        })
1436    }
1437}
1438
1439#[cfg(test)]
1440mod tests {
1441    use fake::{Fake, Faker};
1442    use node_data::ledger;
1443
1444    use super::*;
1445
1446    #[test]
1447    fn test_store_block() {
1448        TestWrapper::new("test_store_block").run(|path| {
1449            let db = Backend::create_or_open(path, DatabaseOptions::default());
1450
1451            let b: Block = Faker.fake();
1452            assert!(!b.txs().is_empty());
1453
1454            let hash = b.header().hash;
1455
1456            assert!(db
1457                .update(|txn| {
1458                    txn.store_block(
1459                        b.header(),
1460                        &to_spent_txs(b.txs()),
1461                        b.faults(),
1462                        Label::Final(3),
1463                    )?;
1464                    Ok(())
1465                })
1466                .is_ok());
1467
1468            db.view(|txn| {
1469                // Assert block header is fully fetched from ledger
1470                let db_blk = txn
1471                    .block(&hash)
1472                    .expect("Block to be fetched")
1473                    .expect("Block to exist");
1474                assert_eq!(db_blk.header().hash, b.header().hash);
1475
1476                // Assert all transactions are fully fetched from ledger as
1477                // well.
1478                for pos in 0..b.txs().len() {
1479                    assert_eq!(db_blk.txs()[pos].id(), b.txs()[pos].id());
1480                }
1481
1482                // Assert all faults are fully fetched from ledger as
1483                // well.
1484                for pos in 0..b.faults().len() {
1485                    assert_eq!(db_blk.faults()[pos].id(), b.faults()[pos].id());
1486                }
1487            });
1488
1489            assert!(db
1490                .update(|txn| {
1491                    txn.clear_database()?;
1492                    Ok(())
1493                })
1494                .is_ok());
1495
1496            db.view(|txn| {
1497                assert!(txn
1498                    .block(&hash)
1499                    .expect("block to be fetched")
1500                    .is_none());
1501            });
1502        });
1503    }
1504
1505    #[test]
1506    fn test_read_only() {
1507        TestWrapper::new("test_read_only").run(|path| {
1508            let db = Backend::create_or_open(path, DatabaseOptions::default());
1509            let b: Block = Faker.fake();
1510            db.update_dry_run(true, |txn| {
1511                txn.store_block(
1512                    b.header(),
1513                    &to_spent_txs(b.txs()),
1514                    b.faults(),
1515                    Label::Final(3),
1516                )
1517            })
1518            .expect("block to be stored");
1519            db.view(|txn| {
1520                assert!(txn
1521                    .block(&b.header().hash)
1522                    .expect("block to be fetched")
1523                    .is_none());
1524            });
1525        });
1526    }
1527
1528    #[test]
1529    fn test_transaction_isolation() {
1530        TestWrapper::new("test_transaction_isolation").run(|path| {
1531            let db = Backend::create_or_open(path, DatabaseOptions::default());
1532            let mut b: Block = Faker.fake();
1533            let hash = b.header().hash;
1534
1535            db.view(|txn| {
1536                // Simulate a concurrent update is committed during read-only
1537                // transaction
1538                assert!(db
1539                    .update(|inner| {
1540                        inner
1541                            .store_block(
1542                                b.header(),
1543                                &to_spent_txs(b.txs()),
1544                                b.faults(),
1545                                Label::Final(3),
1546                            )
1547                            .unwrap();
1548
1549                        // We support Read-Your-Own-Writes
1550                        assert!(inner.block(&hash)?.is_some());
1551                        // Data is isolated until the transaction is not
1552                        // committed
1553                        assert!(txn.block(&hash)?.is_none());
1554                        Ok(())
1555                    })
1556                    .is_ok());
1557
1558                // Asserts that the read-only/view transaction get the updated
1559                // data after the tx is committed
1560                assert!(txn
1561                    .block(&hash)
1562                    .expect("block to be fetched")
1563                    .is_some());
1564            });
1565
1566            // Asserts that update was done
1567            db.view(|txn| {
1568                assert_blocks_eq(
1569                    &mut txn
1570                        .block(&hash)
1571                        .expect("block to be fetched")
1572                        .unwrap(),
1573                    &mut b,
1574                );
1575            });
1576        });
1577    }
1578
1579    fn assert_blocks_eq(a: &Block, b: &Block) {
1580        assert!(a.header().hash != [0u8; 32]);
1581        assert!(a.header().hash.eq(&b.header().hash));
1582    }
1583
1584    #[test]
1585    fn test_add_mempool_tx() {
1586        TestWrapper::new("test_add_tx").run(|path| {
1587            let db = Backend::create_or_open(path, DatabaseOptions::default());
1588            let t: Transaction = Faker.fake();
1589
1590            assert!(db.update(|txn| { txn.store_mempool_tx(&t, 0) }).is_ok());
1591
1592            db.view(|vq| {
1593                assert!(vq.mempool_tx_exists(t.id()).unwrap());
1594
1595                let fetched_tx = vq
1596                    .mempool_tx(t.id())
1597                    .expect("valid contract call")
1598                    .unwrap();
1599
1600                assert_eq!(
1601                    fetched_tx.id(),
1602                    t.id(),
1603                    "fetched transaction should be the same"
1604                );
1605            });
1606
1607            // Delete a contract call
1608            db.update(|txn| {
1609                let deleted =
1610                    txn.delete_mempool_tx(t.id(), false).expect("valid tx");
1611                assert!(deleted.len() == 1);
1612                Ok(())
1613            })
1614            .unwrap();
1615        });
1616    }
1617
1618    #[test]
1619    fn test_mempool_txs_sorted_by_fee() {
1620        TestWrapper::new("test_mempool_txs_sorted_by_fee").run(|path| {
1621            let db = Backend::create_or_open(path, DatabaseOptions::default());
1622            // Populate mempool with N contract calls
1623            let _rng = rand::thread_rng();
1624            db.update(|txn| {
1625                for _i in 0..10u32 {
1626                    let t: Transaction = Faker.fake();
1627                    txn.store_mempool_tx(&t, 0)?;
1628                }
1629                Ok(())
1630            })
1631            .unwrap();
1632
1633            db.view(|txn| {
1634                let txs = txn.mempool_txs_sorted_by_fee();
1635
1636                let mut last_fee = u64::MAX;
1637                for t in txs {
1638                    let fee = t.gas_price();
1639                    assert!(
1640                        fee <= last_fee,
1641                        "tx fees are not in decreasing order"
1642                    );
1643                    last_fee = fee
1644                }
1645                assert_ne!(last_fee, u64::MAX, "No tx has been processed")
1646            });
1647        });
1648    }
1649
1650    #[test]
1651    fn test_txs_count() {
1652        TestWrapper::new("test_txs_count").run(|path| {
1653            let db = Backend::create_or_open(path, DatabaseOptions::default());
1654
1655            const N: usize = 100;
1656            const D: usize = 50;
1657
1658            let txs: Vec<_> = (0..N)
1659                .map(|i| ledger::faker::gen_dummy_tx(i as u64))
1660                .collect();
1661
1662            db.update(|db| {
1663                assert_eq!(db.mempool_txs_count(), 0);
1664                txs.iter().for_each(|t| {
1665                    db.store_mempool_tx(&t, 0).expect("tx should be added")
1666                });
1667                Ok(())
1668            })
1669            .unwrap();
1670
1671            db.update(|db| {
1672                // Ensure txs count is equal to the number of added tx
1673                assert_eq!(db.mempool_txs_count(), N);
1674
1675                txs.iter().take(D).for_each(|tx| {
1676                    let deleted = db
1677                        .delete_mempool_tx(tx.id(), false)
1678                        .expect("transaction should be deleted");
1679                    assert!(deleted.len() == 1);
1680                });
1681
1682                Ok(())
1683            })
1684            .unwrap();
1685
1686            // Ensure txs count is updated after the deletion
1687            db.update(|db| {
1688                assert_eq!(db.mempool_txs_count(), N - D);
1689                Ok(())
1690            })
1691            .unwrap();
1692        });
1693    }
1694
1695    #[test]
1696    fn test_max_gas_limit() {
1697        TestWrapper::new("test_block_size_limit").run(|path| {
1698            let db = Backend::create_or_open(path, DatabaseOptions::default());
1699
1700            db.update(|txn| {
1701                for i in 0..10u32 {
1702                    let t = ledger::faker::gen_dummy_tx(i as u64);
1703                    txn.store_mempool_tx(&t, 0)?;
1704                }
1705                Ok(())
1706            })
1707            .unwrap();
1708
1709            let total_gas_price: u64 = 9 + 8 + 7 + 6 + 5 + 4 + 3 + 2 + 1;
1710            db.view(|txn| {
1711                let txs = txn
1712                    .mempool_txs_sorted_by_fee()
1713                    .map(|t| t.gas_price())
1714                    .sum::<u64>();
1715
1716                assert_eq!(txs, total_gas_price);
1717            });
1718        });
1719    }
1720
1721    #[test]
1722    fn test_get_expired_txs() {
1723        TestWrapper::new("test_get_expired_txs").run(|path| {
1724            let db = Backend::create_or_open(path, DatabaseOptions::default());
1725
1726            let mut expiry_list = HashSet::new();
1727            let _ = db.update(|txn| {
1728                (1..101).for_each(|i| {
1729                    let t = ledger::faker::gen_dummy_tx(i as u64);
1730                    txn.store_mempool_tx(&t, i).expect("tx should be added");
1731                    expiry_list.insert(t.id());
1732                });
1733
1734                (1000..1100).for_each(|i| {
1735                    let t = ledger::faker::gen_dummy_tx(i as u64);
1736                    txn.store_mempool_tx(&t, i).expect("tx should be added");
1737                });
1738
1739                Ok(())
1740            });
1741
1742            db.view(|vq| {
1743                let expired: HashSet<_> = vq
1744                    .mempool_expired_txs(100)
1745                    .unwrap()
1746                    .into_iter()
1747                    .map(|id| id)
1748                    .collect();
1749
1750                assert_eq!(expiry_list, expired);
1751            });
1752        });
1753    }
1754
1755    fn to_spent_txs(txs: &Vec<Transaction>) -> Vec<SpentTransaction> {
1756        txs.iter()
1757            .map(|t| SpentTransaction {
1758                inner: t.clone(),
1759                block_height: 0,
1760                gas_spent: 0,
1761                err: None,
1762            })
1763            .collect()
1764    }
1765
1766    #[test]
1767    fn test_get_ledger_tx_by_hash() {
1768        TestWrapper::new("test_get_ledger_tx_by_hash").run(|path| {
1769            let db = Backend::create_or_open(path, DatabaseOptions::default());
1770            let b: Block = Faker.fake();
1771            assert!(!b.txs().is_empty());
1772
1773            // Store a block
1774            assert!(db
1775                .update(|txn| {
1776                    txn.store_block(
1777                        b.header(),
1778                        &to_spent_txs(b.txs()),
1779                        b.faults(),
1780                        Label::Final(3),
1781                    )?;
1782                    Ok(())
1783                })
1784                .is_ok());
1785
1786            // Assert all transactions of the accepted (stored) block are
1787            // accessible by hash.
1788            db.view(|v| {
1789                for t in b.txs().iter() {
1790                    assert!(v
1791                        .ledger_tx(&t.id())
1792                        .expect("should not return error")
1793                        .expect("should find a transaction")
1794                        .inner
1795                        .eq(t));
1796                }
1797            });
1798        });
1799    }
1800
1801    #[test]
1802    fn test_fetch_block_hash_by_height() {
1803        TestWrapper::new("test_fetch_block_hash_by_height").run(|path| {
1804            let db = Backend::create_or_open(path, DatabaseOptions::default());
1805            let b: Block = Faker.fake();
1806
1807            // Store a block
1808            assert!(db
1809                .update(|txn| {
1810                    txn.store_block(
1811                        b.header(),
1812                        &to_spent_txs(b.txs()),
1813                        b.faults(),
1814                        Label::Attested(3),
1815                    )?;
1816                    Ok(())
1817                })
1818                .is_ok());
1819
1820            // Assert block hash is accessible by height.
1821            db.view(|v| {
1822                assert!(v
1823                    .block_hash_by_height(b.header().height)
1824                    .expect("should not return error")
1825                    .expect("should find a block")
1826                    .eq(&b.header().hash));
1827            });
1828        });
1829    }
1830
1831    #[test]
1832    fn test_fetch_block_label_by_height() {
1833        TestWrapper::new("test_fetch_block_hash_by_height").run(|path| {
1834            let db = Backend::create_or_open(path, DatabaseOptions::default());
1835            let b: Block = Faker.fake();
1836
1837            // Store a block
1838            assert!(db
1839                .update(|txn| {
1840                    txn.store_block(
1841                        b.header(),
1842                        &to_spent_txs(b.txs()),
1843                        b.faults(),
1844                        Label::Attested(3),
1845                    )?;
1846                    Ok(())
1847                })
1848                .is_ok());
1849
1850            // Assert block hash is accessible by height.
1851            db.view(|v| {
1852                assert!(v
1853                    .block_label_by_height(b.header().height)
1854                    .expect("should not return error")
1855                    .expect("should find a block")
1856                    .1
1857                    .eq(&Label::Attested(3)));
1858            });
1859        });
1860    }
1861
1862    #[test]
1863    /// Ensures delete_block fn removes all keys of a single block
1864    fn test_delete_block() {
1865        let t = TestWrapper::new("test_fetch_block_hash_by_height");
1866        t.run(|path| {
1867            let db = Backend::create_or_open(path, DatabaseOptions::default());
1868            let b: ledger::Block = Faker.fake();
1869
1870            assert!(db
1871                .update(|ut| {
1872                    ut.store_block(
1873                        b.header(),
1874                        &to_spent_txs(b.txs()),
1875                        b.faults(),
1876                        Label::Final(3),
1877                    )?;
1878                    Ok(())
1879                })
1880                .is_ok());
1881
1882            assert!(db
1883                .update(|ut| {
1884                    ut.delete_block(&b)?;
1885                    Ok(())
1886                })
1887                .is_ok());
1888        });
1889
1890        let path = t.get_path();
1891        let opts = Options::default();
1892
1893        let vec = rocksdb::DB::list_cf(&opts, &path).unwrap();
1894        assert!(!vec.is_empty());
1895
1896        // Ensure no block fields leak after its deletion
1897        let db = rocksdb::DB::open_cf(&opts, &path, vec.clone()).unwrap();
1898        vec.into_iter()
1899            .map(|cf_name| {
1900                if cf_name == CF_METADATA {
1901                    return;
1902                }
1903
1904                let cf = db.cf_handle(&cf_name).unwrap();
1905                assert_eq!(
1906                    db.iterator_cf(cf, IteratorMode::Start)
1907                        .map(Result::unwrap)
1908                        .count(),
1909                    0
1910                );
1911            })
1912            .for_each(drop);
1913    }
1914
1915    struct TestWrapper(tempfile::TempDir);
1916
1917    impl TestWrapper {
1918        fn new(path: &'static str) -> Self {
1919            Self(
1920                tempfile::TempDir::with_prefix(path)
1921                    .expect("Temp directory to be created"),
1922            )
1923        }
1924
1925        pub fn run<F>(&self, test_func: F)
1926        where
1927            F: FnOnce(&Path),
1928        {
1929            test_func(self.0.path());
1930        }
1931
1932        pub fn get_path(&self) -> std::path::PathBuf {
1933            self.0.path().to_owned().join(DB_FOLDER_NAME)
1934        }
1935    }
1936}