fuel_core_txpool/
shared_state.rs

1use std::sync::Arc;
2
3use fuel_core_types::{
4    fuel_tx::{
5        Transaction,
6        TxId,
7    },
8    services::txpool::ArcPoolTx,
9};
10use tokio::sync::{
11    mpsc,
12    oneshot::{
13        self,
14        error::TryRecvError,
15    },
16    watch,
17};
18
19use crate::{
20    Constraints,
21    error::Error,
22    pool::TxPoolStats,
23    pool_worker::{
24        self,
25        PoolReadRequest,
26    },
27    service::{
28        TxInfo,
29        WritePoolRequest,
30    },
31};
32
33#[derive(Clone)]
34pub struct SharedState {
35    pub(crate) write_pool_requests_sender: mpsc::Sender<WritePoolRequest>,
36    pub(crate) select_transactions_requests_sender:
37        mpsc::Sender<pool_worker::PoolExtractBlockTransactions>,
38    pub(crate) request_read_sender: mpsc::Sender<PoolReadRequest>,
39    pub(crate) new_executable_txs_notifier: tokio::sync::watch::Sender<()>,
40    pub(crate) latest_stats: tokio::sync::watch::Receiver<TxPoolStats>,
41}
42
43impl SharedState {
44    pub fn try_insert(&self, transactions: Vec<Transaction>) -> Result<(), Error> {
45        let transactions = transactions.into_iter().map(Arc::new).collect();
46        self.write_pool_requests_sender
47            .try_send(WritePoolRequest::InsertTxs { transactions })
48            .map_err(|_| Error::ServiceQueueFull)?;
49
50        Ok(())
51    }
52
53    pub async fn insert(&self, transaction: Transaction) -> Result<(), Error> {
54        let transaction = Arc::new(transaction);
55        let (sender, receiver) = oneshot::channel();
56
57        self.write_pool_requests_sender
58            .send(WritePoolRequest::InsertTx {
59                transaction,
60                response_channel: sender,
61            })
62            .await
63            .map_err(|_| Error::ServiceCommunicationFailed)?;
64
65        receiver
66            .await
67            .map_err(|_| Error::ServiceCommunicationFailed)?
68    }
69
70    /// This function has a hot loop inside to acquire transactions for the execution.
71    /// It relies on the prioritization of the `TxPool`
72    /// (it always tries to prioritize the `extract` call over other calls).
73    /// In the future, extraction will be an async function,
74    /// and we can remove this loop and just `await`.
75    pub fn extract_transactions_for_block(
76        &self,
77        constraints: Constraints,
78    ) -> Result<Vec<ArcPoolTx>, Error> {
79        let (select_transactions_sender, mut select_transactions_receiver) =
80            oneshot::channel();
81        self.select_transactions_requests_sender
82            .try_send(
83                pool_worker::PoolExtractBlockTransactions::ExtractBlockTransactions {
84                    constraints,
85                    transactions: select_transactions_sender,
86                },
87            )
88            .map_err(|_| Error::ServiceCommunicationFailed)?;
89
90        loop {
91            let result = select_transactions_receiver.try_recv();
92            match result {
93                Ok(txs) => {
94                    return Ok(txs);
95                }
96                Err(TryRecvError::Empty) => continue,
97                Err(TryRecvError::Closed) => {
98                    return Err(Error::ServiceCommunicationFailed);
99                }
100            }
101        }
102    }
103
104    pub async fn get_tx_ids(&self, max_txs: usize) -> Result<Vec<TxId>, Error> {
105        let (response_channel, result_receiver) = oneshot::channel();
106
107        self.request_read_sender
108            .send(PoolReadRequest::TxIds {
109                max_txs,
110                response_channel,
111            })
112            .await
113            .map_err(|_| Error::ServiceCommunicationFailed)?;
114
115        result_receiver
116            .await
117            .map_err(|_| Error::ServiceCommunicationFailed)
118    }
119
120    pub async fn find_one(&self, tx_id: TxId) -> Result<Option<TxInfo>, Error> {
121        Ok(self.find(vec![tx_id]).await?.pop().flatten())
122    }
123
124    pub async fn find(&self, tx_ids: Vec<TxId>) -> Result<Vec<Option<TxInfo>>, Error> {
125        let (response_channel, result_receiver) = oneshot::channel();
126
127        self.request_read_sender
128            .send(PoolReadRequest::Txs {
129                tx_ids,
130                response_channel,
131            })
132            .await
133            .map_err(|_| Error::ServiceCommunicationFailed)?;
134
135        result_receiver
136            .await
137            .map_err(|_| Error::ServiceCommunicationFailed)
138    }
139
140    /// Get a notifier that is notified when new executable transactions are added to the pool.
141    pub fn get_new_executable_txs_notifier(&self) -> watch::Receiver<()> {
142        self.new_executable_txs_notifier.subscribe()
143    }
144
145    pub fn latest_stats(&self) -> TxPoolStats {
146        *self.latest_stats.borrow()
147    }
148}