soroban-fork 0.9.2

Lazy-loading mainnet/testnet fork for Soroban tests. Inspired by Foundry's Anvil.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
//! Single-threaded actor that owns the `ForkedEnv` and processes RPC
//! commands serially.
//!
//! # Why an actor
//!
//! `soroban_sdk::Env` is `!Send` — its internal `Host = Rc<HostImpl>`
//! pins it to a single thread. We can't share it via `Arc<RwLock<Env>>`,
//! and rebuilding it per-request would invalidate any state the user
//! has accumulated (deal_token, mock_all_auths, future cheatcodes).
//!
//! The fix is the actor pattern: HTTP handlers (running on a multi-thread
//! tokio runtime) send `Command`s through a `tokio::sync::mpsc` channel
//! to one OS thread that owns the `ForkedEnv`. The worker processes
//! commands serially in a blocking loop; each command carries a
//! `oneshot::Sender` for the reply, so handlers `await` on the receiver
//! and the OS thread is freed while the worker computes.
//!
//! # Trade-offs
//!
//! - **Throughput is bounded by single-threaded execution** of contract
//!   calls. Soroban contract execution is μs–ms range, and we're a test
//!   harness, so this is fine.
//! - **A panicking handler kills the worker thread**, dropping the
//!   `Receiver` and breaking the whole server. v0.5 documents this; a
//!   future version may add panic recovery via `catch_unwind` in the
//!   worker loop.
//! - **Cache misses on `getLedgerEntries` block all other requests** for
//!   the duration of the upstream RPC call. Steady-state cache hits are
//!   instant; first-touch latency is one RPC round-trip per uncached key.

use std::collections::HashMap;
use std::rc::Rc;
use std::thread;
use std::time::{SystemTime, UNIX_EPOCH};

use log::{info, warn};
use sha2::{Digest, Sha256};
use soroban_env_host::budget::Budget;
use soroban_env_host::e2e_invoke::{
    invoke_host_function_in_recording_mode, RecordingInvocationAuthMode,
};
use soroban_env_host::storage::SnapshotSource;
use soroban_env_host::xdr::{
    AccountId, ContractEvent, DiagnosticEvent, Hash, HostFunction, LedgerEntry, LedgerKey, Limits,
    ReadXdr, ScVal, SorobanAuthorizationEntry, SorobanResources, TransactionEnvelope,
    TransactionSignaturePayload, TransactionSignaturePayloadTaggedTransaction, WriteXdr,
};
use tokio::sync::{mpsc, oneshot};

use crate::{ForkConfig, ForkError};

/// A unit of work the worker can execute against `ForkedEnv`.
///
/// Each variant carries its inputs and a `oneshot::Sender` for the
/// reply. Using a sum type (rather than `Box<dyn FnOnce>`) keeps the
/// command surface explicit and `Debug`-printable for tracing.
#[derive(Debug)]
pub(crate) enum Command {
    /// Snapshot of network metadata captured at fork-build time. No
    /// upstream call.
    GetNetwork {
        reply: oneshot::Sender<NetworkReply>,
    },
    /// The forked Env's current ledger info. No upstream call.
    GetLatestLedger {
        reply: oneshot::Sender<LatestLedgerReply>,
    },
    /// The fork-point ledger as a single-element Stellar `getLedgers`
    /// page. The fork is a frozen snapshot — there's exactly one ledger
    /// of state, regardless of what the caller's `start_ledger` was, so
    /// the request's `start_ledger` is intentionally not threaded
    /// through; we always answer with our own sequence.
    GetLedgersPage {
        reply: oneshot::Sender<LedgersPageReply>,
    },
    /// Resolve a batch of `LedgerKey`s through the snapshot source —
    /// cache hits are instant; misses trigger upstream RPC fetches that
    /// block the worker for the round-trip.
    GetLedgerEntries {
        keys: Vec<LedgerKey>,
        reply: oneshot::Sender<LedgerEntriesReply>,
    },
    /// Run a host function in recording mode and return everything the
    /// host observed: result, auth requirements, footprint, events,
    /// budget consumption. Does **not** mutate the env's state (the
    /// host primitive constructs its own throwaway sandbox per call).
    ///
    /// `transaction_size_bytes` is the on-the-wire length of the
    /// `TransactionEnvelope` the handler decoded. The worker needs it
    /// to compute the bandwidth + historical-data components of
    /// `minResourceFee`; threading it as a `Command` field keeps fee
    /// math centralised in the worker (where the live fee schedule
    /// lives) rather than splitting it across handler and worker.
    SimulateTransaction {
        host_function: HostFunction,
        source_account: AccountId,
        transaction_size_bytes: u32,
        reply: oneshot::Sender<SimulationReply>,
    },
    /// Apply a transaction's writes to the fork's snapshot source so
    /// subsequent reads see them.
    ///
    /// We run the same `invoke_host_function_in_recording_mode` path as
    /// `simulateTransaction`, but instead of throwing the result away,
    /// we capture `ledger_changes` and feed them back via
    /// `RpcSnapshotSource::apply_changes`. Auth runs in trust mode
    /// (`Recording(false)`) so unsigned envelopes from test code work
    /// without ceremony.
    ///
    /// Receipts are stored on the worker keyed by the SHA-256 of the
    /// envelope; `GetTransaction` looks them up.
    SendTransaction {
        envelope_bytes: Vec<u8>,
        host_function: HostFunction,
        source_account: AccountId,
        reply: oneshot::Sender<SendReply>,
    },
    /// Force-write a `LedgerEntry` directly into the snapshot
    /// source, bypassing any host-level checks. The load-bearing
    /// primitive for stress-testing scenarios — oracle price
    /// manipulation, force-set token balances, replace contract
    /// code, all reduce to one entry write.
    ///
    /// `live_until` carries an optional TTL hint for entries that
    /// have one (`ContractData`, `ContractCode`); pass `None` for
    /// entries that don't (`Account`, `Trustline`).
    SetLedgerEntry {
        key: LedgerKey,
        entry: LedgerEntry,
        live_until: Option<u32>,
        reply: oneshot::Sender<()>,
    },
    /// Close `ledgers` ledgers and bump the close-time by
    /// `timestamp_advance_seconds`. Pushes time-sensitive contract
    /// logic (vesting cliffs, oracle staleness) past thresholds
    /// without orchestrating a real consensus round.
    ///
    /// Stellar talks about *closing* ledgers (finalising one and
    /// moving on); this is that, simulated locally — the fork's
    /// reported sequence + timestamp advance the same way real
    /// ledger close moves them.
    CloseLedgers {
        ledgers: u32,
        timestamp_advance_seconds: u64,
        reply: oneshot::Sender<CloseLedgersReply>,
    },
    /// Look up a previously-applied transaction's receipt by hash.
    /// Hash is the 32-byte SHA-256 of the original envelope bytes.
    GetTransaction {
        hash: [u8; 32],
        reply: oneshot::Sender<Option<TxReceipt>>,
    },
}

#[derive(Debug)]
pub(crate) struct NetworkReply {
    /// Network passphrase, or a synthesised label when the user
    /// overrode `network_id` and the original passphrase is unknown.
    pub(crate) passphrase: String,
    pub(crate) protocol_version: u32,
    /// Hex-encoded SHA-256 of the passphrase (a.k.a. network ID).
    pub(crate) network_id_hex: String,
}

#[derive(Debug)]
pub(crate) struct LatestLedgerReply {
    pub(crate) sequence: u32,
    pub(crate) protocol_version: u32,
    /// Synthesised stable identifier for the fork-point ledger. The
    /// real RPC returns a 32-byte ledger hash; we don't have one (we
    /// forked from a snapshot, not a Stellar ledger), so we generate
    /// a deterministic label from the sequence.
    pub(crate) id: String,
}

#[derive(Debug)]
pub(crate) struct LedgersPageReply {
    pub(crate) sequence: u32,
    pub(crate) close_time: u64,
}

#[derive(Debug)]
pub(crate) struct LedgerEntriesReply {
    /// Per-key result, in the same order the keys were given. `None`
    /// means the key is absent from the ledger (and we asked the
    /// upstream RPC to confirm); `Some` carries the entry plus its
    /// optional live-until-ledger TTL hint.
    pub(crate) entries: Vec<Option<(LedgerKey, LedgerEntry, Option<u32>)>>,
    /// Sequence number reported as `latestLedger` on the wire — the
    /// fork's reported ledger.
    pub(crate) latest_ledger: u32,
}

/// Recording-mode simulation outcome. We avoid carrying `HostError`
/// across the channel (it isn't `Send` in all circumstances and
/// stringifying loses no useful client-facing information) so we map
/// the failure case to a human-readable message at the worker boundary.
#[derive(Debug)]
pub(crate) struct SimulationReply {
    /// `Ok(scval)` on simulation success, `Err(message)` when the host
    /// reported an error during invocation. The wire response carries
    /// the message in the top-level `error` field.
    pub(crate) result: std::result::Result<ScVal, String>,
    /// Recorded auth entries that a real `sendTransaction` would need
    /// to be signed with.
    pub(crate) auth: Vec<SorobanAuthorizationEntry>,
    /// Footprint (read+write keys) and resource accounting (instructions,
    /// disk-read/write bytes). Becomes `transactionData.resources`.
    pub(crate) resources: SorobanResources,
    /// Contract-emitted events.
    pub(crate) contract_events: Vec<ContractEvent>,
    /// Diagnostic events captured if tracing was on (fn_call/fn_return
    /// pairs). Empty otherwise.
    pub(crate) diagnostic_events: Vec<DiagnosticEvent>,
    /// Echoed `latestLedger` for the response wire shape.
    pub(crate) latest_ledger: u32,
    /// Resource fee a real `sendTransaction` would owe at the live
    /// network's fee schedule, summed from non-refundable and
    /// refundable components. `None` when the schedule could not be
    /// resolved — the wire response then omits `minResourceFee`
    /// rather than lying with `"0"`.
    pub(crate) min_resource_fee: Option<i64>,
    /// Real memory in bytes the host's budget metered during the
    /// invocation, queried via `Budget::get_mem_bytes_consumed`.
    /// `None` only on the recording-mode failure path before any
    /// metering happened.
    pub(crate) mem_bytes: Option<u64>,
}

/// Receipt for a transaction the worker actually applied to the
/// snapshot source. Stored in the worker-local receipt map keyed by
/// envelope hash; `getTransaction` reads from there.
///
/// We avoid building a full `TransactionResult` / `TransactionMeta`
/// XDR for the v0.6 wire shape — those are non-trivial Stellar-core
/// types that real `getTransaction` callers consume. v0.6 emits the
/// invocation's `ScVal` return value and a status string; building
/// out the full meta-XDR is a v0.6.x followup.
#[derive(Clone, Debug)]
pub(crate) struct TxReceipt {
    /// `Ok(scval)` on application success, `Err(message)` when the
    /// host reported an error during execution.
    pub(crate) result: std::result::Result<ScVal, String>,
    /// Original `TransactionEnvelope` bytes — echoed back as
    /// `envelopeXdr` on the wire so clients can verify the receipt
    /// matches the envelope they sent.
    pub(crate) envelope_bytes: Vec<u8>,
    /// Ledger sequence at apply time. Each `sendTransaction` advances
    /// the fork's reported sequence by one (ledger close by side
    /// effect).
    pub(crate) ledger: u32,
    /// Unix-seconds timestamp the receipt was created at. Used by the
    /// wire `createdAt` field.
    pub(crate) created_at: u64,
    /// How many `LedgerEntryChange`s were applied to the snapshot
    /// source. Useful for assertions in smoke tests; surfaced as a
    /// debug info field on the wire.
    pub(crate) applied_changes: u32,
}

/// Reply for `Command::SendTransaction` — the receipt we built plus
/// the hash we keyed it by, so the handler can echo the hash back.
#[derive(Debug)]
pub(crate) struct SendReply {
    pub(crate) hash: [u8; 32],
    pub(crate) receipt: TxReceipt,
}

/// Reply for `Command::CloseLedgers` — new ledger sequence +
/// close-time so the wire response can confirm the bump without
/// a follow-up `getLatestLedger` round-trip.
#[derive(Debug)]
pub(crate) struct CloseLedgersReply {
    pub(crate) new_sequence: u32,
    pub(crate) new_close_time: u64,
}

/// Handle to the worker thread. Cloning is cheap (Arc-style internally
/// in `mpsc::Sender`) so handlers can clone freely.
#[derive(Clone)]
pub(crate) struct ActorHandle {
    tx: mpsc::Sender<Command>,
}

impl ActorHandle {
    /// Send a command and `await` the reply.
    ///
    /// Two failure modes, both surfaced as `internal_error` to clients:
    /// - The send queue is full (worker too slow). Tokio's bounded
    ///   channel applies backpressure here; we don't want unbounded
    ///   queueing under load.
    /// - The worker has died (channel closed). The server is in an
    ///   unrecoverable state at that point.
    pub(crate) async fn send<R>(
        &self,
        build: impl FnOnce(oneshot::Sender<R>) -> Command,
    ) -> Result<R, ActorError> {
        let (reply_tx, reply_rx) = oneshot::channel();
        let cmd = build(reply_tx);
        self.tx
            .send(cmd)
            .await
            .map_err(|_| ActorError::WorkerGone)?;
        reply_rx.await.map_err(|_| ActorError::WorkerGone)
    }
}

/// Failure modes when communicating with the worker.
#[derive(Debug, thiserror::Error)]
pub(crate) enum ActorError {
    /// Worker thread is no longer running. Either it panicked or the
    /// server is shutting down. Either way, the only correct behaviour
    /// is to fail subsequent requests fast.
    #[error("worker thread is no longer running")]
    WorkerGone,
}

/// Spawn the worker thread and return a handle that the HTTP layer can
/// use to enqueue commands.
///
/// **The Env is built inside the worker thread, never crossing a thread
/// boundary.** That's the load-bearing constraint of this whole module:
/// `Env` contains `Rc<HostImpl>` which is `!Send`, so any attempt to
/// pass an already-built `ForkedEnv` into `thread::spawn` fails to
/// compile. The trade-off is that build errors surface asynchronously
/// through `ready_rx` — callers must `.await` it before serving.
pub(crate) fn spawn(
    config: ForkConfig,
) -> (
    ActorHandle,
    oneshot::Receiver<std::result::Result<(), ForkError>>,
) {
    // 32 = a small bounded queue. If the worker can't keep up, handlers
    // back-pressure (their `.send().await` waits) rather than spinning
    // up unbounded RAM. For a test-fork server, 32 in-flight requests
    // is comfortably more than any realistic test suite generates.
    let (tx, rx) = mpsc::channel(32);
    let (ready_tx, ready_rx) = oneshot::channel();

    // `std::thread::spawn` (not `tokio::task::spawn_blocking`) because
    // we need a long-lived OS thread that *owns* the !Send Env, not a
    // pool worker that might migrate between calls.
    thread::Builder::new()
        .name("soroban-fork-worker".into())
        .spawn(move || {
            let env = match config.build() {
                Ok(env) => {
                    let _ = ready_tx.send(Ok(()));
                    env
                }
                Err(e) => {
                    let _ = ready_tx.send(Err(e));
                    return;
                }
            };
            worker_loop(env, rx);
        })
        .expect("spawn soroban-fork-worker thread");

    (ActorHandle { tx }, ready_rx)
}

/// Main loop. Pulls commands from the channel, dispatches, sends
/// replies. Exits when the channel closes (all senders dropped =
/// server shutting down).
fn worker_loop(env: crate::ForkedEnv, mut rx: mpsc::Receiver<Command>) {
    info!("soroban-fork: worker thread started");

    // Per-worker receipt store. The worker owns this; everything that
    // needs to read it goes through `Command::GetTransaction` so we
    // don't have to make `TxReceipt: Send` or wrap in Arc/Mutex.
    let mut receipts: HashMap<[u8; 32], TxReceipt> = HashMap::new();

    while let Some(cmd) = rx.blocking_recv() {
        match cmd {
            Command::GetNetwork { reply } => {
                let passphrase = env
                    .passphrase()
                    // Passphrase is missing only when the user passed an
                    // explicit `network_id` override. We synthesise a
                    // label so the wire shape stays valid; a real client
                    // that needs the exact passphrase string should not
                    // override `network_id` in the first place.
                    .map(|s| s.to_string())
                    .unwrap_or_else(|| "Forked Soroban Network (custom network_id)".to_string());
                let _ = reply.send(NetworkReply {
                    passphrase,
                    protocol_version: env.protocol_version(),
                    network_id_hex: hex_encode(&env.network_id()),
                });
            }
            Command::GetLatestLedger { reply } => {
                let _ = reply.send(LatestLedgerReply {
                    sequence: env.ledger_sequence(),
                    protocol_version: env.protocol_version(),
                    id: format!("forked-ledger-{}", env.ledger_sequence()),
                });
            }
            Command::GetLedgersPage { reply } => {
                // We have one ledger of state; clients may ask for any
                // `start_ledger` but we always answer with the fork
                // point. The wire shape stays valid either way.
                let _ = reply.send(LedgersPageReply {
                    sequence: env.ledger_sequence(),
                    close_time: env.ledger_close_time(),
                });
            }
            Command::GetLedgerEntries { keys, reply } => {
                let entries = resolve_ledger_entries(&env, keys);
                let _ = reply.send(LedgerEntriesReply {
                    entries,
                    latest_ledger: env.ledger_sequence(),
                });
            }
            Command::SimulateTransaction {
                host_function,
                source_account,
                transaction_size_bytes,
                reply,
            } => {
                let _ = reply.send(simulate_transaction(
                    &env,
                    host_function,
                    source_account,
                    transaction_size_bytes,
                ));
            }
            Command::SendTransaction {
                envelope_bytes,
                host_function,
                source_account,
                reply,
            } => {
                let send_reply =
                    send_transaction(&env, envelope_bytes, host_function, source_account);
                receipts.insert(send_reply.hash, send_reply.receipt.clone());
                let _ = reply.send(send_reply);
            }
            Command::GetTransaction { hash, reply } => {
                let _ = reply.send(receipts.get(&hash).cloned());
            }
            Command::SetLedgerEntry {
                key,
                entry,
                live_until,
                reply,
            } => {
                env.snapshot_source().set_entry(key, entry, live_until);
                let _ = reply.send(());
            }
            Command::CloseLedgers {
                ledgers,
                timestamp_advance_seconds,
                reply,
            } => {
                // `warp` mutates the SDK's live LedgerInfo; our
                // `ledger_sequence()` / `ledger_close_time()`
                // accessors read straight from there, so subsequent
                // `getLatestLedger` calls reflect the new values.
                env.warp(ledgers, timestamp_advance_seconds);
                let _ = reply.send(CloseLedgersReply {
                    new_sequence: env.ledger_sequence(),
                    new_close_time: env.ledger_close_time(),
                });
            }
        }
    }
    warn!("soroban-fork: worker thread shutting down (channel closed)");
    drop(env);
    info!("soroban-fork: worker thread exited");
}

/// Run `invoke_host_function_in_recording_mode` against the forked
/// snapshot source and translate the result into a `SimulationReply`.
///
/// **No state mutation.** The host primitive builds its own throwaway
/// sandbox internally — calling this method twice from the same env
/// is idempotent and side-effect-free. That's exactly what
/// `simulateTransaction` should do.
fn simulate_transaction(
    env: &crate::ForkedEnv,
    host_function: HostFunction,
    source_account: AccountId,
    transaction_size_bytes: u32,
) -> SimulationReply {
    use soroban_sdk::testutils::Ledger as _;

    let snapshot_source: Rc<dyn SnapshotSource> = env.snapshot_source().clone();
    let ledger_info = env.env().ledger().get();
    let budget = Budget::default();

    // `Recording(false)` = "track auth as the contract calls
    // require_auth(...) and report the entries needed". `false` allows
    // non-root authorizations (per the host's terminology). Callers
    // who want to enforce specific signed entries can use
    // `sendTransaction` in v0.6 with explicit auth.
    let auth_mode = RecordingInvocationAuthMode::Recording(false);

    let mut diagnostic_events: Vec<DiagnosticEvent> = Vec::new();
    let result = invoke_host_function_in_recording_mode(
        &budget,
        true, // enable_diagnostics — captures fn_call/fn_return events
        &host_function,
        &source_account,
        auth_mode,
        ledger_info,
        snapshot_source,
        [0u8; 32], // base_prng_seed — deterministic for reproducible simulations
        &mut diagnostic_events,
    );

    let latest_ledger = env.ledger_sequence();

    // Memory accounting reads from the same Budget that was just charged
    // by the host. On the failure path the budget may not have been
    // exercised; treat that as `None` rather than a misleading 0.
    let mem_bytes = budget.get_mem_bytes_consumed().ok();

    match result {
        Ok(rec) => {
            let invoke_result = rec.invoke_result.map_err(|e| format!("host error: {e}"));
            let min_resource_fee = compute_min_resource_fee(
                env,
                &rec.resources,
                &rec.contract_events,
                transaction_size_bytes,
            );
            SimulationReply {
                result: invoke_result,
                auth: rec.auth,
                resources: rec.resources,
                contract_events: rec.contract_events,
                diagnostic_events,
                latest_ledger,
                min_resource_fee,
                mem_bytes,
            }
        }
        Err(e) => {
            // Recording-mode-level error (budget exhaustion). The wire
            // response sets `error` and elides everything else.
            SimulationReply {
                result: Err(format!("recording-mode error: {e}")),
                auth: Vec::new(),
                resources: empty_soroban_resources(),
                contract_events: Vec::new(),
                diagnostic_events,
                latest_ledger,
                min_resource_fee: None,
                mem_bytes,
            }
        }
    }
}

/// Apply a transaction to the fork's snapshot source so subsequent
/// reads see its writes. Builds the receipt the worker stores.
///
/// Same recording-mode entry point as [`simulate_transaction`], but
/// instead of discarding `ledger_changes` we feed them back into the
/// source via [`crate::RpcSnapshotSource::apply_changes`]. The host
/// runs with relaxed (trust-mode) auth — `Recording(false)` — so
/// unsigned envelopes from test code apply without ceremony.
///
/// **What this is not.** v0.6 does not advance the fork's reported
/// ledger sequence on each send (no block-by-side-effect yet). State
/// changes persist in the cache; ledger metadata stays at fork-point.
/// Adding ledger advancement is a v0.6.x followup once the
/// timestamp/sequence Cell-vs-warp question is resolved.
fn send_transaction(
    env: &crate::ForkedEnv,
    envelope_bytes: Vec<u8>,
    host_function: HostFunction,
    source_account: AccountId,
) -> SendReply {
    use soroban_sdk::testutils::Ledger as _;

    let snapshot_source: Rc<dyn SnapshotSource> = env.snapshot_source().clone();
    let ledger_info = env.env().ledger().get();
    let budget = Budget::default();
    let auth_mode = RecordingInvocationAuthMode::Recording(false);

    let mut diagnostic_events: Vec<DiagnosticEvent> = Vec::new();
    let invoke_result = invoke_host_function_in_recording_mode(
        &budget,
        true,
        &host_function,
        &source_account,
        auth_mode,
        ledger_info,
        snapshot_source,
        [0u8; 32],
        &mut diagnostic_events,
    );

    let hash = canonical_tx_hash(&envelope_bytes, &env.network_id()).unwrap_or_else(|_| {
        // Envelope decode shouldn't fail here — it already round-tripped
        // through `extract_invoke_op` at the handler. Fall back to a
        // raw envelope hash so we always return *some* hash; the
        // receipt store still works internally even if a JS SDK client
        // can't predict it.
        Sha256::digest(&envelope_bytes).into()
    });
    let ledger = env.ledger_sequence();
    let created_at = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .map(|d| d.as_secs())
        .unwrap_or(0);

    let (result, applied_changes) = match invoke_result {
        Ok(rec) => {
            let result = rec.invoke_result.map_err(|e| format!("host error: {e}"));
            // Only apply changes when the host invocation actually
            // succeeded — a failed invocation produces no semantic
            // writes (the host doesn't expose partial mutations).
            let applied = if result.is_ok() {
                env.snapshot_source().apply_changes(rec.ledger_changes)
            } else {
                0
            };
            (result, applied)
        }
        Err(e) => (Err(format!("recording-mode error: {e}")), 0),
    };

    // Bump the source account's seq_num so subsequent envelopes
    // built from `getAccount` → `seq + 1` validate as the host
    // expects. Only on success — a failed invocation should not
    // burn a sequence number, mirroring real Stellar core where
    // a tx that fails validation doesn't advance the account's
    // seq either.
    //
    // `bump_account_seq` returns `None` when the source account
    // isn't cached locally; that's the "first send from a never-
    // pre-funded mainnet account" case. We log and move on rather
    // than failing the send — the receipt still says SUCCESS for
    // the host invocation itself, which is the truth.
    if result.is_ok()
        && env
            .snapshot_source()
            .bump_account_seq(&source_account)
            .is_none()
    {
        log::debug!(
            "soroban-fork: source account {source_account:?} not cached; \
             seq not bumped (sendTransaction still applied)"
        );
    }

    let receipt = TxReceipt {
        result,
        envelope_bytes,
        ledger,
        created_at,
        applied_changes,
    };
    SendReply { hash, receipt }
}

/// Compute the canonical Stellar transaction hash from an envelope —
/// the same value `stellar-rpc` and the JS SDK compute when polling
/// `getTransaction`. Defined as `sha256(TransactionSignaturePayload)`
/// where the payload carries the network ID and the inner V1 (or
/// FeeBump) transaction.
///
/// Using the wire-canonical hash (rather than `sha256(envelope_bytes)`)
/// matters because clients build their own hash from the envelope
/// they sent and use it as the lookup key. A raw envelope hash would
/// be internally consistent but invisible to those clients.
fn canonical_tx_hash(envelope_bytes: &[u8], network_id: &[u8; 32]) -> Result<[u8; 32], String> {
    let envelope = TransactionEnvelope::from_xdr(envelope_bytes, Limits::none())
        .map_err(|e| format!("envelope decode for hash: {e}"))?;
    let tagged = match envelope {
        TransactionEnvelope::Tx(v1) => TransactionSignaturePayloadTaggedTransaction::Tx(v1.tx),
        TransactionEnvelope::TxFeeBump(fb) => {
            TransactionSignaturePayloadTaggedTransaction::TxFeeBump(fb.tx)
        }
        TransactionEnvelope::TxV0(_) => {
            // V0 envelopes don't carry Soroban operations — the handler
            // already rejects them, so this arm only fires if a future
            // Command path admits V0. Hash is undefined for our use.
            return Err("V0 envelopes do not have a Soroban-canonical hash".into());
        }
    };
    let payload = TransactionSignaturePayload {
        network_id: Hash(*network_id),
        tagged_transaction: tagged,
    };
    let payload_xdr = payload
        .to_xdr(Limits::none())
        .map_err(|e| format!("payload encode for hash: {e}"))?;
    Ok(Sha256::digest(&payload_xdr).into())
}

/// Compute the minimum resource fee a real `sendTransaction` would owe
/// for this simulation, using the on-chain fee schedule.
///
/// Returns `None` when:
/// - Resolving the live fee schedule failed (RPC error, missing config
///   setting). The wire response then omits `minResourceFee` rather
///   than lying with `"0"` or a partial computation.
/// - Encoding a contract event for size measurement failed (host
///   internal error; should not happen with well-formed events).
///
/// **Slight underestimate by signature size.** The bandwidth +
/// historical-data fee components scale with the on-the-wire envelope
/// size, but at simulation time the envelope carries no signatures
/// yet (the caller can't sign what they're trying to size). Real
/// `sendTransaction` will pay ~70 bytes × `fee_per_transaction_size_1kb`
/// extra per signer; clients copying this number into a signed tx
/// should pad accordingly. Same approximation `stellar-rpc` makes.
fn compute_min_resource_fee(
    env: &crate::ForkedEnv,
    resources: &SorobanResources,
    contract_events: &[ContractEvent],
    transaction_size_bytes: u32,
) -> Option<i64> {
    use soroban_env_host::xdr::{DiagnosticEvent, Limits, WriteXdr};

    let fee_config = match env.fee_configuration() {
        Ok(cfg) => cfg,
        Err(e) => {
            warn!("soroban-fork: minResourceFee skipped — fee schedule unavailable: {e}");
            return None;
        }
    };

    // Stellar core measures contract-events size as the XDR length of
    // the same `DiagnosticEvent { in_successful_contract_call: true,
    // event }` wrappers that go on the wire.
    let mut events_size: u32 = 0;
    for ce in contract_events {
        let de = DiagnosticEvent {
            in_successful_contract_call: true,
            event: ce.clone(),
        };
        match de.to_xdr(Limits::none()) {
            Ok(bytes) => {
                events_size = events_size.saturating_add(bytes.len() as u32);
            }
            Err(e) => {
                warn!(
                    "soroban-fork: minResourceFee skipped — contract event XDR encode failed: {e}"
                );
                return None;
            }
        }
    }

    let footprint = &resources.footprint;
    let read_only_count = footprint.read_only.len() as u32;
    let read_write_count = footprint.read_write.len() as u32;

    let tx_resources = crate::fees::TransactionResources {
        instructions: resources.instructions,
        // Stellar fee math counts every entry the tx touches as "read"
        // (writes are read-then-written), and writes again as "write".
        disk_read_entries: read_only_count.saturating_add(read_write_count),
        write_entries: read_write_count,
        disk_read_bytes: resources.disk_read_bytes,
        write_bytes: resources.write_bytes,
        contract_events_size_bytes: events_size,
        transaction_size_bytes,
    };
    let (non_refundable, refundable) =
        crate::fees::compute_transaction_resource_fee(&tx_resources, fee_config);
    Some(non_refundable.saturating_add(refundable))
}

/// Stand-in resources struct for the failure path. We populate the same
/// shape the success path returns so the response serialiser doesn't
/// need to special-case `None`.
fn empty_soroban_resources() -> SorobanResources {
    use soroban_env_host::xdr::LedgerFootprint;
    SorobanResources {
        footprint: LedgerFootprint {
            read_only: vec![].try_into().expect("empty vec into VecM"),
            read_write: vec![].try_into().expect("empty vec into VecM"),
        },
        instructions: 0,
        disk_read_bytes: 0,
        write_bytes: 0,
    }
}

/// Resolve a batch of keys through the snapshot source. Cache hits are
/// O(BTreeMap lookup + XDR decode), misses are one RPC round-trip per
/// key (the upstream client batches `getLedgerEntries` internally for
/// pre-warming, but on-demand calls go one at a time).
fn resolve_ledger_entries(
    env: &crate::ForkedEnv,
    keys: Vec<LedgerKey>,
) -> Vec<Option<(LedgerKey, LedgerEntry, Option<u32>)>> {
    let source = env.snapshot_source();
    keys.into_iter()
        .map(|key| {
            let key_rc = Rc::new(key.clone());
            match source.get(&key_rc) {
                Ok(Some((entry_rc, live_until))) => {
                    Some((key, entry_rc.as_ref().clone(), live_until))
                }
                Ok(None) => None,
                // SnapshotSource::get's HostError is theoretical here —
                // our impl never produces one (Strict mode panics, Lenient
                // returns None). If that contract changes, we surface as
                // "missing" rather than crashing the worker; the caller
                // sees a partial response.
                Err(_) => None,
            }
        })
        .collect()
}

/// Lower-case hex encoder for network IDs and similar 32-byte values.
/// Inline to avoid pulling in a hex-crate dep just for a few uses.
fn hex_encode(bytes: &[u8]) -> String {
    let mut s = String::with_capacity(bytes.len() * 2);
    for b in bytes {
        s.push_str(&format!("{b:02x}"));
    }
    s
}