oxidized_builder/core/
mempool.rs

1use crate::common::error::AppError;
2use crate::network::provider::WsProvider;
3use alloy::consensus::Transaction as _;
4use alloy::primitives::B256;
5use alloy::providers::Provider;
6use futures::StreamExt;
7use tokio::sync::mpsc::UnboundedSender;
8use tokio::time::{sleep, Duration};
9use crate::core::strategy::StrategyWork;
10
11pub struct MempoolScanner {
12    provider: WsProvider,
13    tx_sender: UnboundedSender<StrategyWork>,
14}
15
16impl MempoolScanner {
17    pub fn new(provider: WsProvider, tx_sender: UnboundedSender<StrategyWork>) -> Self {
18        Self {
19            provider,
20            tx_sender,
21        }
22    }
23
24    pub async fn run(self) -> Result<(), AppError> {
25        tracing::info!("Mempool Scanner started...");
26
27        loop {
28            match self.provider.subscribe_pending_transactions().await {
29                Ok(sub) => {
30                    tracing::info!(target: "mempool", "Subscribed to pendingTransactions");
31                    let mut stream = sub.into_stream();
32                    while let Some(tx_hash) = stream.next().await {
33                        let provider_clone = self.provider.clone();
34                        let sender_clone = self.tx_sender.clone();
35
36                        tokio::spawn(async move {
37                            if let Ok(Some(tx)) = provider_clone.get_transaction_by_hash(tx_hash).await {
38                                if tx.input().len() > 4 {
39                                    let _ = sender_clone.send(StrategyWork::Mempool(tx));
40                                }
41                            }
42                        });
43                    }
44                    tracing::warn!(target: "mempool", "Pending tx subscription ended, retrying after backoff");
45                }
46                Err(e) => {
47                    tracing::warn!(
48                        target: "mempool",
49                        error = %e,
50                        "WS pending sub failed; falling back to polling filter"
51                    );
52                    self.poll_filter_loop().await?;
53                }
54            }
55
56            sleep(Duration::from_secs(2)).await;
57        }
58    }
59}
60
61impl MempoolScanner {
62    async fn poll_filter_loop(&self) -> Result<(), AppError> {
63        let filter_id = self
64            .provider
65            .new_pending_transactions_filter(false)
66            .await
67            .map_err(|err| AppError::Connection(format!("Filter create failed: {}", err)))?;
68
69        loop {
70            match self.provider.get_filter_changes::<B256>(filter_id).await {
71                Ok(hashes) => {
72                    for tx_hash in hashes {
73                        let provider_clone = self.provider.clone();
74                        let sender_clone = self.tx_sender.clone();
75                        tokio::spawn(async move {
76                            if let Ok(Some(tx)) = provider_clone.get_transaction_by_hash(tx_hash).await
77                            {
78                                if tx.input().len() > 4 {
79                                    let _ = sender_clone.send(StrategyWork::Mempool(tx));
80                                }
81                            }
82                        });
83                    }
84                }
85                Err(err) => {
86                    tracing::warn!(
87                        target: "mempool",
88                        error = %err,
89                        "poll get_filter_changes failed"
90                    );
91                    break;
92                }
93            }
94            sleep(Duration::from_millis(1200)).await;
95        }
96
97        Ok(())
98    }
99}