amadeus_node/node/
txpool.rs1use crate::consensus::chain_epoch;
2use crate::consensus::doms::tx::{TxU, pack, validate};
3use crate::utils::Hash;
4use crate::utils::rocksdb::RocksDb;
5use amadeus_runtime::consensus::bic::{coin, sol};
6use std::collections::HashMap;
7use std::sync::Arc;
8use tokio::sync::RwLock;
9
10#[derive(Debug, Clone)]
11pub struct ValidateTxArgs {
12 pub epoch: u32,
13 pub segment_vr_hash: Hash,
14 pub diff_bits: u32,
15 pub batch_state: BatchState,
16}
17
18#[derive(Debug, Clone, Default)]
19pub struct BatchState {
20 chain_nonces: HashMap<Vec<u8>, i128>,
21 balances: HashMap<Vec<u8>, i128>,
22}
23
24#[derive(Debug)]
25pub enum TxPoolError {
26 InvalidNonce { nonce: i128, hash: Hash },
27 InsufficientBalance { nonce: i128, hash: Hash },
28 InvalidSol { nonce: i128, hash: Hash },
29 ValidationError(String),
30}
31
32pub struct TxPool {
33 db: Arc<RocksDb>,
34 pool: Arc<RwLock<HashMap<Vec<u8>, TxU>>>,
35}
36
37impl TxPool {
38 pub fn new(db: Arc<RocksDb>) -> Self {
39 Self { db, pool: Arc::new(RwLock::new(HashMap::new())) }
40 }
41
42 pub async fn insert(&self, tx_packed: &[u8]) -> Result<(), TxPoolError> {
43 match validate(tx_packed, false) {
44 Ok(txu) => {
45 let mut pool = self.pool.write().await;
46 let key = vec![txu.tx.nonce.to_le_bytes().to_vec(), txu.hash.to_vec()].concat();
47 pool.insert(key, txu);
48 Ok(())
49 }
50 Err(e) => Err(TxPoolError::ValidationError(e.to_string())),
51 }
52 }
53
54 pub async fn insert_and_broadcast(&self, tx_packed: &[u8]) -> Result<(), TxPoolError> {
55 self.insert(tx_packed).await?;
56 Ok(())
58 }
59
60 pub async fn purge_stale(&self) {
61 let _cur_epoch = chain_epoch(self.db.as_ref());
62 let mut pool = self.pool.write().await;
63
64 pool.retain(|_key, _txu| {
67 true
69 });
70 }
71
72 pub fn validate_tx(&self, txu: &TxU, args: &mut ValidateTxArgs) -> Result<(), TxPoolError> {
73 let signer_vec = txu.tx.signer.to_vec();
75 let chain_nonce = args.batch_state.chain_nonces.get(&signer_vec).cloned().unwrap_or_else(|| {
76 crate::consensus::fabric::chain_queries::chain_nonce(self.db.as_ref(), txu.tx.signer.as_ref()).unwrap_or(0)
77 as i128
78 });
79
80 if chain_nonce != 0 && txu.tx.nonce <= chain_nonce {
81 return Err(TxPoolError::InvalidNonce { nonce: txu.tx.nonce, hash: txu.hash });
82 }
83 args.batch_state.chain_nonces.insert(signer_vec.clone(), txu.tx.nonce);
84
85 let balance = args.batch_state.balances.get(&signer_vec).cloned().unwrap_or_else(|| {
87 crate::consensus::fabric::chain_queries::chain_balance(self.db.as_ref(), txu.tx.signer.as_ref())
88 });
89
90 let exec_cost = txu.exec_cost(args.epoch) as i128;
91 let fee = coin::to_cents(1) as i128;
92
93 let new_balance = balance.saturating_sub(exec_cost).saturating_sub(fee);
94 if balance < exec_cost.saturating_add(fee) {
95 return Err(TxPoolError::InsufficientBalance { nonce: txu.tx.nonce, hash: txu.hash });
96 }
97 args.batch_state.balances.insert(signer_vec, new_balance);
98
99 let action = &txu.tx.action;
100 if action.function.as_slice() == b"submit_sol" && !action.args.is_empty() {
101 let sol_bytes = &action.args[0];
102 if sol_bytes.len() >= 36 {
103 let sol_epoch = u32::from_le_bytes([sol_bytes[0], sol_bytes[1], sol_bytes[2], sol_bytes[3]]);
104 let sol_svrh = &sol_bytes[4..36];
105
106 if sol_epoch != args.epoch || sol_svrh != &args.segment_vr_hash[..] || sol_bytes.len() != sol::SOL_SIZE
107 {
108 return Err(TxPoolError::InvalidSol { nonce: txu.tx.nonce, hash: txu.hash });
109 }
110 }
111 }
112
113 Ok(())
114 }
115
116 pub fn validate_tx_batch(&self, txs_packed: &[Vec<u8>]) -> Vec<Vec<u8>> {
117 let chain_epoch = chain_epoch(self.db.as_ref());
118 let segment_vr_hash = crate::consensus::fabric::chain_queries::chain_segment_vr_hash(self.db.as_ref())
119 .and_then(|v| v.try_into().ok())
120 .unwrap_or([0u8; 32]);
121 let diff_bits = crate::consensus::fabric::chain_queries::chain_diff_bits(self.db.as_ref());
122
123 let mut args = ValidateTxArgs {
124 epoch: chain_epoch,
125 segment_vr_hash: Hash::from(segment_vr_hash),
126 diff_bits: diff_bits as u32,
127 batch_state: BatchState::default(),
128 };
129
130 let mut good = Vec::new();
131 for tx_packed in txs_packed {
132 match validate(tx_packed, false) {
133 Ok(txu) => {
134 if self.validate_tx(&txu, &mut args).is_ok() {
135 good.push(tx_packed.clone());
136 }
137 }
138 Err(_) => continue,
139 }
140 }
141
142 good
143 }
144
145 pub async fn grab_next_valid(&self, amt: usize) -> Vec<Vec<u8>> {
146 let chain_epoch = chain_epoch(self.db.as_ref());
147 let segment_vr_hash = crate::consensus::fabric::chain_queries::chain_segment_vr_hash(self.db.as_ref())
148 .and_then(|v| v.try_into().ok())
149 .unwrap_or([0u8; 32]);
150 let diff_bits = crate::consensus::fabric::chain_queries::chain_diff_bits(self.db.as_ref());
151
152 let mut args = ValidateTxArgs {
153 epoch: chain_epoch,
154 segment_vr_hash: Hash::from(segment_vr_hash),
155 diff_bits: diff_bits as u32,
156 batch_state: BatchState::default(),
157 };
158
159 let mut result = Vec::new();
160 let mut to_delete = Vec::new();
161
162 let pool = self.pool.read().await;
163 for (key, txu) in pool.iter() {
164 if result.len() >= amt {
165 break;
166 }
167
168 match self.validate_tx(txu, &mut args) {
169 Ok(()) => {
170 result.push(pack(txu));
171 }
172 Err(_) => {
173 to_delete.push(key.clone());
174 }
175 }
176 }
177 drop(pool);
178
179 if !to_delete.is_empty() {
181 let mut pool = self.pool.write().await;
182 for key in to_delete {
183 pool.remove(&key);
184 }
185 }
186
187 result
188 }
189
190 pub async fn size(&self) -> usize {
191 self.pool.read().await.len()
192 }
193
194 pub async fn delete_packed(&self, txs_packed: &[Vec<u8>]) {
197 if txs_packed.is_empty() {
198 return;
199 }
200
201 let mut pool = self.pool.write().await;
202 let mut removed_count = 0;
203
204 for tx_packed in txs_packed {
205 if let Ok(txu) = validate(tx_packed, false) {
207 let key = vec![txu.tx.nonce.to_le_bytes().to_vec(), txu.hash.to_vec()].concat();
209 if pool.remove(&key).is_some() {
210 removed_count += 1;
211 }
212 }
213 }
214
215 if removed_count > 0 {
216 tracing::debug!("removed {} transactions from pool", removed_count);
217 }
218 }
219}