1use std::cell::Cell;
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
12use log::{debug, warn};
13use serde::de::DeserializeOwned;
14use serde::{Deserialize, Serialize};
15use sha2::{Digest, Sha256};
16use soroban_env_host::xdr::{
17 LedgerEntry, LedgerEntryData, LedgerEntryExt, LedgerKey, Limits, ReadXdr, WriteXdr,
18};
19
20use crate::error::{ForkError, Result};
21
22const ERROR_BODY_TRUNCATE_BYTES: usize = 256;
26
27#[derive(Clone, Debug)]
29pub struct RpcConfig {
30 pub retries: u32,
35 pub base_retry_delay: Duration,
40 pub request_timeout: Option<Duration>,
42 pub max_keys_per_request: usize,
45}
46
47impl Default for RpcConfig {
48 fn default() -> Self {
49 Self {
50 retries: 3,
51 base_retry_delay: Duration::from_millis(300),
52 request_timeout: Some(Duration::from_secs(30)),
53 max_keys_per_request: 200,
54 }
55 }
56}
57
58pub struct FetchedEntry {
60 pub entry: LedgerEntry,
62 pub live_until: Option<u32>,
64}
65
66pub struct LatestLedger {
72 pub sequence: u32,
74 pub protocol_version: u32,
76 pub close_time: u64,
80}
81
82pub struct NetworkMetadata {
84 pub passphrase: String,
86 pub network_id: [u8; 32],
88}
89
90pub struct RpcClient {
93 http: reqwest::blocking::Client,
94 url: String,
95 config: RpcConfig,
96}
97
98impl RpcClient {
99 pub fn new(url: impl Into<String>, config: RpcConfig) -> Result<Self> {
101 let mut builder = reqwest::blocking::ClientBuilder::new();
102 if let Some(timeout) = config.request_timeout {
103 builder = builder.timeout(timeout);
104 }
105 let http = builder.build()?;
106 Ok(Self {
107 http,
108 url: url.into(),
109 config,
110 })
111 }
112
113 pub fn get_latest_ledger(&self) -> Result<LatestLedger> {
121 let response: JsonRpcResponse<GetLatestLedgerResult> =
122 self.rpc_post("getLatestLedger", serde_json::json!({}))?;
123 let result = response.into_result()?;
124 let close_time = self.get_ledger_close_time(result.sequence)?;
125 Ok(LatestLedger {
126 sequence: result.sequence,
127 protocol_version: result.protocol_version,
128 close_time,
129 })
130 }
131
132 pub fn get_ledger_close_time(&self, sequence: u32) -> Result<u64> {
136 let response: JsonRpcResponse<GetLedgersResult> = self.rpc_post(
137 "getLedgers",
138 serde_json::json!({
139 "startLedger": sequence,
140 "pagination": { "limit": 1 },
141 }),
142 )?;
143 let result = response.into_result()?;
144 let ledger = result.ledgers.into_iter().next().ok_or_else(|| {
145 ForkError::RpcError(format!("getLedgers returned no entry for {sequence}"))
146 })?;
147 ledger.ledger_close_time.parse::<u64>().map_err(|e| {
148 ForkError::RpcError(format!(
149 "getLedgers returned non-numeric ledgerCloseTime '{}': {e}",
150 ledger.ledger_close_time
151 ))
152 })
153 }
154
155 pub fn get_network(&self) -> Result<NetworkMetadata> {
158 let response: JsonRpcResponse<GetNetworkResult> =
159 self.rpc_post("getNetwork", serde_json::json!({}))?;
160 let result = response.into_result()?;
161 let mut hasher = Sha256::new();
162 hasher.update(result.passphrase.as_bytes());
163 let network_id: [u8; 32] = hasher.finalize().into();
164 Ok(NetworkMetadata {
165 passphrase: result.passphrase,
166 network_id,
167 })
168 }
169
170 pub fn fetch_entries(&self, keys: &[LedgerKey]) -> Result<Vec<FetchedEntry>> {
173 let mut results = Vec::new();
174 for chunk in keys.chunks(self.config.max_keys_per_request) {
175 let encoded_keys = chunk.iter().map(encode_key).collect::<Result<Vec<_>>>()?;
176 let response: JsonRpcResponse<GetLedgerEntriesResult> = self.rpc_post(
177 "getLedgerEntries",
178 serde_json::json!({ "keys": encoded_keys }),
179 )?;
180 let result = response.into_result()?;
181 if let Some(entries) = result.entries {
182 for wire in entries {
183 results.push(decode_entry(wire)?);
184 }
185 }
186 }
187 Ok(results)
188 }
189
190 pub fn fetch_entry(&self, key: &LedgerKey) -> Result<Option<FetchedEntry>> {
192 Ok(self.fetch_entries(std::slice::from_ref(key))?.pop())
193 }
194
195 fn rpc_post<T: DeserializeOwned>(
196 &self,
197 method: &str,
198 params: serde_json::Value,
199 ) -> Result<JsonRpcResponse<T>> {
200 let request = JsonRpcRequest {
201 jsonrpc: "2.0",
202 id: 1,
203 method,
204 params,
205 };
206
207 let total_attempts = self.config.retries + 1;
208 let mut last_error: Option<ForkError> = None;
209
210 for attempt in 0..total_attempts {
211 match self.try_once::<T>(&request) {
212 Ok(parsed) => return Ok(parsed),
213 Err(RetryDecision::Retry(err)) if attempt + 1 < total_attempts => {
214 let delay = backoff_delay(self.config.base_retry_delay, attempt);
215 warn!(
216 "soroban-fork: RPC {method} failed (attempt {}/{}): {err}; \
217 retrying in {delay:?}",
218 attempt + 1,
219 total_attempts
220 );
221 std::thread::sleep(delay);
222 last_error = Some(err);
223 }
224 Err(RetryDecision::Retry(err)) | Err(RetryDecision::Fatal(err)) => {
225 return Err(err);
226 }
227 }
228 }
229
230 Err(last_error.unwrap_or_else(|| {
231 ForkError::Transport("retry loop exhausted with no recorded error".into())
232 }))
233 }
234
235 fn try_once<T: DeserializeOwned>(
236 &self,
237 request: &JsonRpcRequest<'_>,
238 ) -> std::result::Result<JsonRpcResponse<T>, RetryDecision> {
239 let response = self
240 .http
241 .post(&self.url)
242 .json(request)
243 .send()
244 .map_err(|e| RetryDecision::Retry(ForkError::from(e)))?;
245
246 let status = response.status();
247 let code = status.as_u16();
248 let retryable = matches!(code, 408 | 425 | 429) || status.is_server_error();
252 if retryable {
253 let body = response_body_snippet(response);
254 return Err(RetryDecision::Retry(ForkError::Transport(format!(
255 "HTTP {status}: {body}"
256 ))));
257 }
258 if !status.is_success() {
259 let body = response_body_snippet(response);
260 return Err(RetryDecision::Fatal(ForkError::Transport(format!(
261 "HTTP {status}: {body}"
262 ))));
263 }
264
265 response
266 .json::<JsonRpcResponse<T>>()
267 .map_err(|e| RetryDecision::Retry(ForkError::from(e)))
268 }
269}
270
271fn response_body_snippet(response: reqwest::blocking::Response) -> String {
274 match response.text() {
275 Ok(body) => {
276 let trimmed = body.trim();
277 if trimmed.is_empty() {
278 "<empty body>".to_string()
279 } else {
280 truncate_chars(trimmed, ERROR_BODY_TRUNCATE_BYTES)
281 }
282 }
283 Err(_) => "<no body>".to_string(),
284 }
285}
286
287fn truncate_chars(s: &str, max_chars: usize) -> String {
290 if s.chars().count() <= max_chars {
291 return s.to_string();
292 }
293 let mut out: String = s.chars().take(max_chars).collect();
294 out.push('…');
295 out
296}
297
298enum RetryDecision {
299 Retry(ForkError),
301 Fatal(ForkError),
303}
304
305fn backoff_delay(base: Duration, attempt: u32) -> Duration {
313 let factor = 2u32.saturating_pow(attempt);
315 let exponential = base.saturating_mul(factor);
316 let jitter = jitter_under(base);
317 exponential.saturating_add(jitter)
318}
319
320fn jitter_under(max: Duration) -> Duration {
324 let max_nanos = max.as_nanos() as u64;
325 if max_nanos == 0 {
326 return Duration::ZERO;
327 }
328 Duration::from_nanos(next_rng_u64() % max_nanos)
329}
330
331thread_local! {
332 static RNG_STATE: Cell<u64> = Cell::new(seed_rng());
333}
334
335fn seed_rng() -> u64 {
336 SystemTime::now()
337 .duration_since(UNIX_EPOCH)
338 .map(|d| (d.as_nanos() as u64).wrapping_mul(0x9E3779B97F4A7C15))
339 .unwrap_or(0xDEAD_BEEF_CAFE_BABE)
340 | 1 }
342
343fn next_rng_u64() -> u64 {
344 RNG_STATE.with(|cell| {
345 let mut x = cell.get();
346 x ^= x << 13;
349 x ^= x >> 7;
350 x ^= x << 17;
351 cell.set(x);
352 x
353 })
354}
355
356fn encode_key(key: &LedgerKey) -> Result<String> {
357 let bytes = key
358 .to_xdr(Limits::none())
359 .map_err(|e| ForkError::Xdr(format!("encode LedgerKey: {e}")))?;
360 Ok(BASE64.encode(&bytes))
361}
362
363fn decode_entry(wire: EntryResult) -> Result<FetchedEntry> {
364 let entry_bytes = BASE64.decode(&wire.xdr)?;
365 let entry_data = LedgerEntryData::from_xdr(&entry_bytes, Limits::none())
369 .map_err(|e| ForkError::Xdr(format!("decode LedgerEntryData: {e}")))?;
370 let entry = LedgerEntry {
371 last_modified_ledger_seq: wire.last_modified_ledger_seq,
372 data: entry_data,
373 ext: LedgerEntryExt::V0,
374 };
375 debug!(
376 "soroban-fork: decoded entry, last_modified={}, live_until={:?}",
377 wire.last_modified_ledger_seq, wire.live_until_ledger_seq
378 );
379 Ok(FetchedEntry {
380 entry,
381 live_until: wire.live_until_ledger_seq,
382 })
383}
384
385#[derive(Serialize)]
390struct JsonRpcRequest<'a> {
391 jsonrpc: &'a str,
392 id: u64,
393 method: &'a str,
394 params: serde_json::Value,
395}
396
397#[derive(Deserialize)]
398struct JsonRpcResponse<T> {
399 result: Option<T>,
400 error: Option<serde_json::Value>,
401}
402
403impl<T> JsonRpcResponse<T> {
404 fn into_result(self) -> Result<T> {
405 if let Some(err) = self.error {
406 return Err(ForkError::RpcError(err.to_string()));
407 }
408 self.result.ok_or(ForkError::RpcNoResult)
409 }
410}
411
412#[derive(Deserialize)]
413#[serde(rename_all = "camelCase")]
414struct GetLedgerEntriesResult {
415 entries: Option<Vec<EntryResult>>,
416 #[allow(dead_code)]
417 latest_ledger: u32,
418}
419
420#[derive(Deserialize)]
421#[serde(rename_all = "camelCase")]
422struct EntryResult {
423 #[allow(dead_code)]
424 key: String,
425 xdr: String,
426 last_modified_ledger_seq: u32,
427 live_until_ledger_seq: Option<u32>,
428}
429
430#[derive(Deserialize)]
431#[serde(rename_all = "camelCase")]
432struct GetLatestLedgerResult {
433 #[allow(dead_code)]
434 id: String,
435 protocol_version: u32,
436 sequence: u32,
437}
438
439#[derive(Deserialize)]
440#[serde(rename_all = "camelCase")]
441struct GetNetworkResult {
442 passphrase: String,
443 #[allow(dead_code)]
444 #[serde(default)]
445 friendbot_url: Option<String>,
446 #[allow(dead_code)]
447 #[serde(default)]
448 protocol_version: Option<u32>,
449}
450
451#[derive(Deserialize)]
452#[serde(rename_all = "camelCase")]
453struct GetLedgersResult {
454 ledgers: Vec<LedgerInfoWire>,
455}
456
457#[derive(Deserialize)]
458#[serde(rename_all = "camelCase")]
459struct LedgerInfoWire {
460 #[allow(dead_code)]
461 sequence: u32,
462 ledger_close_time: String,
467}
468
469#[cfg(test)]
474mod tests {
475 use super::*;
476
477 #[test]
478 fn backoff_delay_doubles_each_attempt_within_jitter_window() {
479 let base = Duration::from_millis(100);
483 for attempt in 0u32..4 {
484 let factor = 2u32.pow(attempt);
485 let lower = base * factor;
486 let upper = base * (factor + 1);
487 for _ in 0..32 {
488 let d = backoff_delay(base, attempt);
489 assert!(
490 d >= lower && d < upper,
491 "attempt {attempt}: {d:?} not in [{lower:?}, {upper:?})"
492 );
493 }
494 }
495 }
496
497 #[test]
498 fn backoff_delay_saturates_on_absurd_attempt() {
499 let base = Duration::from_secs(1);
500 let d = backoff_delay(base, 100);
502 assert!(d >= base);
504 }
505
506 #[test]
507 fn backoff_delay_zero_base_is_zero() {
508 assert_eq!(backoff_delay(Duration::ZERO, 0), Duration::ZERO);
510 assert_eq!(backoff_delay(Duration::ZERO, 5), Duration::ZERO);
511 }
512
513 #[test]
514 fn truncate_chars_handles_multibyte_safely() {
515 let s = "тест🚀тест";
518 let out = truncate_chars(s, 5);
519 assert!(out.ends_with('…'));
521 assert_eq!(out.chars().count(), 6);
522 }
523
524 #[test]
525 fn truncate_chars_short_input_unchanged() {
526 assert_eq!(truncate_chars("abc", 10), "abc");
527 assert_eq!(truncate_chars("", 10), "");
528 }
529
530 #[test]
531 fn json_rpc_response_into_result_returns_result_when_ok() {
532 let response: JsonRpcResponse<u32> = JsonRpcResponse {
533 result: Some(42),
534 error: None,
535 };
536 assert_eq!(response.into_result().unwrap(), 42);
537 }
538
539 #[test]
540 fn json_rpc_response_into_result_propagates_error_field() {
541 let response: JsonRpcResponse<u32> = JsonRpcResponse {
542 result: None,
543 error: Some(serde_json::json!({"code": -32000, "message": "boom"})),
544 };
545 let err = response.into_result().unwrap_err();
546 assert!(matches!(err, ForkError::RpcError(_)));
547 }
548
549 #[test]
550 fn json_rpc_response_into_result_errors_when_no_result_no_error() {
551 let response: JsonRpcResponse<u32> = JsonRpcResponse {
552 result: None,
553 error: None,
554 };
555 let err = response.into_result().unwrap_err();
556 assert!(matches!(err, ForkError::RpcNoResult));
557 }
558
559 #[test]
560 fn rpc_config_default_has_sensible_values() {
561 let cfg = RpcConfig::default();
562 assert!(cfg.retries >= 1);
563 assert!(cfg.base_retry_delay >= Duration::from_millis(50));
564 assert_eq!(cfg.max_keys_per_request, 200);
565 assert!(cfg.request_timeout.is_some());
566 }
567}