chainrpc_core/
pending_pool.rs1use std::collections::HashSet;
8use std::sync::Mutex;
9
10use serde_json::Value;
11
12use crate::error::TransportError;
13use crate::request::JsonRpcRequest;
14use crate::transport::RpcTransport;
15
16#[derive(Debug, Clone)]
22pub struct PendingPoolConfig {
23 pub poll_interval_ms: u64,
25 pub max_monitored: usize,
27}
28
29impl Default for PendingPoolConfig {
30 fn default() -> Self {
31 Self {
32 poll_interval_ms: 2000,
33 max_monitored: 256,
34 }
35 }
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
44pub enum PendingTxStatus {
45 Pending,
47 Included { block_number: u64 },
49 NotFound,
51}
52
53impl std::fmt::Display for PendingTxStatus {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 match self {
56 Self::Pending => write!(f, "pending"),
57 Self::Included { block_number } => write!(f, "included(block={block_number})"),
58 Self::NotFound => write!(f, "not_found"),
59 }
60 }
61}
62
63pub struct PendingPoolMonitor {
73 config: PendingPoolConfig,
74 watched: Mutex<HashSet<String>>,
75}
76
77impl PendingPoolMonitor {
78 pub fn new(config: PendingPoolConfig) -> Self {
80 Self {
81 config,
82 watched: Mutex::new(HashSet::new()),
83 }
84 }
85
86 pub fn watch(&self, tx_hash: String) -> bool {
91 let mut watched = self.watched.lock().unwrap();
92 if watched.len() >= self.config.max_monitored {
93 return false;
94 }
95 watched.insert(tx_hash)
96 }
97
98 pub fn unwatch(&self, tx_hash: &str) {
100 let mut watched = self.watched.lock().unwrap();
101 watched.remove(tx_hash);
102 }
103
104 pub fn watched(&self) -> Vec<String> {
106 let watched = self.watched.lock().unwrap();
107 watched.iter().cloned().collect()
108 }
109
110 pub fn count(&self) -> usize {
112 let watched = self.watched.lock().unwrap();
113 watched.len()
114 }
115
116 pub fn poll_interval_ms(&self) -> u64 {
118 self.config.poll_interval_ms
119 }
120
121 pub async fn check_status(
130 transport: &dyn RpcTransport,
131 tx_hash: &str,
132 ) -> Result<PendingTxStatus, TransportError> {
133 let receipt_req = JsonRpcRequest::auto(
135 "eth_getTransactionReceipt",
136 vec![Value::String(tx_hash.to_string())],
137 );
138 let receipt_resp = transport.send(receipt_req).await?;
139 let receipt_value = receipt_resp.into_result().map_err(TransportError::Rpc)?;
140
141 if !receipt_value.is_null() {
142 if let Some(block_hex) = receipt_value.get("blockNumber").and_then(|v| v.as_str()) {
144 let block_number =
145 u64::from_str_radix(block_hex.trim_start_matches("0x"), 16).unwrap_or(0);
146 return Ok(PendingTxStatus::Included { block_number });
147 }
148 return Ok(PendingTxStatus::Included { block_number: 0 });
150 }
151
152 let tx_req = JsonRpcRequest::auto(
154 "eth_getTransactionByHash",
155 vec![Value::String(tx_hash.to_string())],
156 );
157 let tx_resp = transport.send(tx_req).await?;
158 let tx_value = tx_resp.into_result().map_err(TransportError::Rpc)?;
159
160 if tx_value.is_null() {
161 Ok(PendingTxStatus::NotFound)
162 } else {
163 Ok(PendingTxStatus::Pending)
164 }
165 }
166}
167
168#[cfg(test)]
173mod tests {
174 use super::*;
175
176 #[test]
177 fn monitor_watch_unwatch() {
178 let monitor = PendingPoolMonitor::new(PendingPoolConfig::default());
179
180 assert!(monitor.watch("0xabc".to_string()));
182 assert_eq!(monitor.count(), 1);
183
184 assert!(!monitor.watch("0xabc".to_string()));
186 assert_eq!(monitor.count(), 1);
187
188 assert!(monitor.watch("0xdef".to_string()));
190 assert_eq!(monitor.count(), 2);
191
192 monitor.unwatch("0xabc");
194 assert_eq!(monitor.count(), 1);
195
196 let list = monitor.watched();
198 assert_eq!(list.len(), 1);
199 assert!(list.contains(&"0xdef".to_string()));
200 }
201
202 #[test]
203 fn monitor_max_capacity() {
204 let config = PendingPoolConfig {
205 poll_interval_ms: 1000,
206 max_monitored: 2,
207 };
208 let monitor = PendingPoolMonitor::new(config);
209
210 assert!(monitor.watch("0x1".to_string()));
211 assert!(monitor.watch("0x2".to_string()));
212 assert!(!monitor.watch("0x3".to_string()));
214 assert_eq!(monitor.count(), 2);
215
216 monitor.unwatch("0x1");
218 assert!(monitor.watch("0x3".to_string()));
219 assert_eq!(monitor.count(), 2);
220 }
221
222 #[test]
223 fn pending_status_enum() {
224 let pending = PendingTxStatus::Pending;
225 assert_eq!(pending.to_string(), "pending");
226
227 let included = PendingTxStatus::Included { block_number: 42 };
228 assert_eq!(included.to_string(), "included(block=42)");
229
230 let not_found = PendingTxStatus::NotFound;
231 assert_eq!(not_found.to_string(), "not_found");
232
233 assert_eq!(PendingTxStatus::Pending, PendingTxStatus::Pending);
235 assert_ne!(PendingTxStatus::Pending, PendingTxStatus::NotFound);
236 assert_eq!(
237 PendingTxStatus::Included { block_number: 10 },
238 PendingTxStatus::Included { block_number: 10 },
239 );
240 assert_ne!(
241 PendingTxStatus::Included { block_number: 10 },
242 PendingTxStatus::Included { block_number: 20 },
243 );
244 }
245}