use crate::consensus::chain_epoch;
use crate::consensus::doms::tx::{TxU, pack, validate};
use crate::utils::Hash;
use crate::utils::rocksdb::RocksDb;
use amadeus_runtime::consensus::bic::{coin, sol};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug, Clone)]
pub struct ValidateTxArgs {
pub epoch: u32,
pub segment_vr_hash: Hash,
pub diff_bits: u32,
pub batch_state: BatchState,
}
#[derive(Debug, Clone, Default)]
pub struct BatchState {
chain_nonces: HashMap<Vec<u8>, i128>,
balances: HashMap<Vec<u8>, i128>,
}
#[derive(Debug)]
pub enum TxPoolError {
InvalidNonce { nonce: i128, hash: Hash },
InsufficientBalance { nonce: i128, hash: Hash },
InvalidSol { nonce: i128, hash: Hash },
ValidationError(String),
}
pub struct TxPool {
db: Arc<RocksDb>,
pool: Arc<RwLock<HashMap<Vec<u8>, TxU>>>,
}
impl TxPool {
pub fn new(db: Arc<RocksDb>) -> Self {
Self { db, pool: Arc::new(RwLock::new(HashMap::new())) }
}
pub async fn insert(&self, tx_packed: &[u8]) -> Result<(), TxPoolError> {
match validate(tx_packed, false) {
Ok(txu) => {
let mut pool = self.pool.write().await;
let key = vec![txu.tx.nonce.to_le_bytes().to_vec(), txu.hash.to_vec()].concat();
pool.insert(key, txu);
Ok(())
}
Err(e) => Err(TxPoolError::ValidationError(e.to_string())),
}
}
pub async fn insert_and_broadcast(&self, tx_packed: &[u8]) -> Result<(), TxPoolError> {
self.insert(tx_packed).await?;
Ok(())
}
pub async fn purge_stale(&self) {
let _cur_epoch = chain_epoch(self.db.as_ref());
let mut pool = self.pool.write().await;
pool.retain(|_key, _txu| {
true
});
}
pub fn validate_tx(&self, txu: &TxU, args: &mut ValidateTxArgs) -> Result<(), TxPoolError> {
let signer_vec = txu.tx.signer.to_vec();
let chain_nonce = args.batch_state.chain_nonces.get(&signer_vec).cloned().unwrap_or_else(|| {
crate::consensus::fabric::chain_queries::chain_nonce(self.db.as_ref(), txu.tx.signer.as_ref()).unwrap_or(0)
as i128
});
if chain_nonce != 0 && txu.tx.nonce <= chain_nonce {
return Err(TxPoolError::InvalidNonce { nonce: txu.tx.nonce, hash: txu.hash });
}
args.batch_state.chain_nonces.insert(signer_vec.clone(), txu.tx.nonce);
let balance = args.batch_state.balances.get(&signer_vec).cloned().unwrap_or_else(|| {
crate::consensus::fabric::chain_queries::chain_balance(self.db.as_ref(), txu.tx.signer.as_ref())
});
let exec_cost = txu.exec_cost(args.epoch) as i128;
let fee = coin::to_cents(1) as i128;
let new_balance = balance.saturating_sub(exec_cost).saturating_sub(fee);
if balance < exec_cost.saturating_add(fee) {
return Err(TxPoolError::InsufficientBalance { nonce: txu.tx.nonce, hash: txu.hash });
}
args.batch_state.balances.insert(signer_vec, new_balance);
let action = &txu.tx.action;
if action.function.as_slice() == b"submit_sol" && !action.args.is_empty() {
let sol_bytes = &action.args[0];
if sol_bytes.len() >= 36 {
let sol_epoch = u32::from_le_bytes([sol_bytes[0], sol_bytes[1], sol_bytes[2], sol_bytes[3]]);
let sol_svrh = &sol_bytes[4..36];
if sol_epoch != args.epoch || sol_svrh != &args.segment_vr_hash[..] || sol_bytes.len() != sol::SOL_SIZE
{
return Err(TxPoolError::InvalidSol { nonce: txu.tx.nonce, hash: txu.hash });
}
}
}
Ok(())
}
pub fn validate_tx_batch(&self, txs_packed: &[Vec<u8>]) -> Vec<Vec<u8>> {
let chain_epoch = chain_epoch(self.db.as_ref());
let segment_vr_hash = crate::consensus::fabric::chain_queries::chain_segment_vr_hash(self.db.as_ref())
.and_then(|v| v.try_into().ok())
.unwrap_or([0u8; 32]);
let diff_bits = crate::consensus::fabric::chain_queries::chain_diff_bits(self.db.as_ref());
let mut args = ValidateTxArgs {
epoch: chain_epoch,
segment_vr_hash: Hash::from(segment_vr_hash),
diff_bits: diff_bits as u32,
batch_state: BatchState::default(),
};
let mut good = Vec::new();
for tx_packed in txs_packed {
match validate(tx_packed, false) {
Ok(txu) => {
if self.validate_tx(&txu, &mut args).is_ok() {
good.push(tx_packed.clone());
}
}
Err(_) => continue,
}
}
good
}
pub async fn grab_next_valid(&self, amt: usize) -> Vec<Vec<u8>> {
let chain_epoch = chain_epoch(self.db.as_ref());
let segment_vr_hash = crate::consensus::fabric::chain_queries::chain_segment_vr_hash(self.db.as_ref())
.and_then(|v| v.try_into().ok())
.unwrap_or([0u8; 32]);
let diff_bits = crate::consensus::fabric::chain_queries::chain_diff_bits(self.db.as_ref());
let mut args = ValidateTxArgs {
epoch: chain_epoch,
segment_vr_hash: Hash::from(segment_vr_hash),
diff_bits: diff_bits as u32,
batch_state: BatchState::default(),
};
let mut result = Vec::new();
let mut to_delete = Vec::new();
let pool = self.pool.read().await;
for (key, txu) in pool.iter() {
if result.len() >= amt {
break;
}
match self.validate_tx(txu, &mut args) {
Ok(()) => {
result.push(pack(txu));
}
Err(_) => {
to_delete.push(key.clone());
}
}
}
drop(pool);
if !to_delete.is_empty() {
let mut pool = self.pool.write().await;
for key in to_delete {
pool.remove(&key);
}
}
result
}
pub async fn size(&self) -> usize {
self.pool.read().await.len()
}
pub async fn delete_packed(&self, txs_packed: &[Vec<u8>]) {
if txs_packed.is_empty() {
return;
}
let mut pool = self.pool.write().await;
let mut removed_count = 0;
for tx_packed in txs_packed {
if let Ok(txu) = validate(tx_packed, false) {
let key = vec![txu.tx.nonce.to_le_bytes().to_vec(), txu.hash.to_vec()].concat();
if pool.remove(&key).is_some() {
removed_count += 1;
}
}
}
if removed_count > 0 {
tracing::debug!("removed {} transactions from pool", removed_count);
}
}
}