fuel_core_txpool/
shared_state.rs1use std::sync::Arc;
2
3use fuel_core_types::{
4 fuel_tx::{
5 Transaction,
6 TxId,
7 },
8 services::txpool::ArcPoolTx,
9};
10use tokio::sync::{
11 mpsc,
12 oneshot::{
13 self,
14 error::TryRecvError,
15 },
16 watch,
17};
18
19use crate::{
20 Constraints,
21 error::Error,
22 pool::TxPoolStats,
23 pool_worker::{
24 self,
25 PoolReadRequest,
26 },
27 service::{
28 TxInfo,
29 WritePoolRequest,
30 },
31};
32
33#[derive(Clone)]
34pub struct SharedState {
35 pub(crate) write_pool_requests_sender: mpsc::Sender<WritePoolRequest>,
36 pub(crate) select_transactions_requests_sender:
37 mpsc::Sender<pool_worker::PoolExtractBlockTransactions>,
38 pub(crate) request_read_sender: mpsc::Sender<PoolReadRequest>,
39 pub(crate) new_executable_txs_notifier: tokio::sync::watch::Sender<()>,
40 pub(crate) latest_stats: tokio::sync::watch::Receiver<TxPoolStats>,
41}
42
43impl SharedState {
44 pub fn try_insert(&self, transactions: Vec<Transaction>) -> Result<(), Error> {
45 let transactions = transactions.into_iter().map(Arc::new).collect();
46 self.write_pool_requests_sender
47 .try_send(WritePoolRequest::InsertTxs { transactions })
48 .map_err(|_| Error::ServiceQueueFull)?;
49
50 Ok(())
51 }
52
53 pub async fn insert(&self, transaction: Transaction) -> Result<(), Error> {
54 let transaction = Arc::new(transaction);
55 let (sender, receiver) = oneshot::channel();
56
57 self.write_pool_requests_sender
58 .send(WritePoolRequest::InsertTx {
59 transaction,
60 response_channel: sender,
61 })
62 .await
63 .map_err(|_| Error::ServiceCommunicationFailed)?;
64
65 receiver
66 .await
67 .map_err(|_| Error::ServiceCommunicationFailed)?
68 }
69
70 pub fn extract_transactions_for_block(
76 &self,
77 constraints: Constraints,
78 ) -> Result<Vec<ArcPoolTx>, Error> {
79 let (select_transactions_sender, mut select_transactions_receiver) =
80 oneshot::channel();
81 self.select_transactions_requests_sender
82 .try_send(
83 pool_worker::PoolExtractBlockTransactions::ExtractBlockTransactions {
84 constraints,
85 transactions: select_transactions_sender,
86 },
87 )
88 .map_err(|_| Error::ServiceCommunicationFailed)?;
89
90 loop {
91 let result = select_transactions_receiver.try_recv();
92 match result {
93 Ok(txs) => {
94 return Ok(txs);
95 }
96 Err(TryRecvError::Empty) => continue,
97 Err(TryRecvError::Closed) => {
98 return Err(Error::ServiceCommunicationFailed);
99 }
100 }
101 }
102 }
103
104 pub async fn get_tx_ids(&self, max_txs: usize) -> Result<Vec<TxId>, Error> {
105 let (response_channel, result_receiver) = oneshot::channel();
106
107 self.request_read_sender
108 .send(PoolReadRequest::TxIds {
109 max_txs,
110 response_channel,
111 })
112 .await
113 .map_err(|_| Error::ServiceCommunicationFailed)?;
114
115 result_receiver
116 .await
117 .map_err(|_| Error::ServiceCommunicationFailed)
118 }
119
120 pub async fn find_one(&self, tx_id: TxId) -> Result<Option<TxInfo>, Error> {
121 Ok(self.find(vec![tx_id]).await?.pop().flatten())
122 }
123
124 pub async fn find(&self, tx_ids: Vec<TxId>) -> Result<Vec<Option<TxInfo>>, Error> {
125 let (response_channel, result_receiver) = oneshot::channel();
126
127 self.request_read_sender
128 .send(PoolReadRequest::Txs {
129 tx_ids,
130 response_channel,
131 })
132 .await
133 .map_err(|_| Error::ServiceCommunicationFailed)?;
134
135 result_receiver
136 .await
137 .map_err(|_| Error::ServiceCommunicationFailed)
138 }
139
140 pub fn get_new_executable_txs_notifier(&self) -> watch::Receiver<()> {
142 self.new_executable_txs_notifier.subscribe()
143 }
144
145 pub fn latest_stats(&self) -> TxPoolStats {
146 *self.latest_stats.borrow()
147 }
148}