Skip to main content

chainrpc_core/
pending_pool.rs

1//! Pending transaction pool monitoring.
2//!
3//! [`PendingPoolMonitor`] watches a set of transaction hashes and can query
4//! an [`RpcTransport`] to determine whether each transaction is still pending,
5//! has been included in a block, or has disappeared from the mempool.
6
7use std::collections::HashSet;
8use std::sync::Mutex;
9
10use serde_json::Value;
11
12use crate::error::TransportError;
13use crate::request::JsonRpcRequest;
14use crate::transport::RpcTransport;
15
16// ---------------------------------------------------------------------------
17// Config
18// ---------------------------------------------------------------------------
19
20/// Configuration for pending pool monitoring.
21#[derive(Debug, Clone)]
22pub struct PendingPoolConfig {
23    /// How often to poll for status changes (ms).
24    pub poll_interval_ms: u64,
25    /// Max number of transactions to monitor simultaneously.
26    pub max_monitored: usize,
27}
28
29impl Default for PendingPoolConfig {
30    fn default() -> Self {
31        Self {
32            poll_interval_ms: 2000,
33            max_monitored: 256,
34        }
35    }
36}
37
38// ---------------------------------------------------------------------------
39// PendingTxStatus
40// ---------------------------------------------------------------------------
41
42/// The observed status of a pending transaction.
43#[derive(Debug, Clone, PartialEq, Eq)]
44pub enum PendingTxStatus {
45    /// Still in mempool, no receipt yet.
46    Pending,
47    /// Included in a block.
48    Included { block_number: u64 },
49    /// Transaction not found (possibly dropped).
50    NotFound,
51}
52
53impl std::fmt::Display for PendingTxStatus {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        match self {
56            Self::Pending => write!(f, "pending"),
57            Self::Included { block_number } => write!(f, "included(block={block_number})"),
58            Self::NotFound => write!(f, "not_found"),
59        }
60    }
61}
62
63// ---------------------------------------------------------------------------
64// PendingPoolMonitor
65// ---------------------------------------------------------------------------
66
67/// Monitors pending transactions and reports status changes.
68///
69/// The monitor maintains a thread-safe set of transaction hashes and
70/// provides a static [`check_status`](PendingPoolMonitor::check_status)
71/// method to query a transport for the current state of a transaction.
72pub struct PendingPoolMonitor {
73    config: PendingPoolConfig,
74    watched: Mutex<HashSet<String>>,
75}
76
77impl PendingPoolMonitor {
78    /// Create a new monitor with the given configuration.
79    pub fn new(config: PendingPoolConfig) -> Self {
80        Self {
81            config,
82            watched: Mutex::new(HashSet::new()),
83        }
84    }
85
86    /// Add a transaction hash to monitor.
87    ///
88    /// Returns `true` if the hash was added, `false` if already present or
89    /// if the monitor is at maximum capacity.
90    pub fn watch(&self, tx_hash: String) -> bool {
91        let mut watched = self.watched.lock().unwrap();
92        if watched.len() >= self.config.max_monitored {
93            return false;
94        }
95        watched.insert(tx_hash)
96    }
97
98    /// Remove a transaction from monitoring.
99    pub fn unwatch(&self, tx_hash: &str) {
100        let mut watched = self.watched.lock().unwrap();
101        watched.remove(tx_hash);
102    }
103
104    /// Get all currently watched transaction hashes.
105    pub fn watched(&self) -> Vec<String> {
106        let watched = self.watched.lock().unwrap();
107        watched.iter().cloned().collect()
108    }
109
110    /// Number of transactions being monitored.
111    pub fn count(&self) -> usize {
112        let watched = self.watched.lock().unwrap();
113        watched.len()
114    }
115
116    /// Get the poll interval from the config.
117    pub fn poll_interval_ms(&self) -> u64 {
118        self.config.poll_interval_ms
119    }
120
121    /// Check the status of a single tx by querying the transport.
122    ///
123    /// Calls `eth_getTransactionReceipt` on the transport:
124    /// - If the receipt exists and contains a `blockNumber`, the tx is
125    ///   [`Included`](PendingTxStatus::Included).
126    /// - If the receipt is `null` we fall back to `eth_getTransactionByHash`:
127    ///   - If the tx object is present the tx is still [`Pending`](PendingTxStatus::Pending).
128    ///   - Otherwise it is [`NotFound`](PendingTxStatus::NotFound).
129    pub async fn check_status(
130        transport: &dyn RpcTransport,
131        tx_hash: &str,
132    ) -> Result<PendingTxStatus, TransportError> {
133        // 1. Try eth_getTransactionReceipt.
134        let receipt_req = JsonRpcRequest::auto(
135            "eth_getTransactionReceipt",
136            vec![Value::String(tx_hash.to_string())],
137        );
138        let receipt_resp = transport.send(receipt_req).await?;
139        let receipt_value = receipt_resp.into_result().map_err(TransportError::Rpc)?;
140
141        if !receipt_value.is_null() {
142            // Extract blockNumber from the receipt.
143            if let Some(block_hex) = receipt_value.get("blockNumber").and_then(|v| v.as_str()) {
144                let block_number =
145                    u64::from_str_radix(block_hex.trim_start_matches("0x"), 16).unwrap_or(0);
146                return Ok(PendingTxStatus::Included { block_number });
147            }
148            // Receipt exists but no blockNumber — treat as included at 0.
149            return Ok(PendingTxStatus::Included { block_number: 0 });
150        }
151
152        // 2. Receipt is null — check if the tx itself exists.
153        let tx_req = JsonRpcRequest::auto(
154            "eth_getTransactionByHash",
155            vec![Value::String(tx_hash.to_string())],
156        );
157        let tx_resp = transport.send(tx_req).await?;
158        let tx_value = tx_resp.into_result().map_err(TransportError::Rpc)?;
159
160        if tx_value.is_null() {
161            Ok(PendingTxStatus::NotFound)
162        } else {
163            Ok(PendingTxStatus::Pending)
164        }
165    }
166}
167
168// ===========================================================================
169// Tests
170// ===========================================================================
171
172#[cfg(test)]
173mod tests {
174    use super::*;
175
176    #[test]
177    fn monitor_watch_unwatch() {
178        let monitor = PendingPoolMonitor::new(PendingPoolConfig::default());
179
180        // Watch a new hash.
181        assert!(monitor.watch("0xabc".to_string()));
182        assert_eq!(monitor.count(), 1);
183
184        // Watching the same hash again returns false (already present).
185        assert!(!monitor.watch("0xabc".to_string()));
186        assert_eq!(monitor.count(), 1);
187
188        // Watch a second hash.
189        assert!(monitor.watch("0xdef".to_string()));
190        assert_eq!(monitor.count(), 2);
191
192        // Unwatch.
193        monitor.unwatch("0xabc");
194        assert_eq!(monitor.count(), 1);
195
196        // The watched list should only contain 0xdef.
197        let list = monitor.watched();
198        assert_eq!(list.len(), 1);
199        assert!(list.contains(&"0xdef".to_string()));
200    }
201
202    #[test]
203    fn monitor_max_capacity() {
204        let config = PendingPoolConfig {
205            poll_interval_ms: 1000,
206            max_monitored: 2,
207        };
208        let monitor = PendingPoolMonitor::new(config);
209
210        assert!(monitor.watch("0x1".to_string()));
211        assert!(monitor.watch("0x2".to_string()));
212        // At capacity — should return false.
213        assert!(!monitor.watch("0x3".to_string()));
214        assert_eq!(monitor.count(), 2);
215
216        // After unwatching one, we can add again.
217        monitor.unwatch("0x1");
218        assert!(monitor.watch("0x3".to_string()));
219        assert_eq!(monitor.count(), 2);
220    }
221
222    #[test]
223    fn pending_status_enum() {
224        let pending = PendingTxStatus::Pending;
225        assert_eq!(pending.to_string(), "pending");
226
227        let included = PendingTxStatus::Included { block_number: 42 };
228        assert_eq!(included.to_string(), "included(block=42)");
229
230        let not_found = PendingTxStatus::NotFound;
231        assert_eq!(not_found.to_string(), "not_found");
232
233        // PartialEq
234        assert_eq!(PendingTxStatus::Pending, PendingTxStatus::Pending);
235        assert_ne!(PendingTxStatus::Pending, PendingTxStatus::NotFound);
236        assert_eq!(
237            PendingTxStatus::Included { block_number: 10 },
238            PendingTxStatus::Included { block_number: 10 },
239        );
240        assert_ne!(
241            PendingTxStatus::Included { block_number: 10 },
242            PendingTxStatus::Included { block_number: 20 },
243        );
244    }
245}