fuel_txpool/
service.rs

1use crate::{
2    Config,
3    TxPool,
4};
5use anyhow::anyhow;
6use fuel_core_interfaces::{
7    block_importer::ImportBlockBroadcast,
8    common::prelude::Bytes32,
9    p2p::{
10        GossipData,
11        P2pRequestEvent,
12        TransactionBroadcast,
13        TransactionGossipData,
14    },
15    txpool::{
16        self,
17        Error,
18        TxPoolDb,
19        TxPoolMpsc,
20        TxStatus,
21        TxUpdate,
22    },
23};
24use std::sync::Arc;
25use tokio::{
26    sync::{
27        broadcast,
28        mpsc,
29        Mutex,
30        RwLock,
31    },
32    task::JoinHandle,
33};
34use tracing::error;
35
36pub struct ServiceBuilder {
37    config: Config,
38    db: Option<Box<dyn TxPoolDb>>,
39    txpool_sender: Option<txpool::Sender>,
40    txpool_receiver: Option<mpsc::Receiver<TxPoolMpsc>>,
41    tx_status_sender: Option<TxStatusChange>,
42    import_block_receiver: Option<broadcast::Receiver<ImportBlockBroadcast>>,
43    incoming_tx_receiver: Option<broadcast::Receiver<TransactionGossipData>>,
44    network_sender: Option<mpsc::Sender<P2pRequestEvent>>,
45}
46
47#[derive(Clone)]
48pub struct TxStatusChange {
49    status_sender: broadcast::Sender<TxStatus>,
50    update_sender: broadcast::Sender<TxUpdate>,
51}
52
53impl TxStatusChange {
54    pub fn new(capacity: usize) -> Self {
55        let (status_sender, _) = broadcast::channel(capacity);
56        let (update_sender, _) = broadcast::channel(capacity);
57        Self {
58            status_sender,
59            update_sender,
60        }
61    }
62    pub fn send_complete(&self, id: Bytes32) {
63        let _ = self.status_sender.send(TxStatus::Completed);
64        self.updated(id);
65    }
66
67    pub fn send_submitted(&self, id: Bytes32) {
68        let _ = self.status_sender.send(TxStatus::Submitted);
69        self.updated(id);
70    }
71
72    pub fn send_squeezed_out(&self, id: Bytes32, reason: Error) {
73        let _ = self.status_sender.send(TxStatus::SqueezedOut {
74            reason: reason.clone(),
75        });
76        let _ = self.update_sender.send(TxUpdate::squeezed_out(id, reason));
77    }
78
79    fn updated(&self, id: Bytes32) {
80        let _ = self.update_sender.send(TxUpdate::updated(id));
81    }
82}
83
84impl Default for ServiceBuilder {
85    fn default() -> Self {
86        Self::new()
87    }
88}
89
90impl ServiceBuilder {
91    pub fn new() -> Self {
92        Self {
93            config: Default::default(),
94            db: None,
95            txpool_sender: None,
96            txpool_receiver: None,
97            tx_status_sender: None,
98            import_block_receiver: None,
99            incoming_tx_receiver: None,
100            network_sender: None,
101        }
102    }
103
104    pub fn sender(&self) -> &txpool::Sender {
105        self.txpool_sender.as_ref().unwrap()
106    }
107
108    pub fn tx_status_subscribe(&self) -> broadcast::Receiver<TxStatus> {
109        self.tx_status_sender
110            .as_ref()
111            .unwrap()
112            .status_sender
113            .subscribe()
114    }
115
116    pub fn tx_change_subscribe(&self) -> broadcast::Receiver<TxUpdate> {
117        self.tx_status_sender
118            .as_ref()
119            .unwrap()
120            .update_sender
121            .subscribe()
122    }
123
124    pub fn db(&mut self, db: Box<dyn TxPoolDb>) -> &mut Self {
125        self.db = Some(db);
126        self
127    }
128
129    pub fn txpool_sender(&mut self, txpool_sender: txpool::Sender) -> &mut Self {
130        self.txpool_sender = Some(txpool_sender);
131        self
132    }
133
134    pub fn txpool_receiver(
135        &mut self,
136        txpool_receiver: mpsc::Receiver<TxPoolMpsc>,
137    ) -> &mut Self {
138        self.txpool_receiver = Some(txpool_receiver);
139        self
140    }
141
142    pub fn tx_status_sender(&mut self, tx_status_sender: TxStatusChange) -> &mut Self {
143        self.tx_status_sender = Some(tx_status_sender);
144        self
145    }
146
147    pub fn incoming_tx_receiver(
148        &mut self,
149        incoming_tx_receiver: broadcast::Receiver<TransactionGossipData>,
150    ) -> &mut Self {
151        self.incoming_tx_receiver = Some(incoming_tx_receiver);
152        self
153    }
154
155    pub fn network_sender(
156        &mut self,
157        network_sender: mpsc::Sender<P2pRequestEvent>,
158    ) -> &mut Self {
159        self.network_sender = Some(network_sender);
160        self
161    }
162
163    pub fn import_block_event(
164        &mut self,
165        import_block_receiver: broadcast::Receiver<ImportBlockBroadcast>,
166    ) -> &mut Self {
167        self.import_block_receiver = Some(import_block_receiver);
168        self
169    }
170
171    pub fn config(&mut self, config: Config) -> &mut Self {
172        self.config = config;
173        self
174    }
175
176    pub fn build(self) -> anyhow::Result<Service> {
177        if self.db.is_none()
178            || self.import_block_receiver.is_none()
179            || self.incoming_tx_receiver.is_none()
180            || self.txpool_sender.is_none()
181            || self.tx_status_sender.is_none()
182            || self.txpool_receiver.is_none()
183            || self.network_sender.is_none()
184        {
185            return Err(anyhow!("One of context items are not set"))
186        }
187
188        let service = Service::new(
189            self.txpool_sender.unwrap(),
190            self.tx_status_sender.clone().unwrap(),
191            Context {
192                config: self.config,
193                db: Arc::new(self.db.unwrap()),
194                txpool_receiver: self.txpool_receiver.unwrap(),
195                tx_status_sender: self.tx_status_sender.unwrap(),
196                import_block_receiver: self.import_block_receiver.unwrap(),
197                incoming_tx_receiver: self.incoming_tx_receiver.unwrap(),
198                network_sender: self.network_sender.unwrap(),
199            },
200        )?;
201        Ok(service)
202    }
203}
204
205pub struct Context {
206    pub config: Config,
207    pub db: Arc<Box<dyn TxPoolDb>>,
208    pub txpool_receiver: mpsc::Receiver<TxPoolMpsc>,
209    pub tx_status_sender: TxStatusChange,
210    pub import_block_receiver: broadcast::Receiver<ImportBlockBroadcast>,
211    pub incoming_tx_receiver: broadcast::Receiver<TransactionGossipData>,
212    pub network_sender: mpsc::Sender<P2pRequestEvent>,
213}
214
215impl Context {
216    pub async fn run(mut self) -> Self {
217        let txpool = Arc::new(RwLock::new(TxPool::new(self.config.clone())));
218
219        loop {
220            tokio::select! {
221                new_transaction = self.incoming_tx_receiver.recv() => {
222                    if new_transaction.is_err() {
223                        error!("Incoming tx receiver channel closed unexpectedly; shutting down transaction pool service.");
224                        break;
225                    }
226
227                    let txpool = txpool.clone();
228                    let db = self.db.clone();
229                    let tx_status_sender = self.tx_status_sender.clone();
230
231                    tokio::spawn( async move {
232                        let txpool = txpool.as_ref();
233                        if let GossipData { data: Some(TransactionBroadcast::NewTransaction ( tx )), .. } =  new_transaction.unwrap() {
234                            let txs = vec!(Arc::new(tx));
235                            TxPool::insert(txpool, db.as_ref().as_ref(), &tx_status_sender, &txs).await;
236                        }
237                    });
238                }
239
240                event = self.txpool_receiver.recv() => {
241                    if matches!(event,Some(TxPoolMpsc::Stop) | None) {
242                        break;
243                    }
244                    let txpool = txpool.clone();
245                    let db = self.db.clone();
246                    let tx_status_sender = self.tx_status_sender.clone();
247
248                    let network_sender = self.network_sender.clone();
249
250                    // This is little bit risky but we can always add semaphore to limit number of requests.
251                    tokio::spawn( async move {
252                        let txpool = txpool.as_ref();
253                    match event.unwrap() {
254                        TxPoolMpsc::PendingNumber { response } => {
255                            let _ = response.send(TxPool::pending_number(txpool).await);
256                        }
257                        TxPoolMpsc::ConsumableGas { response } => {
258                            let _ = response.send(TxPool::consumable_gas(txpool).await);
259                        }
260                        TxPoolMpsc::Includable { response } => {
261                            let _ = response.send(TxPool::includable(txpool).await);
262                        }
263                        TxPoolMpsc::Insert { txs, response } => {
264                            let insert = TxPool::insert(txpool, db.as_ref().as_ref(), &tx_status_sender, &txs).await;
265                            for (ret, tx) in insert.iter().zip(txs.into_iter()) {
266                                match ret {
267                                    Ok(_) => {
268                                        let _ = network_sender.send(P2pRequestEvent::BroadcastNewTransaction {
269                                            transaction: tx.clone(),
270                                        }).await;
271                                    }
272                                    Err(_) => {}
273                                }
274                            }
275                            let _ = response.send(insert);
276                        }
277                        TxPoolMpsc::Find { ids, response } => {
278                            let _ = response.send(TxPool::find(txpool,&ids).await);
279                        }
280                        TxPoolMpsc::FindOne { id, response } => {
281                            let _ = response.send(TxPool::find_one(txpool,&id).await);
282                        }
283                        TxPoolMpsc::FindDependent { ids, response } => {
284                            let _ = response.send(TxPool::find_dependent(txpool,&ids).await);
285                        }
286                        TxPoolMpsc::FilterByNegative { ids, response } => {
287                            let _ = response.send(TxPool::filter_by_negative(txpool,&ids).await);
288                        }
289                        TxPoolMpsc::Remove { ids, response } => {
290                            let _ = response.send(TxPool::remove(txpool, &tx_status_sender ,&ids).await);
291                        }
292                        TxPoolMpsc::Stop => {}
293                    }});
294                }
295
296                block_updated = self.import_block_receiver.recv() => {
297                  if let Ok(block_updated) = block_updated {
298                        match block_updated {
299                            ImportBlockBroadcast::PendingFuelBlockImported { block } => {
300                                let txpool = txpool.clone();
301                                TxPool::block_update(txpool.as_ref(), &self.tx_status_sender, block).await
302                                // TODO: Should this be done in a separate task? Like this:
303                                // tokio::spawn( async move {
304                                //     TxPool::block_update(txpool.as_ref(), block).await
305                                // });
306                            },
307                            ImportBlockBroadcast::SealedFuelBlockImported { block: _, is_created_by_self: _ } => {
308                                // TODO: what to do with sealed blocks?
309                                todo!("Sealed block");
310                            }
311                        };
312                    }
313                }
314            }
315        }
316        self
317    }
318}
319
320pub struct Service {
321    txpool_sender: txpool::Sender,
322    tx_status_sender: TxStatusChange,
323    join: Mutex<Option<JoinHandle<Context>>>,
324    context: Arc<Mutex<Option<Context>>>,
325}
326
327impl Service {
328    pub fn new(
329        txpool_sender: txpool::Sender,
330        tx_status_sender: TxStatusChange,
331        context: Context,
332    ) -> anyhow::Result<Self> {
333        Ok(Self {
334            txpool_sender,
335            tx_status_sender,
336            join: Mutex::new(None),
337            context: Arc::new(Mutex::new(Some(context))),
338        })
339    }
340
341    pub async fn start(&self) -> anyhow::Result<()> {
342        let mut join = self.join.lock().await;
343        if join.is_none() {
344            if let Some(context) = self.context.lock().await.take() {
345                *join = Some(tokio::spawn(async { context.run().await }));
346                Ok(())
347            } else {
348                Err(anyhow!("Starting TxPool service that is stopping"))
349            }
350        } else {
351            Err(anyhow!("Service TxPool is already started"))
352        }
353    }
354
355    pub async fn stop(&self) -> Option<JoinHandle<()>> {
356        let mut join = self.join.lock().await;
357        let join_handle = join.take();
358
359        if let Some(join_handle) = join_handle {
360            let _ = self.txpool_sender.send(TxPoolMpsc::Stop).await;
361            let context = self.context.clone();
362            Some(tokio::spawn(async move {
363                let ret = join_handle.await;
364                *context.lock().await = ret.ok();
365            }))
366        } else {
367            None
368        }
369    }
370
371    pub fn tx_status_subscribe(&self) -> broadcast::Receiver<TxStatus> {
372        self.tx_status_sender.status_sender.subscribe()
373    }
374
375    pub fn tx_update_subscribe(&self) -> broadcast::Receiver<TxUpdate> {
376        self.tx_status_sender.update_sender.subscribe()
377    }
378
379    pub fn sender(&self) -> &txpool::Sender {
380        &self.txpool_sender
381    }
382}
383
384#[cfg(test)]
385pub mod test_helpers;
386#[cfg(test)]
387pub mod tests;
388#[cfg(test)]
389pub mod tests_p2p;