1use super::ledger_transaction_execution::execute_ledger_transaction;
2use super::Error;
3use clap::Parser;
4use flume;
5use flume::Sender;
6use radix_common::prelude::*;
7use radix_engine::vm::VmModules;
8use radix_substate_store_impls::rocks_db_with_merkle_tree::RocksDBWithMerkleTreeSubstateStore;
9use radix_substate_store_interface::interface::*;
10use radix_transactions::prelude::*;
11use rocksdb::{Direction, IteratorMode, Options, DB};
12use std::path::PathBuf;
13use std::thread;
14use std::time::Duration;
15
16#[derive(Parser, Debug)]
18pub struct TxnSync {
19 pub source: PathBuf,
21 pub database_dir: PathBuf,
23
24 #[clap(short, long)]
26 pub network: Option<String>,
27 #[clap(short, long)]
29 pub max_version: Option<u64>,
30
31 #[clap(long)]
33 pub trace: bool,
34}
35
36impl TxnSync {
37 pub fn sync(&self) -> Result<(), String> {
38 let network = match &self.network {
39 Some(n) => NetworkDefinition::from_str(n).map_err(Error::ParseNetworkError)?,
40 None => NetworkDefinition::mainnet(),
41 };
42
43 let cur_version = {
44 let database = RocksDBWithMerkleTreeSubstateStore::standard(self.database_dir.clone());
45 let cur_version = database.get_current_version();
46 if cur_version >= self.max_version.unwrap_or(u64::MAX) {
47 return Ok(());
48 }
49 cur_version
50 };
51 let to_version = self.max_version;
52
53 let start = std::time::Instant::now();
54 let (tx, rx) = flume::bounded(10);
55
56 let mut txn_reader = CommittedTxnReader::StateManagerDatabaseDir(self.source.clone());
58 let txn_read_thread_handle =
59 thread::spawn(move || txn_reader.read(cur_version, to_version, tx));
60
61 let mut database = RocksDBWithMerkleTreeSubstateStore::standard(self.database_dir.clone());
63 let trace = self.trace;
64 let txn_write_thread_handle = thread::spawn(move || {
65 let vm_modules = VmModules::default();
66 let iter = rx.iter();
67 for (tx_payload, expected_state_root_hash) in iter {
68 let (_hash, receipt) = execute_ledger_transaction(
69 &database,
70 &vm_modules,
71 &network,
72 &tx_payload,
73 trace,
74 );
75 let state_updates = receipt.into_state_updates();
76 let database_updates = state_updates.create_database_updates();
77
78 let current_version = database.get_current_version();
79 let new_version = current_version + 1;
80 let (_, new_state_root_hash) =
82 radix_substate_store_impls::rocks_db_with_merkle_tree::compute_state_tree_update(
83 &database,
84 current_version,
85 &database_updates,
86 );
87 if new_state_root_hash != expected_state_root_hash {
88 panic!(
89 "State hash mismatch at version {}. Expected {} Actual {}",
90 new_version, expected_state_root_hash, new_state_root_hash
91 );
92 }
93 database.commit(&database_updates);
94
95 if new_version < 1000 || new_version.is_multiple_of(1000) {
97 print_progress(start.elapsed(), new_version, new_state_root_hash);
98 }
99 }
100
101 let duration = start.elapsed();
102 println!("Time elapsed: {:?}", duration);
103 println!("State version: {}", database.get_current_version());
104 println!("State root hash: {}", database.get_current_root_hash());
105 });
106
107 txn_read_thread_handle.join().unwrap()?;
108 txn_write_thread_handle.join().unwrap();
109
110 Ok(())
111 }
112}
113
114fn print_progress(duration: Duration, new_version: u64, new_root: Hash) {
115 let seconds = duration.as_secs() % 60;
116 let minutes = (duration.as_secs() / 60) % 60;
117 let hours = (duration.as_secs() / 60) / 60;
118 println!(
119 "New version: {}, {}, {:0>2}:{:0>2}:{:0>2}",
120 new_version, new_root, hours, minutes, seconds
121 );
122}
123
124enum CommittedTxnReader {
125 StateManagerDatabaseDir(PathBuf),
126}
127
128impl CommittedTxnReader {
129 fn read(
130 &mut self,
131 from_version: u64,
132 to_version: Option<u64>,
133 tx: Sender<(RawLedgerTransaction, Hash)>,
134 ) -> Result<(), Error> {
135 match self {
136 CommittedTxnReader::StateManagerDatabaseDir(db_dir) => {
137 let temp_dir = tempfile::tempdir().map_err(Error::IOError)?;
138
139 let db = DB::open_cf_as_secondary(
140 &Options::default(),
141 db_dir.as_path(),
142 temp_dir.as_ref(),
143 vec![
144 "raw_ledger_transactions",
145 "committed_transaction_identifiers",
146 ],
147 )
148 .unwrap();
149
150 let mut iter_start_state_version = from_version + 1;
151
152 loop {
153 db.try_catch_up_with_primary()
154 .expect("DB catch up with primary failed");
155 let txn_iter = db.iterator_cf(
156 &db.cf_handle("raw_ledger_transactions").unwrap(),
157 IteratorMode::From(
158 &iter_start_state_version.to_be_bytes(),
159 Direction::Forward,
160 ),
161 );
162 let mut identifiers_iter = db.iterator_cf(
163 &db.cf_handle("committed_transaction_identifiers").unwrap(),
164 IteratorMode::From(
165 &iter_start_state_version.to_be_bytes(),
166 Direction::Forward,
167 ),
168 );
169 for next_txn in txn_iter {
170 let next_txn = next_txn.unwrap();
171 let next_state_version =
172 u64::from_be_bytes(next_txn.0.as_ref().try_into().unwrap());
173
174 let next_identifiers_bytes = identifiers_iter
175 .next()
176 .expect("Missing txn identifiers")
177 .unwrap();
178
179 let next_identifiers: VersionedCommittedTransactionIdentifiers =
180 scrypto_decode(next_identifiers_bytes.1.as_ref()).unwrap();
181 let expected_state_root_hash = next_identifiers
182 .fully_update_and_into_latest_version()
183 .resultant_ledger_hashes
184 .state_root
185 .0;
186
187 tx.send((
188 RawLedgerTransaction::from_slice(&next_txn.1),
189 expected_state_root_hash,
190 ))
191 .unwrap();
192 if let Some(to_version) = to_version {
193 if to_version == next_state_version {
194 return Ok(());
195 }
196 }
197 iter_start_state_version = next_state_version + 1;
198 }
199 thread::sleep(Duration::from_secs(1));
200 }
201 }
202 }
203 }
204}
205
206define_single_versioned! {
207 #[derive(Debug, Clone, Sbor)]
208 pub VersionedCommittedTransactionIdentifiers(CommittedTransactionIdentifiersVersions) => CommittedTransactionIdentifiers = CommittedTransactionIdentifiersV1
209}
210
211#[derive(Debug, Clone, Sbor)]
212pub struct CommittedTransactionIdentifiersV1 {
213 pub payload: LedgerTransactionHashes,
214 pub resultant_ledger_hashes: LedgerHashes,
215 pub proposer_timestamp_ms: i64,
216}
217
218#[derive(PartialEq, Eq, Hash, Clone, Copy, PartialOrd, Ord, Debug, Sbor)]
219pub struct LedgerHashes {
220 pub state_root: StateHash,
221 pub transaction_root: TransactionTreeHash,
222 pub receipt_root: ReceiptTreeHash,
223}
224
225define_wrapped_hash! {
226 StateHash
227}
228
229define_wrapped_hash! {
230 TransactionTreeHash
231}
232
233define_wrapped_hash! {
234 ReceiptTreeHash
235}