1use crate::{
2 Config,
3 TxPool,
4};
5use anyhow::anyhow;
6use fuel_core_interfaces::{
7 block_importer::ImportBlockBroadcast,
8 common::prelude::Bytes32,
9 p2p::{
10 GossipData,
11 P2pRequestEvent,
12 TransactionBroadcast,
13 TransactionGossipData,
14 },
15 txpool::{
16 self,
17 Error,
18 TxPoolDb,
19 TxPoolMpsc,
20 TxStatus,
21 TxUpdate,
22 },
23};
24use std::sync::Arc;
25use tokio::{
26 sync::{
27 broadcast,
28 mpsc,
29 Mutex,
30 RwLock,
31 },
32 task::JoinHandle,
33};
34use tracing::error;
35
36pub struct ServiceBuilder {
37 config: Config,
38 db: Option<Box<dyn TxPoolDb>>,
39 txpool_sender: Option<txpool::Sender>,
40 txpool_receiver: Option<mpsc::Receiver<TxPoolMpsc>>,
41 tx_status_sender: Option<TxStatusChange>,
42 import_block_receiver: Option<broadcast::Receiver<ImportBlockBroadcast>>,
43 incoming_tx_receiver: Option<broadcast::Receiver<TransactionGossipData>>,
44 network_sender: Option<mpsc::Sender<P2pRequestEvent>>,
45}
46
47#[derive(Clone)]
48pub struct TxStatusChange {
49 status_sender: broadcast::Sender<TxStatus>,
50 update_sender: broadcast::Sender<TxUpdate>,
51}
52
53impl TxStatusChange {
54 pub fn new(capacity: usize) -> Self {
55 let (status_sender, _) = broadcast::channel(capacity);
56 let (update_sender, _) = broadcast::channel(capacity);
57 Self {
58 status_sender,
59 update_sender,
60 }
61 }
62 pub fn send_complete(&self, id: Bytes32) {
63 let _ = self.status_sender.send(TxStatus::Completed);
64 self.updated(id);
65 }
66
67 pub fn send_submitted(&self, id: Bytes32) {
68 let _ = self.status_sender.send(TxStatus::Submitted);
69 self.updated(id);
70 }
71
72 pub fn send_squeezed_out(&self, id: Bytes32, reason: Error) {
73 let _ = self.status_sender.send(TxStatus::SqueezedOut {
74 reason: reason.clone(),
75 });
76 let _ = self.update_sender.send(TxUpdate::squeezed_out(id, reason));
77 }
78
79 fn updated(&self, id: Bytes32) {
80 let _ = self.update_sender.send(TxUpdate::updated(id));
81 }
82}
83
84impl Default for ServiceBuilder {
85 fn default() -> Self {
86 Self::new()
87 }
88}
89
90impl ServiceBuilder {
91 pub fn new() -> Self {
92 Self {
93 config: Default::default(),
94 db: None,
95 txpool_sender: None,
96 txpool_receiver: None,
97 tx_status_sender: None,
98 import_block_receiver: None,
99 incoming_tx_receiver: None,
100 network_sender: None,
101 }
102 }
103
104 pub fn sender(&self) -> &txpool::Sender {
105 self.txpool_sender.as_ref().unwrap()
106 }
107
108 pub fn tx_status_subscribe(&self) -> broadcast::Receiver<TxStatus> {
109 self.tx_status_sender
110 .as_ref()
111 .unwrap()
112 .status_sender
113 .subscribe()
114 }
115
116 pub fn tx_change_subscribe(&self) -> broadcast::Receiver<TxUpdate> {
117 self.tx_status_sender
118 .as_ref()
119 .unwrap()
120 .update_sender
121 .subscribe()
122 }
123
124 pub fn db(&mut self, db: Box<dyn TxPoolDb>) -> &mut Self {
125 self.db = Some(db);
126 self
127 }
128
129 pub fn txpool_sender(&mut self, txpool_sender: txpool::Sender) -> &mut Self {
130 self.txpool_sender = Some(txpool_sender);
131 self
132 }
133
134 pub fn txpool_receiver(
135 &mut self,
136 txpool_receiver: mpsc::Receiver<TxPoolMpsc>,
137 ) -> &mut Self {
138 self.txpool_receiver = Some(txpool_receiver);
139 self
140 }
141
142 pub fn tx_status_sender(&mut self, tx_status_sender: TxStatusChange) -> &mut Self {
143 self.tx_status_sender = Some(tx_status_sender);
144 self
145 }
146
147 pub fn incoming_tx_receiver(
148 &mut self,
149 incoming_tx_receiver: broadcast::Receiver<TransactionGossipData>,
150 ) -> &mut Self {
151 self.incoming_tx_receiver = Some(incoming_tx_receiver);
152 self
153 }
154
155 pub fn network_sender(
156 &mut self,
157 network_sender: mpsc::Sender<P2pRequestEvent>,
158 ) -> &mut Self {
159 self.network_sender = Some(network_sender);
160 self
161 }
162
163 pub fn import_block_event(
164 &mut self,
165 import_block_receiver: broadcast::Receiver<ImportBlockBroadcast>,
166 ) -> &mut Self {
167 self.import_block_receiver = Some(import_block_receiver);
168 self
169 }
170
171 pub fn config(&mut self, config: Config) -> &mut Self {
172 self.config = config;
173 self
174 }
175
176 pub fn build(self) -> anyhow::Result<Service> {
177 if self.db.is_none()
178 || self.import_block_receiver.is_none()
179 || self.incoming_tx_receiver.is_none()
180 || self.txpool_sender.is_none()
181 || self.tx_status_sender.is_none()
182 || self.txpool_receiver.is_none()
183 || self.network_sender.is_none()
184 {
185 return Err(anyhow!("One of context items are not set"))
186 }
187
188 let service = Service::new(
189 self.txpool_sender.unwrap(),
190 self.tx_status_sender.clone().unwrap(),
191 Context {
192 config: self.config,
193 db: Arc::new(self.db.unwrap()),
194 txpool_receiver: self.txpool_receiver.unwrap(),
195 tx_status_sender: self.tx_status_sender.unwrap(),
196 import_block_receiver: self.import_block_receiver.unwrap(),
197 incoming_tx_receiver: self.incoming_tx_receiver.unwrap(),
198 network_sender: self.network_sender.unwrap(),
199 },
200 )?;
201 Ok(service)
202 }
203}
204
205pub struct Context {
206 pub config: Config,
207 pub db: Arc<Box<dyn TxPoolDb>>,
208 pub txpool_receiver: mpsc::Receiver<TxPoolMpsc>,
209 pub tx_status_sender: TxStatusChange,
210 pub import_block_receiver: broadcast::Receiver<ImportBlockBroadcast>,
211 pub incoming_tx_receiver: broadcast::Receiver<TransactionGossipData>,
212 pub network_sender: mpsc::Sender<P2pRequestEvent>,
213}
214
215impl Context {
216 pub async fn run(mut self) -> Self {
217 let txpool = Arc::new(RwLock::new(TxPool::new(self.config.clone())));
218
219 loop {
220 tokio::select! {
221 new_transaction = self.incoming_tx_receiver.recv() => {
222 if new_transaction.is_err() {
223 error!("Incoming tx receiver channel closed unexpectedly; shutting down transaction pool service.");
224 break;
225 }
226
227 let txpool = txpool.clone();
228 let db = self.db.clone();
229 let tx_status_sender = self.tx_status_sender.clone();
230
231 tokio::spawn( async move {
232 let txpool = txpool.as_ref();
233 if let GossipData { data: Some(TransactionBroadcast::NewTransaction ( tx )), .. } = new_transaction.unwrap() {
234 let txs = vec!(Arc::new(tx));
235 TxPool::insert(txpool, db.as_ref().as_ref(), &tx_status_sender, &txs).await;
236 }
237 });
238 }
239
240 event = self.txpool_receiver.recv() => {
241 if matches!(event,Some(TxPoolMpsc::Stop) | None) {
242 break;
243 }
244 let txpool = txpool.clone();
245 let db = self.db.clone();
246 let tx_status_sender = self.tx_status_sender.clone();
247
248 let network_sender = self.network_sender.clone();
249
250 tokio::spawn( async move {
252 let txpool = txpool.as_ref();
253 match event.unwrap() {
254 TxPoolMpsc::PendingNumber { response } => {
255 let _ = response.send(TxPool::pending_number(txpool).await);
256 }
257 TxPoolMpsc::ConsumableGas { response } => {
258 let _ = response.send(TxPool::consumable_gas(txpool).await);
259 }
260 TxPoolMpsc::Includable { response } => {
261 let _ = response.send(TxPool::includable(txpool).await);
262 }
263 TxPoolMpsc::Insert { txs, response } => {
264 let insert = TxPool::insert(txpool, db.as_ref().as_ref(), &tx_status_sender, &txs).await;
265 for (ret, tx) in insert.iter().zip(txs.into_iter()) {
266 match ret {
267 Ok(_) => {
268 let _ = network_sender.send(P2pRequestEvent::BroadcastNewTransaction {
269 transaction: tx.clone(),
270 }).await;
271 }
272 Err(_) => {}
273 }
274 }
275 let _ = response.send(insert);
276 }
277 TxPoolMpsc::Find { ids, response } => {
278 let _ = response.send(TxPool::find(txpool,&ids).await);
279 }
280 TxPoolMpsc::FindOne { id, response } => {
281 let _ = response.send(TxPool::find_one(txpool,&id).await);
282 }
283 TxPoolMpsc::FindDependent { ids, response } => {
284 let _ = response.send(TxPool::find_dependent(txpool,&ids).await);
285 }
286 TxPoolMpsc::FilterByNegative { ids, response } => {
287 let _ = response.send(TxPool::filter_by_negative(txpool,&ids).await);
288 }
289 TxPoolMpsc::Remove { ids, response } => {
290 let _ = response.send(TxPool::remove(txpool, &tx_status_sender ,&ids).await);
291 }
292 TxPoolMpsc::Stop => {}
293 }});
294 }
295
296 block_updated = self.import_block_receiver.recv() => {
297 if let Ok(block_updated) = block_updated {
298 match block_updated {
299 ImportBlockBroadcast::PendingFuelBlockImported { block } => {
300 let txpool = txpool.clone();
301 TxPool::block_update(txpool.as_ref(), &self.tx_status_sender, block).await
302 },
307 ImportBlockBroadcast::SealedFuelBlockImported { block: _, is_created_by_self: _ } => {
308 todo!("Sealed block");
310 }
311 };
312 }
313 }
314 }
315 }
316 self
317 }
318}
319
320pub struct Service {
321 txpool_sender: txpool::Sender,
322 tx_status_sender: TxStatusChange,
323 join: Mutex<Option<JoinHandle<Context>>>,
324 context: Arc<Mutex<Option<Context>>>,
325}
326
327impl Service {
328 pub fn new(
329 txpool_sender: txpool::Sender,
330 tx_status_sender: TxStatusChange,
331 context: Context,
332 ) -> anyhow::Result<Self> {
333 Ok(Self {
334 txpool_sender,
335 tx_status_sender,
336 join: Mutex::new(None),
337 context: Arc::new(Mutex::new(Some(context))),
338 })
339 }
340
341 pub async fn start(&self) -> anyhow::Result<()> {
342 let mut join = self.join.lock().await;
343 if join.is_none() {
344 if let Some(context) = self.context.lock().await.take() {
345 *join = Some(tokio::spawn(async { context.run().await }));
346 Ok(())
347 } else {
348 Err(anyhow!("Starting TxPool service that is stopping"))
349 }
350 } else {
351 Err(anyhow!("Service TxPool is already started"))
352 }
353 }
354
355 pub async fn stop(&self) -> Option<JoinHandle<()>> {
356 let mut join = self.join.lock().await;
357 let join_handle = join.take();
358
359 if let Some(join_handle) = join_handle {
360 let _ = self.txpool_sender.send(TxPoolMpsc::Stop).await;
361 let context = self.context.clone();
362 Some(tokio::spawn(async move {
363 let ret = join_handle.await;
364 *context.lock().await = ret.ok();
365 }))
366 } else {
367 None
368 }
369 }
370
371 pub fn tx_status_subscribe(&self) -> broadcast::Receiver<TxStatus> {
372 self.tx_status_sender.status_sender.subscribe()
373 }
374
375 pub fn tx_update_subscribe(&self) -> broadcast::Receiver<TxUpdate> {
376 self.tx_status_sender.update_sender.subscribe()
377 }
378
379 pub fn sender(&self) -> &txpool::Sender {
380 &self.txpool_sender
381 }
382}
383
384#[cfg(test)]
385pub mod test_helpers;
386#[cfg(test)]
387pub mod tests;
388#[cfg(test)]
389pub mod tests_p2p;