Skip to main content

soroban_fork/
source.rs

1//! `SnapshotSource` implementation that fetches ledger entries on demand.
2//!
3//! The VM asks for a ledger entry via [`SnapshotSource::get`]; we check the
4//! in-memory cache first and, on miss, defer to an [`RpcClient`]. Results
5//! (including confirmed-missing entries) are memoized so the second lookup
6//! of the same key is always local.
7//!
8//! # Thread-safety & internal representation
9//!
10//! Cached entries live as **XDR-encoded bytes** in a `Mutex<BTreeMap>`,
11//! decoded back into a fresh `Rc<LedgerEntry>` only when the SDK's
12//! [`SnapshotSource::get`] hands one out across the trait boundary.
13//! Storing bytes (rather than `Rc<LedgerEntry>` directly) is what lets the
14//! struct be `Send + Sync`: `Rc` is intentionally single-threaded, so any
15//! `Rc` inside a shared field would taint the whole type. With bytes, the
16//! shared cache is fully thread-safe; the per-call `Rc` is created on the
17//! consumer's thread inside `get` and never crosses a boundary.
18//!
19//! Decode cost on a cache hit is the XDR-parse of one `LedgerEntry`
20//! (microseconds for typical entries). The cost is paid on every call to
21//! `get`, including hits — if profiling ever shows this on a hot path, a
22//! per-thread parsed-entry memoization layer can be added without changing
23//! the public API.
24
25use std::collections::BTreeMap;
26use std::rc::Rc;
27use std::sync::atomic::{AtomicU32, Ordering};
28use std::sync::{Arc, Mutex};
29
30use log::{info, warn};
31use soroban_env_host::storage::{EntryWithLiveUntil, SnapshotSource};
32use soroban_env_host::xdr::{
33    AccountId, ContractDataDurability, LedgerEntry, LedgerEntryData, LedgerKey, LedgerKeyAccount,
34    Limits, PublicKey, ReadXdr, ScAddress, ScVal, SequenceNumber, WriteXdr,
35};
36use soroban_env_host::HostError;
37
38use crate::rpc::RpcClient;
39
40/// Policy for how the source reacts to transport failures inside the VM loop.
41#[derive(Clone, Copy, Debug, PartialEq, Eq)]
42pub enum FetchMode {
43    /// Surface the error as a panic. Appropriate for tests where a missing
44    /// entry is always a real bug — you want to see the failing key.
45    Strict,
46    /// Log the error and return `None`. Useful when you're OK with the VM
47    /// observing a non-existent entry if the RPC is flaky, e.g. when
48    /// probing endpoints you don't strictly require.
49    Lenient,
50}
51
52/// Internal cache value: XDR-encoded `LedgerEntry` bytes plus the
53/// `live_until` ledger hint from the RPC. Storing bytes (not a parsed
54/// `LedgerEntry` wrapped in `Rc`) is what gives the source its
55/// `Send + Sync` guarantee — see the module-level docs.
56type CachedBytes = (Vec<u8>, Option<u32>);
57
58/// A [`SnapshotSource`] backed by a Soroban RPC + local cache.
59///
60/// Cache semantics:
61/// - `Some(Some(entry))` → we've seen it, entry exists.
62/// - `Some(None)` → we've asked, RPC said the entry doesn't exist. Negative
63///   cache — stops us re-asking for keys we know are absent.
64/// - `None` → we haven't asked yet.
65///
66/// Thread-safety: this type is `Send + Sync`. Wrap it in [`Arc`] to share
67/// across threads — for example, between a future RPC-server worker pool
68/// and the `Env` instances handed to each request. The SDK's
69/// `SnapshotSource` trait still expects an [`Rc`] at its boundary, so a
70/// fresh `Rc<LedgerEntry>` is built per `get` call from cached bytes; the
71/// `Rc` never escapes its caller's thread.
72pub struct RpcSnapshotSource {
73    cache: Mutex<BTreeMap<LedgerKey, Option<CachedBytes>>>,
74    client: Arc<RpcClient>,
75    fetch_count: AtomicU32,
76    fetch_mode: FetchMode,
77}
78
79// Compile-time guarantee that the source is `Send + Sync`. Living at
80// module scope (not inside `cfg(test)`) means a future change that
81// reintroduces `Rc`/`RefCell` breaks `cargo build`, not just `cargo test`
82// — the safety net runs in every developer's local edit cycle.
83const _: fn() = || {
84    fn assert_send<T: Send>() {}
85    fn assert_sync<T: Sync>() {}
86    assert_send::<RpcSnapshotSource>();
87    assert_sync::<RpcSnapshotSource>();
88};
89
90impl RpcSnapshotSource {
91    /// Wrap the given RPC client. `Arc` so the source can be cloned cheaply
92    /// and shared with other harnesses (e.g. a pre-warmer).
93    pub fn new(client: Arc<RpcClient>) -> Self {
94        Self {
95            cache: Mutex::new(BTreeMap::new()),
96            client,
97            fetch_count: AtomicU32::new(0),
98            fetch_mode: FetchMode::Strict,
99        }
100    }
101
102    /// Set the fetch mode. Builder-style — consumes `self` and returns it.
103    pub fn with_fetch_mode(mut self, mode: FetchMode) -> Self {
104        self.fetch_mode = mode;
105        self
106    }
107
108    /// Pre-populate the cache from a snapshot file. Entries loaded this
109    /// way do not count towards `fetch_count`.
110    pub fn preload(
111        &self,
112        entries: impl IntoIterator<Item = (LedgerKey, LedgerEntry, Option<u32>)>,
113    ) {
114        let mut cache = self.cache.lock().expect("cache mutex poisoned");
115        for (key, entry, live_until) in entries {
116            cache.insert(key, Some((encode_entry(&entry), live_until)));
117        }
118    }
119
120    /// How many RPC fetches this source has *attempted* since creation
121    /// (counts both successful and failed attempts — the counter
122    /// increments before the network call, so a connect timeout still
123    /// shows up here). Useful for asserting cache hit-rates in tests.
124    pub fn fetch_count(&self) -> u32 {
125        self.fetch_count.load(Ordering::Relaxed)
126    }
127
128    /// Force-write a single `LedgerEntry` into the cache, replacing
129    /// whatever was there (or creating a fresh entry if the key was
130    /// absent). Powers the JSON-RPC `fork_setLedgerEntry` extension
131    /// — clients hand us an XDR-encoded entry, we trust them and
132    /// install it. Subsequent reads (including via the host's
133    /// recording-mode storage in `simulateTransaction` /
134    /// `sendTransaction`) see the new entry.
135    ///
136    /// Stellar's storage model maps every piece of network state to
137    /// one `LedgerEntry` per key, so this single primitive covers
138    /// every flavor of fork-mode state mutation: oracle-price
139    /// rewrites (a `ContractData` entry), token balance overrides
140    /// (a `Trustline` or `ContractData` entry), contract code
141    /// replacement (a `ContractCode` entry). Higher-level wrappers
142    /// (`setBalance`, `setCode`, etc.) compose on top.
143    ///
144    /// `live_until` carries forward an optional TTL hint — pass
145    /// `None` for entries that don't have one (Account, Trustline)
146    /// or when the test doesn't care about expiry.
147    pub fn set_entry(&self, key: LedgerKey, entry: LedgerEntry, live_until: Option<u32>) {
148        let bytes = encode_entry(&entry);
149        let mut cache = self.cache.lock().expect("cache mutex poisoned");
150        cache.insert(key, Some((bytes, live_until)));
151    }
152
153    /// Bump the `seq_num` of an Account ledger entry that's already
154    /// in the cache. Returns the new sequence on success, `None` if
155    /// the account isn't cached or the cached entry isn't an
156    /// `AccountEntry` (so the caller can decide whether absence is
157    /// an error or just "first send from a never-touched account").
158    ///
159    /// Stellar's transaction validation expects `tx.seq_num ==
160    /// account.seq_num + 1` and post-success leaves the account at
161    /// `tx.seq_num`. The fork's trust mode skips the pre-check but
162    /// still must increment so the *next* envelope a JS-SDK client
163    /// builds (via `getAccount` → `tx.seq_num + 1`) lines up with
164    /// what the host expects.
165    pub fn bump_account_seq(&self, account_id: &AccountId) -> Option<i64> {
166        let key = LedgerKey::Account(LedgerKeyAccount {
167            account_id: account_id.clone(),
168        });
169        let mut cache = self.cache.lock().expect("cache mutex poisoned");
170        let cached = cache.get_mut(&key)?;
171        let bytes_and_ttl = cached.as_mut()?;
172        let bytes = &bytes_and_ttl.0;
173        let mut entry = LedgerEntry::from_xdr(bytes, Limits::none()).ok()?;
174        let new_seq = match &mut entry.data {
175            LedgerEntryData::Account(account) => {
176                let SequenceNumber(current) = account.seq_num;
177                let next = current.wrapping_add(1);
178                account.seq_num = SequenceNumber(next);
179                next
180            }
181            _ => return None,
182        };
183        let new_bytes = entry.to_xdr(Limits::none()).ok()?;
184        bytes_and_ttl.0 = new_bytes;
185        Some(new_seq)
186    }
187
188    /// Apply a batch of `LedgerEntryChange`s back to the cache so that
189    /// subsequent reads see the writes. Powers the JSON-RPC server's
190    /// `sendTransaction` — recording-mode invocation gives us a list
191    /// of changes; we walk them and update the cached bytes.
192    ///
193    /// Semantics per change:
194    /// - `read_only == true` → ignored. The host won't have produced
195    ///   a `new_value` and TTL bumps don't change entry contents.
196    /// - `encoded_new_value == Some(bytes)` → overwrite the cached
197    ///   entry. Existing live-until is kept (TTL changes are tracked
198    ///   separately by the host but not yet plumbed here).
199    /// - `encoded_new_value == None` (read-write) → entry was
200    ///   removed; flip the cache to the negative-cache `None` marker
201    ///   so subsequent `get`s see absence locally without re-asking
202    ///   upstream RPC.
203    ///
204    /// `key` and `entry` decode panics in this method are the same
205    /// "structural bug, not recoverable" class as elsewhere in this
206    /// file: bytes came directly from the host that just produced
207    /// them, so a decode failure means the host violated its own
208    /// XDR-shape invariant.
209    pub fn apply_changes<I>(&self, changes: I) -> u32
210    where
211        I: IntoIterator<Item = soroban_env_host::e2e_invoke::LedgerEntryChange>,
212    {
213        let mut cache = self.cache.lock().expect("cache mutex poisoned");
214        let mut applied: u32 = 0;
215        for change in changes {
216            if change.read_only {
217                continue;
218            }
219            let key = LedgerKey::from_xdr(&change.encoded_key, Limits::none())
220                .unwrap_or_else(|e| panic!("apply_changes: bad LedgerKey from host: {e}"));
221            match change.encoded_new_value {
222                Some(bytes) => {
223                    let live_until = change.ttl_change.as_ref().map(|t| t.new_live_until_ledger);
224                    cache.insert(key, Some((bytes, live_until)));
225                }
226                None => {
227                    cache.insert(key, None);
228                }
229            }
230            applied = applied.saturating_add(1);
231        }
232        applied
233    }
234
235    /// Export the cache for persistence. Negative-cache entries (confirmed
236    /// missing) are intentionally omitted — they aren't useful across
237    /// processes and bloat the on-disk snapshot.
238    ///
239    /// The decode step runs **outside** the cache lock: we snapshot raw
240    /// bytes under the lock, release it, then parse. With a 10k-entry
241    /// fork this avoids blocking concurrent `get` and `fetch` calls for
242    /// the duration of a few-ms parse loop.
243    pub fn entries(&self) -> Vec<(LedgerKey, LedgerEntry, Option<u32>)> {
244        let raw: Vec<(LedgerKey, Vec<u8>, Option<u32>)> = {
245            let cache = self.cache.lock().expect("cache mutex poisoned");
246            cache
247                .iter()
248                .filter_map(|(key, val)| {
249                    val.as_ref()
250                        .map(|(bytes, live_until)| (key.clone(), bytes.clone(), *live_until))
251                })
252                .collect()
253        };
254        raw.into_iter()
255            .map(|(key, bytes, live_until)| (key, decode_entry(&bytes), live_until))
256            .collect()
257    }
258
259    /// Issue an RPC fetch and memoize whatever we get — including a
260    /// `None` on Lenient errors, matching the original RefCell-era
261    /// behavior. Caching the negative result on a Lenient error means
262    /// later `get`s return `None` immediately without retrying; users
263    /// who want retries should rebuild the env (the cache is per-Source).
264    fn fetch_from_rpc(&self, key: &LedgerKey) -> Option<EntryWithLiveUntil> {
265        // `fetch_add` returns the prior value; `+ 1` gives a 1-based
266        // monotonic fetch number that survives concurrent racers.
267        let count = self.fetch_count.fetch_add(1, Ordering::Relaxed) + 1;
268
269        info!("soroban-fork: fetch #{count}: {}", key_display(key));
270
271        let result: Option<EntryWithLiveUntil> = match self.client.fetch_entry(key) {
272            Ok(Some(fetched)) => Some((Rc::new(fetched.entry), fetched.live_until)),
273            Ok(None) => {
274                info!("soroban-fork: fetch #{count}: not found on ledger");
275                None
276            }
277            Err(e) => match self.fetch_mode {
278                FetchMode::Strict => {
279                    // Panicking inside `SnapshotSource::get` would be caught
280                    // and reformatted by the host; surfacing via panic here
281                    // keeps the error readable in standard test output.
282                    panic!("soroban-fork: RPC fetch #{count} failed (strict): {e}")
283                }
284                FetchMode::Lenient => {
285                    warn!("soroban-fork: RPC fetch #{count} error (lenient): {e}");
286                    None
287                }
288            },
289        };
290
291        // Encode the positive case into bytes for shared storage; persist
292        // negative case as `None` so re-asks for absent keys are local.
293        let cached = result
294            .as_ref()
295            .map(|(rc, live_until)| (encode_entry(rc.as_ref()), *live_until));
296        self.cache
297            .lock()
298            .expect("cache mutex poisoned")
299            .insert(key.clone(), cached);
300
301        result
302    }
303}
304
305impl SnapshotSource for RpcSnapshotSource {
306    fn get(
307        &self,
308        key: &Rc<LedgerKey>,
309    ) -> std::result::Result<Option<EntryWithLiveUntil>, HostError> {
310        // Lock briefly: take a clone of the cached value (cheap memcpy of a
311        // small Vec), drop the lock, decode outside the critical section.
312        // Holding the lock across decode would needlessly serialise
313        // concurrent readers.
314        let cached = self
315            .cache
316            .lock()
317            .expect("cache mutex poisoned")
318            .get(key.as_ref())
319            .cloned();
320
321        if let Some(value) = cached {
322            return Ok(value.map(|(bytes, live_until)| (Rc::new(decode_entry(&bytes)), live_until)));
323        }
324
325        Ok(self.fetch_from_rpc(key.as_ref()))
326    }
327}
328
329// ---------------------------------------------------------------------------
330// XDR codec helpers
331// ---------------------------------------------------------------------------
332
333/// Encode a `LedgerEntry` to its XDR byte representation.
334///
335/// Panics if encoding fails. With `Limits::none()` there are no size caps,
336/// so a failure here means the input was structurally invalid (e.g.
337/// malformed XDR enum discriminant) — that's a bug in whoever produced the
338/// `LedgerEntry`, not a runtime condition we can recover from gracefully.
339fn encode_entry(entry: &LedgerEntry) -> Vec<u8> {
340    entry
341        .to_xdr(Limits::none())
342        .unwrap_or_else(|e| panic!("soroban-fork: LedgerEntry encode failed (structural bug): {e}"))
343}
344
345/// Decode an XDR-encoded `LedgerEntry`.
346///
347/// Panics if decoding fails. The bytes always come from `encode_entry`
348/// in this module (RPC fetches and JSON-loaded `LedgerSnapshot` entries
349/// are both routed through `encode_entry` before they hit the cache),
350/// so a failure here means the cache contents are corrupted — memory
351/// damage, a process bug, or someone replaced the bytes externally.
352/// Not recoverable.
353fn decode_entry(bytes: &[u8]) -> LedgerEntry {
354    LedgerEntry::from_xdr(bytes, Limits::none()).unwrap_or_else(|e| {
355        panic!(
356            "soroban-fork: cached LedgerEntry decode failed — cache corruption or \
357             XDR-version mismatch: {e}"
358        )
359    })
360}
361
362// ---------------------------------------------------------------------------
363// Diagnostic formatting helpers — kept private but exercised by unit tests.
364// ---------------------------------------------------------------------------
365
366fn key_display(key: &LedgerKey) -> String {
367    match key {
368        LedgerKey::ContractData(cd) => {
369            let addr = sc_address_short(&cd.contract);
370            if cd.key == ScVal::LedgerKeyContractInstance {
371                format!("ContractData({addr}, instance)")
372            } else {
373                let dur = match cd.durability {
374                    ContractDataDurability::Temporary => "temp",
375                    ContractDataDurability::Persistent => "persistent",
376                };
377                format!("ContractData({addr}, {dur})")
378            }
379        }
380        LedgerKey::ContractCode(cc) => {
381            let h = &cc.hash.0;
382            format!(
383                "ContractCode({:02x}{:02x}{:02x}{:02x}...)",
384                h[0], h[1], h[2], h[3]
385            )
386        }
387        LedgerKey::Account(a) => {
388            format!("Account({})", account_id_short(&a.account_id))
389        }
390        LedgerKey::Trustline(t) => {
391            format!("Trustline({})", account_id_short(&t.account_id))
392        }
393        LedgerKey::ConfigSetting(_) => "ConfigSetting".to_string(),
394        LedgerKey::Ttl(_) => "Ttl".to_string(),
395        _ => "Other".to_string(),
396    }
397}
398
399fn sc_address_short(addr: &ScAddress) -> String {
400    // `stellar_strkey`'s `Display` impl writes to `fmt::Formatter`, returning
401    // `std::String` via `format!`. Calling `.to_string()` directly on some
402    // of these types picks up a `heapless::String<N>` via the `SerdeSeq`
403    // surface — we always want the heap-allocating std one.
404    let full = match addr {
405        ScAddress::Contract(hash) => {
406            format!("{}", stellar_strkey::Contract(hash.0.clone().into()))
407        }
408        ScAddress::Account(id) => account_id_full(id),
409        _ => "???".to_string(),
410    };
411    abbreviate(&full)
412}
413
414fn account_id_short(id: &soroban_env_host::xdr::AccountId) -> String {
415    abbreviate(&account_id_full(id))
416}
417
418fn account_id_full(id: &soroban_env_host::xdr::AccountId) -> String {
419    let PublicKey::PublicKeyTypeEd25519(k) = &id.0;
420    format!("{}", stellar_strkey::ed25519::PublicKey(k.0))
421}
422
423fn abbreviate(s: &str) -> String {
424    if s.len() > 12 {
425        format!("{}...{}", &s[..4], &s[s.len() - 4..])
426    } else {
427        s.to_string()
428    }
429}
430
431// ---------------------------------------------------------------------------
432// Tests — pure, no network required.
433// ---------------------------------------------------------------------------
434
435#[cfg(test)]
436mod tests {
437    use super::*;
438    use soroban_env_host::xdr::{
439        ConfigSettingEntry, ConfigSettingId, LedgerEntryData, LedgerEntryExt,
440        LedgerKeyConfigSetting,
441    };
442
443    fn dummy_client() -> Arc<RpcClient> {
444        Arc::new(
445            RpcClient::new("http://localhost:0", crate::rpc::RpcConfig::default())
446                .expect("client construction should not fail"),
447        )
448    }
449
450    fn dummy_entry(last_modified: u32) -> (LedgerKey, LedgerEntry, Option<u32>) {
451        let key = LedgerKey::ConfigSetting(LedgerKeyConfigSetting {
452            config_setting_id: ConfigSettingId::ContractMaxSizeBytes,
453        });
454        let entry = LedgerEntry {
455            last_modified_ledger_seq: last_modified,
456            data: LedgerEntryData::ConfigSetting(ConfigSettingEntry::ContractMaxSizeBytes(65_536)),
457            ext: LedgerEntryExt::V0,
458        };
459        (key, entry, None)
460    }
461
462    #[test]
463    fn abbreviate_short_string_is_unchanged() {
464        assert_eq!(abbreviate("abc"), "abc");
465        assert_eq!(abbreviate("12345678"), "12345678");
466    }
467
468    #[test]
469    fn abbreviate_long_string_collapses_middle() {
470        // 56-char Stellar address shape — first 4 + last 4 chars, with "..."
471        // between. Keep as a concrete-value assertion rather than a shape
472        // assertion: this is exactly what humans see in the logs, so an
473        // accidental format change should fail loudly here.
474        let full = "GABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890ABCDEFGHIJKLMNOPQR";
475        let short = abbreviate(full);
476        assert_eq!(short, "GABC...OPQR");
477        assert!(short.len() < full.len());
478    }
479
480    #[test]
481    fn key_display_renders_config_setting() {
482        let key = LedgerKey::ConfigSetting(LedgerKeyConfigSetting {
483            config_setting_id: ConfigSettingId::ContractMaxSizeBytes,
484        });
485        assert_eq!(key_display(&key), "ConfigSetting");
486    }
487
488    #[test]
489    fn fetch_mode_default_is_strict() {
490        // Documented contract: new sources start in Strict until explicitly
491        // opted down. Keep this test as a guard against silent regressions.
492        let src = RpcSnapshotSource::new(dummy_client());
493        assert_eq!(src.fetch_mode, FetchMode::Strict);
494    }
495
496    #[test]
497    fn xdr_round_trip_preserves_ledger_entry() {
498        // The cache stores XDR bytes; correctness depends on encode/decode
499        // being a true identity. If a future XDR-codec change ever broke
500        // round-tripping, every cache hit would silently corrupt state —
501        // this test pins the invariant.
502        let (_, entry, _) = dummy_entry(42);
503        let encoded = encode_entry(&entry);
504        let decoded = decode_entry(&encoded);
505        assert_eq!(entry, decoded);
506        // And re-encoding the decoded value must reproduce the same bytes.
507        assert_eq!(encoded, encode_entry(&decoded));
508    }
509
510    #[test]
511    fn preload_then_entries_round_trips() {
512        let src = RpcSnapshotSource::new(dummy_client());
513        let original = vec![dummy_entry(7)];
514        src.preload(original.clone());
515        let exported = src.entries();
516        assert_eq!(exported.len(), 1);
517        assert_eq!(exported[0].0, original[0].0);
518        assert_eq!(exported[0].1, original[0].1);
519        assert_eq!(exported[0].2, original[0].2);
520    }
521
522    #[test]
523    fn get_returns_preloaded_entry() {
524        let src = RpcSnapshotSource::new(dummy_client());
525        let (key, entry, live_until) = dummy_entry(99);
526        src.preload(vec![(key.clone(), entry.clone(), live_until)]);
527
528        let key_rc = Rc::new(key);
529        let result = src.get(&key_rc).expect("get should not error");
530        let (got_entry, got_live_until) = result.expect("preloaded entry should be present");
531        assert_eq!(got_entry.as_ref(), &entry);
532        assert_eq!(got_live_until, live_until);
533        // Preloads do not count as fetches.
534        assert_eq!(src.fetch_count(), 0);
535    }
536
537    #[test]
538    fn concurrent_reads_of_preloaded_entry_are_race_free() {
539        // Eight threads racing through `get` on the same preloaded key.
540        // No fetch should fire (count stays 0). With RefCell the borrow
541        // would panic; with Mutex<bytes>, we get correct shared access.
542        use std::thread;
543        let src = Arc::new(RpcSnapshotSource::new(dummy_client()));
544        let (key, entry, live_until) = dummy_entry(123);
545        src.preload(vec![(key.clone(), entry.clone(), live_until)]);
546
547        let mut handles = Vec::new();
548        for _ in 0..8 {
549            let src = Arc::clone(&src);
550            let key = key.clone();
551            let entry = entry.clone();
552            handles.push(thread::spawn(move || {
553                let key_rc = Rc::new(key);
554                for _ in 0..100 {
555                    let got = src
556                        .get(&key_rc)
557                        .expect("get should not error")
558                        .expect("entry should be present");
559                    assert_eq!(got.0.as_ref(), &entry);
560                }
561            }));
562        }
563        for h in handles {
564            h.join().expect("worker thread panicked");
565        }
566        assert_eq!(src.fetch_count(), 0, "no RPC fetches should have fired");
567    }
568}