use std::collections::BTreeMap;
use std::rc::Rc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
use log::{info, warn};
use soroban_env_host::storage::{EntryWithLiveUntil, SnapshotSource};
use soroban_env_host::xdr::{
AccountId, ContractDataDurability, LedgerEntry, LedgerEntryData, LedgerKey, LedgerKeyAccount,
Limits, PublicKey, ReadXdr, ScAddress, ScVal, SequenceNumber, WriteXdr,
};
use soroban_env_host::HostError;
use crate::rpc::RpcClient;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum FetchMode {
Strict,
Lenient,
}
type CachedBytes = (Vec<u8>, Option<u32>);
pub struct RpcSnapshotSource {
cache: Mutex<BTreeMap<LedgerKey, Option<CachedBytes>>>,
client: Arc<RpcClient>,
fetch_count: AtomicU32,
fetch_mode: FetchMode,
}
const _: fn() = || {
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
assert_send::<RpcSnapshotSource>();
assert_sync::<RpcSnapshotSource>();
};
impl RpcSnapshotSource {
pub fn new(client: Arc<RpcClient>) -> Self {
Self {
cache: Mutex::new(BTreeMap::new()),
client,
fetch_count: AtomicU32::new(0),
fetch_mode: FetchMode::Strict,
}
}
pub fn with_fetch_mode(mut self, mode: FetchMode) -> Self {
self.fetch_mode = mode;
self
}
pub fn preload(
&self,
entries: impl IntoIterator<Item = (LedgerKey, LedgerEntry, Option<u32>)>,
) {
let mut cache = self.cache.lock().expect("cache mutex poisoned");
for (key, entry, live_until) in entries {
cache.insert(key, Some((encode_entry(&entry), live_until)));
}
}
pub fn fetch_count(&self) -> u32 {
self.fetch_count.load(Ordering::Relaxed)
}
pub fn set_entry(&self, key: LedgerKey, entry: LedgerEntry, live_until: Option<u32>) {
let bytes = encode_entry(&entry);
let mut cache = self.cache.lock().expect("cache mutex poisoned");
cache.insert(key, Some((bytes, live_until)));
}
pub fn bump_account_seq(&self, account_id: &AccountId) -> Option<i64> {
let key = LedgerKey::Account(LedgerKeyAccount {
account_id: account_id.clone(),
});
let mut cache = self.cache.lock().expect("cache mutex poisoned");
let cached = cache.get_mut(&key)?;
let bytes_and_ttl = cached.as_mut()?;
let bytes = &bytes_and_ttl.0;
let mut entry = LedgerEntry::from_xdr(bytes, Limits::none()).ok()?;
let new_seq = match &mut entry.data {
LedgerEntryData::Account(account) => {
let SequenceNumber(current) = account.seq_num;
let next = current.wrapping_add(1);
account.seq_num = SequenceNumber(next);
next
}
_ => return None,
};
let new_bytes = entry.to_xdr(Limits::none()).ok()?;
bytes_and_ttl.0 = new_bytes;
Some(new_seq)
}
pub fn apply_changes<I>(&self, changes: I) -> u32
where
I: IntoIterator<Item = soroban_env_host::e2e_invoke::LedgerEntryChange>,
{
let mut cache = self.cache.lock().expect("cache mutex poisoned");
let mut applied: u32 = 0;
for change in changes {
if change.read_only {
continue;
}
let key = LedgerKey::from_xdr(&change.encoded_key, Limits::none())
.unwrap_or_else(|e| panic!("apply_changes: bad LedgerKey from host: {e}"));
match change.encoded_new_value {
Some(bytes) => {
let live_until = change.ttl_change.as_ref().map(|t| t.new_live_until_ledger);
cache.insert(key, Some((bytes, live_until)));
}
None => {
cache.insert(key, None);
}
}
applied = applied.saturating_add(1);
}
applied
}
pub fn entries(&self) -> Vec<(LedgerKey, LedgerEntry, Option<u32>)> {
let raw: Vec<(LedgerKey, Vec<u8>, Option<u32>)> = {
let cache = self.cache.lock().expect("cache mutex poisoned");
cache
.iter()
.filter_map(|(key, val)| {
val.as_ref()
.map(|(bytes, live_until)| (key.clone(), bytes.clone(), *live_until))
})
.collect()
};
raw.into_iter()
.map(|(key, bytes, live_until)| (key, decode_entry(&bytes), live_until))
.collect()
}
fn fetch_from_rpc(&self, key: &LedgerKey) -> Option<EntryWithLiveUntil> {
let count = self.fetch_count.fetch_add(1, Ordering::Relaxed) + 1;
info!("soroban-fork: fetch #{count}: {}", key_display(key));
let result: Option<EntryWithLiveUntil> = match self.client.fetch_entry(key) {
Ok(Some(fetched)) => Some((Rc::new(fetched.entry), fetched.live_until)),
Ok(None) => {
info!("soroban-fork: fetch #{count}: not found on ledger");
None
}
Err(e) => match self.fetch_mode {
FetchMode::Strict => {
panic!("soroban-fork: RPC fetch #{count} failed (strict): {e}")
}
FetchMode::Lenient => {
warn!("soroban-fork: RPC fetch #{count} error (lenient): {e}");
None
}
},
};
let cached = result
.as_ref()
.map(|(rc, live_until)| (encode_entry(rc.as_ref()), *live_until));
self.cache
.lock()
.expect("cache mutex poisoned")
.insert(key.clone(), cached);
result
}
}
impl SnapshotSource for RpcSnapshotSource {
fn get(
&self,
key: &Rc<LedgerKey>,
) -> std::result::Result<Option<EntryWithLiveUntil>, HostError> {
let cached = self
.cache
.lock()
.expect("cache mutex poisoned")
.get(key.as_ref())
.cloned();
if let Some(value) = cached {
return Ok(value.map(|(bytes, live_until)| (Rc::new(decode_entry(&bytes)), live_until)));
}
Ok(self.fetch_from_rpc(key.as_ref()))
}
}
fn encode_entry(entry: &LedgerEntry) -> Vec<u8> {
entry
.to_xdr(Limits::none())
.unwrap_or_else(|e| panic!("soroban-fork: LedgerEntry encode failed (structural bug): {e}"))
}
fn decode_entry(bytes: &[u8]) -> LedgerEntry {
LedgerEntry::from_xdr(bytes, Limits::none()).unwrap_or_else(|e| {
panic!(
"soroban-fork: cached LedgerEntry decode failed — cache corruption or \
XDR-version mismatch: {e}"
)
})
}
fn key_display(key: &LedgerKey) -> String {
match key {
LedgerKey::ContractData(cd) => {
let addr = sc_address_short(&cd.contract);
if cd.key == ScVal::LedgerKeyContractInstance {
format!("ContractData({addr}, instance)")
} else {
let dur = match cd.durability {
ContractDataDurability::Temporary => "temp",
ContractDataDurability::Persistent => "persistent",
};
format!("ContractData({addr}, {dur})")
}
}
LedgerKey::ContractCode(cc) => {
let h = &cc.hash.0;
format!(
"ContractCode({:02x}{:02x}{:02x}{:02x}...)",
h[0], h[1], h[2], h[3]
)
}
LedgerKey::Account(a) => {
format!("Account({})", account_id_short(&a.account_id))
}
LedgerKey::Trustline(t) => {
format!("Trustline({})", account_id_short(&t.account_id))
}
LedgerKey::ConfigSetting(_) => "ConfigSetting".to_string(),
LedgerKey::Ttl(_) => "Ttl".to_string(),
_ => "Other".to_string(),
}
}
fn sc_address_short(addr: &ScAddress) -> String {
let full = match addr {
ScAddress::Contract(hash) => {
format!("{}", stellar_strkey::Contract(hash.0.clone().into()))
}
ScAddress::Account(id) => account_id_full(id),
_ => "???".to_string(),
};
abbreviate(&full)
}
fn account_id_short(id: &soroban_env_host::xdr::AccountId) -> String {
abbreviate(&account_id_full(id))
}
fn account_id_full(id: &soroban_env_host::xdr::AccountId) -> String {
let PublicKey::PublicKeyTypeEd25519(k) = &id.0;
format!("{}", stellar_strkey::ed25519::PublicKey(k.0))
}
fn abbreviate(s: &str) -> String {
if s.len() > 12 {
format!("{}...{}", &s[..4], &s[s.len() - 4..])
} else {
s.to_string()
}
}
#[cfg(test)]
mod tests {
use super::*;
use soroban_env_host::xdr::{
ConfigSettingEntry, ConfigSettingId, LedgerEntryData, LedgerEntryExt,
LedgerKeyConfigSetting,
};
fn dummy_client() -> Arc<RpcClient> {
Arc::new(
RpcClient::new("http://localhost:0", crate::rpc::RpcConfig::default())
.expect("client construction should not fail"),
)
}
fn dummy_entry(last_modified: u32) -> (LedgerKey, LedgerEntry, Option<u32>) {
let key = LedgerKey::ConfigSetting(LedgerKeyConfigSetting {
config_setting_id: ConfigSettingId::ContractMaxSizeBytes,
});
let entry = LedgerEntry {
last_modified_ledger_seq: last_modified,
data: LedgerEntryData::ConfigSetting(ConfigSettingEntry::ContractMaxSizeBytes(65_536)),
ext: LedgerEntryExt::V0,
};
(key, entry, None)
}
#[test]
fn abbreviate_short_string_is_unchanged() {
assert_eq!(abbreviate("abc"), "abc");
assert_eq!(abbreviate("12345678"), "12345678");
}
#[test]
fn abbreviate_long_string_collapses_middle() {
let full = "GABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890ABCDEFGHIJKLMNOPQR";
let short = abbreviate(full);
assert_eq!(short, "GABC...OPQR");
assert!(short.len() < full.len());
}
#[test]
fn key_display_renders_config_setting() {
let key = LedgerKey::ConfigSetting(LedgerKeyConfigSetting {
config_setting_id: ConfigSettingId::ContractMaxSizeBytes,
});
assert_eq!(key_display(&key), "ConfigSetting");
}
#[test]
fn fetch_mode_default_is_strict() {
let src = RpcSnapshotSource::new(dummy_client());
assert_eq!(src.fetch_mode, FetchMode::Strict);
}
#[test]
fn xdr_round_trip_preserves_ledger_entry() {
let (_, entry, _) = dummy_entry(42);
let encoded = encode_entry(&entry);
let decoded = decode_entry(&encoded);
assert_eq!(entry, decoded);
assert_eq!(encoded, encode_entry(&decoded));
}
#[test]
fn preload_then_entries_round_trips() {
let src = RpcSnapshotSource::new(dummy_client());
let original = vec![dummy_entry(7)];
src.preload(original.clone());
let exported = src.entries();
assert_eq!(exported.len(), 1);
assert_eq!(exported[0].0, original[0].0);
assert_eq!(exported[0].1, original[0].1);
assert_eq!(exported[0].2, original[0].2);
}
#[test]
fn get_returns_preloaded_entry() {
let src = RpcSnapshotSource::new(dummy_client());
let (key, entry, live_until) = dummy_entry(99);
src.preload(vec![(key.clone(), entry.clone(), live_until)]);
let key_rc = Rc::new(key);
let result = src.get(&key_rc).expect("get should not error");
let (got_entry, got_live_until) = result.expect("preloaded entry should be present");
assert_eq!(got_entry.as_ref(), &entry);
assert_eq!(got_live_until, live_until);
assert_eq!(src.fetch_count(), 0);
}
#[test]
fn concurrent_reads_of_preloaded_entry_are_race_free() {
use std::thread;
let src = Arc::new(RpcSnapshotSource::new(dummy_client()));
let (key, entry, live_until) = dummy_entry(123);
src.preload(vec![(key.clone(), entry.clone(), live_until)]);
let mut handles = Vec::new();
for _ in 0..8 {
let src = Arc::clone(&src);
let key = key.clone();
let entry = entry.clone();
handles.push(thread::spawn(move || {
let key_rc = Rc::new(key);
for _ in 0..100 {
let got = src
.get(&key_rc)
.expect("get should not error")
.expect("entry should be present");
assert_eq!(got.0.as_ref(), &entry);
}
}));
}
for h in handles {
h.join().expect("worker thread panicked");
}
assert_eq!(src.fetch_count(), 0, "no RPC fetches should have fired");
}
}