oxidized_builder/core/
mempool.rs1use crate::common::error::AppError;
2use crate::network::provider::WsProvider;
3use alloy::consensus::Transaction as _;
4use alloy::primitives::B256;
5use alloy::providers::Provider;
6use futures::StreamExt;
7use tokio::sync::mpsc::UnboundedSender;
8use tokio::time::{sleep, Duration};
9use crate::core::strategy::StrategyWork;
10
11pub struct MempoolScanner {
12 provider: WsProvider,
13 tx_sender: UnboundedSender<StrategyWork>,
14}
15
16impl MempoolScanner {
17 pub fn new(provider: WsProvider, tx_sender: UnboundedSender<StrategyWork>) -> Self {
18 Self {
19 provider,
20 tx_sender,
21 }
22 }
23
24 pub async fn run(self) -> Result<(), AppError> {
25 tracing::info!("Mempool Scanner started...");
26
27 loop {
28 match self.provider.subscribe_pending_transactions().await {
29 Ok(sub) => {
30 tracing::info!(target: "mempool", "Subscribed to pendingTransactions");
31 let mut stream = sub.into_stream();
32 while let Some(tx_hash) = stream.next().await {
33 let provider_clone = self.provider.clone();
34 let sender_clone = self.tx_sender.clone();
35
36 tokio::spawn(async move {
37 if let Ok(Some(tx)) = provider_clone.get_transaction_by_hash(tx_hash).await {
38 if tx.input().len() > 4 {
39 let _ = sender_clone.send(StrategyWork::Mempool(tx));
40 }
41 }
42 });
43 }
44 tracing::warn!(target: "mempool", "Pending tx subscription ended, retrying after backoff");
45 }
46 Err(e) => {
47 tracing::warn!(
48 target: "mempool",
49 error = %e,
50 "WS pending sub failed; falling back to polling filter"
51 );
52 self.poll_filter_loop().await?;
53 }
54 }
55
56 sleep(Duration::from_secs(2)).await;
57 }
58 }
59}
60
61impl MempoolScanner {
62 async fn poll_filter_loop(&self) -> Result<(), AppError> {
63 let filter_id = self
64 .provider
65 .new_pending_transactions_filter(false)
66 .await
67 .map_err(|err| AppError::Connection(format!("Filter create failed: {}", err)))?;
68
69 loop {
70 match self.provider.get_filter_changes::<B256>(filter_id).await {
71 Ok(hashes) => {
72 for tx_hash in hashes {
73 let provider_clone = self.provider.clone();
74 let sender_clone = self.tx_sender.clone();
75 tokio::spawn(async move {
76 if let Ok(Some(tx)) = provider_clone.get_transaction_by_hash(tx_hash).await
77 {
78 if tx.input().len() > 4 {
79 let _ = sender_clone.send(StrategyWork::Mempool(tx));
80 }
81 }
82 });
83 }
84 }
85 Err(err) => {
86 tracing::warn!(
87 target: "mempool",
88 error = %err,
89 "poll get_filter_changes failed"
90 );
91 break;
92 }
93 }
94 sleep(Duration::from_millis(1200)).await;
95 }
96
97 Ok(())
98 }
99}