use std::cell::Cell;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
use log::{debug, warn};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use soroban_env_host::xdr::{
LedgerEntry, LedgerEntryData, LedgerEntryExt, LedgerKey, Limits, ReadXdr, WriteXdr,
};
use crate::error::{ForkError, Result};
const ERROR_BODY_TRUNCATE_BYTES: usize = 256;
#[derive(Clone, Debug)]
pub struct RpcConfig {
pub retries: u32,
pub base_retry_delay: Duration,
pub request_timeout: Option<Duration>,
pub max_keys_per_request: usize,
}
impl Default for RpcConfig {
fn default() -> Self {
Self {
retries: 3,
base_retry_delay: Duration::from_millis(300),
request_timeout: Some(Duration::from_secs(30)),
max_keys_per_request: 200,
}
}
}
pub struct FetchedEntry {
pub entry: LedgerEntry,
pub live_until: Option<u32>,
}
pub struct LatestLedger {
pub sequence: u32,
pub protocol_version: u32,
pub close_time: u64,
}
pub struct NetworkMetadata {
pub passphrase: String,
pub network_id: [u8; 32],
}
pub struct RpcClient {
http: reqwest::blocking::Client,
url: String,
config: RpcConfig,
}
impl RpcClient {
pub fn new(url: impl Into<String>, config: RpcConfig) -> Result<Self> {
let mut builder = reqwest::blocking::ClientBuilder::new();
if let Some(timeout) = config.request_timeout {
builder = builder.timeout(timeout);
}
let http = builder.build()?;
Ok(Self {
http,
url: url.into(),
config,
})
}
pub fn get_latest_ledger(&self) -> Result<LatestLedger> {
let response: JsonRpcResponse<GetLatestLedgerResult> =
self.rpc_post("getLatestLedger", serde_json::json!({}))?;
let result = response.into_result()?;
let close_time = self.get_ledger_close_time(result.sequence)?;
Ok(LatestLedger {
sequence: result.sequence,
protocol_version: result.protocol_version,
close_time,
})
}
pub fn get_ledger_close_time(&self, sequence: u32) -> Result<u64> {
let response: JsonRpcResponse<GetLedgersResult> = self.rpc_post(
"getLedgers",
serde_json::json!({
"startLedger": sequence,
"pagination": { "limit": 1 },
}),
)?;
let result = response.into_result()?;
let ledger = result.ledgers.into_iter().next().ok_or_else(|| {
ForkError::RpcError(format!("getLedgers returned no entry for {sequence}"))
})?;
ledger.ledger_close_time.parse::<u64>().map_err(|e| {
ForkError::RpcError(format!(
"getLedgers returned non-numeric ledgerCloseTime '{}': {e}",
ledger.ledger_close_time
))
})
}
pub fn get_network(&self) -> Result<NetworkMetadata> {
let response: JsonRpcResponse<GetNetworkResult> =
self.rpc_post("getNetwork", serde_json::json!({}))?;
let result = response.into_result()?;
let mut hasher = Sha256::new();
hasher.update(result.passphrase.as_bytes());
let network_id: [u8; 32] = hasher.finalize().into();
Ok(NetworkMetadata {
passphrase: result.passphrase,
network_id,
})
}
pub fn fetch_entries(&self, keys: &[LedgerKey]) -> Result<Vec<FetchedEntry>> {
let mut results = Vec::new();
for chunk in keys.chunks(self.config.max_keys_per_request) {
let encoded_keys = chunk.iter().map(encode_key).collect::<Result<Vec<_>>>()?;
let response: JsonRpcResponse<GetLedgerEntriesResult> = self.rpc_post(
"getLedgerEntries",
serde_json::json!({ "keys": encoded_keys }),
)?;
let result = response.into_result()?;
if let Some(entries) = result.entries {
for wire in entries {
results.push(decode_entry(wire)?);
}
}
}
Ok(results)
}
pub fn fetch_entry(&self, key: &LedgerKey) -> Result<Option<FetchedEntry>> {
Ok(self.fetch_entries(std::slice::from_ref(key))?.pop())
}
fn rpc_post<T: DeserializeOwned>(
&self,
method: &str,
params: serde_json::Value,
) -> Result<JsonRpcResponse<T>> {
let request = JsonRpcRequest {
jsonrpc: "2.0",
id: 1,
method,
params,
};
let total_attempts = self.config.retries + 1;
let mut last_error: Option<ForkError> = None;
for attempt in 0..total_attempts {
match self.try_once::<T>(&request) {
Ok(parsed) => return Ok(parsed),
Err(RetryDecision::Retry(err)) if attempt + 1 < total_attempts => {
let delay = backoff_delay(self.config.base_retry_delay, attempt);
warn!(
"soroban-fork: RPC {method} failed (attempt {}/{}): {err}; \
retrying in {delay:?}",
attempt + 1,
total_attempts
);
std::thread::sleep(delay);
last_error = Some(err);
}
Err(RetryDecision::Retry(err)) | Err(RetryDecision::Fatal(err)) => {
return Err(err);
}
}
}
Err(last_error.unwrap_or_else(|| {
ForkError::Transport("retry loop exhausted with no recorded error".into())
}))
}
fn try_once<T: DeserializeOwned>(
&self,
request: &JsonRpcRequest<'_>,
) -> std::result::Result<JsonRpcResponse<T>, RetryDecision> {
let response = self
.http
.post(&self.url)
.json(request)
.send()
.map_err(|e| RetryDecision::Retry(ForkError::from(e)))?;
let status = response.status();
let code = status.as_u16();
let retryable = matches!(code, 408 | 425 | 429) || status.is_server_error();
if retryable {
let body = response_body_snippet(response);
return Err(RetryDecision::Retry(ForkError::Transport(format!(
"HTTP {status}: {body}"
))));
}
if !status.is_success() {
let body = response_body_snippet(response);
return Err(RetryDecision::Fatal(ForkError::Transport(format!(
"HTTP {status}: {body}"
))));
}
response
.json::<JsonRpcResponse<T>>()
.map_err(|e| RetryDecision::Retry(ForkError::from(e)))
}
}
fn response_body_snippet(response: reqwest::blocking::Response) -> String {
match response.text() {
Ok(body) => {
let trimmed = body.trim();
if trimmed.is_empty() {
"<empty body>".to_string()
} else {
truncate_chars(trimmed, ERROR_BODY_TRUNCATE_BYTES)
}
}
Err(_) => "<no body>".to_string(),
}
}
fn truncate_chars(s: &str, max_chars: usize) -> String {
if s.chars().count() <= max_chars {
return s.to_string();
}
let mut out: String = s.chars().take(max_chars).collect();
out.push('…');
out
}
enum RetryDecision {
Retry(ForkError),
Fatal(ForkError),
}
fn backoff_delay(base: Duration, attempt: u32) -> Duration {
let factor = 2u32.saturating_pow(attempt);
let exponential = base.saturating_mul(factor);
let jitter = jitter_under(base);
exponential.saturating_add(jitter)
}
fn jitter_under(max: Duration) -> Duration {
let max_nanos = max.as_nanos() as u64;
if max_nanos == 0 {
return Duration::ZERO;
}
Duration::from_nanos(next_rng_u64() % max_nanos)
}
thread_local! {
static RNG_STATE: Cell<u64> = Cell::new(seed_rng());
}
fn seed_rng() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| (d.as_nanos() as u64).wrapping_mul(0x9E3779B97F4A7C15))
.unwrap_or(0xDEAD_BEEF_CAFE_BABE)
| 1 }
fn next_rng_u64() -> u64 {
RNG_STATE.with(|cell| {
let mut x = cell.get();
x ^= x << 13;
x ^= x >> 7;
x ^= x << 17;
cell.set(x);
x
})
}
fn encode_key(key: &LedgerKey) -> Result<String> {
let bytes = key
.to_xdr(Limits::none())
.map_err(|e| ForkError::Xdr(format!("encode LedgerKey: {e}")))?;
Ok(BASE64.encode(&bytes))
}
fn decode_entry(wire: EntryResult) -> Result<FetchedEntry> {
let entry_bytes = BASE64.decode(&wire.xdr)?;
let entry_data = LedgerEntryData::from_xdr(&entry_bytes, Limits::none())
.map_err(|e| ForkError::Xdr(format!("decode LedgerEntryData: {e}")))?;
let entry = LedgerEntry {
last_modified_ledger_seq: wire.last_modified_ledger_seq,
data: entry_data,
ext: LedgerEntryExt::V0,
};
debug!(
"soroban-fork: decoded entry, last_modified={}, live_until={:?}",
wire.last_modified_ledger_seq, wire.live_until_ledger_seq
);
Ok(FetchedEntry {
entry,
live_until: wire.live_until_ledger_seq,
})
}
#[derive(Serialize)]
struct JsonRpcRequest<'a> {
jsonrpc: &'a str,
id: u64,
method: &'a str,
params: serde_json::Value,
}
#[derive(Deserialize)]
struct JsonRpcResponse<T> {
result: Option<T>,
error: Option<serde_json::Value>,
}
impl<T> JsonRpcResponse<T> {
fn into_result(self) -> Result<T> {
if let Some(err) = self.error {
return Err(ForkError::RpcError(err.to_string()));
}
self.result.ok_or(ForkError::RpcNoResult)
}
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct GetLedgerEntriesResult {
entries: Option<Vec<EntryResult>>,
#[allow(dead_code)]
latest_ledger: u32,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct EntryResult {
#[allow(dead_code)]
key: String,
xdr: String,
last_modified_ledger_seq: u32,
live_until_ledger_seq: Option<u32>,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct GetLatestLedgerResult {
#[allow(dead_code)]
id: String,
protocol_version: u32,
sequence: u32,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct GetNetworkResult {
passphrase: String,
#[allow(dead_code)]
#[serde(default)]
friendbot_url: Option<String>,
#[allow(dead_code)]
#[serde(default)]
protocol_version: Option<u32>,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct GetLedgersResult {
ledgers: Vec<LedgerInfoWire>,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct LedgerInfoWire {
#[allow(dead_code)]
sequence: u32,
ledger_close_time: String,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn backoff_delay_doubles_each_attempt_within_jitter_window() {
let base = Duration::from_millis(100);
for attempt in 0u32..4 {
let factor = 2u32.pow(attempt);
let lower = base * factor;
let upper = base * (factor + 1);
for _ in 0..32 {
let d = backoff_delay(base, attempt);
assert!(
d >= lower && d < upper,
"attempt {attempt}: {d:?} not in [{lower:?}, {upper:?})"
);
}
}
}
#[test]
fn backoff_delay_saturates_on_absurd_attempt() {
let base = Duration::from_secs(1);
let d = backoff_delay(base, 100);
assert!(d >= base);
}
#[test]
fn backoff_delay_zero_base_is_zero() {
assert_eq!(backoff_delay(Duration::ZERO, 0), Duration::ZERO);
assert_eq!(backoff_delay(Duration::ZERO, 5), Duration::ZERO);
}
#[test]
fn truncate_chars_handles_multibyte_safely() {
let s = "тест🚀тест";
let out = truncate_chars(s, 5);
assert!(out.ends_with('…'));
assert_eq!(out.chars().count(), 6);
}
#[test]
fn truncate_chars_short_input_unchanged() {
assert_eq!(truncate_chars("abc", 10), "abc");
assert_eq!(truncate_chars("", 10), "");
}
#[test]
fn json_rpc_response_into_result_returns_result_when_ok() {
let response: JsonRpcResponse<u32> = JsonRpcResponse {
result: Some(42),
error: None,
};
assert_eq!(response.into_result().unwrap(), 42);
}
#[test]
fn json_rpc_response_into_result_propagates_error_field() {
let response: JsonRpcResponse<u32> = JsonRpcResponse {
result: None,
error: Some(serde_json::json!({"code": -32000, "message": "boom"})),
};
let err = response.into_result().unwrap_err();
assert!(matches!(err, ForkError::RpcError(_)));
}
#[test]
fn json_rpc_response_into_result_errors_when_no_result_no_error() {
let response: JsonRpcResponse<u32> = JsonRpcResponse {
result: None,
error: None,
};
let err = response.into_result().unwrap_err();
assert!(matches!(err, ForkError::RpcNoResult));
}
#[test]
fn rpc_config_default_has_sensible_values() {
let cfg = RpcConfig::default();
assert!(cfg.retries >= 1);
assert!(cfg.base_retry_delay >= Duration::from_millis(50));
assert_eq!(cfg.max_keys_per_request, 200);
assert!(cfg.request_timeout.is_some());
}
}