1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
use std::cmp::Reverse;
use std::sync::Arc;

use crate::Error;
use crate::{subscribers::MultiSubscriber, types::*, Config, TxPool as TxPoolImpl};
use fuel_core_interfaces::txpool::{Subscriber, TxPool, TxPoolDb};

use async_trait::async_trait;
use std::collections::HashMap;
use tokio::sync::RwLock;

pub struct TxPoolService {
    txpool: RwLock<TxPoolImpl>,
    db: Box<dyn TxPoolDb>,
    subs: MultiSubscriber,
}

impl TxPoolService {
    pub fn new(db: Box<dyn TxPoolDb>, config: Arc<Config>) -> Self {
        Self {
            txpool: RwLock::new(TxPoolImpl::new(config)),
            db,
            subs: MultiSubscriber::new(),
        }
    }
}

#[async_trait]
impl TxPool for TxPoolService {
    /// import tx
    async fn insert(&self, txs: Vec<ArcTx>) -> Vec<anyhow::Result<Vec<ArcTx>>> {
        // insert inside pool

        // Check that data is okay (witness match input/output, and if recovered signatures ara valid).
        // should be done before transaction comes to txpool, or before it enters RwLocked region.
        let mut res = Vec::new();
        for tx in txs.iter() {
            let mut pool = self.txpool.write().await;
            res.push(pool.insert(tx.clone(), self.db.as_ref()).await)
        }
        // announce to subscribers
        for (ret, tx) in res.iter().zip(txs.into_iter()) {
            match ret {
                Ok(removed) => {
                    for removed in removed {
                        // small todo there is possibility to have removal reason (ReplacedByHigherGas, DependencyRemoved)
                        // but for now it is okay to just use Error::Removed.
                        self.subs.removed(removed.clone(), &Error::Removed).await;
                    }
                    self.subs.inserted(tx).await;
                }
                Err(_) => {}
            }
        }
        res
    }

    /// find all tx by its hash
    async fn find(&self, hashes: &[TxId]) -> Vec<Option<ArcTx>> {
        let mut res = Vec::with_capacity(hashes.len());
        let pool = self.txpool.read().await;
        for hash in hashes {
            res.push(pool.txs().get(hash).cloned());
        }
        res
    }

    /// find all dependent tx and return them with requsted dependencies in one list sorted by Price.
    async fn find_dependent(&self, hashes: &[TxId]) -> Vec<ArcTx> {
        let mut seen = HashMap::new();
        {
            let pool = self.txpool.read().await;
            for hash in hashes {
                if let Some(tx) = pool.txs().get(hash).cloned() {
                    pool.dependency().find_dependent(tx, &mut seen, pool.txs());
                }
            }
        }
        let mut list: Vec<ArcTx> = seen.into_iter().map(|(_, tx)| tx).collect();
        // sort from high to low price
        list.sort_by_key(|tx| Reverse(tx.gas_price()));

        list
    }

    /// Iterete over `hashes` and return all hashes that we dont have.
    async fn filter_by_negative(&self, tx_ids: &[TxId]) -> Vec<TxId> {
        let mut res = Vec::new();
        let pool = self.txpool.read().await;
        for tx_id in tx_ids {
            if pool.txs().get(tx_id).is_none() {
                res.push(*tx_id)
            }
        }
        res
    }

    /// Return all sorted transactions that are includable in next block.
    /// This is going to be heavy operation, use it with only when needed.
    async fn includable(&self) -> Vec<ArcTx> {
        let pool = self.txpool.read().await;
        pool.sorted_includable()
    }

    /// When block is updated we need to receive all spend outputs and remove them from txpool
    async fn block_update(&self /*spend_outputs: [Input], added_outputs: [AddedOutputs]*/) {
        self.txpool.write().await.block_update()
    }

    /// remove transaction from pool needed on user demand. Low priority
    async fn remove(&self, tx_ids: &[TxId]) {
        let mut removed = Vec::new();
        for tx_id in tx_ids {
            let rem = { self.txpool.write().await.remove_by_tx_id(tx_id) };
            removed.extend(rem.into_iter());
        }
        for removed in removed {
            self.subs.removed(removed, &Error::Removed).await
        }
    }

    async fn subscribe(&self, sub: Arc<dyn Subscriber>) {
        self.subs.sub(sub);
    }
}

#[cfg(any(test))]
pub mod tests {
    use super::*;
    use fuel_core_interfaces::db::helpers::*;

    #[tokio::test]
    async fn test_filter_by_negative() {
        let config = Arc::new(Config::default());
        let db = Box::new(DummyDB::filled());

        let tx1_hash = *TX_ID1;
        let tx2_hash = *TX_ID2;
        let tx3_hash = *TX_ID3;

        let tx1 = Arc::new(DummyDB::dummy_tx(tx1_hash));
        let tx2 = Arc::new(DummyDB::dummy_tx(tx2_hash));

        let service = TxPoolService::new(db, config);
        let out = service.insert(vec![tx1, tx2]).await;
        assert_eq!(out.len(), 2, "Shoud be len 2:{:?}", out);
        assert!(out[0].is_ok(), "Tx1 should be OK, got err:{:?}", out);
        assert!(out[1].is_ok(), "Tx2 should be OK, got err:{:?}", out);
        let out = service.filter_by_negative(&[tx1_hash, tx3_hash]).await;
        assert_eq!(out.len(), 1, "Shoud be len 1:{:?}", out);
        assert_eq!(out[0], tx3_hash, "Found tx id match{:?}", out);
    }

    #[tokio::test]
    async fn test_find() {
        let config = Arc::new(Config::default());
        let db = Box::new(DummyDB::filled());

        let tx1_hash = *TX_ID1;
        let tx2_hash = *TX_ID2;
        let tx3_hash = *TX_ID3;

        let tx1 = Arc::new(DummyDB::dummy_tx(tx1_hash));
        let tx2 = Arc::new(DummyDB::dummy_tx(tx2_hash));

        let service = TxPoolService::new(db, config);
        let out = service.insert(vec![tx1, tx2]).await;
        assert_eq!(out.len(), 2, "Shoud be len 2:{:?}", out);
        assert!(out[0].is_ok(), "Tx1 should be OK, got err:{:?}", out);
        assert!(out[1].is_ok(), "Tx2 should be OK, got err:{:?}", out);
        let out = service.find(&[tx1_hash, tx3_hash]).await;
        assert_eq!(out.len(), 2, "Shoud be len 2:{:?}", out);
        assert!(out[0].is_some(), "Tx1 should be some:{:?}", out);
        let id = out[0].as_ref().unwrap().id();
        assert_eq!(id, tx1_hash, "Found tx id match{:?}", out);
        assert!(out[1].is_none(), "Tx3 should not be found:{:?}", out);
    }

    #[tokio::test]
    async fn simple_insert_removal_subscription() {
        let config = Arc::new(Config::default());
        let db = Box::new(DummyDB::filled());

        struct Subs {
            pub new_tx: RwLock<Vec<ArcTx>>,
            pub rem_tx: RwLock<Vec<ArcTx>>,
        }

        #[async_trait]
        impl Subscriber for Subs {
            async fn inserted(&self, tx: ArcTx) {
                self.new_tx.write().await.push(tx);
            }

            async fn inserted_on_block_revert(&self, _tx: ArcTx) {}

            async fn removed(&self, tx: ArcTx, _error: &Error) {
                self.rem_tx.write().await.push(tx);
            }
        }

        let sub = Arc::new(Subs {
            new_tx: RwLock::new(Vec::new()),
            rem_tx: RwLock::new(Vec::new()),
        });

        let tx1_hash = *TX_ID1;
        let tx2_hash = *TX_ID2;

        let tx1 = Arc::new(DummyDB::dummy_tx(tx1_hash));
        let tx2 = Arc::new(DummyDB::dummy_tx(tx2_hash));

        let service = TxPoolService::new(db, config);
        service.subscribe(sub.clone()).await;
        let out = service.insert(vec![tx1, tx2]).await;
        assert!(out[0].is_ok(), "Tx1 should be OK, got err:{:?}", out);
        assert!(out[1].is_ok(), "Tx2 should be OK, got err:{:?}", out);
        {
            let added = sub.new_tx.read().await;
            assert_eq!(added.len(), 2, "Sub should contains two new tx");
            assert_eq!(added[0].id(), tx1_hash, "First added should be tx1");
            assert_eq!(added[1].id(), tx2_hash, "First added should be tx2");
        }
        service.remove(&[tx1_hash]).await;
        {
            // removing tx1 removed tx2, bcs it is dependent.
            let removed = sub.rem_tx.read().await;
            assert_eq!(removed.len(), 2, "Sub should contains two removed tx");
            assert_eq!(removed[0].id(), tx1_hash, "First removed should be tx1");
            assert_eq!(removed[1].id(), tx2_hash, "Second removed should be tx2");
        }
    }
}