Skip to main content

amadeus_node/node/
txpool.rs

1use crate::consensus::chain_epoch;
2use crate::consensus::doms::tx::{TxU, pack, validate};
3use crate::utils::Hash;
4use crate::utils::rocksdb::RocksDb;
5use amadeus_runtime::consensus::bic::{coin, sol};
6use std::collections::HashMap;
7use std::sync::Arc;
8use tokio::sync::RwLock;
9
10#[derive(Debug, Clone)]
11pub struct ValidateTxArgs {
12    pub epoch: u32,
13    pub segment_vr_hash: Hash,
14    pub diff_bits: u32,
15    pub batch_state: BatchState,
16}
17
18#[derive(Debug, Clone, Default)]
19pub struct BatchState {
20    chain_nonces: HashMap<Vec<u8>, i128>,
21    balances: HashMap<Vec<u8>, i128>,
22}
23
24#[derive(Debug)]
25pub enum TxPoolError {
26    InvalidNonce { nonce: i128, hash: Hash },
27    InsufficientBalance { nonce: i128, hash: Hash },
28    InvalidSol { nonce: i128, hash: Hash },
29    ValidationError(String),
30}
31
32pub struct TxPool {
33    db: Arc<RocksDb>,
34    pool: Arc<RwLock<HashMap<Vec<u8>, TxU>>>,
35}
36
37impl TxPool {
38    pub fn new(db: Arc<RocksDb>) -> Self {
39        Self { db, pool: Arc::new(RwLock::new(HashMap::new())) }
40    }
41
42    pub async fn insert(&self, tx_packed: &[u8]) -> Result<(), TxPoolError> {
43        match validate(tx_packed, false) {
44            Ok(txu) => {
45                let mut pool = self.pool.write().await;
46                let key = vec![txu.tx.nonce.to_le_bytes().to_vec(), txu.hash.to_vec()].concat();
47                pool.insert(key, txu);
48                Ok(())
49            }
50            Err(e) => Err(TxPoolError::ValidationError(e.to_string())),
51        }
52    }
53
54    pub async fn insert_and_broadcast(&self, tx_packed: &[u8]) -> Result<(), TxPoolError> {
55        self.insert(tx_packed).await?;
56        // TODO: Implement broadcast via NodeGen
57        Ok(())
58    }
59
60    pub async fn purge_stale(&self) {
61        let _cur_epoch = chain_epoch(self.db.as_ref());
62        let mut pool = self.pool.write().await;
63
64        // Remove transactions older than 1 epoch
65        // TODO: TX doesn't have epoch field, need to implement proper epoch tracking
66        pool.retain(|_key, _txu| {
67            // For now, keep all transactions until we implement proper epoch tracking
68            true
69        });
70    }
71
72    pub fn validate_tx(&self, txu: &TxU, args: &mut ValidateTxArgs) -> Result<(), TxPoolError> {
73        // Check nonce validity
74        let signer_vec = txu.tx.signer.to_vec();
75        let chain_nonce = args.batch_state.chain_nonces.get(&signer_vec).cloned().unwrap_or_else(|| {
76            crate::consensus::fabric::chain_queries::chain_nonce(self.db.as_ref(), txu.tx.signer.as_ref()).unwrap_or(0)
77                as i128
78        });
79
80        if chain_nonce != 0 && txu.tx.nonce <= chain_nonce {
81            return Err(TxPoolError::InvalidNonce { nonce: txu.tx.nonce, hash: txu.hash });
82        }
83        args.batch_state.chain_nonces.insert(signer_vec.clone(), txu.tx.nonce);
84
85        // Check balance
86        let balance = args.batch_state.balances.get(&signer_vec).cloned().unwrap_or_else(|| {
87            crate::consensus::fabric::chain_queries::chain_balance(self.db.as_ref(), txu.tx.signer.as_ref())
88        });
89
90        let exec_cost = txu.exec_cost(args.epoch) as i128;
91        let fee = coin::to_cents(1) as i128;
92
93        let new_balance = balance.saturating_sub(exec_cost).saturating_sub(fee);
94        if balance < exec_cost.saturating_add(fee) {
95            return Err(TxPoolError::InsufficientBalance { nonce: txu.tx.nonce, hash: txu.hash });
96        }
97        args.batch_state.balances.insert(signer_vec, new_balance);
98
99        let action = &txu.tx.action;
100        if action.function.as_slice() == b"submit_sol" && !action.args.is_empty() {
101            let sol_bytes = &action.args[0];
102            if sol_bytes.len() >= 36 {
103                let sol_epoch = u32::from_le_bytes([sol_bytes[0], sol_bytes[1], sol_bytes[2], sol_bytes[3]]);
104                let sol_svrh = &sol_bytes[4..36];
105
106                if sol_epoch != args.epoch || sol_svrh != &args.segment_vr_hash[..] || sol_bytes.len() != sol::SOL_SIZE
107                {
108                    return Err(TxPoolError::InvalidSol { nonce: txu.tx.nonce, hash: txu.hash });
109                }
110            }
111        }
112
113        Ok(())
114    }
115
116    pub fn validate_tx_batch(&self, txs_packed: &[Vec<u8>]) -> Vec<Vec<u8>> {
117        let chain_epoch = chain_epoch(self.db.as_ref());
118        let segment_vr_hash = crate::consensus::fabric::chain_queries::chain_segment_vr_hash(self.db.as_ref())
119            .and_then(|v| v.try_into().ok())
120            .unwrap_or([0u8; 32]);
121        let diff_bits = crate::consensus::fabric::chain_queries::chain_diff_bits(self.db.as_ref());
122
123        let mut args = ValidateTxArgs {
124            epoch: chain_epoch,
125            segment_vr_hash: Hash::from(segment_vr_hash),
126            diff_bits: diff_bits as u32,
127            batch_state: BatchState::default(),
128        };
129
130        let mut good = Vec::new();
131        for tx_packed in txs_packed {
132            match validate(tx_packed, false) {
133                Ok(txu) => {
134                    if self.validate_tx(&txu, &mut args).is_ok() {
135                        good.push(tx_packed.clone());
136                    }
137                }
138                Err(_) => continue,
139            }
140        }
141
142        good
143    }
144
145    pub async fn grab_next_valid(&self, amt: usize) -> Vec<Vec<u8>> {
146        let chain_epoch = chain_epoch(self.db.as_ref());
147        let segment_vr_hash = crate::consensus::fabric::chain_queries::chain_segment_vr_hash(self.db.as_ref())
148            .and_then(|v| v.try_into().ok())
149            .unwrap_or([0u8; 32]);
150        let diff_bits = crate::consensus::fabric::chain_queries::chain_diff_bits(self.db.as_ref());
151
152        let mut args = ValidateTxArgs {
153            epoch: chain_epoch,
154            segment_vr_hash: Hash::from(segment_vr_hash),
155            diff_bits: diff_bits as u32,
156            batch_state: BatchState::default(),
157        };
158
159        let mut result = Vec::new();
160        let mut to_delete = Vec::new();
161
162        let pool = self.pool.read().await;
163        for (key, txu) in pool.iter() {
164            if result.len() >= amt {
165                break;
166            }
167
168            match self.validate_tx(txu, &mut args) {
169                Ok(()) => {
170                    result.push(pack(txu));
171                }
172                Err(_) => {
173                    to_delete.push(key.clone());
174                }
175            }
176        }
177        drop(pool);
178
179        // Delete stale transactions
180        if !to_delete.is_empty() {
181            let mut pool = self.pool.write().await;
182            for key in to_delete {
183                pool.remove(&key);
184            }
185        }
186
187        result
188    }
189
190    pub async fn size(&self) -> usize {
191        self.pool.read().await.len()
192    }
193
194    /// Delete transactions from pool by their packed representation
195    /// Matches Elixir TXPool.delete_packed - removes transactions that were included in an entry
196    pub async fn delete_packed(&self, txs_packed: &[Vec<u8>]) {
197        if txs_packed.is_empty() {
198            return;
199        }
200
201        let mut pool = self.pool.write().await;
202        let mut removed_count = 0;
203
204        for tx_packed in txs_packed {
205            // try to unpack and validate to get the TxU structure
206            if let Ok(txu) = validate(tx_packed, false) {
207                // construct the key used for storage (nonce || hash)
208                let key = vec![txu.tx.nonce.to_le_bytes().to_vec(), txu.hash.to_vec()].concat();
209                if pool.remove(&key).is_some() {
210                    removed_count += 1;
211                }
212            }
213        }
214
215        if removed_count > 0 {
216            tracing::debug!("removed {} transactions from pool", removed_count);
217        }
218    }
219}