use std::cmp::Reverse;
use std::sync::Arc;
use crate::Error;
use crate::{subscribers::MultiSubscriber, types::*, Config, TxPool as TxPoolImpl};
use async_trait::async_trait;
use fuel_core_interfaces::model::{ArcTx, TxInfo};
use fuel_core_interfaces::txpool::{Subscriber, TxPool, TxPoolDb};
use std::collections::HashMap;
use tokio::sync::RwLock;
pub struct TxPoolService {
txpool: RwLock<TxPoolImpl>,
db: Box<dyn TxPoolDb>,
subs: MultiSubscriber,
}
impl TxPoolService {
pub fn new(db: Box<dyn TxPoolDb>, config: Config) -> Self {
Self {
txpool: RwLock::new(TxPoolImpl::new(config)),
db,
subs: MultiSubscriber::default(),
}
}
}
#[async_trait]
impl TxPool for TxPoolService {
async fn insert(&self, txs: Vec<ArcTx>) -> Vec<anyhow::Result<Vec<ArcTx>>> {
let mut res = Vec::new();
for tx in txs.iter() {
let mut pool = self.txpool.write().await;
res.push(pool.insert(tx.clone(), self.db.as_ref()).await)
}
for (ret, tx) in res.iter().zip(txs.into_iter()) {
match ret {
Ok(removed) => {
for removed in removed {
self.subs.removed(removed.clone(), &Error::Removed).await;
}
self.subs.inserted(tx).await;
}
Err(_) => {}
}
}
res
}
async fn find(&self, hashes: &[TxId]) -> Vec<Option<TxInfo>> {
let mut res = Vec::with_capacity(hashes.len());
let pool = self.txpool.read().await;
for hash in hashes {
res.push(pool.txs().get(hash).cloned());
}
res
}
async fn find_one(&self, hash: &TxId) -> Option<TxInfo> {
self.txpool.read().await.txs().get(hash).cloned()
}
async fn find_dependent(&self, hashes: &[TxId]) -> Vec<ArcTx> {
let mut seen = HashMap::new();
{
let pool = self.txpool.read().await;
for hash in hashes {
if let Some(tx) = pool.txs().get(hash) {
pool.dependency()
.find_dependent(tx.tx().clone(), &mut seen, pool.txs());
}
}
}
let mut list: Vec<ArcTx> = seen.into_iter().map(|(_, tx)| tx).collect();
list.sort_by_key(|tx| Reverse(tx.gas_price()));
list
}
async fn filter_by_negative(&self, tx_ids: &[TxId]) -> Vec<TxId> {
let mut res = Vec::new();
let pool = self.txpool.read().await;
for tx_id in tx_ids {
if pool.txs().get(tx_id).is_none() {
res.push(*tx_id)
}
}
res
}
async fn includable(&self) -> Vec<ArcTx> {
let pool = self.txpool.read().await;
pool.sorted_includable()
}
async fn block_update(&self ) {
self.txpool.write().await.block_update()
}
async fn remove(&self, tx_ids: &[TxId]) {
let mut removed = Vec::new();
for tx_id in tx_ids {
let rem = { self.txpool.write().await.remove_by_tx_id(tx_id) };
removed.extend(rem.into_iter());
}
for removed in removed {
self.subs.removed(removed, &Error::Removed).await
}
}
async fn subscribe(&self, sub: Arc<dyn Subscriber>) {
self.subs.sub(sub);
}
}
#[cfg(any(test))]
pub mod tests {
use super::*;
use fuel_core_interfaces::db::helpers::*;
#[tokio::test]
async fn test_filter_by_negative() {
let config = Config::default();
let db = Box::new(DummyDb::filled());
let tx1_hash = *TX_ID1;
let tx2_hash = *TX_ID2;
let tx3_hash = *TX_ID3;
let tx1 = Arc::new(DummyDb::dummy_tx(tx1_hash));
let tx2 = Arc::new(DummyDb::dummy_tx(tx2_hash));
let service = TxPoolService::new(db, config);
let out = service.insert(vec![tx1, tx2]).await;
assert_eq!(out.len(), 2, "Shoud be len 2:{:?}", out);
assert!(out[0].is_ok(), "Tx1 should be OK, got err:{:?}", out);
assert!(out[1].is_ok(), "Tx2 should be OK, got err:{:?}", out);
let out = service.filter_by_negative(&[tx1_hash, tx3_hash]).await;
assert_eq!(out.len(), 1, "Shoud be len 1:{:?}", out);
assert_eq!(out[0], tx3_hash, "Found tx id match{:?}", out);
}
#[tokio::test]
async fn test_find() {
let config = Config::default();
let db = Box::new(DummyDb::filled());
let tx1_hash = *TX_ID1;
let tx2_hash = *TX_ID2;
let tx3_hash = *TX_ID3;
let tx1 = Arc::new(DummyDb::dummy_tx(tx1_hash));
let tx2 = Arc::new(DummyDb::dummy_tx(tx2_hash));
let service = TxPoolService::new(db, config);
let out = service.insert(vec![tx1, tx2]).await;
assert_eq!(out.len(), 2, "Shoud be len 2:{:?}", out);
assert!(out[0].is_ok(), "Tx1 should be OK, got err:{:?}", out);
assert!(out[1].is_ok(), "Tx2 should be OK, got err:{:?}", out);
let out = service.find(&[tx1_hash, tx3_hash]).await;
assert_eq!(out.len(), 2, "Shoud be len 2:{:?}", out);
assert!(out[0].is_some(), "Tx1 should be some:{:?}", out);
let id = out[0].as_ref().unwrap().id();
assert_eq!(id, tx1_hash, "Found tx id match{:?}", out);
assert!(out[1].is_none(), "Tx3 should not be found:{:?}", out);
}
#[tokio::test]
async fn simple_insert_removal_subscription() {
let config = Config::default();
let db = Box::new(DummyDb::filled());
struct Subs {
pub new_tx: RwLock<Vec<ArcTx>>,
pub rem_tx: RwLock<Vec<ArcTx>>,
}
#[async_trait]
impl Subscriber for Subs {
async fn inserted(&self, tx: ArcTx) {
self.new_tx.write().await.push(tx);
}
async fn inserted_on_block_revert(&self, _tx: ArcTx) {}
async fn removed(&self, tx: ArcTx, _error: &Error) {
self.rem_tx.write().await.push(tx);
}
}
let sub = Arc::new(Subs {
new_tx: RwLock::new(Vec::new()),
rem_tx: RwLock::new(Vec::new()),
});
let tx1_hash = *TX_ID1;
let tx2_hash = *TX_ID2;
let tx1 = Arc::new(DummyDb::dummy_tx(tx1_hash));
let tx2 = Arc::new(DummyDb::dummy_tx(tx2_hash));
let service = TxPoolService::new(db, config);
service.subscribe(sub.clone()).await;
let out = service.insert(vec![tx1, tx2]).await;
assert!(out[0].is_ok(), "Tx1 should be OK, got err:{:?}", out);
assert!(out[1].is_ok(), "Tx2 should be OK, got err:{:?}", out);
{
let added = sub.new_tx.read().await;
assert_eq!(added.len(), 2, "Sub should contains two new tx");
assert_eq!(added[0].id(), tx1_hash, "First added should be tx1");
assert_eq!(added[1].id(), tx2_hash, "First added should be tx2");
}
service.remove(&[tx1_hash]).await;
{
let removed = sub.rem_tx.read().await;
assert_eq!(removed.len(), 2, "Sub should contains two removed tx");
assert_eq!(removed[0].id(), tx1_hash, "First removed should be tx1");
assert_eq!(removed[1].id(), tx2_hash, "Second removed should be tx2");
}
}
}