1use serde_json::Value;
8
9use crate::error::TransportError;
10use crate::request::JsonRpcRequest;
11use crate::transport::RpcTransport;
12use crate::tx::{ReceiptPoller, TrackedTx, TxStatus, TxTracker};
13
14pub async fn poll_receipt(
26 transport: &dyn RpcTransport,
27 tx_hash: &str,
28 poller: &ReceiptPoller,
29) -> Result<Option<Value>, TransportError> {
30 let mut attempt: u32 = 1;
31
32 loop {
33 let delay = match poller.delay_for_attempt(attempt) {
34 Some(d) => d,
35 None => return Ok(None), };
37
38 if attempt > 1 {
40 tokio::time::sleep(delay).await;
41 }
42
43 let req = JsonRpcRequest::auto(
44 "eth_getTransactionReceipt",
45 vec![Value::String(tx_hash.to_string())],
46 );
47 let resp = transport.send(req).await?;
48 let value = resp.into_result().map_err(TransportError::Rpc)?;
49
50 if !value.is_null() {
51 return Ok(Some(value));
52 }
53
54 attempt += 1;
55 }
56}
57
58pub async fn send_and_track(
69 transport: &dyn RpcTransport,
70 tracker: &TxTracker,
71 raw_tx: &str,
72 from: &str,
73 nonce: u64,
74) -> Result<String, TransportError> {
75 let req = JsonRpcRequest::auto(
76 "eth_sendRawTransaction",
77 vec![Value::String(raw_tx.to_string())],
78 );
79 let resp = transport.send(req).await?;
80 let result = resp.into_result().map_err(TransportError::Rpc)?;
81
82 let tx_hash = result
83 .as_str()
84 .ok_or_else(|| {
85 TransportError::Other("eth_sendRawTransaction did not return a string hash".into())
86 })?
87 .to_string();
88
89 let now = std::time::SystemTime::now()
90 .duration_since(std::time::UNIX_EPOCH)
91 .unwrap_or_default()
92 .as_secs();
93
94 let tracked = TrackedTx {
95 tx_hash: tx_hash.clone(),
96 from: from.to_string(),
97 nonce,
98 submitted_at: now,
99 status: TxStatus::Pending,
100 gas_price: None,
101 max_fee: None,
102 max_priority_fee: None,
103 last_checked: now,
104 };
105
106 tracker.track(tracked);
107 Ok(tx_hash)
108}
109
110pub async fn refresh_status(
123 transport: &dyn RpcTransport,
124 tracker: &TxTracker,
125 tx_hash: &str,
126) -> Result<TxStatus, TransportError> {
127 let req = JsonRpcRequest::auto(
128 "eth_getTransactionReceipt",
129 vec![Value::String(tx_hash.to_string())],
130 );
131 let resp = transport.send(req).await?;
132 let value = resp.into_result().map_err(TransportError::Rpc)?;
133
134 let status = if value.is_null() {
135 TxStatus::Pending
136 } else {
137 let block_number = value
138 .get("blockNumber")
139 .and_then(|v| v.as_str())
140 .and_then(|hex| u64::from_str_radix(hex.trim_start_matches("0x"), 16).ok())
141 .unwrap_or(0);
142
143 let block_hash = value
144 .get("blockHash")
145 .and_then(|v| v.as_str())
146 .unwrap_or("0x0")
147 .to_string();
148
149 TxStatus::Included {
150 block_number,
151 block_hash,
152 }
153 };
154
155 tracker.update_status(tx_hash, status.clone());
156 Ok(status)
157}
158
159pub async fn detect_stuck(
172 transport: &dyn RpcTransport,
173 tracker: &TxTracker,
174 current_time: u64,
175) -> Vec<TrackedTx> {
176 let stuck = tracker.stuck(current_time);
177
178 let mut seen_senders = std::collections::HashSet::new();
180 for tx in &stuck {
181 if seen_senders.insert(tx.from.clone()) {
182 let req = JsonRpcRequest::auto(
183 "eth_getTransactionCount",
184 vec![
185 Value::String(tx.from.clone()),
186 Value::String("latest".to_string()),
187 ],
188 );
189 if let Ok(resp) = transport.send(req).await {
190 if let Ok(val) = resp.into_result() {
191 if let Some(hex) = val.as_str() {
192 if let Ok(nonce) = u64::from_str_radix(hex.trim_start_matches("0x"), 16) {
193 tracker.set_nonce(&tx.from, nonce);
194 }
195 }
196 }
197 }
198 }
199 }
200
201 stuck
202}
203
204#[cfg(test)]
209mod tests {
210 use super::*;
211 use crate::request::{JsonRpcResponse, RpcId};
212 use crate::tx::TxTrackerConfig;
213 use async_trait::async_trait;
214 use std::sync::Mutex;
215
216 struct MockTransport {
221 responses: Mutex<std::collections::HashMap<String, Value>>,
223 }
224
225 impl MockTransport {
226 fn new() -> Self {
227 Self {
228 responses: Mutex::new(std::collections::HashMap::new()),
229 }
230 }
231
232 fn set_response(&self, method: &str, value: Value) {
233 let mut map = self.responses.lock().unwrap();
234 map.insert(method.to_string(), value);
235 }
236 }
237
238 #[async_trait]
239 impl RpcTransport for MockTransport {
240 async fn send(&self, req: JsonRpcRequest) -> Result<JsonRpcResponse, TransportError> {
241 let map = self.responses.lock().unwrap();
242 let result = map.get(&req.method).cloned().unwrap_or(Value::Null);
243 Ok(JsonRpcResponse {
244 jsonrpc: "2.0".into(),
245 id: RpcId::Number(1),
246 result: Some(result),
247 error: None,
248 })
249 }
250
251 fn url(&self) -> &str {
252 "mock://lifecycle"
253 }
254 }
255
256 #[tokio::test]
261 async fn send_and_track_records_tx() {
262 let transport = MockTransport::new();
263 transport.set_response("eth_sendRawTransaction", Value::String("0xdeadbeef".into()));
264
265 let tracker = TxTracker::new(TxTrackerConfig::default());
266
267 let hash = send_and_track(&transport, &tracker, "0xraw_data", "0xAlice", 42)
268 .await
269 .expect("send_and_track should succeed");
270
271 assert_eq!(hash, "0xdeadbeef");
272 assert_eq!(tracker.count(), 1);
273
274 let tracked = tracker.get("0xdeadbeef").expect("tx should be tracked");
275 assert_eq!(tracked.from, "0xAlice");
276 assert_eq!(tracked.nonce, 42);
277 assert_eq!(tracked.status, TxStatus::Pending);
278 }
279
280 #[tokio::test]
281 async fn detect_stuck_returns_old_txs() {
282 let transport = MockTransport::new();
283 transport.set_response("eth_getTransactionCount", Value::String("0x5".into()));
285
286 let config = TxTrackerConfig {
287 stuck_timeout_secs: 60,
288 ..Default::default()
289 };
290 let tracker = TxTracker::new(config);
291
292 let old_tx = TrackedTx {
294 tx_hash: "0xold".to_string(),
295 from: "0xAlice".to_string(),
296 nonce: 3,
297 submitted_at: 1000,
298 status: TxStatus::Pending,
299 gas_price: Some(20_000_000_000),
300 max_fee: None,
301 max_priority_fee: None,
302 last_checked: 1000,
303 };
304 tracker.track(old_tx);
305
306 let new_tx = TrackedTx {
308 tx_hash: "0xnew".to_string(),
309 from: "0xAlice".to_string(),
310 nonce: 4,
311 submitted_at: 1090,
312 status: TxStatus::Pending,
313 gas_price: Some(20_000_000_000),
314 max_fee: None,
315 max_priority_fee: None,
316 last_checked: 1090,
317 };
318 tracker.track(new_tx);
319
320 let stuck = detect_stuck(&transport, &tracker, 1100).await;
324 assert_eq!(stuck.len(), 1);
325 assert_eq!(stuck[0].tx_hash, "0xold");
326
327 assert_eq!(tracker.next_nonce("0xAlice"), Some(6)); }
330}