Skip to main content

dusk_node/database/rocksdb/
mod.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4//
5// Copyright (c) DUSK NETWORK. All rights reserved.
6
7use std::cell::RefCell;
8use std::collections::HashSet;
9use std::io::{Read, Write};
10use std::path::Path;
11use std::sync::Arc;
12use std::{io, vec};
13
14use anyhow::Result;
15use dusk_core::transfer::data::BlobSidecar;
16use node_data::Serializable;
17use node_data::ledger::{
18    Block, Fault, Header, Label, LedgerTransaction, SpendingId,
19    SpentTransaction,
20};
21use node_data::message::{ConsensusHeader, payload};
22use rocksdb::{
23    AsColumnFamilyRef, BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor,
24    DBAccess, DBRawIteratorWithThreadMode, IteratorMode, LogLevel,
25    OptimisticTransactionDB, OptimisticTransactionOptions, Options,
26    WriteOptions,
27};
28use tracing::info;
29
30use super::{
31    ConsensusStorage, DB, DatabaseOptions, Ledger, LightBlock, Metadata,
32    Persist,
33};
34use crate::database::Mempool;
35
36const CF_LEDGER_HEADER: &str = "cf_ledger_header";
37const CF_LEDGER_TXS: &str = "cf_ledger_txs";
38const CF_LEDGER_BLOBS: &str = "cf_ledger_blobs";
39const CF_LEDGER_BLOBS_HEIGHT: &str = "cf_ledger_blobs_height";
40const CF_LEDGER_FAULTS: &str = "cf_ledger_faults";
41const CF_LEDGER_HEIGHT: &str = "cf_ledger_height";
42const CF_CANDIDATES: &str = "cf_candidates";
43const CF_CANDIDATES_HEIGHT: &str = "cf_candidates_height";
44const CF_VALIDATION_RESULTS: &str = "cf_validation_results";
45const CF_MEMPOOL: &str = "cf_mempool";
46const CF_MEMPOOL_SPENDING_ID: &str = "cf_mempool_spending_id";
47const CF_MEMPOOL_FEES: &str = "cf_mempool_fees";
48const CF_METADATA: &str = "cf_metadata";
49
50const DB_FOLDER_NAME: &str = "chain.db";
51
52// List of supported metadata keys
53pub const MD_HASH_KEY: &[u8] = b"hash_key";
54pub const MD_STATE_ROOT_KEY: &[u8] = b"state_hash_key";
55pub const MD_AVG_VALIDATION: &[u8] = b"avg_validation_time";
56pub const MD_AVG_RATIFICATION: &[u8] = b"avg_ratification_time";
57pub const MD_AVG_PROPOSAL: &[u8] = b"avg_proposal_time";
58pub const MD_LAST_ITER: &[u8] = b"consensus_last_iter";
59
60#[derive(Clone)]
61pub struct Backend {
62    rocksdb: Arc<OptimisticTransactionDB>,
63}
64
65impl Backend {
66    fn begin_tx(&self) -> DBTransaction<'_, OptimisticTransactionDB> {
67        // Create a new RocksDB transaction
68        let write_options = WriteOptions::default();
69        let tx_options = OptimisticTransactionOptions::default();
70
71        let inner = self.rocksdb.transaction_opt(&write_options, &tx_options);
72
73        // Borrow column families
74        let ledger_cf = self
75            .rocksdb
76            .cf_handle(CF_LEDGER_HEADER)
77            .expect("ledger_header column family must exist");
78
79        let ledger_txs_cf = self
80            .rocksdb
81            .cf_handle(CF_LEDGER_TXS)
82            .expect("CF_LEDGER_TXS column family must exist");
83
84        let ledger_faults_cf = self
85            .rocksdb
86            .cf_handle(CF_LEDGER_FAULTS)
87            .expect("CF_LEDGER_FAULTS column family must exist");
88
89        let candidates_cf = self
90            .rocksdb
91            .cf_handle(CF_CANDIDATES)
92            .expect("candidates column family must exist");
93
94        let candidates_height_cf = self
95            .rocksdb
96            .cf_handle(CF_CANDIDATES_HEIGHT)
97            .expect("candidates_height column family must exist");
98
99        let validation_results_cf = self
100            .rocksdb
101            .cf_handle(CF_VALIDATION_RESULTS)
102            .expect("validation result column family must exist");
103
104        let mempool_cf = self
105            .rocksdb
106            .cf_handle(CF_MEMPOOL)
107            .expect("mempool column family must exist");
108
109        let spending_id_cf = self
110            .rocksdb
111            .cf_handle(CF_MEMPOOL_SPENDING_ID)
112            .expect("CF_MEMPOOL_SPENDING_ID column family must exist");
113
114        let fees_cf = self
115            .rocksdb
116            .cf_handle(CF_MEMPOOL_FEES)
117            .expect("CF_MEMPOOL_FEES column family must exist");
118
119        let ledger_height_cf = self
120            .rocksdb
121            .cf_handle(CF_LEDGER_HEIGHT)
122            .expect("CF_LEDGER_HEIGHT column family must exist");
123
124        let metadata_cf = self
125            .rocksdb
126            .cf_handle(CF_METADATA)
127            .expect("CF_METADATA column family must exist");
128
129        let ledger_blobs_cf = self
130            .rocksdb
131            .cf_handle(CF_LEDGER_BLOBS)
132            .expect("CF_LEDGER_BLOBS column family must exist");
133
134        let ledger_blobs_height_cf = self
135            .rocksdb
136            .cf_handle(CF_LEDGER_BLOBS_HEIGHT)
137            .expect("CF_LEDGER_BLOBS_HEIGHT column family must exist");
138
139        DBTransaction::<'_, OptimisticTransactionDB> {
140            inner,
141            candidates_cf,
142            candidates_height_cf,
143            validation_results_cf,
144            ledger_cf,
145            ledger_txs_cf,
146            ledger_faults_cf,
147            mempool_cf,
148            spending_id_cf,
149            fees_cf,
150            ledger_height_cf,
151            ledger_blobs_cf,
152            ledger_blobs_height_cf,
153            metadata_cf,
154            cumulative_inner_size: RefCell::new(0),
155        }
156    }
157}
158
159impl DB for Backend {
160    type P<'a> = DBTransaction<'a, OptimisticTransactionDB>;
161
162    fn create_or_open<T>(path: T, db_opts: DatabaseOptions) -> Self
163    where
164        T: AsRef<Path>,
165    {
166        let path = path.as_ref().join(DB_FOLDER_NAME);
167        info!("Opening database in {path:?}, {:?} ", db_opts);
168
169        // A set of options for initializing any blocks-related CF (including
170        // METADATA CF)
171        let mut blocks_cf_opts = Options::default();
172        blocks_cf_opts.create_if_missing(db_opts.create_if_missing);
173        blocks_cf_opts.create_missing_column_families(true);
174        blocks_cf_opts.set_level_compaction_dynamic_level_bytes(true);
175        blocks_cf_opts
176            .set_write_buffer_size(db_opts.blocks_cf_max_write_buffer_size);
177
178        if db_opts.enable_debug {
179            blocks_cf_opts.set_log_level(LogLevel::Info);
180            blocks_cf_opts.set_dump_malloc_stats(true);
181            blocks_cf_opts.enable_statistics();
182        }
183
184        if db_opts.blocks_cf_disable_block_cache {
185            let mut block_opts = BlockBasedOptions::default();
186            block_opts.disable_cache();
187            blocks_cf_opts.set_block_based_table_factory(&block_opts);
188        }
189
190        // Configure CF_MEMPOOL column family, so it benefits from low
191        // write-latency of L0
192        let mut mp_opts = blocks_cf_opts.clone();
193        // Disable WAL by default
194        mp_opts.set_manual_wal_flush(true);
195        mp_opts.create_if_missing(true);
196        mp_opts.create_missing_column_families(true);
197        mp_opts.set_write_buffer_size(db_opts.mempool_cf_max_write_buffer_size);
198
199        if db_opts.enable_debug {
200            mp_opts.set_log_level(LogLevel::Info);
201            mp_opts.set_dump_malloc_stats(true);
202            mp_opts.enable_statistics();
203        }
204
205        let cfs = vec![
206            ColumnFamilyDescriptor::new(
207                CF_LEDGER_HEADER,
208                blocks_cf_opts.clone(),
209            ),
210            ColumnFamilyDescriptor::new(CF_LEDGER_TXS, blocks_cf_opts.clone()),
211            ColumnFamilyDescriptor::new(
212                CF_LEDGER_FAULTS,
213                blocks_cf_opts.clone(),
214            ),
215            ColumnFamilyDescriptor::new(
216                CF_LEDGER_HEIGHT,
217                blocks_cf_opts.clone(),
218            ),
219            ColumnFamilyDescriptor::new(
220                CF_LEDGER_BLOBS,
221                blocks_cf_opts.clone(),
222            ),
223            ColumnFamilyDescriptor::new(
224                CF_LEDGER_BLOBS_HEIGHT,
225                blocks_cf_opts.clone(),
226            ),
227            ColumnFamilyDescriptor::new(CF_CANDIDATES, blocks_cf_opts.clone()),
228            ColumnFamilyDescriptor::new(
229                CF_CANDIDATES_HEIGHT,
230                blocks_cf_opts.clone(),
231            ),
232            ColumnFamilyDescriptor::new(
233                CF_VALIDATION_RESULTS,
234                blocks_cf_opts.clone(),
235            ),
236            ColumnFamilyDescriptor::new(CF_METADATA, blocks_cf_opts.clone()),
237            ColumnFamilyDescriptor::new(CF_MEMPOOL, mp_opts.clone()),
238            ColumnFamilyDescriptor::new(
239                CF_MEMPOOL_SPENDING_ID,
240                mp_opts.clone(),
241            ),
242            ColumnFamilyDescriptor::new(CF_MEMPOOL_FEES, mp_opts.clone()),
243        ];
244
245        Self {
246            rocksdb: Arc::new(
247                OptimisticTransactionDB::open_cf_descriptors(
248                    &blocks_cf_opts,
249                    &path,
250                    cfs,
251                )
252                .unwrap_or_else(|_| {
253                    panic!("should be a valid database in {path:?}")
254                }),
255            ),
256        }
257    }
258
259    fn view<F, T>(&self, f: F) -> T
260    where
261        F: for<'a> FnOnce(&Self::P<'a>) -> T,
262    {
263        // Create a new read-only transaction
264        let tx = self.begin_tx();
265
266        // Execute all read-only transactions in isolation
267        let ret = f(&tx);
268        tx.rollback().expect("rollback to succeed for readonly");
269        ret
270    }
271
272    fn update<F, T>(&self, execute: F) -> Result<T>
273    where
274        F: for<'a> FnOnce(&mut Self::P<'a>) -> Result<T>,
275    {
276        self.update_dry_run(false, execute)
277    }
278
279    fn update_dry_run<F, T>(&self, dry_run: bool, execute: F) -> Result<T>
280    where
281        F: for<'a> FnOnce(&mut Self::P<'a>) -> Result<T>,
282    {
283        // Create read-write transaction
284        let mut tx = self.begin_tx();
285
286        // If f returns err, no commit will be applied into backend
287        // storage
288        let ret = execute(&mut tx)?;
289
290        if dry_run {
291            tx.rollback()?;
292        } else {
293            // Apply changes in atomic way
294            tx.commit()?;
295        }
296
297        Ok(ret)
298    }
299
300    fn close(&mut self) {}
301}
302
303pub struct DBTransaction<'db, DB: DBAccess> {
304    inner: rocksdb::Transaction<'db, DB>,
305    /// cumulative size of transaction footprint
306    cumulative_inner_size: RefCell<usize>,
307
308    // TODO: pack all column families into a single array
309    // Candidates column family
310    candidates_cf: &'db ColumnFamily,
311    candidates_height_cf: &'db ColumnFamily,
312    // ValidationResults column family
313    validation_results_cf: &'db ColumnFamily,
314
315    // Ledger column families
316    ledger_cf: &'db ColumnFamily,
317    ledger_faults_cf: &'db ColumnFamily,
318    ledger_txs_cf: &'db ColumnFamily,
319    ledger_height_cf: &'db ColumnFamily,
320    ledger_blobs_cf: &'db ColumnFamily,
321    ledger_blobs_height_cf: &'db ColumnFamily,
322
323    // Mempool column families
324    mempool_cf: &'db ColumnFamily,
325    spending_id_cf: &'db ColumnFamily,
326    fees_cf: &'db ColumnFamily,
327
328    metadata_cf: &'db ColumnFamily,
329}
330
331mod blocks;
332mod error;
333mod metadata_indexes;
334mod tx_events;
335mod tx_utils;
336
337use metadata_indexes::{
338    deserialize_iter_key, deserialize_key, serialize_iter_key, serialize_key,
339};
340
341impl node_data::Serializable for LightBlock {
342    fn write<W: Write>(&self, w: &mut W) -> io::Result<()> {
343        // Write block header
344        self.header.write(w)?;
345
346        // Write transactions count
347        let len = self.transactions_ids.len() as u32;
348        w.write_all(&len.to_le_bytes())?;
349
350        // Write transactions hashes
351        for tx_id in &self.transactions_ids {
352            w.write_all(tx_id)?;
353        }
354
355        // Write faults count
356        let len = self.faults_ids.len() as u32;
357        w.write_all(&len.to_le_bytes())?;
358
359        // Write faults id
360        for f_id in &self.faults_ids {
361            w.write_all(f_id)?;
362        }
363
364        Ok(())
365    }
366
367    fn read<R: Read>(r: &mut R) -> io::Result<Self>
368    where
369        Self: Sized,
370    {
371        // Read block header
372        let header = Header::read(r)?;
373
374        // Read transactions count
375        let len = Self::read_u32_le(r)?;
376
377        // Read transactions hashes
378        let mut transactions_ids = vec![];
379        for _ in 0..len {
380            let mut tx_id = [0u8; 32];
381            r.read_exact(&mut tx_id[..])?;
382
383            transactions_ids.push(tx_id);
384        }
385
386        // Read faults count
387        let len = Self::read_u32_le(r)?;
388
389        // Read faults ids
390        let mut faults_ids = vec![];
391        for _ in 0..len {
392            let mut f_id = [0u8; 32];
393            r.read_exact(&mut f_id[..])?;
394
395            faults_ids.push(f_id);
396        }
397
398        Ok(Self {
399            header,
400            transactions_ids,
401            faults_ids,
402        })
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use dusk_core::signatures::bls::{
409        PublicKey as AccountPublicKey, SecretKey as AccountSecretKey,
410    };
411    use dusk_core::transfer::Transaction as ProtocolTransaction;
412    use dusk_core::transfer::data::{
413        BlobData, BlobDataPart, BlobSidecar, TransactionData,
414    };
415    use fake::{Fake, Faker};
416    use node_data::{hard_fork, ledger};
417    use rand::rngs::StdRng;
418    use rand::{Rng, SeedableRng};
419
420    use super::*;
421
422    fn blob_spent_tx(block_height: u64) -> SpentTransaction {
423        let mut rng = StdRng::seed_from_u64(42);
424        let sender_sk = AccountSecretKey::random(&mut rng);
425        let receiver_pk =
426            Some(AccountPublicKey::from(&AccountSecretKey::random(&mut rng)));
427
428        let blob_data: BlobDataPart = [7u8; 4096 * 32];
429        let sidecar = BlobSidecar {
430            commitment: [8u8; 48],
431            proof: [9u8; 48],
432            data: blob_data,
433        };
434        let blob = BlobData {
435            hash: [10u8; 32],
436            data: Some(sidecar),
437        };
438
439        let protocol = ProtocolTransaction::moonlight(
440            &sender_sk,
441            receiver_pk,
442            rng.r#gen(),
443            rng.r#gen(),
444            1,
445            1,
446            rng.r#gen(),
447            0xFA,
448            Some(TransactionData::from(vec![blob])),
449        )
450        .expect("blob tx should build");
451
452        SpentTransaction {
453            inner: LedgerTransaction::from_protocol_for_ledger(
454                protocol,
455                block_height,
456            ),
457            block_height,
458            gas_spent: 0,
459            err: None,
460        }
461    }
462
463    #[test]
464    fn test_store_block() {
465        TestWrapper::new("test_store_block").run(|path| {
466            let db = Backend::create_or_open(path, DatabaseOptions::default());
467
468            let b: Block = Faker.fake();
469            assert!(!b.txs().is_empty());
470            let spent_txs = to_spent_txs(b.header().height, b.txs());
471
472            let hash = b.header().hash;
473
474            assert!(
475                db.update(|txn| {
476                    txn.store_block(
477                        b.header(),
478                        &spent_txs,
479                        b.faults(),
480                        Label::Final(3),
481                    )?;
482                    Ok(())
483                })
484                .is_ok()
485            );
486
487            db.view(|txn| {
488                // Assert block header is fully fetched from ledger
489                let db_blk = txn
490                    .block(&hash)
491                    .expect("Block to be fetched")
492                    .expect("Block to exist");
493                assert_eq!(db_blk.header().hash, b.header().hash);
494
495                // Assert all transactions are fully fetched from ledger as
496                // well.
497                for (db_tx, spent_tx) in
498                    db_blk.txs().iter().zip(spent_txs.iter())
499                {
500                    assert_eq!(*db_tx, spent_tx.inner);
501                }
502
503                // Assert all faults are fully fetched from ledger as
504                // well.
505                for pos in 0..b.faults().len() {
506                    assert_eq!(db_blk.faults()[pos].id(), b.faults()[pos].id());
507                }
508            });
509
510            assert!(
511                db.update(|txn| {
512                    txn.clear_database()?;
513                    Ok(())
514                })
515                .is_ok()
516            );
517
518            db.view(|txn| {
519                assert!(
520                    txn.block(&hash).expect("block to be fetched").is_none()
521                );
522            });
523        });
524    }
525
526    #[test]
527    fn test_store_block_strips_blob_sidecars() {
528        std::thread::Builder::new()
529            .stack_size(8 * 1024 * 1024)
530            .spawn(|| {
531                TestWrapper::new("test_store_block_strips_blob_sidecars").run(
532                    |path| {
533                        let db = Backend::create_or_open(
534                            path,
535                            DatabaseOptions::default(),
536                        );
537                        let header: Header = Faker.fake();
538                        let spent_tx = blob_spent_tx(header.height);
539                        let blob_hash =
540                            spent_tx.inner.protocol().blob().unwrap()[0].hash;
541                        let expected_sidecar = spent_tx.inner.protocol().blob()
542                            .unwrap()[0]
543                            .data
544                            .clone()
545                            .unwrap();
546
547                        db.update(|txn| {
548                            txn.store_block(
549                                &header,
550                                std::slice::from_ref(&spent_tx),
551                                &[],
552                                Label::Final(header.height),
553                            )?;
554                            Ok(())
555                        })
556                        .expect("block should be stored");
557
558                        db.view(|txn| {
559                            let raw = txn
560                                .inner
561                                .get_cf(txn.ledger_txs_cf, spent_tx.inner.id())
562                                .expect("ledger tx bytes should be readable")
563                                .expect("ledger tx bytes should exist");
564                            let stored = SpentTransaction::read(&mut &raw[..])
565                                .expect("stored spent tx should decode");
566                            assert!(
567                                stored.inner.protocol().blob().unwrap()[0]
568                                    .data
569                                    .is_none(),
570                                "ledger tx record must not persist blob sidecars",
571                            );
572
573                            let stored_sidecar = txn
574                                .blob_data_by_hash(&blob_hash)
575                                .expect("blob sidecar bytes should be readable")
576                                .expect("blob sidecar should be stored separately");
577                            let decoded_sidecar = BlobSidecar::from_buf(
578                                &mut &stored_sidecar[..],
579                            )
580                            .expect("stored blob sidecar should decode");
581                            assert_eq!(decoded_sidecar, expected_sidecar);
582
583                            let block = txn
584                                .block(&header.hash)
585                                .expect("block should be readable")
586                                .expect("block should exist");
587                            assert!(
588                                block.txs()[0].protocol().blob().unwrap()[0]
589                                    .data
590                                    .is_some(),
591                                "blob sidecar should be rehydrated on read",
592                            );
593
594                            Ok::<(), error::RocksDbError>(())
595                        })
596                        .expect("block readback should succeed");
597                    },
598                );
599            })
600            .expect("blob storage test thread should spawn")
601            .join()
602            .expect("blob storage test thread should complete");
603    }
604
605    #[test]
606    fn test_read_only() {
607        TestWrapper::new("test_read_only").run(|path| {
608            let db = Backend::create_or_open(path, DatabaseOptions::default());
609            let b: Block = Faker.fake();
610            let spent_txs = to_spent_txs(b.header().height, b.txs());
611            db.update_dry_run(true, |txn| {
612                txn.store_block(
613                    b.header(),
614                    &spent_txs,
615                    b.faults(),
616                    Label::Final(3),
617                )
618            })
619            .expect("block to be stored");
620            db.view(|txn| {
621                assert!(
622                    txn.block(&b.header().hash)
623                        .expect("block to be fetched")
624                        .is_none()
625                );
626            });
627        });
628    }
629
630    #[test]
631    fn test_transaction_isolation() {
632        TestWrapper::new("test_transaction_isolation").run(|path| {
633            let db = Backend::create_or_open(path, DatabaseOptions::default());
634            let b: Block = Faker.fake();
635            let spent_txs = to_spent_txs(b.header().height, b.txs());
636            let hash = b.header().hash;
637
638            db.view(|txn| {
639                // Simulate a concurrent update is committed during read-only
640                // transaction
641                assert!(
642                    db.update(|inner| {
643                        inner
644                            .store_block(
645                                b.header(),
646                                &spent_txs,
647                                b.faults(),
648                                Label::Final(3),
649                            )
650                            .unwrap();
651
652                        // We support Read-Your-Own-Writes
653                        assert!(inner.block(&hash)?.is_some());
654                        // Data is isolated until the transaction is not
655                        // committed
656                        assert!(txn.block(&hash)?.is_none());
657                        Ok(())
658                    })
659                    .is_ok()
660                );
661
662                // Asserts that the read-only/view transaction get the updated
663                // data after the tx is committed
664                assert!(
665                    txn.block(&hash).expect("block to be fetched").is_some()
666                );
667            });
668
669            // Asserts that update was done
670            db.view(|txn| {
671                assert_blocks_eq(
672                    &txn.block(&hash).expect("block to be fetched").unwrap(),
673                    &b,
674                );
675            });
676        });
677    }
678
679    fn assert_blocks_eq(a: &Block, b: &Block) {
680        assert!(a.header().hash != [0u8; 32]);
681        assert!(a.header().hash.eq(&b.header().hash));
682    }
683
684    #[test]
685    fn test_add_mempool_tx() {
686        TestWrapper::new("test_add_tx").run(|path| {
687            let db = Backend::create_or_open(path, DatabaseOptions::default());
688            let t: LedgerTransaction = Faker.fake();
689
690            assert!(db.update(|txn| { txn.store_mempool_tx(&t, 0) }).is_ok());
691
692            db.view(|vq| {
693                assert!(vq.mempool_tx_exists(t.id()).unwrap());
694
695                let fetched_tx = vq
696                    .mempool_tx(t.id())
697                    .expect("valid contract call")
698                    .unwrap();
699
700                assert_eq!(
701                    fetched_tx.id(),
702                    t.id(),
703                    "fetched transaction should be the same"
704                );
705            });
706
707            // Delete a contract call
708            db.update(|txn| {
709                let deleted =
710                    txn.delete_mempool_tx(t.id(), false).expect("valid tx");
711                assert!(deleted.len() == 1);
712                Ok(())
713            })
714            .unwrap();
715        });
716    }
717
718    #[test]
719    fn test_mempool_txs_sorted_by_fee() {
720        TestWrapper::new("test_mempool_txs_sorted_by_fee").run(|path| {
721            let db = Backend::create_or_open(path, DatabaseOptions::default());
722            // Populate mempool with N contract calls
723            let _rng = rand::thread_rng();
724            db.update(|txn| {
725                for _i in 0..10u32 {
726                    let t: LedgerTransaction = Faker.fake();
727                    txn.store_mempool_tx(&t, 0)?;
728                }
729                Ok(())
730            })
731            .unwrap();
732
733            db.view(|txn| {
734                let txs = txn.mempool_txs_sorted_by_fee();
735
736                let mut last_fee = u64::MAX;
737                for t in txs {
738                    let fee = t.gas_price();
739                    assert!(
740                        fee <= last_fee,
741                        "tx fees are not in decreasing order"
742                    );
743                    last_fee = fee
744                }
745                assert_ne!(last_fee, u64::MAX, "No tx has been processed")
746            });
747        });
748    }
749
750    #[test]
751    fn test_mempool_txs_ids_sorted_by_low_fee() {
752        TestWrapper::new("test_mempool_txs_ids_sorted_by_low_fee").run(
753            |path| {
754                let db =
755                    Backend::create_or_open(path, DatabaseOptions::default());
756
757                db.update(|txn| {
758                    for gas_price in [3, 1, 2] {
759                        let t = ledger::faker::gen_dummy_tx(gas_price);
760                        txn.store_mempool_tx(&t, 0)?;
761                    }
762                    Ok(())
763                })
764                .unwrap();
765
766                db.view(|txn| {
767                    let fees = txn
768                        .mempool_txs_ids_sorted_by_low_fee()
769                        .map(|(fee, _)| fee)
770                        .collect::<Vec<_>>();
771
772                    assert_eq!(fees, vec![1, 2, 3]);
773                });
774            },
775        );
776    }
777
778    #[test]
779    fn test_txs_count() {
780        TestWrapper::new("test_txs_count").run(|path| {
781            let db = Backend::create_or_open(path, DatabaseOptions::default());
782
783            const N: usize = 100;
784            const D: usize = 50;
785
786            let txs: Vec<_> = (0..N)
787                .map(|i| ledger::faker::gen_dummy_tx(i as u64))
788                .collect();
789
790            db.update(|db| {
791                assert_eq!(db.mempool_txs_count(), 0);
792                txs.iter().for_each(|t| {
793                    db.store_mempool_tx(t, 0).expect("tx should be added")
794                });
795                Ok(())
796            })
797            .unwrap();
798
799            db.update(|db| {
800                // Ensure txs count is equal to the number of added tx
801                assert_eq!(db.mempool_txs_count(), N);
802
803                txs.iter().take(D).for_each(|tx| {
804                    let deleted = db
805                        .delete_mempool_tx(tx.id(), false)
806                        .expect("transaction should be deleted");
807                    assert!(deleted.len() == 1);
808                });
809
810                Ok(())
811            })
812            .unwrap();
813
814            // Ensure txs count is updated after the deletion
815            db.update(|db| {
816                assert_eq!(db.mempool_txs_count(), N - D);
817                Ok(())
818            })
819            .unwrap();
820        });
821    }
822
823    #[test]
824    fn test_max_gas_limit() {
825        TestWrapper::new("test_block_size_limit").run(|path| {
826            let db = Backend::create_or_open(path, DatabaseOptions::default());
827
828            db.update(|txn| {
829                for i in 0..10u32 {
830                    let t = ledger::faker::gen_dummy_tx(i as u64);
831                    txn.store_mempool_tx(&t, 0)?;
832                }
833                Ok(())
834            })
835            .unwrap();
836
837            let total_gas_price: u64 = 9 + 8 + 7 + 6 + 5 + 4 + 3 + 2 + 1;
838            db.view(|txn| {
839                let txs = txn
840                    .mempool_txs_sorted_by_fee()
841                    .map(|t| t.gas_price())
842                    .sum::<u64>();
843
844                assert_eq!(txs, total_gas_price);
845            });
846        });
847    }
848
849    #[test]
850    fn test_get_expired_txs() {
851        TestWrapper::new("test_get_expired_txs").run(|path| {
852            let db = Backend::create_or_open(path, DatabaseOptions::default());
853
854            let mut expiry_list = HashSet::new();
855            let _ = db.update(|txn| {
856                (1..101).for_each(|i| {
857                    let t = ledger::faker::gen_dummy_tx(i);
858                    txn.store_mempool_tx(&t, i).expect("tx should be added");
859                    expiry_list.insert(t.id());
860                });
861
862                (1000..1100).for_each(|i| {
863                    let t = ledger::faker::gen_dummy_tx(i);
864                    txn.store_mempool_tx(&t, i).expect("tx should be added");
865                });
866
867                Ok(())
868            });
869
870            db.view(|vq| {
871                let expired: HashSet<_> =
872                    vq.mempool_expired_txs(100).unwrap().into_iter().collect();
873
874                assert_eq!(expiry_list, expired);
875            });
876        });
877    }
878
879    fn to_spent_txs(
880        block_height: u64,
881        txs: &[LedgerTransaction],
882    ) -> Vec<SpentTransaction> {
883        let format = hard_fork::ledger_tx_format_at(block_height);
884        txs.iter()
885            .map(|t| SpentTransaction {
886                inner: LedgerTransaction::from_protocol_with_format(
887                    t.protocol().clone(),
888                    format,
889                ),
890                block_height,
891                gas_spent: 0,
892                err: None,
893            })
894            .collect()
895    }
896
897    #[test]
898    fn test_get_ledger_tx_by_hash() {
899        TestWrapper::new("test_get_ledger_tx_by_hash").run(|path| {
900            let db = Backend::create_or_open(path, DatabaseOptions::default());
901            let b: Block = Faker.fake();
902            assert!(!b.txs().is_empty());
903            let spent_txs = to_spent_txs(b.header().height, b.txs());
904
905            // Store a block
906            assert!(
907                db.update(|txn| {
908                    txn.store_block(
909                        b.header(),
910                        &spent_txs,
911                        b.faults(),
912                        Label::Final(3),
913                    )?;
914                    Ok(())
915                })
916                .is_ok()
917            );
918
919            // Assert all transactions of the accepted (stored) block are
920            // accessible by hash.
921            db.view(|v| {
922                for expected in &spent_txs {
923                    let fetched = v
924                        .ledger_tx(&expected.inner.id())
925                        .expect("should not return error")
926                        .expect("should find a transaction");
927                    assert_eq!(fetched.inner, expected.inner);
928                    assert_eq!(fetched.block_height, expected.block_height);
929                }
930            });
931        });
932    }
933
934    #[test]
935    fn test_fetch_block_hash_by_height() {
936        TestWrapper::new("test_fetch_block_hash_by_height").run(|path| {
937            let db = Backend::create_or_open(path, DatabaseOptions::default());
938            let b: Block = Faker.fake();
939            let spent_txs = to_spent_txs(b.header().height, b.txs());
940
941            // Store a block
942            assert!(
943                db.update(|txn| {
944                    txn.store_block(
945                        b.header(),
946                        &spent_txs,
947                        b.faults(),
948                        Label::Attested(3),
949                    )?;
950                    Ok(())
951                })
952                .is_ok()
953            );
954
955            // Assert block hash is accessible by height.
956            db.view(|v| {
957                assert!(
958                    v.block_hash_by_height(b.header().height)
959                        .expect("should not return error")
960                        .expect("should find a block")
961                        .eq(&b.header().hash)
962                );
963            });
964        });
965    }
966
967    #[test]
968    fn test_fetch_block_label_by_height() {
969        TestWrapper::new("test_fetch_block_hash_by_height").run(|path| {
970            let db = Backend::create_or_open(path, DatabaseOptions::default());
971            let b: Block = Faker.fake();
972            let spent_txs = to_spent_txs(b.header().height, b.txs());
973
974            // Store a block
975            assert!(
976                db.update(|txn| {
977                    txn.store_block(
978                        b.header(),
979                        &spent_txs,
980                        b.faults(),
981                        Label::Attested(3),
982                    )?;
983                    Ok(())
984                })
985                .is_ok()
986            );
987
988            // Assert block hash is accessible by height.
989            db.view(|v| {
990                assert!(
991                    v.block_label_by_height(b.header().height)
992                        .expect("should not return error")
993                        .expect("should find a block")
994                        .1
995                        .eq(&Label::Attested(3))
996                );
997            });
998        });
999    }
1000
1001    #[test]
1002    /// Ensures delete_block fn removes all keys of a single block
1003    fn test_delete_block() {
1004        let t = TestWrapper::new("test_fetch_block_hash_by_height");
1005        t.run(|path| {
1006            let db = Backend::create_or_open(path, DatabaseOptions::default());
1007            let b: ledger::Block = Faker.fake();
1008            let spent_txs = to_spent_txs(b.header().height, b.txs());
1009
1010            assert!(
1011                db.update(|ut| {
1012                    ut.store_block(
1013                        b.header(),
1014                        &spent_txs,
1015                        b.faults(),
1016                        Label::Final(3),
1017                    )?;
1018                    Ok(())
1019                })
1020                .is_ok()
1021            );
1022
1023            assert!(
1024                db.update(|ut| {
1025                    ut.delete_block(&b)?;
1026                    Ok(())
1027                })
1028                .is_ok()
1029            );
1030        });
1031
1032        let path = t.get_path();
1033        let opts = Options::default();
1034
1035        let vec = rocksdb::DB::list_cf(&opts, &path).unwrap();
1036        assert!(!vec.is_empty());
1037
1038        // Ensure no block fields leak after its deletion
1039        let db = rocksdb::DB::open_cf(&opts, &path, vec.clone()).unwrap();
1040        vec.into_iter()
1041            .map(|cf_name| {
1042                if cf_name == CF_METADATA {
1043                    return;
1044                }
1045
1046                let cf = db.cf_handle(&cf_name).unwrap();
1047                assert_eq!(
1048                    db.iterator_cf(cf, IteratorMode::Start)
1049                        .map(Result::unwrap)
1050                        .count(),
1051                    0
1052                );
1053            })
1054            .for_each(drop);
1055    }
1056
1057    struct TestWrapper(tempfile::TempDir);
1058
1059    impl TestWrapper {
1060        fn new(path: &'static str) -> Self {
1061            Self(
1062                tempfile::TempDir::with_prefix(path)
1063                    .expect("Temp directory to be created"),
1064            )
1065        }
1066
1067        pub fn run<F>(&self, test_func: F)
1068        where
1069            F: FnOnce(&Path),
1070        {
1071            test_func(self.0.path());
1072        }
1073
1074        pub fn get_path(&self) -> std::path::PathBuf {
1075            self.0.path().to_owned().join(DB_FOLDER_NAME)
1076        }
1077    }
1078}