radix_clis/replay/
cmd_sync.rs

1use super::ledger_transaction_execution::execute_ledger_transaction;
2use super::Error;
3use clap::Parser;
4use flume;
5use flume::Sender;
6use radix_common::prelude::*;
7use radix_engine::vm::VmModules;
8use radix_substate_store_impls::rocks_db_with_merkle_tree::RocksDBWithMerkleTreeSubstateStore;
9use radix_substate_store_interface::interface::*;
10use radix_transactions::prelude::*;
11use rocksdb::{Direction, IteratorMode, Options, DB};
12use std::path::PathBuf;
13use std::thread;
14use std::time::Duration;
15
16/// Run transactions
17#[derive(Parser, Debug)]
18pub struct TxnSync {
19    /// Path to the source Node state manager database
20    pub source: PathBuf,
21    /// Path to a folder for storing state
22    pub database_dir: PathBuf,
23
24    /// The network to use, [mainnet | stokenet]
25    #[clap(short, long)]
26    pub network: Option<String>,
27    /// The max version to execute
28    #[clap(short, long)]
29    pub max_version: Option<u64>,
30
31    /// Trace transaction execution
32    #[clap(long)]
33    pub trace: bool,
34}
35
36impl TxnSync {
37    pub fn sync(&self) -> Result<(), String> {
38        let network = match &self.network {
39            Some(n) => NetworkDefinition::from_str(n).map_err(Error::ParseNetworkError)?,
40            None => NetworkDefinition::mainnet(),
41        };
42
43        let cur_version = {
44            let database = RocksDBWithMerkleTreeSubstateStore::standard(self.database_dir.clone());
45            let cur_version = database.get_current_version();
46            if cur_version >= self.max_version.unwrap_or(u64::MAX) {
47                return Ok(());
48            }
49            cur_version
50        };
51        let to_version = self.max_version;
52
53        let start = std::time::Instant::now();
54        let (tx, rx) = flume::bounded(10);
55
56        // txn reader
57        let mut txn_reader = CommittedTxnReader::StateManagerDatabaseDir(self.source.clone());
58        let txn_read_thread_handle =
59            thread::spawn(move || txn_reader.read(cur_version, to_version, tx));
60
61        // txn executor
62        let mut database = RocksDBWithMerkleTreeSubstateStore::standard(self.database_dir.clone());
63        let trace = self.trace;
64        let txn_write_thread_handle = thread::spawn(move || {
65            let vm_modules = VmModules::default();
66            let iter = rx.iter();
67            for (tx_payload, expected_state_root_hash) in iter {
68                let (_hash, receipt) = execute_ledger_transaction(
69                    &database,
70                    &vm_modules,
71                    &network,
72                    &tx_payload,
73                    trace,
74                );
75                let state_updates = receipt.into_state_updates();
76                let database_updates = state_updates.create_database_updates();
77
78                let current_version = database.get_current_version();
79                let new_version = current_version + 1;
80                // TODO: avoid redundant computation?
81                let (_, new_state_root_hash) =
82                    radix_substate_store_impls::rocks_db_with_merkle_tree::compute_state_tree_update(
83                        &database,
84                        current_version,
85                        &database_updates,
86                    );
87                if new_state_root_hash != expected_state_root_hash {
88                    panic!(
89                        "State hash mismatch at version {}. Expected {} Actual {}",
90                        new_version, expected_state_root_hash, new_state_root_hash
91                    );
92                }
93                database.commit(&database_updates);
94
95                // print progress
96                if new_version < 1000 || new_version.is_multiple_of(1000) {
97                    print_progress(start.elapsed(), new_version, new_state_root_hash);
98                }
99            }
100
101            let duration = start.elapsed();
102            println!("Time elapsed: {:?}", duration);
103            println!("State version: {}", database.get_current_version());
104            println!("State root hash: {}", database.get_current_root_hash());
105        });
106
107        txn_read_thread_handle.join().unwrap()?;
108        txn_write_thread_handle.join().unwrap();
109
110        Ok(())
111    }
112}
113
114fn print_progress(duration: Duration, new_version: u64, new_root: Hash) {
115    let seconds = duration.as_secs() % 60;
116    let minutes = (duration.as_secs() / 60) % 60;
117    let hours = (duration.as_secs() / 60) / 60;
118    println!(
119        "New version: {}, {}, {:0>2}:{:0>2}:{:0>2}",
120        new_version, new_root, hours, minutes, seconds
121    );
122}
123
124enum CommittedTxnReader {
125    StateManagerDatabaseDir(PathBuf),
126}
127
128impl CommittedTxnReader {
129    fn read(
130        &mut self,
131        from_version: u64,
132        to_version: Option<u64>,
133        tx: Sender<(RawLedgerTransaction, Hash)>,
134    ) -> Result<(), Error> {
135        match self {
136            CommittedTxnReader::StateManagerDatabaseDir(db_dir) => {
137                let temp_dir = tempfile::tempdir().map_err(Error::IOError)?;
138
139                let db = DB::open_cf_as_secondary(
140                    &Options::default(),
141                    db_dir.as_path(),
142                    temp_dir.as_ref(),
143                    vec![
144                        "raw_ledger_transactions",
145                        "committed_transaction_identifiers",
146                    ],
147                )
148                .unwrap();
149
150                let mut iter_start_state_version = from_version + 1;
151
152                loop {
153                    db.try_catch_up_with_primary()
154                        .expect("DB catch up with primary failed");
155                    let txn_iter = db.iterator_cf(
156                        &db.cf_handle("raw_ledger_transactions").unwrap(),
157                        IteratorMode::From(
158                            &iter_start_state_version.to_be_bytes(),
159                            Direction::Forward,
160                        ),
161                    );
162                    let mut identifiers_iter = db.iterator_cf(
163                        &db.cf_handle("committed_transaction_identifiers").unwrap(),
164                        IteratorMode::From(
165                            &iter_start_state_version.to_be_bytes(),
166                            Direction::Forward,
167                        ),
168                    );
169                    for next_txn in txn_iter {
170                        let next_txn = next_txn.unwrap();
171                        let next_state_version =
172                            u64::from_be_bytes(next_txn.0.as_ref().try_into().unwrap());
173
174                        let next_identifiers_bytes = identifiers_iter
175                            .next()
176                            .expect("Missing txn identifiers")
177                            .unwrap();
178
179                        let next_identifiers: VersionedCommittedTransactionIdentifiers =
180                            scrypto_decode(next_identifiers_bytes.1.as_ref()).unwrap();
181                        let expected_state_root_hash = next_identifiers
182                            .fully_update_and_into_latest_version()
183                            .resultant_ledger_hashes
184                            .state_root
185                            .0;
186
187                        tx.send((
188                            RawLedgerTransaction::from_slice(&next_txn.1),
189                            expected_state_root_hash,
190                        ))
191                        .unwrap();
192                        if let Some(to_version) = to_version {
193                            if to_version == next_state_version {
194                                return Ok(());
195                            }
196                        }
197                        iter_start_state_version = next_state_version + 1;
198                    }
199                    thread::sleep(Duration::from_secs(1));
200                }
201            }
202        }
203    }
204}
205
206define_single_versioned! {
207    #[derive(Debug, Clone, Sbor)]
208    pub VersionedCommittedTransactionIdentifiers(CommittedTransactionIdentifiersVersions) => CommittedTransactionIdentifiers = CommittedTransactionIdentifiersV1
209}
210
211#[derive(Debug, Clone, Sbor)]
212pub struct CommittedTransactionIdentifiersV1 {
213    pub payload: LedgerTransactionHashes,
214    pub resultant_ledger_hashes: LedgerHashes,
215    pub proposer_timestamp_ms: i64,
216}
217
218#[derive(PartialEq, Eq, Hash, Clone, Copy, PartialOrd, Ord, Debug, Sbor)]
219pub struct LedgerHashes {
220    pub state_root: StateHash,
221    pub transaction_root: TransactionTreeHash,
222    pub receipt_root: ReceiptTreeHash,
223}
224
225define_wrapped_hash! {
226    StateHash
227}
228
229define_wrapped_hash! {
230    TransactionTreeHash
231}
232
233define_wrapped_hash! {
234    ReceiptTreeHash
235}