soroban_fork/source.rs
1//! `SnapshotSource` implementation that fetches ledger entries on demand.
2//!
3//! The VM asks for a ledger entry via [`SnapshotSource::get`]; we check the
4//! in-memory cache first and, on miss, defer to an [`RpcClient`]. Results
5//! (including confirmed-missing entries) are memoized so the second lookup
6//! of the same key is always local.
7//!
8//! # Thread-safety & internal representation
9//!
10//! Cached entries live as **XDR-encoded bytes** in a `Mutex<BTreeMap>`,
11//! decoded back into a fresh `Rc<LedgerEntry>` only when the SDK's
12//! [`SnapshotSource::get`] hands one out across the trait boundary.
13//! Storing bytes (rather than `Rc<LedgerEntry>` directly) is what lets the
14//! struct be `Send + Sync`: `Rc` is intentionally single-threaded, so any
15//! `Rc` inside a shared field would taint the whole type. With bytes, the
16//! shared cache is fully thread-safe; the per-call `Rc` is created on the
17//! consumer's thread inside `get` and never crosses a boundary.
18//!
19//! Decode cost on a cache hit is the XDR-parse of one `LedgerEntry`
20//! (microseconds for typical entries). The cost is paid on every call to
21//! `get`, including hits — if profiling ever shows this on a hot path, a
22//! per-thread parsed-entry memoization layer can be added without changing
23//! the public API.
24
25use std::collections::BTreeMap;
26use std::rc::Rc;
27use std::sync::atomic::{AtomicU32, Ordering};
28use std::sync::{Arc, Mutex};
29
30use log::{info, warn};
31use soroban_env_host::storage::{EntryWithLiveUntil, SnapshotSource};
32use soroban_env_host::xdr::{
33 AccountId, ContractDataDurability, LedgerEntry, LedgerEntryData, LedgerKey, LedgerKeyAccount,
34 Limits, PublicKey, ReadXdr, ScAddress, ScVal, SequenceNumber, WriteXdr,
35};
36use soroban_env_host::HostError;
37
38use crate::rpc::RpcClient;
39
40/// Policy for how the source reacts to transport failures inside the VM loop.
41#[derive(Clone, Copy, Debug, PartialEq, Eq)]
42pub enum FetchMode {
43 /// Surface the error as a panic. Appropriate for tests where a missing
44 /// entry is always a real bug — you want to see the failing key.
45 Strict,
46 /// Log the error and return `None`. Useful when you're OK with the VM
47 /// observing a non-existent entry if the RPC is flaky, e.g. when
48 /// probing endpoints you don't strictly require.
49 Lenient,
50}
51
52/// Internal cache value: XDR-encoded `LedgerEntry` bytes plus the
53/// `live_until` ledger hint from the RPC. Storing bytes (not a parsed
54/// `LedgerEntry` wrapped in `Rc`) is what gives the source its
55/// `Send + Sync` guarantee — see the module-level docs.
56type CachedBytes = (Vec<u8>, Option<u32>);
57
58/// A [`SnapshotSource`] backed by a Soroban RPC + local cache.
59///
60/// Cache semantics:
61/// - `Some(Some(entry))` → we've seen it, entry exists.
62/// - `Some(None)` → we've asked, RPC said the entry doesn't exist. Negative
63/// cache — stops us re-asking for keys we know are absent.
64/// - `None` → we haven't asked yet.
65///
66/// Thread-safety: this type is `Send + Sync`. Wrap it in [`Arc`] to share
67/// across threads — for example, between a future RPC-server worker pool
68/// and the `Env` instances handed to each request. The SDK's
69/// `SnapshotSource` trait still expects an [`Rc`] at its boundary, so a
70/// fresh `Rc<LedgerEntry>` is built per `get` call from cached bytes; the
71/// `Rc` never escapes its caller's thread.
72pub struct RpcSnapshotSource {
73 cache: Mutex<BTreeMap<LedgerKey, Option<CachedBytes>>>,
74 client: Arc<RpcClient>,
75 fetch_count: AtomicU32,
76 fetch_mode: FetchMode,
77}
78
79// Compile-time guarantee that the source is `Send + Sync`. Living at
80// module scope (not inside `cfg(test)`) means a future change that
81// reintroduces `Rc`/`RefCell` breaks `cargo build`, not just `cargo test`
82// — the safety net runs in every developer's local edit cycle.
83const _: fn() = || {
84 fn assert_send<T: Send>() {}
85 fn assert_sync<T: Sync>() {}
86 assert_send::<RpcSnapshotSource>();
87 assert_sync::<RpcSnapshotSource>();
88};
89
90impl RpcSnapshotSource {
91 /// Wrap the given RPC client. `Arc` so the source can be cloned cheaply
92 /// and shared with other harnesses (e.g. a pre-warmer).
93 pub fn new(client: Arc<RpcClient>) -> Self {
94 Self {
95 cache: Mutex::new(BTreeMap::new()),
96 client,
97 fetch_count: AtomicU32::new(0),
98 fetch_mode: FetchMode::Strict,
99 }
100 }
101
102 /// Set the fetch mode. Builder-style — consumes `self` and returns it.
103 pub fn with_fetch_mode(mut self, mode: FetchMode) -> Self {
104 self.fetch_mode = mode;
105 self
106 }
107
108 /// Pre-populate the cache from a snapshot file. Entries loaded this
109 /// way do not count towards `fetch_count`.
110 pub fn preload(
111 &self,
112 entries: impl IntoIterator<Item = (LedgerKey, LedgerEntry, Option<u32>)>,
113 ) {
114 let mut cache = self.cache.lock().expect("cache mutex poisoned");
115 for (key, entry, live_until) in entries {
116 cache.insert(key, Some((encode_entry(&entry), live_until)));
117 }
118 }
119
120 /// How many RPC fetches this source has *attempted* since creation
121 /// (counts both successful and failed attempts — the counter
122 /// increments before the network call, so a connect timeout still
123 /// shows up here). Useful for asserting cache hit-rates in tests.
124 pub fn fetch_count(&self) -> u32 {
125 self.fetch_count.load(Ordering::Relaxed)
126 }
127
128 /// Force-write a single `LedgerEntry` into the cache, replacing
129 /// whatever was there (or creating a fresh entry if the key was
130 /// absent). Powers the JSON-RPC `fork_setLedgerEntry` extension
131 /// — clients hand us an XDR-encoded entry, we trust them and
132 /// install it. Subsequent reads (including via the host's
133 /// recording-mode storage in `simulateTransaction` /
134 /// `sendTransaction`) see the new entry.
135 ///
136 /// Stellar's storage model maps every piece of network state to
137 /// one `LedgerEntry` per key, so this single primitive covers
138 /// every flavor of fork-mode state mutation: oracle-price
139 /// rewrites (a `ContractData` entry), token balance overrides
140 /// (a `Trustline` or `ContractData` entry), contract code
141 /// replacement (a `ContractCode` entry). Higher-level wrappers
142 /// (`setBalance`, `setCode`, etc.) compose on top.
143 ///
144 /// `live_until` carries forward an optional TTL hint — pass
145 /// `None` for entries that don't have one (Account, Trustline)
146 /// or when the test doesn't care about expiry.
147 pub fn set_entry(&self, key: LedgerKey, entry: LedgerEntry, live_until: Option<u32>) {
148 let bytes = encode_entry(&entry);
149 let mut cache = self.cache.lock().expect("cache mutex poisoned");
150 cache.insert(key, Some((bytes, live_until)));
151 }
152
153 /// Bump the `seq_num` of an Account ledger entry that's already
154 /// in the cache. Returns the new sequence on success, `None` if
155 /// the account isn't cached or the cached entry isn't an
156 /// `AccountEntry` (so the caller can decide whether absence is
157 /// an error or just "first send from a never-touched account").
158 ///
159 /// Stellar's transaction validation expects `tx.seq_num ==
160 /// account.seq_num + 1` and post-success leaves the account at
161 /// `tx.seq_num`. The fork's trust mode skips the pre-check but
162 /// still must increment so the *next* envelope a JS-SDK client
163 /// builds (via `getAccount` → `tx.seq_num + 1`) lines up with
164 /// what the host expects.
165 pub fn bump_account_seq(&self, account_id: &AccountId) -> Option<i64> {
166 let key = LedgerKey::Account(LedgerKeyAccount {
167 account_id: account_id.clone(),
168 });
169 let mut cache = self.cache.lock().expect("cache mutex poisoned");
170 let cached = cache.get_mut(&key)?;
171 let bytes_and_ttl = cached.as_mut()?;
172 let bytes = &bytes_and_ttl.0;
173 let mut entry = LedgerEntry::from_xdr(bytes, Limits::none()).ok()?;
174 let new_seq = match &mut entry.data {
175 LedgerEntryData::Account(account) => {
176 let SequenceNumber(current) = account.seq_num;
177 let next = current.wrapping_add(1);
178 account.seq_num = SequenceNumber(next);
179 next
180 }
181 _ => return None,
182 };
183 let new_bytes = entry.to_xdr(Limits::none()).ok()?;
184 bytes_and_ttl.0 = new_bytes;
185 Some(new_seq)
186 }
187
188 /// Apply a batch of `LedgerEntryChange`s back to the cache so that
189 /// subsequent reads see the writes. Powers the JSON-RPC server's
190 /// `sendTransaction` — recording-mode invocation gives us a list
191 /// of changes; we walk them and update the cached bytes.
192 ///
193 /// Semantics per change:
194 /// - `read_only == true` → ignored. The host won't have produced
195 /// a `new_value` and TTL bumps don't change entry contents.
196 /// - `encoded_new_value == Some(bytes)` → overwrite the cached
197 /// entry. Existing live-until is kept (TTL changes are tracked
198 /// separately by the host but not yet plumbed here).
199 /// - `encoded_new_value == None` (read-write) → entry was
200 /// removed; flip the cache to the negative-cache `None` marker
201 /// so subsequent `get`s see absence locally without re-asking
202 /// upstream RPC.
203 ///
204 /// `key` and `entry` decode panics in this method are the same
205 /// "structural bug, not recoverable" class as elsewhere in this
206 /// file: bytes came directly from the host that just produced
207 /// them, so a decode failure means the host violated its own
208 /// XDR-shape invariant.
209 pub fn apply_changes<I>(&self, changes: I) -> u32
210 where
211 I: IntoIterator<Item = soroban_env_host::e2e_invoke::LedgerEntryChange>,
212 {
213 let mut cache = self.cache.lock().expect("cache mutex poisoned");
214 let mut applied: u32 = 0;
215 for change in changes {
216 if change.read_only {
217 continue;
218 }
219 let key = LedgerKey::from_xdr(&change.encoded_key, Limits::none())
220 .unwrap_or_else(|e| panic!("apply_changes: bad LedgerKey from host: {e}"));
221 match change.encoded_new_value {
222 Some(bytes) => {
223 let live_until = change.ttl_change.as_ref().map(|t| t.new_live_until_ledger);
224 cache.insert(key, Some((bytes, live_until)));
225 }
226 None => {
227 cache.insert(key, None);
228 }
229 }
230 applied = applied.saturating_add(1);
231 }
232 applied
233 }
234
235 /// Export the cache for persistence. Negative-cache entries (confirmed
236 /// missing) are intentionally omitted — they aren't useful across
237 /// processes and bloat the on-disk snapshot.
238 ///
239 /// The decode step runs **outside** the cache lock: we snapshot raw
240 /// bytes under the lock, release it, then parse. With a 10k-entry
241 /// fork this avoids blocking concurrent `get` and `fetch` calls for
242 /// the duration of a few-ms parse loop.
243 pub fn entries(&self) -> Vec<(LedgerKey, LedgerEntry, Option<u32>)> {
244 let raw: Vec<(LedgerKey, Vec<u8>, Option<u32>)> = {
245 let cache = self.cache.lock().expect("cache mutex poisoned");
246 cache
247 .iter()
248 .filter_map(|(key, val)| {
249 val.as_ref()
250 .map(|(bytes, live_until)| (key.clone(), bytes.clone(), *live_until))
251 })
252 .collect()
253 };
254 raw.into_iter()
255 .map(|(key, bytes, live_until)| (key, decode_entry(&bytes), live_until))
256 .collect()
257 }
258
259 /// Issue an RPC fetch and memoize whatever we get — including a
260 /// `None` on Lenient errors, matching the original RefCell-era
261 /// behavior. Caching the negative result on a Lenient error means
262 /// later `get`s return `None` immediately without retrying; users
263 /// who want retries should rebuild the env (the cache is per-Source).
264 fn fetch_from_rpc(&self, key: &LedgerKey) -> Option<EntryWithLiveUntil> {
265 // `fetch_add` returns the prior value; `+ 1` gives a 1-based
266 // monotonic fetch number that survives concurrent racers.
267 let count = self.fetch_count.fetch_add(1, Ordering::Relaxed) + 1;
268
269 info!("soroban-fork: fetch #{count}: {}", key_display(key));
270
271 let result: Option<EntryWithLiveUntil> = match self.client.fetch_entry(key) {
272 Ok(Some(fetched)) => Some((Rc::new(fetched.entry), fetched.live_until)),
273 Ok(None) => {
274 info!("soroban-fork: fetch #{count}: not found on ledger");
275 None
276 }
277 Err(e) => match self.fetch_mode {
278 FetchMode::Strict => {
279 // Panicking inside `SnapshotSource::get` would be caught
280 // and reformatted by the host; surfacing via panic here
281 // keeps the error readable in standard test output.
282 panic!("soroban-fork: RPC fetch #{count} failed (strict): {e}")
283 }
284 FetchMode::Lenient => {
285 warn!("soroban-fork: RPC fetch #{count} error (lenient): {e}");
286 None
287 }
288 },
289 };
290
291 // Encode the positive case into bytes for shared storage; persist
292 // negative case as `None` so re-asks for absent keys are local.
293 let cached = result
294 .as_ref()
295 .map(|(rc, live_until)| (encode_entry(rc.as_ref()), *live_until));
296 self.cache
297 .lock()
298 .expect("cache mutex poisoned")
299 .insert(key.clone(), cached);
300
301 result
302 }
303}
304
305impl SnapshotSource for RpcSnapshotSource {
306 fn get(
307 &self,
308 key: &Rc<LedgerKey>,
309 ) -> std::result::Result<Option<EntryWithLiveUntil>, HostError> {
310 // Lock briefly: take a clone of the cached value (cheap memcpy of a
311 // small Vec), drop the lock, decode outside the critical section.
312 // Holding the lock across decode would needlessly serialise
313 // concurrent readers.
314 let cached = self
315 .cache
316 .lock()
317 .expect("cache mutex poisoned")
318 .get(key.as_ref())
319 .cloned();
320
321 if let Some(value) = cached {
322 return Ok(value.map(|(bytes, live_until)| (Rc::new(decode_entry(&bytes)), live_until)));
323 }
324
325 Ok(self.fetch_from_rpc(key.as_ref()))
326 }
327}
328
329// ---------------------------------------------------------------------------
330// XDR codec helpers
331// ---------------------------------------------------------------------------
332
333/// Encode a `LedgerEntry` to its XDR byte representation.
334///
335/// Panics if encoding fails. With `Limits::none()` there are no size caps,
336/// so a failure here means the input was structurally invalid (e.g.
337/// malformed XDR enum discriminant) — that's a bug in whoever produced the
338/// `LedgerEntry`, not a runtime condition we can recover from gracefully.
339fn encode_entry(entry: &LedgerEntry) -> Vec<u8> {
340 entry
341 .to_xdr(Limits::none())
342 .unwrap_or_else(|e| panic!("soroban-fork: LedgerEntry encode failed (structural bug): {e}"))
343}
344
345/// Decode an XDR-encoded `LedgerEntry`.
346///
347/// Panics if decoding fails. The bytes always come from `encode_entry`
348/// in this module (RPC fetches and JSON-loaded `LedgerSnapshot` entries
349/// are both routed through `encode_entry` before they hit the cache),
350/// so a failure here means the cache contents are corrupted — memory
351/// damage, a process bug, or someone replaced the bytes externally.
352/// Not recoverable.
353fn decode_entry(bytes: &[u8]) -> LedgerEntry {
354 LedgerEntry::from_xdr(bytes, Limits::none()).unwrap_or_else(|e| {
355 panic!(
356 "soroban-fork: cached LedgerEntry decode failed — cache corruption or \
357 XDR-version mismatch: {e}"
358 )
359 })
360}
361
362// ---------------------------------------------------------------------------
363// Diagnostic formatting helpers — kept private but exercised by unit tests.
364// ---------------------------------------------------------------------------
365
366fn key_display(key: &LedgerKey) -> String {
367 match key {
368 LedgerKey::ContractData(cd) => {
369 let addr = sc_address_short(&cd.contract);
370 if cd.key == ScVal::LedgerKeyContractInstance {
371 format!("ContractData({addr}, instance)")
372 } else {
373 let dur = match cd.durability {
374 ContractDataDurability::Temporary => "temp",
375 ContractDataDurability::Persistent => "persistent",
376 };
377 format!("ContractData({addr}, {dur})")
378 }
379 }
380 LedgerKey::ContractCode(cc) => {
381 let h = &cc.hash.0;
382 format!(
383 "ContractCode({:02x}{:02x}{:02x}{:02x}...)",
384 h[0], h[1], h[2], h[3]
385 )
386 }
387 LedgerKey::Account(a) => {
388 format!("Account({})", account_id_short(&a.account_id))
389 }
390 LedgerKey::Trustline(t) => {
391 format!("Trustline({})", account_id_short(&t.account_id))
392 }
393 LedgerKey::ConfigSetting(_) => "ConfigSetting".to_string(),
394 LedgerKey::Ttl(_) => "Ttl".to_string(),
395 _ => "Other".to_string(),
396 }
397}
398
399fn sc_address_short(addr: &ScAddress) -> String {
400 // `stellar_strkey`'s `Display` impl writes to `fmt::Formatter`, returning
401 // `std::String` via `format!`. Calling `.to_string()` directly on some
402 // of these types picks up a `heapless::String<N>` via the `SerdeSeq`
403 // surface — we always want the heap-allocating std one.
404 let full = match addr {
405 ScAddress::Contract(hash) => {
406 format!("{}", stellar_strkey::Contract(hash.0.clone().into()))
407 }
408 ScAddress::Account(id) => account_id_full(id),
409 _ => "???".to_string(),
410 };
411 abbreviate(&full)
412}
413
414fn account_id_short(id: &soroban_env_host::xdr::AccountId) -> String {
415 abbreviate(&account_id_full(id))
416}
417
418fn account_id_full(id: &soroban_env_host::xdr::AccountId) -> String {
419 let PublicKey::PublicKeyTypeEd25519(k) = &id.0;
420 format!("{}", stellar_strkey::ed25519::PublicKey(k.0))
421}
422
423fn abbreviate(s: &str) -> String {
424 if s.len() > 12 {
425 format!("{}...{}", &s[..4], &s[s.len() - 4..])
426 } else {
427 s.to_string()
428 }
429}
430
431// ---------------------------------------------------------------------------
432// Tests — pure, no network required.
433// ---------------------------------------------------------------------------
434
435#[cfg(test)]
436mod tests {
437 use super::*;
438 use soroban_env_host::xdr::{
439 ConfigSettingEntry, ConfigSettingId, LedgerEntryData, LedgerEntryExt,
440 LedgerKeyConfigSetting,
441 };
442
443 fn dummy_client() -> Arc<RpcClient> {
444 Arc::new(
445 RpcClient::new("http://localhost:0", crate::rpc::RpcConfig::default())
446 .expect("client construction should not fail"),
447 )
448 }
449
450 fn dummy_entry(last_modified: u32) -> (LedgerKey, LedgerEntry, Option<u32>) {
451 let key = LedgerKey::ConfigSetting(LedgerKeyConfigSetting {
452 config_setting_id: ConfigSettingId::ContractMaxSizeBytes,
453 });
454 let entry = LedgerEntry {
455 last_modified_ledger_seq: last_modified,
456 data: LedgerEntryData::ConfigSetting(ConfigSettingEntry::ContractMaxSizeBytes(65_536)),
457 ext: LedgerEntryExt::V0,
458 };
459 (key, entry, None)
460 }
461
462 #[test]
463 fn abbreviate_short_string_is_unchanged() {
464 assert_eq!(abbreviate("abc"), "abc");
465 assert_eq!(abbreviate("12345678"), "12345678");
466 }
467
468 #[test]
469 fn abbreviate_long_string_collapses_middle() {
470 // 56-char Stellar address shape — first 4 + last 4 chars, with "..."
471 // between. Keep as a concrete-value assertion rather than a shape
472 // assertion: this is exactly what humans see in the logs, so an
473 // accidental format change should fail loudly here.
474 let full = "GABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890ABCDEFGHIJKLMNOPQR";
475 let short = abbreviate(full);
476 assert_eq!(short, "GABC...OPQR");
477 assert!(short.len() < full.len());
478 }
479
480 #[test]
481 fn key_display_renders_config_setting() {
482 let key = LedgerKey::ConfigSetting(LedgerKeyConfigSetting {
483 config_setting_id: ConfigSettingId::ContractMaxSizeBytes,
484 });
485 assert_eq!(key_display(&key), "ConfigSetting");
486 }
487
488 #[test]
489 fn fetch_mode_default_is_strict() {
490 // Documented contract: new sources start in Strict until explicitly
491 // opted down. Keep this test as a guard against silent regressions.
492 let src = RpcSnapshotSource::new(dummy_client());
493 assert_eq!(src.fetch_mode, FetchMode::Strict);
494 }
495
496 #[test]
497 fn xdr_round_trip_preserves_ledger_entry() {
498 // The cache stores XDR bytes; correctness depends on encode/decode
499 // being a true identity. If a future XDR-codec change ever broke
500 // round-tripping, every cache hit would silently corrupt state —
501 // this test pins the invariant.
502 let (_, entry, _) = dummy_entry(42);
503 let encoded = encode_entry(&entry);
504 let decoded = decode_entry(&encoded);
505 assert_eq!(entry, decoded);
506 // And re-encoding the decoded value must reproduce the same bytes.
507 assert_eq!(encoded, encode_entry(&decoded));
508 }
509
510 #[test]
511 fn preload_then_entries_round_trips() {
512 let src = RpcSnapshotSource::new(dummy_client());
513 let original = vec![dummy_entry(7)];
514 src.preload(original.clone());
515 let exported = src.entries();
516 assert_eq!(exported.len(), 1);
517 assert_eq!(exported[0].0, original[0].0);
518 assert_eq!(exported[0].1, original[0].1);
519 assert_eq!(exported[0].2, original[0].2);
520 }
521
522 #[test]
523 fn get_returns_preloaded_entry() {
524 let src = RpcSnapshotSource::new(dummy_client());
525 let (key, entry, live_until) = dummy_entry(99);
526 src.preload(vec![(key.clone(), entry.clone(), live_until)]);
527
528 let key_rc = Rc::new(key);
529 let result = src.get(&key_rc).expect("get should not error");
530 let (got_entry, got_live_until) = result.expect("preloaded entry should be present");
531 assert_eq!(got_entry.as_ref(), &entry);
532 assert_eq!(got_live_until, live_until);
533 // Preloads do not count as fetches.
534 assert_eq!(src.fetch_count(), 0);
535 }
536
537 #[test]
538 fn concurrent_reads_of_preloaded_entry_are_race_free() {
539 // Eight threads racing through `get` on the same preloaded key.
540 // No fetch should fire (count stays 0). With RefCell the borrow
541 // would panic; with Mutex<bytes>, we get correct shared access.
542 use std::thread;
543 let src = Arc::new(RpcSnapshotSource::new(dummy_client()));
544 let (key, entry, live_until) = dummy_entry(123);
545 src.preload(vec![(key.clone(), entry.clone(), live_until)]);
546
547 let mut handles = Vec::new();
548 for _ in 0..8 {
549 let src = Arc::clone(&src);
550 let key = key.clone();
551 let entry = entry.clone();
552 handles.push(thread::spawn(move || {
553 let key_rc = Rc::new(key);
554 for _ in 0..100 {
555 let got = src
556 .get(&key_rc)
557 .expect("get should not error")
558 .expect("entry should be present");
559 assert_eq!(got.0.as_ref(), &entry);
560 }
561 }));
562 }
563 for h in handles {
564 h.join().expect("worker thread panicked");
565 }
566 assert_eq!(src.fetch_count(), 0, "no RPC fetches should have fired");
567 }
568}