use std::collections::HashMap;
use std::rc::Rc;
use std::thread;
use std::time::{SystemTime, UNIX_EPOCH};
use log::{info, warn};
use sha2::{Digest, Sha256};
use soroban_env_host::budget::Budget;
use soroban_env_host::e2e_invoke::{
invoke_host_function_in_recording_mode, RecordingInvocationAuthMode,
};
use soroban_env_host::storage::SnapshotSource;
use soroban_env_host::xdr::{
AccountId, ContractEvent, DiagnosticEvent, Hash, HostFunction, LedgerEntry, LedgerKey, Limits,
ReadXdr, ScVal, SorobanAuthorizationEntry, SorobanResources, TransactionEnvelope,
TransactionSignaturePayload, TransactionSignaturePayloadTaggedTransaction, WriteXdr,
};
use tokio::sync::{mpsc, oneshot};
use crate::{ForkConfig, ForkError};
#[derive(Debug)]
pub(crate) enum Command {
GetNetwork {
reply: oneshot::Sender<NetworkReply>,
},
GetLatestLedger {
reply: oneshot::Sender<LatestLedgerReply>,
},
GetLedgersPage {
reply: oneshot::Sender<LedgersPageReply>,
},
GetLedgerEntries {
keys: Vec<LedgerKey>,
reply: oneshot::Sender<LedgerEntriesReply>,
},
SimulateTransaction {
host_function: HostFunction,
source_account: AccountId,
transaction_size_bytes: u32,
reply: oneshot::Sender<SimulationReply>,
},
SendTransaction {
envelope_bytes: Vec<u8>,
host_function: HostFunction,
source_account: AccountId,
reply: oneshot::Sender<SendReply>,
},
SetLedgerEntry {
key: LedgerKey,
entry: LedgerEntry,
live_until: Option<u32>,
reply: oneshot::Sender<()>,
},
CloseLedgers {
ledgers: u32,
timestamp_advance_seconds: u64,
reply: oneshot::Sender<CloseLedgersReply>,
},
GetTransaction {
hash: [u8; 32],
reply: oneshot::Sender<Option<TxReceipt>>,
},
}
#[derive(Debug)]
pub(crate) struct NetworkReply {
pub(crate) passphrase: String,
pub(crate) protocol_version: u32,
pub(crate) network_id_hex: String,
}
#[derive(Debug)]
pub(crate) struct LatestLedgerReply {
pub(crate) sequence: u32,
pub(crate) protocol_version: u32,
pub(crate) id: String,
}
#[derive(Debug)]
pub(crate) struct LedgersPageReply {
pub(crate) sequence: u32,
pub(crate) close_time: u64,
}
#[derive(Debug)]
pub(crate) struct LedgerEntriesReply {
pub(crate) entries: Vec<Option<(LedgerKey, LedgerEntry, Option<u32>)>>,
pub(crate) latest_ledger: u32,
}
#[derive(Debug)]
pub(crate) struct SimulationReply {
pub(crate) result: std::result::Result<ScVal, String>,
pub(crate) auth: Vec<SorobanAuthorizationEntry>,
pub(crate) resources: SorobanResources,
pub(crate) contract_events: Vec<ContractEvent>,
pub(crate) diagnostic_events: Vec<DiagnosticEvent>,
pub(crate) latest_ledger: u32,
pub(crate) min_resource_fee: Option<i64>,
pub(crate) mem_bytes: Option<u64>,
}
#[derive(Clone, Debug)]
pub(crate) struct TxReceipt {
pub(crate) result: std::result::Result<ScVal, String>,
pub(crate) envelope_bytes: Vec<u8>,
pub(crate) ledger: u32,
pub(crate) created_at: u64,
pub(crate) applied_changes: u32,
}
#[derive(Debug)]
pub(crate) struct SendReply {
pub(crate) hash: [u8; 32],
pub(crate) receipt: TxReceipt,
}
#[derive(Debug)]
pub(crate) struct CloseLedgersReply {
pub(crate) new_sequence: u32,
pub(crate) new_close_time: u64,
}
#[derive(Clone)]
pub(crate) struct ActorHandle {
tx: mpsc::Sender<Command>,
}
impl ActorHandle {
pub(crate) async fn send<R>(
&self,
build: impl FnOnce(oneshot::Sender<R>) -> Command,
) -> Result<R, ActorError> {
let (reply_tx, reply_rx) = oneshot::channel();
let cmd = build(reply_tx);
self.tx
.send(cmd)
.await
.map_err(|_| ActorError::WorkerGone)?;
reply_rx.await.map_err(|_| ActorError::WorkerGone)
}
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum ActorError {
#[error("worker thread is no longer running")]
WorkerGone,
}
pub(crate) fn spawn(
config: ForkConfig,
) -> (
ActorHandle,
oneshot::Receiver<std::result::Result<(), ForkError>>,
) {
let (tx, rx) = mpsc::channel(32);
let (ready_tx, ready_rx) = oneshot::channel();
thread::Builder::new()
.name("soroban-fork-worker".into())
.spawn(move || {
let env = match config.build() {
Ok(env) => {
let _ = ready_tx.send(Ok(()));
env
}
Err(e) => {
let _ = ready_tx.send(Err(e));
return;
}
};
worker_loop(env, rx);
})
.expect("spawn soroban-fork-worker thread");
(ActorHandle { tx }, ready_rx)
}
fn worker_loop(env: crate::ForkedEnv, mut rx: mpsc::Receiver<Command>) {
info!("soroban-fork: worker thread started");
let mut receipts: HashMap<[u8; 32], TxReceipt> = HashMap::new();
while let Some(cmd) = rx.blocking_recv() {
match cmd {
Command::GetNetwork { reply } => {
let passphrase = env
.passphrase()
.map(|s| s.to_string())
.unwrap_or_else(|| "Forked Soroban Network (custom network_id)".to_string());
let _ = reply.send(NetworkReply {
passphrase,
protocol_version: env.protocol_version(),
network_id_hex: hex_encode(&env.network_id()),
});
}
Command::GetLatestLedger { reply } => {
let _ = reply.send(LatestLedgerReply {
sequence: env.ledger_sequence(),
protocol_version: env.protocol_version(),
id: format!("forked-ledger-{}", env.ledger_sequence()),
});
}
Command::GetLedgersPage { reply } => {
let _ = reply.send(LedgersPageReply {
sequence: env.ledger_sequence(),
close_time: env.ledger_close_time(),
});
}
Command::GetLedgerEntries { keys, reply } => {
let entries = resolve_ledger_entries(&env, keys);
let _ = reply.send(LedgerEntriesReply {
entries,
latest_ledger: env.ledger_sequence(),
});
}
Command::SimulateTransaction {
host_function,
source_account,
transaction_size_bytes,
reply,
} => {
let _ = reply.send(simulate_transaction(
&env,
host_function,
source_account,
transaction_size_bytes,
));
}
Command::SendTransaction {
envelope_bytes,
host_function,
source_account,
reply,
} => {
let send_reply =
send_transaction(&env, envelope_bytes, host_function, source_account);
receipts.insert(send_reply.hash, send_reply.receipt.clone());
let _ = reply.send(send_reply);
}
Command::GetTransaction { hash, reply } => {
let _ = reply.send(receipts.get(&hash).cloned());
}
Command::SetLedgerEntry {
key,
entry,
live_until,
reply,
} => {
env.snapshot_source().set_entry(key, entry, live_until);
let _ = reply.send(());
}
Command::CloseLedgers {
ledgers,
timestamp_advance_seconds,
reply,
} => {
env.warp(ledgers, timestamp_advance_seconds);
let _ = reply.send(CloseLedgersReply {
new_sequence: env.ledger_sequence(),
new_close_time: env.ledger_close_time(),
});
}
}
}
warn!("soroban-fork: worker thread shutting down (channel closed)");
drop(env);
info!("soroban-fork: worker thread exited");
}
fn simulate_transaction(
env: &crate::ForkedEnv,
host_function: HostFunction,
source_account: AccountId,
transaction_size_bytes: u32,
) -> SimulationReply {
use soroban_sdk::testutils::Ledger as _;
let snapshot_source: Rc<dyn SnapshotSource> = env.snapshot_source().clone();
let ledger_info = env.env().ledger().get();
let budget = Budget::default();
let auth_mode = RecordingInvocationAuthMode::Recording(false);
let mut diagnostic_events: Vec<DiagnosticEvent> = Vec::new();
let result = invoke_host_function_in_recording_mode(
&budget,
true, &host_function,
&source_account,
auth_mode,
ledger_info,
snapshot_source,
[0u8; 32], &mut diagnostic_events,
);
let latest_ledger = env.ledger_sequence();
let mem_bytes = budget.get_mem_bytes_consumed().ok();
match result {
Ok(rec) => {
let invoke_result = rec.invoke_result.map_err(|e| format!("host error: {e}"));
let min_resource_fee = compute_min_resource_fee(
env,
&rec.resources,
&rec.contract_events,
transaction_size_bytes,
);
SimulationReply {
result: invoke_result,
auth: rec.auth,
resources: rec.resources,
contract_events: rec.contract_events,
diagnostic_events,
latest_ledger,
min_resource_fee,
mem_bytes,
}
}
Err(e) => {
SimulationReply {
result: Err(format!("recording-mode error: {e}")),
auth: Vec::new(),
resources: empty_soroban_resources(),
contract_events: Vec::new(),
diagnostic_events,
latest_ledger,
min_resource_fee: None,
mem_bytes,
}
}
}
}
fn send_transaction(
env: &crate::ForkedEnv,
envelope_bytes: Vec<u8>,
host_function: HostFunction,
source_account: AccountId,
) -> SendReply {
use soroban_sdk::testutils::Ledger as _;
let snapshot_source: Rc<dyn SnapshotSource> = env.snapshot_source().clone();
let ledger_info = env.env().ledger().get();
let budget = Budget::default();
let auth_mode = RecordingInvocationAuthMode::Recording(false);
let mut diagnostic_events: Vec<DiagnosticEvent> = Vec::new();
let invoke_result = invoke_host_function_in_recording_mode(
&budget,
true,
&host_function,
&source_account,
auth_mode,
ledger_info,
snapshot_source,
[0u8; 32],
&mut diagnostic_events,
);
let hash = canonical_tx_hash(&envelope_bytes, &env.network_id()).unwrap_or_else(|_| {
Sha256::digest(&envelope_bytes).into()
});
let ledger = env.ledger_sequence();
let created_at = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let (result, applied_changes) = match invoke_result {
Ok(rec) => {
let result = rec.invoke_result.map_err(|e| format!("host error: {e}"));
let applied = if result.is_ok() {
env.snapshot_source().apply_changes(rec.ledger_changes)
} else {
0
};
(result, applied)
}
Err(e) => (Err(format!("recording-mode error: {e}")), 0),
};
if result.is_ok()
&& env
.snapshot_source()
.bump_account_seq(&source_account)
.is_none()
{
log::debug!(
"soroban-fork: source account {source_account:?} not cached; \
seq not bumped (sendTransaction still applied)"
);
}
let receipt = TxReceipt {
result,
envelope_bytes,
ledger,
created_at,
applied_changes,
};
SendReply { hash, receipt }
}
fn canonical_tx_hash(envelope_bytes: &[u8], network_id: &[u8; 32]) -> Result<[u8; 32], String> {
let envelope = TransactionEnvelope::from_xdr(envelope_bytes, Limits::none())
.map_err(|e| format!("envelope decode for hash: {e}"))?;
let tagged = match envelope {
TransactionEnvelope::Tx(v1) => TransactionSignaturePayloadTaggedTransaction::Tx(v1.tx),
TransactionEnvelope::TxFeeBump(fb) => {
TransactionSignaturePayloadTaggedTransaction::TxFeeBump(fb.tx)
}
TransactionEnvelope::TxV0(_) => {
return Err("V0 envelopes do not have a Soroban-canonical hash".into());
}
};
let payload = TransactionSignaturePayload {
network_id: Hash(*network_id),
tagged_transaction: tagged,
};
let payload_xdr = payload
.to_xdr(Limits::none())
.map_err(|e| format!("payload encode for hash: {e}"))?;
Ok(Sha256::digest(&payload_xdr).into())
}
fn compute_min_resource_fee(
env: &crate::ForkedEnv,
resources: &SorobanResources,
contract_events: &[ContractEvent],
transaction_size_bytes: u32,
) -> Option<i64> {
use soroban_env_host::xdr::{DiagnosticEvent, Limits, WriteXdr};
let fee_config = match env.fee_configuration() {
Ok(cfg) => cfg,
Err(e) => {
warn!("soroban-fork: minResourceFee skipped — fee schedule unavailable: {e}");
return None;
}
};
let mut events_size: u32 = 0;
for ce in contract_events {
let de = DiagnosticEvent {
in_successful_contract_call: true,
event: ce.clone(),
};
match de.to_xdr(Limits::none()) {
Ok(bytes) => {
events_size = events_size.saturating_add(bytes.len() as u32);
}
Err(e) => {
warn!(
"soroban-fork: minResourceFee skipped — contract event XDR encode failed: {e}"
);
return None;
}
}
}
let footprint = &resources.footprint;
let read_only_count = footprint.read_only.len() as u32;
let read_write_count = footprint.read_write.len() as u32;
let tx_resources = crate::fees::TransactionResources {
instructions: resources.instructions,
disk_read_entries: read_only_count.saturating_add(read_write_count),
write_entries: read_write_count,
disk_read_bytes: resources.disk_read_bytes,
write_bytes: resources.write_bytes,
contract_events_size_bytes: events_size,
transaction_size_bytes,
};
let (non_refundable, refundable) =
crate::fees::compute_transaction_resource_fee(&tx_resources, fee_config);
Some(non_refundable.saturating_add(refundable))
}
fn empty_soroban_resources() -> SorobanResources {
use soroban_env_host::xdr::LedgerFootprint;
SorobanResources {
footprint: LedgerFootprint {
read_only: vec![].try_into().expect("empty vec into VecM"),
read_write: vec![].try_into().expect("empty vec into VecM"),
},
instructions: 0,
disk_read_bytes: 0,
write_bytes: 0,
}
}
fn resolve_ledger_entries(
env: &crate::ForkedEnv,
keys: Vec<LedgerKey>,
) -> Vec<Option<(LedgerKey, LedgerEntry, Option<u32>)>> {
let source = env.snapshot_source();
keys.into_iter()
.map(|key| {
let key_rc = Rc::new(key.clone());
match source.get(&key_rc) {
Ok(Some((entry_rc, live_until))) => {
Some((key, entry_rc.as_ref().clone(), live_until))
}
Ok(None) => None,
Err(_) => None,
}
})
.collect()
}
fn hex_encode(bytes: &[u8]) -> String {
let mut s = String::with_capacity(bytes.len() * 2);
for b in bytes {
s.push_str(&format!("{b:02x}"));
}
s
}