Skip to main content

chainrpc_core/
tx_lifecycle.rs

1//! Transaction lifecycle management wired into [`RpcTransport`].
2//!
3//! This module provides async helper functions that compose the
4//! [`TxTracker`] / [`ReceiptPoller`] primitives from [`crate::tx`] with
5//! a live [`RpcTransport`] to drive the full send-track-confirm lifecycle.
6
7use serde_json::Value;
8
9use crate::error::TransportError;
10use crate::request::JsonRpcRequest;
11use crate::transport::RpcTransport;
12use crate::tx::{ReceiptPoller, TrackedTx, TxStatus, TxTracker};
13
14// ---------------------------------------------------------------------------
15// poll_receipt
16// ---------------------------------------------------------------------------
17
18/// Poll for a transaction receipt with exponential backoff.
19///
20/// Uses the [`ReceiptPoller`] to determine the delay between attempts.
21/// Calls `eth_getTransactionReceipt` on the transport.
22///
23/// Returns `Ok(Some(receipt))` when a receipt is found, `Ok(None)` if the
24/// poller's maximum attempts are exhausted, or `Err` on transport failure.
25pub async fn poll_receipt(
26    transport: &dyn RpcTransport,
27    tx_hash: &str,
28    poller: &ReceiptPoller,
29) -> Result<Option<Value>, TransportError> {
30    let mut attempt: u32 = 1;
31
32    loop {
33        let delay = match poller.delay_for_attempt(attempt) {
34            Some(d) => d,
35            None => return Ok(None), // max attempts exceeded
36        };
37
38        // Wait before querying (except on the very first attempt).
39        if attempt > 1 {
40            tokio::time::sleep(delay).await;
41        }
42
43        let req = JsonRpcRequest::auto(
44            "eth_getTransactionReceipt",
45            vec![Value::String(tx_hash.to_string())],
46        );
47        let resp = transport.send(req).await?;
48        let value = resp.into_result().map_err(TransportError::Rpc)?;
49
50        if !value.is_null() {
51            return Ok(Some(value));
52        }
53
54        attempt += 1;
55    }
56}
57
58// ---------------------------------------------------------------------------
59// send_and_track
60// ---------------------------------------------------------------------------
61
62/// Send a raw transaction and automatically track it.
63///
64/// 1. Sends via `eth_sendRawTransaction`.
65/// 2. Extracts the returned transaction hash from the RPC response.
66/// 3. Creates a [`TrackedTx`] and registers it with the [`TxTracker`].
67/// 4. Returns the transaction hash.
68pub async fn send_and_track(
69    transport: &dyn RpcTransport,
70    tracker: &TxTracker,
71    raw_tx: &str,
72    from: &str,
73    nonce: u64,
74) -> Result<String, TransportError> {
75    let req = JsonRpcRequest::auto(
76        "eth_sendRawTransaction",
77        vec![Value::String(raw_tx.to_string())],
78    );
79    let resp = transport.send(req).await?;
80    let result = resp.into_result().map_err(TransportError::Rpc)?;
81
82    let tx_hash = result
83        .as_str()
84        .ok_or_else(|| {
85            TransportError::Other("eth_sendRawTransaction did not return a string hash".into())
86        })?
87        .to_string();
88
89    let now = std::time::SystemTime::now()
90        .duration_since(std::time::UNIX_EPOCH)
91        .unwrap_or_default()
92        .as_secs();
93
94    let tracked = TrackedTx {
95        tx_hash: tx_hash.clone(),
96        from: from.to_string(),
97        nonce,
98        submitted_at: now,
99        status: TxStatus::Pending,
100        gas_price: None,
101        max_fee: None,
102        max_priority_fee: None,
103        last_checked: now,
104    };
105
106    tracker.track(tracked);
107    Ok(tx_hash)
108}
109
110// ---------------------------------------------------------------------------
111// refresh_status
112// ---------------------------------------------------------------------------
113
114/// Check the status of a tracked transaction against the chain.
115///
116/// Queries `eth_getTransactionReceipt` and updates the tracker:
117/// - If a receipt with a `blockNumber` is found, the status becomes
118///   [`TxStatus::Included`].
119/// - If the receipt is `null`, the status remains [`TxStatus::Pending`].
120///
121/// Returns the newly determined status.
122pub async fn refresh_status(
123    transport: &dyn RpcTransport,
124    tracker: &TxTracker,
125    tx_hash: &str,
126) -> Result<TxStatus, TransportError> {
127    let req = JsonRpcRequest::auto(
128        "eth_getTransactionReceipt",
129        vec![Value::String(tx_hash.to_string())],
130    );
131    let resp = transport.send(req).await?;
132    let value = resp.into_result().map_err(TransportError::Rpc)?;
133
134    let status = if value.is_null() {
135        TxStatus::Pending
136    } else {
137        let block_number = value
138            .get("blockNumber")
139            .and_then(|v| v.as_str())
140            .and_then(|hex| u64::from_str_radix(hex.trim_start_matches("0x"), 16).ok())
141            .unwrap_or(0);
142
143        let block_hash = value
144            .get("blockHash")
145            .and_then(|v| v.as_str())
146            .unwrap_or("0x0")
147            .to_string();
148
149        TxStatus::Included {
150            block_number,
151            block_hash,
152        }
153    };
154
155    tracker.update_status(tx_hash, status.clone());
156    Ok(status)
157}
158
159// ---------------------------------------------------------------------------
160// detect_stuck
161// ---------------------------------------------------------------------------
162
163/// Detect and return stuck transactions.
164///
165/// Delegates to [`TxTracker::stuck`] using the supplied `current_time`.
166/// For each stuck transaction, queries `eth_getTransactionCount` for the
167/// sender to help diagnose nonce-based replacement or dropping (the caller
168/// can use this information to decide on remediation).
169///
170/// Returns the list of stuck [`TrackedTx`] entries.
171pub async fn detect_stuck(
172    transport: &dyn RpcTransport,
173    tracker: &TxTracker,
174    current_time: u64,
175) -> Vec<TrackedTx> {
176    let stuck = tracker.stuck(current_time);
177
178    // For each unique sender, refresh the on-chain nonce in the tracker.
179    let mut seen_senders = std::collections::HashSet::new();
180    for tx in &stuck {
181        if seen_senders.insert(tx.from.clone()) {
182            let req = JsonRpcRequest::auto(
183                "eth_getTransactionCount",
184                vec![
185                    Value::String(tx.from.clone()),
186                    Value::String("latest".to_string()),
187                ],
188            );
189            if let Ok(resp) = transport.send(req).await {
190                if let Ok(val) = resp.into_result() {
191                    if let Some(hex) = val.as_str() {
192                        if let Ok(nonce) = u64::from_str_radix(hex.trim_start_matches("0x"), 16) {
193                            tracker.set_nonce(&tx.from, nonce);
194                        }
195                    }
196                }
197            }
198        }
199    }
200
201    stuck
202}
203
204// ===========================================================================
205// Tests
206// ===========================================================================
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211    use crate::request::{JsonRpcResponse, RpcId};
212    use crate::tx::TxTrackerConfig;
213    use async_trait::async_trait;
214    use std::sync::Mutex;
215
216    // -----------------------------------------------------------------------
217    // Mock transport that returns configurable responses per method.
218    // -----------------------------------------------------------------------
219
220    struct MockTransport {
221        /// Responses to return, keyed by method name.
222        responses: Mutex<std::collections::HashMap<String, Value>>,
223    }
224
225    impl MockTransport {
226        fn new() -> Self {
227            Self {
228                responses: Mutex::new(std::collections::HashMap::new()),
229            }
230        }
231
232        fn set_response(&self, method: &str, value: Value) {
233            let mut map = self.responses.lock().unwrap();
234            map.insert(method.to_string(), value);
235        }
236    }
237
238    #[async_trait]
239    impl RpcTransport for MockTransport {
240        async fn send(&self, req: JsonRpcRequest) -> Result<JsonRpcResponse, TransportError> {
241            let map = self.responses.lock().unwrap();
242            let result = map.get(&req.method).cloned().unwrap_or(Value::Null);
243            Ok(JsonRpcResponse {
244                jsonrpc: "2.0".into(),
245                id: RpcId::Number(1),
246                result: Some(result),
247                error: None,
248            })
249        }
250
251        fn url(&self) -> &str {
252            "mock://lifecycle"
253        }
254    }
255
256    // -----------------------------------------------------------------------
257    // Tests
258    // -----------------------------------------------------------------------
259
260    #[tokio::test]
261    async fn send_and_track_records_tx() {
262        let transport = MockTransport::new();
263        transport.set_response("eth_sendRawTransaction", Value::String("0xdeadbeef".into()));
264
265        let tracker = TxTracker::new(TxTrackerConfig::default());
266
267        let hash = send_and_track(&transport, &tracker, "0xraw_data", "0xAlice", 42)
268            .await
269            .expect("send_and_track should succeed");
270
271        assert_eq!(hash, "0xdeadbeef");
272        assert_eq!(tracker.count(), 1);
273
274        let tracked = tracker.get("0xdeadbeef").expect("tx should be tracked");
275        assert_eq!(tracked.from, "0xAlice");
276        assert_eq!(tracked.nonce, 42);
277        assert_eq!(tracked.status, TxStatus::Pending);
278    }
279
280    #[tokio::test]
281    async fn detect_stuck_returns_old_txs() {
282        let transport = MockTransport::new();
283        // Return a nonce as a hex string for eth_getTransactionCount.
284        transport.set_response("eth_getTransactionCount", Value::String("0x5".into()));
285
286        let config = TxTrackerConfig {
287            stuck_timeout_secs: 60,
288            ..Default::default()
289        };
290        let tracker = TxTracker::new(config);
291
292        // Submit a transaction at time 1000.
293        let old_tx = TrackedTx {
294            tx_hash: "0xold".to_string(),
295            from: "0xAlice".to_string(),
296            nonce: 3,
297            submitted_at: 1000,
298            status: TxStatus::Pending,
299            gas_price: Some(20_000_000_000),
300            max_fee: None,
301            max_priority_fee: None,
302            last_checked: 1000,
303        };
304        tracker.track(old_tx);
305
306        // Submit a recent transaction at time 1090.
307        let new_tx = TrackedTx {
308            tx_hash: "0xnew".to_string(),
309            from: "0xAlice".to_string(),
310            nonce: 4,
311            submitted_at: 1090,
312            status: TxStatus::Pending,
313            gas_price: Some(20_000_000_000),
314            max_fee: None,
315            max_priority_fee: None,
316            last_checked: 1090,
317        };
318        tracker.track(new_tx);
319
320        // At time 1061, only 0xold is stuck (61s > 60s timeout),
321        // 0xnew is not (only 0s > 60s timeout would be false since 1090-1061 is negative).
322        // We use current_time = 1100 so that 0xold (100s) is stuck and 0xnew (10s) is not.
323        let stuck = detect_stuck(&transport, &tracker, 1100).await;
324        assert_eq!(stuck.len(), 1);
325        assert_eq!(stuck[0].tx_hash, "0xold");
326
327        // The nonce tracker should have been updated from the transport response.
328        assert_eq!(tracker.next_nonce("0xAlice"), Some(6)); // 0x5 = 5, next = 6
329    }
330}