Skip to main content

soroban_fork/
rpc.rs

1//! Soroban JSON-RPC transport.
2//!
3//! A thin typed client around the handful of RPC methods the fork harness
4//! actually needs: `getLedgerEntries`, `getLatestLedger`, and `getNetwork`.
5//! Configurable retry + HTTP timeouts live here; XDR encode/decode sits here
6//! too so the rest of the crate never touches wire formats directly.
7
8use std::cell::Cell;
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
12use log::{debug, warn};
13use serde::de::DeserializeOwned;
14use serde::{Deserialize, Serialize};
15use sha2::{Digest, Sha256};
16use soroban_env_host::xdr::{
17    LedgerEntry, LedgerEntryData, LedgerEntryExt, LedgerKey, Limits, ReadXdr, WriteXdr,
18};
19
20use crate::error::{ForkError, Result};
21
22/// Maximum number of bytes of an HTTP response body to embed in a
23/// transport error. Soroban RPC error pages from edge caches (Cloudflare,
24/// gateway.fm) can be large; truncating keeps logs readable.
25const ERROR_BODY_TRUNCATE_BYTES: usize = 256;
26
27/// Tuning for the RPC transport layer.
28#[derive(Clone, Debug)]
29pub struct RpcConfig {
30    /// Maximum number of retries for transient failures: network errors,
31    /// HTTP 408 / 425 / 429, and any 5xx response. Other 4xx codes fail
32    /// fast (they're caller-error, retrying won't help). Total attempts
33    /// is `retries + 1`.
34    pub retries: u32,
35    /// Base delay between retries. The actual sleep is
36    /// `base * 2^attempt + rand(0..base)` — exponential backoff with
37    /// **full jitter**, so concurrent test runners don't synchronise
38    /// their retries into a thundering herd against a degraded RPC.
39    pub base_retry_delay: Duration,
40    /// Per-request HTTP timeout. `None` delegates to reqwest's default.
41    pub request_timeout: Option<Duration>,
42    /// `getLedgerEntries` batch size. Soroban RPC caps this at 200; we
43    /// default to the same and expose it for testing seams.
44    pub max_keys_per_request: usize,
45}
46
47impl Default for RpcConfig {
48    fn default() -> Self {
49        Self {
50            retries: 3,
51            base_retry_delay: Duration::from_millis(300),
52            request_timeout: Some(Duration::from_secs(30)),
53            max_keys_per_request: 200,
54        }
55    }
56}
57
58/// A single fetched ledger entry, with the live-until hint from the RPC.
59pub struct FetchedEntry {
60    /// The parsed entry body.
61    pub entry: LedgerEntry,
62    /// Ledger sequence at which this entry expires, if applicable.
63    pub live_until: Option<u32>,
64}
65
66/// Snapshot of the latest closed ledger as reported by the RPC.
67///
68/// `close_time` is fetched separately via `getLedgers` because
69/// `getLatestLedger` does not include it in its response — the extra
70/// round-trip happens once at fork build time.
71pub struct LatestLedger {
72    /// Latest closed ledger sequence.
73    pub sequence: u32,
74    /// Current protocol version the network is running.
75    pub protocol_version: u32,
76    /// Unix-seconds close time of the latest ledger. Defaulting the
77    /// forked `Env`'s timestamp to this value keeps tests reproducible —
78    /// wall-clock defaults make every run depend on when it was started.
79    pub close_time: u64,
80}
81
82/// Network metadata from the `getNetwork` RPC response.
83pub struct NetworkMetadata {
84    /// The network passphrase, e.g. `"Test SDF Network ; September 2015"`.
85    pub passphrase: String,
86    /// SHA-256 of the passphrase, as required by the VM's `LedgerInfo`.
87    pub network_id: [u8; 32],
88}
89
90/// Typed RPC client. One instance per fork, reused across fetches so
91/// reqwest can pool connections to the RPC.
92pub struct RpcClient {
93    http: reqwest::blocking::Client,
94    url: String,
95    config: RpcConfig,
96}
97
98impl RpcClient {
99    /// Build an RPC client for `url` with the given transport config.
100    pub fn new(url: impl Into<String>, config: RpcConfig) -> Result<Self> {
101        let mut builder = reqwest::blocking::ClientBuilder::new();
102        if let Some(timeout) = config.request_timeout {
103            builder = builder.timeout(timeout);
104        }
105        let http = builder.build()?;
106        Ok(Self {
107            http,
108            url: url.into(),
109            config,
110        })
111    }
112
113    /// Retrieve the latest ledger sequence, protocol version, and close time.
114    ///
115    /// Issues two RPC calls: `getLatestLedger` (sequence + protocol) and
116    /// `getLedgers` for that sequence (close time). Both succeed or the
117    /// whole call fails — partial state would let the build proceed with
118    /// a wall-clock timestamp, which is exactly the silent fallback this
119    /// API is designed to avoid.
120    pub fn get_latest_ledger(&self) -> Result<LatestLedger> {
121        let response: JsonRpcResponse<GetLatestLedgerResult> =
122            self.rpc_post("getLatestLedger", serde_json::json!({}))?;
123        let result = response.into_result()?;
124        let close_time = self.get_ledger_close_time(result.sequence)?;
125        Ok(LatestLedger {
126            sequence: result.sequence,
127            protocol_version: result.protocol_version,
128            close_time,
129        })
130    }
131
132    /// Fetch the close time (Unix seconds) of a specific ledger via
133    /// `getLedgers`. Returns an error if the ledger is outside the RPC's
134    /// retention window.
135    pub fn get_ledger_close_time(&self, sequence: u32) -> Result<u64> {
136        let response: JsonRpcResponse<GetLedgersResult> = self.rpc_post(
137            "getLedgers",
138            serde_json::json!({
139                "startLedger": sequence,
140                "pagination": { "limit": 1 },
141            }),
142        )?;
143        let result = response.into_result()?;
144        let ledger = result.ledgers.into_iter().next().ok_or_else(|| {
145            ForkError::RpcError(format!("getLedgers returned no entry for {sequence}"))
146        })?;
147        ledger.ledger_close_time.parse::<u64>().map_err(|e| {
148            ForkError::RpcError(format!(
149                "getLedgers returned non-numeric ledgerCloseTime '{}': {e}",
150                ledger.ledger_close_time
151            ))
152        })
153    }
154
155    /// Retrieve network metadata. The `network_id` is computed as
156    /// SHA-256 of the returned passphrase (per Stellar convention).
157    pub fn get_network(&self) -> Result<NetworkMetadata> {
158        let response: JsonRpcResponse<GetNetworkResult> =
159            self.rpc_post("getNetwork", serde_json::json!({}))?;
160        let result = response.into_result()?;
161        let mut hasher = Sha256::new();
162        hasher.update(result.passphrase.as_bytes());
163        let network_id: [u8; 32] = hasher.finalize().into();
164        Ok(NetworkMetadata {
165            passphrase: result.passphrase,
166            network_id,
167        })
168    }
169
170    /// Fetch ledger entries in batches of `max_keys_per_request`. Missing
171    /// keys are simply absent from the returned vector — no error.
172    pub fn fetch_entries(&self, keys: &[LedgerKey]) -> Result<Vec<FetchedEntry>> {
173        let mut results = Vec::new();
174        for chunk in keys.chunks(self.config.max_keys_per_request) {
175            let encoded_keys = chunk.iter().map(encode_key).collect::<Result<Vec<_>>>()?;
176            let response: JsonRpcResponse<GetLedgerEntriesResult> = self.rpc_post(
177                "getLedgerEntries",
178                serde_json::json!({ "keys": encoded_keys }),
179            )?;
180            let result = response.into_result()?;
181            if let Some(entries) = result.entries {
182                for wire in entries {
183                    results.push(decode_entry(wire)?);
184                }
185            }
186        }
187        Ok(results)
188    }
189
190    /// Convenience: fetch a single key. Returns `None` if absent.
191    pub fn fetch_entry(&self, key: &LedgerKey) -> Result<Option<FetchedEntry>> {
192        Ok(self.fetch_entries(std::slice::from_ref(key))?.pop())
193    }
194
195    fn rpc_post<T: DeserializeOwned>(
196        &self,
197        method: &str,
198        params: serde_json::Value,
199    ) -> Result<JsonRpcResponse<T>> {
200        let request = JsonRpcRequest {
201            jsonrpc: "2.0",
202            id: 1,
203            method,
204            params,
205        };
206
207        let total_attempts = self.config.retries + 1;
208        let mut last_error: Option<ForkError> = None;
209
210        for attempt in 0..total_attempts {
211            match self.try_once::<T>(&request) {
212                Ok(parsed) => return Ok(parsed),
213                Err(RetryDecision::Retry(err)) if attempt + 1 < total_attempts => {
214                    let delay = backoff_delay(self.config.base_retry_delay, attempt);
215                    warn!(
216                        "soroban-fork: RPC {method} failed (attempt {}/{}): {err}; \
217                         retrying in {delay:?}",
218                        attempt + 1,
219                        total_attempts
220                    );
221                    std::thread::sleep(delay);
222                    last_error = Some(err);
223                }
224                Err(RetryDecision::Retry(err)) | Err(RetryDecision::Fatal(err)) => {
225                    return Err(err);
226                }
227            }
228        }
229
230        Err(last_error.unwrap_or_else(|| {
231            ForkError::Transport("retry loop exhausted with no recorded error".into())
232        }))
233    }
234
235    fn try_once<T: DeserializeOwned>(
236        &self,
237        request: &JsonRpcRequest<'_>,
238    ) -> std::result::Result<JsonRpcResponse<T>, RetryDecision> {
239        let response = self
240            .http
241            .post(&self.url)
242            .json(request)
243            .send()
244            .map_err(|e| RetryDecision::Retry(ForkError::from(e)))?;
245
246        let status = response.status();
247        let code = status.as_u16();
248        // 408 Request Timeout, 425 Too Early, 429 Too Many Requests + 5xx
249        // are the canonical "try again" set. Everything else 4xx is a
250        // permanent caller error (bad auth, bad URL, bad JSON-RPC method).
251        let retryable = matches!(code, 408 | 425 | 429) || status.is_server_error();
252        if retryable {
253            let body = response_body_snippet(response);
254            return Err(RetryDecision::Retry(ForkError::Transport(format!(
255                "HTTP {status}: {body}"
256            ))));
257        }
258        if !status.is_success() {
259            let body = response_body_snippet(response);
260            return Err(RetryDecision::Fatal(ForkError::Transport(format!(
261                "HTTP {status}: {body}"
262            ))));
263        }
264
265        response
266            .json::<JsonRpcResponse<T>>()
267            .map_err(|e| RetryDecision::Retry(ForkError::from(e)))
268    }
269}
270
271/// Best-effort extraction of a short, printable body snippet for error
272/// messages. Returns `<no body>` if the body can't be read.
273fn response_body_snippet(response: reqwest::blocking::Response) -> String {
274    match response.text() {
275        Ok(body) => {
276            let trimmed = body.trim();
277            if trimmed.is_empty() {
278                "<empty body>".to_string()
279            } else {
280                truncate_chars(trimmed, ERROR_BODY_TRUNCATE_BYTES)
281            }
282        }
283        Err(_) => "<no body>".to_string(),
284    }
285}
286
287/// Truncate at character (not byte) boundary so we never split a
288/// multi-byte UTF-8 sequence in the middle. Appends `…` if truncated.
289fn truncate_chars(s: &str, max_chars: usize) -> String {
290    if s.chars().count() <= max_chars {
291        return s.to_string();
292    }
293    let mut out: String = s.chars().take(max_chars).collect();
294    out.push('…');
295    out
296}
297
298enum RetryDecision {
299    /// Transient failure — caller should back off and retry if budget remains.
300    Retry(ForkError),
301    /// Permanent failure — no point retrying.
302    Fatal(ForkError),
303}
304
305/// Exponential backoff with full jitter: each delay is uniformly sampled
306/// from `[base * 2^attempt, base * 2^attempt + base)`. The jitter prevents
307/// a fleet of concurrent tests from synchronising their retries into a
308/// thundering-herd pattern when the RPC is briefly degraded.
309///
310/// Kept as a free function so it's unit-testable without standing up an
311/// `RpcClient`.
312fn backoff_delay(base: Duration, attempt: u32) -> Duration {
313    // Saturating pow prevents overflow on absurd retry counts.
314    let factor = 2u32.saturating_pow(attempt);
315    let exponential = base.saturating_mul(factor);
316    let jitter = jitter_under(base);
317    exponential.saturating_add(jitter)
318}
319
320/// Random `Duration` in `[0, max)`. Returns `Duration::ZERO` if `max` is
321/// zero. Uses a thread-local xorshift64* seeded from the system clock —
322/// good enough for spreading retries, not cryptographically random.
323fn jitter_under(max: Duration) -> Duration {
324    let max_nanos = max.as_nanos() as u64;
325    if max_nanos == 0 {
326        return Duration::ZERO;
327    }
328    Duration::from_nanos(next_rng_u64() % max_nanos)
329}
330
331thread_local! {
332    static RNG_STATE: Cell<u64> = Cell::new(seed_rng());
333}
334
335fn seed_rng() -> u64 {
336    SystemTime::now()
337        .duration_since(UNIX_EPOCH)
338        .map(|d| (d.as_nanos() as u64).wrapping_mul(0x9E3779B97F4A7C15))
339        .unwrap_or(0xDEAD_BEEF_CAFE_BABE)
340        | 1 // ensure non-zero — xorshift would degenerate on 0
341}
342
343fn next_rng_u64() -> u64 {
344    RNG_STATE.with(|cell| {
345        let mut x = cell.get();
346        // xorshift64 — Marsaglia 2003. State is never zero (seeded with |1
347        // and the transform preserves non-zero), so degeneracy isn't a risk.
348        x ^= x << 13;
349        x ^= x >> 7;
350        x ^= x << 17;
351        cell.set(x);
352        x
353    })
354}
355
356fn encode_key(key: &LedgerKey) -> Result<String> {
357    let bytes = key
358        .to_xdr(Limits::none())
359        .map_err(|e| ForkError::Xdr(format!("encode LedgerKey: {e}")))?;
360    Ok(BASE64.encode(&bytes))
361}
362
363fn decode_entry(wire: EntryResult) -> Result<FetchedEntry> {
364    let entry_bytes = BASE64.decode(&wire.xdr)?;
365    // RPC returns `LedgerEntryData`, not a full `LedgerEntry`. We reconstruct
366    // the wrapper using the per-entry `lastModifiedLedgerSeq` the server
367    // delivers alongside.
368    let entry_data = LedgerEntryData::from_xdr(&entry_bytes, Limits::none())
369        .map_err(|e| ForkError::Xdr(format!("decode LedgerEntryData: {e}")))?;
370    let entry = LedgerEntry {
371        last_modified_ledger_seq: wire.last_modified_ledger_seq,
372        data: entry_data,
373        ext: LedgerEntryExt::V0,
374    };
375    debug!(
376        "soroban-fork: decoded entry, last_modified={}, live_until={:?}",
377        wire.last_modified_ledger_seq, wire.live_until_ledger_seq
378    );
379    Ok(FetchedEntry {
380        entry,
381        live_until: wire.live_until_ledger_seq,
382    })
383}
384
385// ---------------------------------------------------------------------------
386// Wire types — isolated from the rest of the crate.
387// ---------------------------------------------------------------------------
388
389#[derive(Serialize)]
390struct JsonRpcRequest<'a> {
391    jsonrpc: &'a str,
392    id: u64,
393    method: &'a str,
394    params: serde_json::Value,
395}
396
397#[derive(Deserialize)]
398struct JsonRpcResponse<T> {
399    result: Option<T>,
400    error: Option<serde_json::Value>,
401}
402
403impl<T> JsonRpcResponse<T> {
404    fn into_result(self) -> Result<T> {
405        if let Some(err) = self.error {
406            return Err(ForkError::RpcError(err.to_string()));
407        }
408        self.result.ok_or(ForkError::RpcNoResult)
409    }
410}
411
412#[derive(Deserialize)]
413#[serde(rename_all = "camelCase")]
414struct GetLedgerEntriesResult {
415    entries: Option<Vec<EntryResult>>,
416    #[allow(dead_code)]
417    latest_ledger: u32,
418}
419
420#[derive(Deserialize)]
421#[serde(rename_all = "camelCase")]
422struct EntryResult {
423    #[allow(dead_code)]
424    key: String,
425    xdr: String,
426    last_modified_ledger_seq: u32,
427    live_until_ledger_seq: Option<u32>,
428}
429
430#[derive(Deserialize)]
431#[serde(rename_all = "camelCase")]
432struct GetLatestLedgerResult {
433    #[allow(dead_code)]
434    id: String,
435    protocol_version: u32,
436    sequence: u32,
437}
438
439#[derive(Deserialize)]
440#[serde(rename_all = "camelCase")]
441struct GetNetworkResult {
442    passphrase: String,
443    #[allow(dead_code)]
444    #[serde(default)]
445    friendbot_url: Option<String>,
446    #[allow(dead_code)]
447    #[serde(default)]
448    protocol_version: Option<u32>,
449}
450
451#[derive(Deserialize)]
452#[serde(rename_all = "camelCase")]
453struct GetLedgersResult {
454    ledgers: Vec<LedgerInfoWire>,
455}
456
457#[derive(Deserialize)]
458#[serde(rename_all = "camelCase")]
459struct LedgerInfoWire {
460    #[allow(dead_code)]
461    sequence: u32,
462    /// `getLedgers` returns this as a string-encoded Unix-seconds value
463    /// (per the Stellar RPC OpenRPC spec). We parse to `u64` at the
464    /// `RpcClient::get_ledger_close_time` boundary so callers get a
465    /// numeric type and obvious failure on protocol drift.
466    ledger_close_time: String,
467}
468
469// ---------------------------------------------------------------------------
470// Unit tests — no network required.
471// ---------------------------------------------------------------------------
472
473#[cfg(test)]
474mod tests {
475    use super::*;
476
477    #[test]
478    fn backoff_delay_doubles_each_attempt_within_jitter_window() {
479        // With full jitter, the delay falls in [base * 2^attempt, base * (2^attempt + 1)).
480        // Sample many times to smoke out off-by-one boundary mistakes in the
481        // jitter implementation.
482        let base = Duration::from_millis(100);
483        for attempt in 0u32..4 {
484            let factor = 2u32.pow(attempt);
485            let lower = base * factor;
486            let upper = base * (factor + 1);
487            for _ in 0..32 {
488                let d = backoff_delay(base, attempt);
489                assert!(
490                    d >= lower && d < upper,
491                    "attempt {attempt}: {d:?} not in [{lower:?}, {upper:?})"
492                );
493            }
494        }
495    }
496
497    #[test]
498    fn backoff_delay_saturates_on_absurd_attempt() {
499        let base = Duration::from_secs(1);
500        // 2^100 overflows u32; saturating arithmetic prevents panic.
501        let d = backoff_delay(base, 100);
502        // Any finite result is acceptable; the important property is "doesn't panic".
503        assert!(d >= base);
504    }
505
506    #[test]
507    fn backoff_delay_zero_base_is_zero() {
508        // Edge case: zero base => zero delay, no panic from `% 0`.
509        assert_eq!(backoff_delay(Duration::ZERO, 0), Duration::ZERO);
510        assert_eq!(backoff_delay(Duration::ZERO, 5), Duration::ZERO);
511    }
512
513    #[test]
514    fn truncate_chars_handles_multibyte_safely() {
515        // Cyrillic + emoji: each char is 2-4 bytes. Naive byte slicing
516        // would panic on a UTF-8 boundary.
517        let s = "тест🚀тест";
518        let out = truncate_chars(s, 5);
519        // 5 chars + ellipsis (more than 5 chars in the source).
520        assert!(out.ends_with('…'));
521        assert_eq!(out.chars().count(), 6);
522    }
523
524    #[test]
525    fn truncate_chars_short_input_unchanged() {
526        assert_eq!(truncate_chars("abc", 10), "abc");
527        assert_eq!(truncate_chars("", 10), "");
528    }
529
530    #[test]
531    fn json_rpc_response_into_result_returns_result_when_ok() {
532        let response: JsonRpcResponse<u32> = JsonRpcResponse {
533            result: Some(42),
534            error: None,
535        };
536        assert_eq!(response.into_result().unwrap(), 42);
537    }
538
539    #[test]
540    fn json_rpc_response_into_result_propagates_error_field() {
541        let response: JsonRpcResponse<u32> = JsonRpcResponse {
542            result: None,
543            error: Some(serde_json::json!({"code": -32000, "message": "boom"})),
544        };
545        let err = response.into_result().unwrap_err();
546        assert!(matches!(err, ForkError::RpcError(_)));
547    }
548
549    #[test]
550    fn json_rpc_response_into_result_errors_when_no_result_no_error() {
551        let response: JsonRpcResponse<u32> = JsonRpcResponse {
552            result: None,
553            error: None,
554        };
555        let err = response.into_result().unwrap_err();
556        assert!(matches!(err, ForkError::RpcNoResult));
557    }
558
559    #[test]
560    fn rpc_config_default_has_sensible_values() {
561        let cfg = RpcConfig::default();
562        assert!(cfg.retries >= 1);
563        assert!(cfg.base_retry_delay >= Duration::from_millis(50));
564        assert_eq!(cfg.max_keys_per_request, 200);
565        assert!(cfg.request_timeout.is_some());
566    }
567}