1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
// Copyright (c) Aptos
// SPDX-License-Identifier: Apache-2.0

use crate::{
    transaction_executor::TransactionExecutor, transaction_generator::TransactionGenerator,
    TransactionCommitter,
};
use aptos_config::{
    config::{RocksdbConfig, StoragePrunerConfig},
    utils::get_genesis_txn,
};
use aptos_jellyfish_merkle::metrics::{
    APTOS_JELLYFISH_INTERNAL_ENCODED_BYTES, APTOS_JELLYFISH_LEAF_ENCODED_BYTES,
    APTOS_JELLYFISH_STORAGE_READS,
};
use aptos_vm::AptosVM;
use aptosdb::{metrics::ROCKSDB_PROPERTIES, schema::JELLYFISH_MERKLE_NODE_CF_NAME, AptosDB};
use executor::{
    block_executor::BlockExecutor,
    db_bootstrapper::{generate_waypoint, maybe_bootstrap},
};
use executor_types::BlockExecutorTrait;
use std::{
    fs,
    path::Path,
    sync::{mpsc, Arc},
};
use storage_interface::DbReaderWriter;

pub fn run(
    num_accounts: usize,
    init_account_balance: u64,
    block_size: usize,
    db_dir: impl AsRef<Path>,
    storage_pruner_config: StoragePrunerConfig,
) {
    println!("Initializing...");

    if db_dir.as_ref().exists() {
        panic!("data-dir exists already.");
    }
    // create if not exists
    fs::create_dir_all(db_dir.as_ref()).unwrap();

    let (config, genesis_key) = aptos_genesis_tool::test_config();
    // Create executor.
    let (db, db_rw) = DbReaderWriter::wrap(
        AptosDB::open(
            &db_dir,
            false,                 /* readonly */
            storage_pruner_config, /* pruner */
            RocksdbConfig::default(),
        )
        .expect("DB should open."),
    );

    // Bootstrap db with genesis
    let waypoint = generate_waypoint::<AptosVM>(&db_rw, get_genesis_txn(&config).unwrap()).unwrap();
    maybe_bootstrap::<AptosVM>(&db_rw, get_genesis_txn(&config).unwrap(), waypoint).unwrap();

    let executor = Arc::new(BlockExecutor::new(db_rw));
    let executor_2 = executor.clone();
    let genesis_block_id = executor.committed_block_id();
    let (block_sender, block_receiver) = mpsc::sync_channel(3 /* bound */);
    let (commit_sender, commit_receiver) = mpsc::sync_channel(3 /* bound */);

    // Set a progressing bar
    // Spawn threads to run transaction generator, executor and committer separately.
    let gen_thread = std::thread::Builder::new()
        .name("txn_generator".to_string())
        .spawn(move || {
            let seed_accounts = (num_accounts / 1000).max(1);
            let mut generator = TransactionGenerator::new_with_sender(
                genesis_key,
                num_accounts,
                seed_accounts,
                block_sender,
            );
            generator.run_mint(init_account_balance, block_size);
            generator
        })
        .expect("Failed to spawn transaction generator thread.");
    let exe_thread = std::thread::Builder::new()
        .name("txn_executor".to_string())
        .spawn(move || {
            let mut exe = TransactionExecutor::new(
                executor,
                genesis_block_id,
                0, /* start_verison */
                Some(commit_sender),
            );
            while let Ok(transactions) = block_receiver.recv() {
                exe.execute_block(transactions);
            }
        })
        .expect("Failed to spawn transaction executor thread.");
    let commit_thread = std::thread::Builder::new()
        .name("txn_committer".to_string())
        .spawn(move || {
            let mut committer = TransactionCommitter::new(executor_2, 0, commit_receiver);
            committer.run();
        })
        .expect("Failed to spawn transaction committer thread.");

    // Wait for generator to finish.
    let mut generator = gen_thread.join().unwrap();
    generator.drop_sender();
    // Wait until all transactions are committed.
    exe_thread.join().unwrap();
    commit_thread.join().unwrap();
    // Do a sanity check on the sequence number to make sure all transactions are committed.
    generator.verify_sequence_number(db.clone());

    let final_version = generator.version();
    // Write metadata
    generator.write_meta(&db_dir);

    db.update_rocksdb_properties().unwrap();
    let db_size = ROCKSDB_PROPERTIES
        .with_label_values(&[
            JELLYFISH_MERKLE_NODE_CF_NAME,
            "aptos_rocksdb_live_sst_files_size_bytes",
        ])
        .get();
    let data_size = ROCKSDB_PROPERTIES
        .with_label_values(&[
            JELLYFISH_MERKLE_NODE_CF_NAME,
            "aptos_rocksdb_total-sst-files-size",
        ])
        .get();
    let reads = APTOS_JELLYFISH_STORAGE_READS.get();
    let leaf_bytes = APTOS_JELLYFISH_LEAF_ENCODED_BYTES.get();
    let internal_bytes = APTOS_JELLYFISH_INTERNAL_ENCODED_BYTES.get();
    println!("=============FINISHED DB CREATION =============");
    println!(
        "created a AptosDB til version {} with {} accounts.",
        final_version, num_accounts,
    );
    println!("DB dir: {}", db_dir.as_ref().display());
    println!("Jellyfish Merkle physical size: {}", db_size);
    println!("Jellyfish Merkle logical size: {}", data_size);
    println!("Total reads from storage: {}", reads);
    println!(
        "Total written internal nodes value size: {} bytes",
        internal_bytes
    );
    println!("Total written leaf nodes value size: {} bytes", leaf_bytes);
}