Skip to main content

commonware_runtime/
deterministic.rs

1//! A deterministic runtime that randomly selects tasks to run based on a seed
2//!
3//! # Panics
4//!
5//! Unless configured otherwise, any task panic will lead to a runtime panic.
6//!
7//! # External Processes
8//!
9//! When testing an application that interacts with some external process, it can appear to
10//! the runtime that progress has stalled because no pending tasks can make progress and/or
11//! that futures resolve at variable latency (which in turn triggers non-deterministic execution).
12//!
13//! To support such applications, the runtime can be built with the `external` feature to both
14//! sleep for each [Config::cycle] (opting to wait if all futures are pending) and to constrain
15//! the resolution latency of any future (with `pace()`).
16//!
17//! **Applications that do not interact with external processes (or are able to mock them) should never
18//! need to enable this feature. It is commonly used when testing consensus with external execution environments
19//! that use their own runtime (but are deterministic over some set of inputs).**
20//!
21//! # Metrics
22//!
23//! This runtime enforces metrics are unique and well-formed:
24//! - Labels must start with `[a-zA-Z]` and contain only `[a-zA-Z0-9_]`
25//! - Metric names must be unique (panics on duplicate registration)
26//!
27//! # Example
28//!
29//! ```rust
30//! use commonware_runtime::{Spawner, Runner, deterministic, Metrics};
31//!
32//! let executor =  deterministic::Runner::default();
33//! executor.start(|context| async move {
34//!     println!("Parent started");
35//!     let result = context.with_label("child").spawn(|_| async move {
36//!         println!("Child started");
37//!         "hello"
38//!     });
39//!     println!("Child result: {:?}", result.await);
40//!     println!("Parent exited");
41//!     println!("Auditor state: {}", context.auditor().state());
42//! });
43//! ```
44
45pub use crate::storage::faulty::Config as FaultConfig;
46use crate::{
47    network::{
48        audited::Network as AuditedNetwork, deterministic::Network as DeterministicNetwork,
49        metered::Network as MeteredNetwork,
50    },
51    storage::{
52        audited::Storage as AuditedStorage, faulty::Storage as FaultyStorage,
53        memory::Storage as MemStorage, metered::Storage as MeteredStorage,
54    },
55    telemetry::metrics::task::Label,
56    utils::{
57        add_attribute,
58        signal::{Signal, Stopper},
59        supervision::Tree,
60        Panicker, Registry, ScopeGuard,
61    },
62    validate_label, BufferPool, BufferPoolConfig, Clock, Error, Execution, Handle, ListenerOf,
63    Metrics as _, Panicked, Spawner as _, METRICS_PREFIX,
64};
65#[cfg(feature = "external")]
66use crate::{Blocker, Pacer};
67use commonware_codec::Encode;
68use commonware_macros::select;
69use commonware_parallel::ThreadPool;
70use commonware_utils::{
71    hex,
72    sync::{Mutex, RwLock},
73    time::SYSTEM_TIME_PRECISION,
74    SystemTimeExt,
75};
76#[cfg(feature = "external")]
77use futures::task::noop_waker;
78use futures::{
79    future::BoxFuture,
80    task::{waker, ArcWake},
81    Future, FutureExt,
82};
83use governor::clock::{Clock as GClock, ReasonablyRealtime};
84#[cfg(feature = "external")]
85use pin_project::pin_project;
86use prometheus_client::{
87    metrics::{counter::Counter, family::Family, gauge::Gauge},
88    registry::{Metric, Registry as PrometheusRegistry},
89};
90use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
91use rand_core::CryptoRngCore;
92use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
93use sha2::{Digest as _, Sha256};
94use std::{
95    borrow::Cow,
96    collections::{BTreeMap, BinaryHeap, HashMap, HashSet},
97    mem::{replace, take},
98    net::{IpAddr, SocketAddr},
99    num::NonZeroUsize,
100    panic::{catch_unwind, resume_unwind, AssertUnwindSafe},
101    pin::Pin,
102    sync::{Arc, Weak},
103    task::{self, Poll, Waker},
104    time::{Duration, SystemTime, UNIX_EPOCH},
105};
106use tracing::{info_span, trace, Instrument};
107use tracing_opentelemetry::OpenTelemetrySpanExt;
108
109#[derive(Debug)]
110struct Metrics {
111    iterations: Counter,
112    tasks_spawned: Family<Label, Counter>,
113    tasks_running: Family<Label, Gauge>,
114    task_polls: Family<Label, Counter>,
115
116    network_bandwidth: Counter,
117}
118
119impl Metrics {
120    pub fn init(registry: &mut PrometheusRegistry) -> Self {
121        let metrics = Self {
122            iterations: Counter::default(),
123            task_polls: Family::default(),
124            tasks_spawned: Family::default(),
125            tasks_running: Family::default(),
126            network_bandwidth: Counter::default(),
127        };
128        registry.register(
129            "iterations",
130            "Total number of iterations",
131            metrics.iterations.clone(),
132        );
133        registry.register(
134            "tasks_spawned",
135            "Total number of tasks spawned",
136            metrics.tasks_spawned.clone(),
137        );
138        registry.register(
139            "tasks_running",
140            "Number of tasks currently running",
141            metrics.tasks_running.clone(),
142        );
143        registry.register(
144            "task_polls",
145            "Total number of task polls",
146            metrics.task_polls.clone(),
147        );
148        registry.register(
149            "bandwidth",
150            "Total amount of data sent over network",
151            metrics.network_bandwidth.clone(),
152        );
153        metrics
154    }
155}
156
157/// A SHA-256 digest.
158type Digest = [u8; 32];
159
160/// Track the state of the runtime for determinism auditing.
161pub struct Auditor {
162    digest: Mutex<Digest>,
163}
164
165impl Default for Auditor {
166    fn default() -> Self {
167        Self {
168            digest: Digest::default().into(),
169        }
170    }
171}
172
173impl Auditor {
174    /// Record that an event happened.
175    /// This auditor's hash will be updated with the event's `label` and
176    /// whatever other data is passed in the `payload` closure.
177    pub(crate) fn event<F>(&self, label: &'static [u8], payload: F)
178    where
179        F: FnOnce(&mut Sha256),
180    {
181        let mut digest = self.digest.lock();
182
183        let mut hasher = Sha256::new();
184        hasher.update(digest.as_ref());
185        hasher.update(label);
186        payload(&mut hasher);
187
188        *digest = hasher.finalize().into();
189    }
190
191    /// Generate a representation of the current state of the runtime.
192    ///
193    /// This can be used to ensure that logic running on top
194    /// of the runtime is interacting deterministically.
195    pub fn state(&self) -> String {
196        let hash = self.digest.lock();
197        hex(hash.as_ref())
198    }
199}
200
201/// A dynamic RNG that can safely be sent between threads.
202pub type BoxDynRng = Box<dyn CryptoRngCore + Send + 'static>;
203
204/// Configuration for the `deterministic` runtime.
205pub struct Config {
206    /// Random number generator.
207    rng: BoxDynRng,
208
209    /// The cycle duration determines how much time is advanced after each iteration of the event
210    /// loop. This is useful to prevent starvation if some task never yields.
211    cycle: Duration,
212
213    /// Time the runtime starts at.
214    start_time: SystemTime,
215
216    /// If the runtime is still executing at this point (i.e. a test hasn't stopped), panic.
217    timeout: Option<Duration>,
218
219    /// Whether spawned tasks should catch panics instead of propagating them.
220    catch_panics: bool,
221
222    /// Configuration for deterministic storage fault injection.
223    /// Defaults to no faults being injected.
224    storage_fault_cfg: FaultConfig,
225
226    /// Buffer pool configuration for network I/O.
227    network_buffer_pool_cfg: BufferPoolConfig,
228
229    /// Buffer pool configuration for storage I/O.
230    storage_buffer_pool_cfg: BufferPoolConfig,
231}
232
233impl Config {
234    /// Returns a new [Config] with default values.
235    pub fn new() -> Self {
236        cfg_if::cfg_if! {
237            if #[cfg(miri)] {
238                // Reduce max_per_class to avoid slow atomics under Miri
239                let network_buffer_pool_cfg = BufferPoolConfig::for_network()
240                    .with_max_per_class(commonware_utils::NZUsize!(32));
241                let storage_buffer_pool_cfg = BufferPoolConfig::for_storage()
242                    .with_max_per_class(commonware_utils::NZUsize!(32));
243            } else {
244                let network_buffer_pool_cfg = BufferPoolConfig::for_network();
245                let storage_buffer_pool_cfg = BufferPoolConfig::for_storage();
246            }
247        }
248
249        Self {
250            rng: Box::new(StdRng::seed_from_u64(42)),
251            cycle: Duration::from_millis(1),
252            start_time: UNIX_EPOCH,
253            timeout: None,
254            catch_panics: false,
255            storage_fault_cfg: FaultConfig::default(),
256            network_buffer_pool_cfg,
257            storage_buffer_pool_cfg,
258        }
259    }
260
261    // Setters
262    /// See [Config]
263    pub fn with_seed(self, seed: u64) -> Self {
264        self.with_rng(Box::new(StdRng::seed_from_u64(seed)))
265    }
266
267    /// Provide the config with a dynamic RNG directly.
268    ///
269    /// This can be useful for, e.g. fuzzing, where beyond just having randomness,
270    /// you might want to control specific bytes of the RNG. By taking in a dynamic
271    /// RNG object, any behavior is possible.
272    pub fn with_rng(mut self, rng: BoxDynRng) -> Self {
273        self.rng = rng;
274        self
275    }
276
277    /// See [Config]
278    pub const fn with_cycle(mut self, cycle: Duration) -> Self {
279        self.cycle = cycle;
280        self
281    }
282    /// See [Config]
283    pub const fn with_start_time(mut self, start_time: SystemTime) -> Self {
284        self.start_time = start_time;
285        self
286    }
287    /// See [Config]
288    pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
289        self.timeout = timeout;
290        self
291    }
292    /// See [Config]
293    pub const fn with_catch_panics(mut self, catch_panics: bool) -> Self {
294        self.catch_panics = catch_panics;
295        self
296    }
297    /// See [Config]
298    pub const fn with_network_buffer_pool_config(mut self, cfg: BufferPoolConfig) -> Self {
299        self.network_buffer_pool_cfg = cfg;
300        self
301    }
302    /// See [Config]
303    pub const fn with_storage_buffer_pool_config(mut self, cfg: BufferPoolConfig) -> Self {
304        self.storage_buffer_pool_cfg = cfg;
305        self
306    }
307
308    /// Configure storage fault injection.
309    ///
310    /// When set, the runtime will inject deterministic storage errors based on
311    /// the provided configuration. Faults are drawn from the shared RNG, ensuring
312    /// reproducible failure patterns for a given seed.
313    pub const fn with_storage_fault_config(mut self, faults: FaultConfig) -> Self {
314        self.storage_fault_cfg = faults;
315        self
316    }
317
318    // Getters
319    /// See [Config]
320    pub const fn cycle(&self) -> Duration {
321        self.cycle
322    }
323    /// See [Config]
324    pub const fn start_time(&self) -> SystemTime {
325        self.start_time
326    }
327    /// See [Config]
328    pub const fn timeout(&self) -> Option<Duration> {
329        self.timeout
330    }
331    /// See [Config]
332    pub const fn catch_panics(&self) -> bool {
333        self.catch_panics
334    }
335    /// See [Config]
336    pub const fn network_buffer_pool_config(&self) -> &BufferPoolConfig {
337        &self.network_buffer_pool_cfg
338    }
339    /// See [Config]
340    pub const fn storage_buffer_pool_config(&self) -> &BufferPoolConfig {
341        &self.storage_buffer_pool_cfg
342    }
343
344    /// Assert that the configuration is valid.
345    pub fn assert(&self) {
346        assert!(
347            self.cycle != Duration::default() || self.timeout.is_none(),
348            "cycle duration must be non-zero when timeout is set",
349        );
350        assert!(
351            self.cycle >= SYSTEM_TIME_PRECISION,
352            "cycle duration must be greater than or equal to system time precision"
353        );
354    }
355}
356
357impl Default for Config {
358    fn default() -> Self {
359        Self::new()
360    }
361}
362
363/// A (prefixed_name, attributes) pair identifying a unique metric registration.
364type MetricKey = (String, Vec<(String, String)>);
365
366/// Deterministic runtime that randomly selects tasks to run based on a seed.
367pub struct Executor {
368    registry: Mutex<Registry>,
369    registered_metrics: Mutex<HashSet<MetricKey>>,
370    cycle: Duration,
371    deadline: Option<SystemTime>,
372    metrics: Arc<Metrics>,
373    auditor: Arc<Auditor>,
374    rng: Arc<Mutex<BoxDynRng>>,
375    time: Mutex<SystemTime>,
376    tasks: Arc<Tasks>,
377    sleeping: Mutex<BinaryHeap<Alarm>>,
378    shutdown: Mutex<Stopper>,
379    panicker: Panicker,
380    dns: Mutex<HashMap<String, Vec<IpAddr>>>,
381}
382
383impl Executor {
384    /// Advance simulated time by [Config::cycle].
385    ///
386    /// When built with the `external` feature, sleep for [Config::cycle] to let
387    /// external processes make progress.
388    fn advance_time(&self) -> SystemTime {
389        #[cfg(feature = "external")]
390        std::thread::sleep(self.cycle);
391
392        let mut time = self.time.lock();
393        *time = time
394            .checked_add(self.cycle)
395            .expect("executor time overflowed");
396        let now = *time;
397        trace!(now = now.epoch_millis(), "time advanced");
398        now
399    }
400
401    /// When idle, jump directly to the next actionable time.
402    ///
403    /// When built with the `external` feature, never skip ahead (to ensure we poll all pending tasks
404    /// every [Config::cycle]).
405    fn skip_idle_time(&self, current: SystemTime) -> SystemTime {
406        if cfg!(feature = "external") || self.tasks.ready() != 0 {
407            return current;
408        }
409
410        let mut skip_until = None;
411        {
412            let sleeping = self.sleeping.lock();
413            if let Some(next) = sleeping.peek() {
414                if next.time > current {
415                    skip_until = Some(next.time);
416                }
417            }
418        }
419
420        skip_until.map_or(current, |deadline| {
421            let mut time = self.time.lock();
422            *time = deadline;
423            let now = *time;
424            trace!(now = now.epoch_millis(), "time skipped");
425            now
426        })
427    }
428
429    /// Wake any sleepers whose deadlines have elapsed.
430    fn wake_ready_sleepers(&self, current: SystemTime) {
431        let mut sleeping = self.sleeping.lock();
432        while let Some(next) = sleeping.peek() {
433            if next.time <= current {
434                let sleeper = sleeping.pop().unwrap();
435                sleeper.waker.wake();
436            } else {
437                break;
438            }
439        }
440    }
441
442    /// Ensure the runtime is making progress.
443    ///
444    /// When built with the `external` feature, always poll pending tasks after the passage of time.
445    fn assert_liveness(&self) {
446        if cfg!(feature = "external") || self.tasks.ready() != 0 {
447            return;
448        }
449
450        panic!("runtime stalled");
451    }
452}
453
454/// An artifact that can be used to recover the state of the runtime.
455///
456/// This is useful when mocking unclean shutdown (while retaining deterministic behavior).
457pub struct Checkpoint {
458    cycle: Duration,
459    deadline: Option<SystemTime>,
460    auditor: Arc<Auditor>,
461    rng: Arc<Mutex<BoxDynRng>>,
462    time: Mutex<SystemTime>,
463    storage: Arc<Storage>,
464    dns: Mutex<HashMap<String, Vec<IpAddr>>>,
465    catch_panics: bool,
466    network_buffer_pool_cfg: BufferPoolConfig,
467    storage_buffer_pool_cfg: BufferPoolConfig,
468}
469
470impl Checkpoint {
471    /// Get a reference to the [Auditor].
472    pub fn auditor(&self) -> Arc<Auditor> {
473        self.auditor.clone()
474    }
475}
476
477#[allow(clippy::large_enum_variant)]
478enum State {
479    Config(Config),
480    Checkpoint(Checkpoint),
481}
482
483/// Implementation of [crate::Runner] for the `deterministic` runtime.
484pub struct Runner {
485    state: State,
486}
487
488impl From<Config> for Runner {
489    fn from(cfg: Config) -> Self {
490        Self::new(cfg)
491    }
492}
493
494impl From<Checkpoint> for Runner {
495    fn from(checkpoint: Checkpoint) -> Self {
496        Self {
497            state: State::Checkpoint(checkpoint),
498        }
499    }
500}
501
502impl Runner {
503    /// Initialize a new `deterministic` runtime with the given seed and cycle duration.
504    pub fn new(cfg: Config) -> Self {
505        // Ensure config is valid
506        cfg.assert();
507        Self {
508            state: State::Config(cfg),
509        }
510    }
511
512    /// Initialize a new `deterministic` runtime with the default configuration
513    /// and the provided seed.
514    pub fn seeded(seed: u64) -> Self {
515        Self::new(Config::default().with_seed(seed))
516    }
517
518    /// Initialize a new `deterministic` runtime with the default configuration
519    /// but exit after the given timeout.
520    pub fn timed(timeout: Duration) -> Self {
521        let cfg = Config {
522            timeout: Some(timeout),
523            ..Config::default()
524        };
525        Self::new(cfg)
526    }
527
528    /// Like [crate::Runner::start], but also returns a [Checkpoint] that can be used
529    /// to recover the state of the runtime in a subsequent run.
530    pub fn start_and_recover<F, Fut>(self, f: F) -> (Fut::Output, Checkpoint)
531    where
532        F: FnOnce(Context) -> Fut,
533        Fut: Future,
534    {
535        // Setup context and return strong reference to executor
536        let (context, executor, panicked) = match self.state {
537            State::Config(config) => Context::new(config),
538            State::Checkpoint(checkpoint) => Context::recover(checkpoint),
539        };
540
541        // Pin root task to the heap
542        let storage = context.storage.clone();
543        let network_buffer_pool_cfg = context.network_buffer_pool.config().clone();
544        let storage_buffer_pool_cfg = context.storage_buffer_pool.config().clone();
545        let mut root = Box::pin(panicked.interrupt(f(context)));
546
547        // Register the root task
548        Tasks::register_root(&executor.tasks);
549
550        // Process tasks until root task completes or progress stalls.
551        // Wrap the loop in catch_unwind to ensure task cleanup runs even if the loop or a task panics.
552        let result = catch_unwind(AssertUnwindSafe(|| loop {
553            // Ensure we have not exceeded our deadline
554            {
555                let current = executor.time.lock();
556                if let Some(deadline) = executor.deadline {
557                    if *current >= deadline {
558                        drop(current);
559                        panic!("runtime timeout");
560                    }
561                }
562            }
563
564            // Drain all ready tasks
565            let mut queue = executor.tasks.drain();
566
567            // Shuffle tasks (if more than one)
568            if queue.len() > 1 {
569                let mut rng = executor.rng.lock();
570                queue.shuffle(&mut *rng);
571            }
572
573            // Run all snapshotted tasks
574            //
575            // This approach is more efficient than randomly selecting a task one-at-a-time
576            // because it ensures we don't pull the same pending task multiple times in a row (without
577            // processing a different task required for other tasks to make progress).
578            trace!(
579                iter = executor.metrics.iterations.get(),
580                tasks = queue.len(),
581                "starting loop"
582            );
583            let mut output = None;
584            for id in queue {
585                // Lookup the task (it may have completed already)
586                let Some(task) = executor.tasks.get(id) else {
587                    trace!(id, "skipping missing task");
588                    continue;
589                };
590
591                // Record task for auditing
592                executor.auditor.event(b"process_task", |hasher| {
593                    hasher.update(task.id.to_be_bytes());
594                    hasher.update(task.label.name().as_bytes());
595                });
596                executor.metrics.task_polls.get_or_create(&task.label).inc();
597                trace!(id, "processing task");
598
599                // Prepare task for polling
600                let waker = waker(Arc::new(TaskWaker {
601                    id,
602                    tasks: Arc::downgrade(&executor.tasks),
603                }));
604                let mut cx = task::Context::from_waker(&waker);
605
606                // Poll the task
607                match &task.mode {
608                    Mode::Root => {
609                        // Poll the root task
610                        if let Poll::Ready(result) = root.as_mut().poll(&mut cx) {
611                            trace!(id, "root task is complete");
612                            output = Some(result);
613                            break;
614                        }
615                    }
616                    Mode::Work(future) => {
617                        // Get the future (if it still exists)
618                        let mut fut_opt = future.lock();
619                        let Some(fut) = fut_opt.as_mut() else {
620                            trace!(id, "skipping already complete task");
621
622                            // Remove the future
623                            executor.tasks.remove(id);
624                            continue;
625                        };
626
627                        // Poll the task
628                        if fut.as_mut().poll(&mut cx).is_ready() {
629                            trace!(id, "task is complete");
630
631                            // Remove the future
632                            executor.tasks.remove(id);
633                            *fut_opt = None;
634                            continue;
635                        }
636                    }
637                }
638
639                // Try again later if task is still pending
640                trace!(id, "task is still pending");
641            }
642
643            // If the root task has completed, exit as soon as possible
644            if let Some(output) = output {
645                break output;
646            }
647
648            // Advance time (skipping ahead if no tasks are ready yet)
649            let mut current = executor.advance_time();
650            current = executor.skip_idle_time(current);
651
652            // Wake sleepers and ensure we continue to make progress
653            executor.wake_ready_sleepers(current);
654            executor.assert_liveness();
655
656            // Record that we completed another iteration of the event loop.
657            executor.metrics.iterations.inc();
658        }));
659
660        // Clear remaining tasks from the executor.
661        //
662        // It is critical that we wait to drop the strong
663        // reference to executor until after we have dropped
664        // all tasks (as they may attempt to upgrade their weak
665        // reference to the executor during drop).
666        executor.sleeping.lock().clear(); // included in tasks
667        let tasks = executor.tasks.clear();
668        for task in tasks {
669            let Mode::Work(future) = &task.mode else {
670                continue;
671            };
672            *future.lock() = None;
673        }
674
675        // Drop the root task to release any Context references it may still hold.
676        // This is necessary when the loop exits early (e.g., timeout) while the
677        // root future is still Pending and holds captured variables with Context references.
678        drop(root);
679
680        // Assert the context doesn't escape the start() function (behavior
681        // is undefined in this case)
682        assert!(
683            Arc::weak_count(&executor) == 0,
684            "executor still has weak references"
685        );
686
687        // Handle the result — resume the original panic after cleanup if one was caught.
688        let output = match result {
689            Ok(output) => output,
690            Err(payload) => resume_unwind(payload),
691        };
692
693        // Extract the executor from the Arc
694        let executor = Arc::into_inner(executor).expect("executor still has strong references");
695
696        // Construct a checkpoint that can be used to restart the runtime
697        let checkpoint = Checkpoint {
698            cycle: executor.cycle,
699            deadline: executor.deadline,
700            auditor: executor.auditor,
701            rng: executor.rng,
702            time: executor.time,
703            storage,
704            dns: executor.dns,
705            catch_panics: executor.panicker.catch(),
706            network_buffer_pool_cfg,
707            storage_buffer_pool_cfg,
708        };
709
710        (output, checkpoint)
711    }
712}
713
714impl Default for Runner {
715    fn default() -> Self {
716        Self::new(Config::default())
717    }
718}
719
720impl crate::Runner for Runner {
721    type Context = Context;
722
723    fn start<F, Fut>(self, f: F) -> Fut::Output
724    where
725        F: FnOnce(Self::Context) -> Fut,
726        Fut: Future,
727    {
728        let (output, _) = self.start_and_recover(f);
729        output
730    }
731}
732
733/// The mode of a [Task].
734enum Mode {
735    Root,
736    Work(Mutex<Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>>),
737}
738
739/// A future being executed by the [Executor].
740struct Task {
741    id: u128,
742    label: Label,
743
744    mode: Mode,
745}
746
747/// A waker for a [Task].
748struct TaskWaker {
749    id: u128,
750
751    tasks: Weak<Tasks>,
752}
753
754impl ArcWake for TaskWaker {
755    fn wake_by_ref(arc_self: &Arc<Self>) {
756        // Upgrade the weak reference to re-enqueue this task.
757        // If upgrade fails, the task queue has been dropped and no action is required.
758        //
759        // This can happen if some data is passed into the runtime and it drops after the runtime exits.
760        if let Some(tasks) = arc_self.tasks.upgrade() {
761            tasks.queue(arc_self.id);
762        }
763    }
764}
765
766/// A collection of [Task]s that are being executed by the [Executor].
767struct Tasks {
768    /// The next task id.
769    counter: Mutex<u128>,
770    /// Tasks ready to be polled.
771    ready: Mutex<Vec<u128>>,
772    /// All running tasks.
773    running: Mutex<BTreeMap<u128, Arc<Task>>>,
774}
775
776impl Tasks {
777    /// Create a new task queue.
778    const fn new() -> Self {
779        Self {
780            counter: Mutex::new(0),
781            ready: Mutex::new(Vec::new()),
782            running: Mutex::new(BTreeMap::new()),
783        }
784    }
785
786    /// Increment the task counter and return the old value.
787    fn increment(&self) -> u128 {
788        let mut counter = self.counter.lock();
789        let old = *counter;
790        *counter = counter.checked_add(1).expect("task counter overflow");
791        old
792    }
793
794    /// Register the root task.
795    ///
796    /// If the root task has already been registered, this function will panic.
797    fn register_root(arc_self: &Arc<Self>) {
798        let id = arc_self.increment();
799        let task = Arc::new(Task {
800            id,
801            label: Label::root(),
802            mode: Mode::Root,
803        });
804        arc_self.register(id, task);
805    }
806
807    /// Register a non-root task to be executed.
808    fn register_work(
809        arc_self: &Arc<Self>,
810        label: Label,
811        future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
812    ) {
813        let id = arc_self.increment();
814        let task = Arc::new(Task {
815            id,
816            label,
817            mode: Mode::Work(Mutex::new(Some(future))),
818        });
819        arc_self.register(id, task);
820    }
821
822    /// Register a new task to be executed.
823    fn register(&self, id: u128, task: Arc<Task>) {
824        // Track as running until completion
825        self.running.lock().insert(id, task);
826
827        // Add to ready
828        self.queue(id);
829    }
830
831    /// Enqueue an already registered task to be executed.
832    fn queue(&self, id: u128) {
833        let mut ready = self.ready.lock();
834        ready.push(id);
835    }
836
837    /// Drain all ready tasks.
838    fn drain(&self) -> Vec<u128> {
839        let mut queue = self.ready.lock();
840        let len = queue.len();
841        replace(&mut *queue, Vec::with_capacity(len))
842    }
843
844    /// The number of ready tasks.
845    fn ready(&self) -> usize {
846        self.ready.lock().len()
847    }
848
849    /// Lookup a task.
850    ///
851    /// We must return cloned here because we cannot hold the running lock while polling a task (will
852    /// deadlock if [Self::register_work] is called).
853    fn get(&self, id: u128) -> Option<Arc<Task>> {
854        let running = self.running.lock();
855        running.get(&id).cloned()
856    }
857
858    /// Remove a task.
859    fn remove(&self, id: u128) {
860        self.running.lock().remove(&id);
861    }
862
863    /// Clear all tasks.
864    fn clear(&self) -> Vec<Arc<Task>> {
865        // Clear ready
866        self.ready.lock().clear();
867
868        // Clear running tasks
869        let running: BTreeMap<u128, Arc<Task>> = {
870            let mut running = self.running.lock();
871            take(&mut *running)
872        };
873        running.into_values().collect()
874    }
875}
876
877type Network = MeteredNetwork<AuditedNetwork<DeterministicNetwork>>;
878type Storage = MeteredStorage<AuditedStorage<FaultyStorage<MemStorage>>>;
879
880/// Implementation of [crate::Spawner], [crate::Clock],
881/// [crate::Network], and [crate::Storage] for the `deterministic`
882/// runtime.
883pub struct Context {
884    name: String,
885    attributes: Vec<(String, String)>,
886    scope: Option<Arc<ScopeGuard>>,
887    executor: Weak<Executor>,
888    network: Arc<Network>,
889    storage: Arc<Storage>,
890    network_buffer_pool: BufferPool,
891    storage_buffer_pool: BufferPool,
892    tree: Arc<Tree>,
893    execution: Execution,
894    instrumented: bool,
895}
896
897impl Clone for Context {
898    fn clone(&self) -> Self {
899        let (child, _) = Tree::child(&self.tree);
900        Self {
901            name: self.name.clone(),
902            attributes: self.attributes.clone(),
903            scope: self.scope.clone(),
904            executor: self.executor.clone(),
905            network: self.network.clone(),
906            storage: self.storage.clone(),
907            network_buffer_pool: self.network_buffer_pool.clone(),
908            storage_buffer_pool: self.storage_buffer_pool.clone(),
909
910            tree: child,
911            execution: Execution::default(),
912            instrumented: false,
913        }
914    }
915}
916
917impl Context {
918    fn new(cfg: Config) -> (Self, Arc<Executor>, Panicked) {
919        // Create a new registry
920        let mut registry = Registry::new();
921        let runtime_registry = registry.root_mut().sub_registry_with_prefix(METRICS_PREFIX);
922
923        // Initialize runtime
924        let metrics = Arc::new(Metrics::init(runtime_registry));
925        let start_time = cfg.start_time;
926        let deadline = cfg
927            .timeout
928            .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
929        let auditor = Arc::new(Auditor::default());
930
931        // Create shared RNG (used by both executor and storage)
932        let rng = Arc::new(Mutex::new(cfg.rng));
933
934        // Initialize buffer pools
935        let network_buffer_pool = BufferPool::new(
936            cfg.network_buffer_pool_cfg.clone(),
937            runtime_registry.sub_registry_with_prefix("network_buffer_pool"),
938        );
939        let storage_buffer_pool = BufferPool::new(
940            cfg.storage_buffer_pool_cfg.clone(),
941            runtime_registry.sub_registry_with_prefix("storage_buffer_pool"),
942        );
943
944        // Create storage fault config (default to disabled if None)
945        let storage_fault_config = Arc::new(RwLock::new(cfg.storage_fault_cfg));
946        let storage = MeteredStorage::new(
947            AuditedStorage::new(
948                FaultyStorage::new(
949                    MemStorage::new(storage_buffer_pool.clone()),
950                    rng.clone(),
951                    storage_fault_config,
952                ),
953                auditor.clone(),
954            ),
955            runtime_registry,
956        );
957
958        // Create network
959        let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
960        let network = MeteredNetwork::new(network, runtime_registry);
961
962        // Initialize panicker
963        let (panicker, panicked) = Panicker::new(cfg.catch_panics);
964
965        let executor = Arc::new(Executor {
966            registry: Mutex::new(registry),
967            registered_metrics: Mutex::new(HashSet::new()),
968            cycle: cfg.cycle,
969            deadline,
970            metrics,
971            auditor,
972            rng,
973            time: Mutex::new(start_time),
974            tasks: Arc::new(Tasks::new()),
975            sleeping: Mutex::new(BinaryHeap::new()),
976            shutdown: Mutex::new(Stopper::default()),
977            panicker,
978            dns: Mutex::new(HashMap::new()),
979        });
980
981        (
982            Self {
983                name: String::new(),
984                attributes: Vec::new(),
985                scope: None,
986                executor: Arc::downgrade(&executor),
987                network: Arc::new(network),
988                storage: Arc::new(storage),
989                network_buffer_pool,
990                storage_buffer_pool,
991                tree: Tree::root(),
992                execution: Execution::default(),
993                instrumented: false,
994            },
995            executor,
996            panicked,
997        )
998    }
999
1000    /// Recover the inner state (deadline, metrics, auditor, rng, synced storage, etc.) from the
1001    /// current runtime and use it to initialize a new instance of the runtime. A recovered runtime
1002    /// does not inherit the current runtime's pending tasks, unsynced storage, network connections, nor
1003    /// its shutdown signaler.
1004    ///
1005    /// This is useful for performing a deterministic simulation that spans multiple runtime instantiations,
1006    /// like simulating unclean shutdown (which involves repeatedly halting the runtime at unexpected intervals).
1007    ///
1008    /// It is only permitted to call this method after the runtime has finished (i.e. once `start` returns)
1009    /// and only permitted to do once (otherwise multiple recovered runtimes will share the same inner state).
1010    /// If either one of these conditions is violated, this method will panic.
1011    fn recover(checkpoint: Checkpoint) -> (Self, Arc<Executor>, Panicked) {
1012        // Rebuild metrics
1013        let mut registry = Registry::new();
1014        let runtime_registry = registry.root_mut().sub_registry_with_prefix(METRICS_PREFIX);
1015        let metrics = Arc::new(Metrics::init(runtime_registry));
1016
1017        // Copy state
1018        let network =
1019            AuditedNetwork::new(DeterministicNetwork::default(), checkpoint.auditor.clone());
1020        let network = MeteredNetwork::new(network, runtime_registry);
1021
1022        // Initialize buffer pools
1023        let network_buffer_pool = BufferPool::new(
1024            checkpoint.network_buffer_pool_cfg.clone(),
1025            runtime_registry.sub_registry_with_prefix("network_buffer_pool"),
1026        );
1027        let storage_buffer_pool = BufferPool::new(
1028            checkpoint.storage_buffer_pool_cfg.clone(),
1029            runtime_registry.sub_registry_with_prefix("storage_buffer_pool"),
1030        );
1031
1032        // Initialize panicker
1033        let (panicker, panicked) = Panicker::new(checkpoint.catch_panics);
1034
1035        let executor = Arc::new(Executor {
1036            // Copied from the checkpoint
1037            cycle: checkpoint.cycle,
1038            deadline: checkpoint.deadline,
1039            auditor: checkpoint.auditor,
1040            rng: checkpoint.rng,
1041            time: checkpoint.time,
1042            dns: checkpoint.dns,
1043
1044            // New state for the new runtime
1045            registry: Mutex::new(registry),
1046            registered_metrics: Mutex::new(HashSet::new()),
1047            metrics,
1048            tasks: Arc::new(Tasks::new()),
1049            sleeping: Mutex::new(BinaryHeap::new()),
1050            shutdown: Mutex::new(Stopper::default()),
1051            panicker,
1052        });
1053        (
1054            Self {
1055                name: String::new(),
1056                attributes: Vec::new(),
1057                scope: None,
1058                executor: Arc::downgrade(&executor),
1059                network: Arc::new(network),
1060                storage: checkpoint.storage,
1061                network_buffer_pool,
1062                storage_buffer_pool,
1063                tree: Tree::root(),
1064                execution: Execution::default(),
1065                instrumented: false,
1066            },
1067            executor,
1068            panicked,
1069        )
1070    }
1071
1072    /// Upgrade Weak reference to [Executor].
1073    fn executor(&self) -> Arc<Executor> {
1074        self.executor.upgrade().expect("executor already dropped")
1075    }
1076
1077    /// Get a reference to [Metrics].
1078    fn metrics(&self) -> Arc<Metrics> {
1079        self.executor().metrics.clone()
1080    }
1081
1082    /// Get a reference to the [Auditor].
1083    pub fn auditor(&self) -> Arc<Auditor> {
1084        self.executor().auditor.clone()
1085    }
1086
1087    /// Compute a [Sha256] digest of all storage contents.
1088    pub fn storage_audit(&self) -> Digest {
1089        self.storage.inner().inner().inner().audit()
1090    }
1091
1092    /// Access the storage fault configuration.
1093    ///
1094    /// Changes to the returned [`FaultConfig`] take effect immediately for
1095    /// subsequent storage operations. This allows dynamically enabling or
1096    /// disabling fault injection during a test.
1097    pub fn storage_fault_config(&self) -> Arc<RwLock<FaultConfig>> {
1098        self.storage.inner().inner().config()
1099    }
1100
1101    /// Register a DNS mapping for a hostname.
1102    ///
1103    /// If `addrs` is `None`, the mapping is removed.
1104    /// If `addrs` is `Some`, the mapping is added or updated.
1105    pub fn resolver_register(&self, host: impl Into<String>, addrs: Option<Vec<IpAddr>>) {
1106        // Update the auditor
1107        let executor = self.executor();
1108        let host = host.into();
1109        executor.auditor.event(b"resolver_register", |hasher| {
1110            hasher.update(host.as_bytes());
1111            hasher.update(addrs.encode());
1112        });
1113
1114        // Update the DNS mapping
1115        let mut dns = executor.dns.lock();
1116        match addrs {
1117            Some(addrs) => {
1118                dns.insert(host, addrs);
1119            }
1120            None => {
1121                dns.remove(&host);
1122            }
1123        }
1124    }
1125}
1126
1127impl crate::Spawner for Context {
1128    fn dedicated(mut self) -> Self {
1129        self.execution = Execution::Dedicated;
1130        self
1131    }
1132
1133    fn shared(mut self, blocking: bool) -> Self {
1134        self.execution = Execution::Shared(blocking);
1135        self
1136    }
1137
1138    fn instrumented(mut self) -> Self {
1139        self.instrumented = true;
1140        self
1141    }
1142
1143    fn spawn<F, Fut, T>(mut self, f: F) -> Handle<T>
1144    where
1145        F: FnOnce(Self) -> Fut + Send + 'static,
1146        Fut: Future<Output = T> + Send + 'static,
1147        T: Send + 'static,
1148    {
1149        // Get metrics
1150        let (label, metric) = spawn_metrics!(self);
1151
1152        // Track supervision before resetting configuration
1153        let parent = Arc::clone(&self.tree);
1154        let is_instrumented = self.instrumented;
1155        self.execution = Execution::default();
1156        self.instrumented = false;
1157        let (child, aborted) = Tree::child(&parent);
1158        if aborted {
1159            return Handle::closed(metric);
1160        }
1161        self.tree = child;
1162
1163        // Spawn the task (we don't care about Model)
1164        let executor = self.executor();
1165        let future: BoxFuture<'_, T> = if is_instrumented {
1166            let span = info_span!(parent: None, "task", name = %label.name());
1167            for (key, value) in &self.attributes {
1168                span.set_attribute(key.clone(), value.clone());
1169            }
1170            f(self).instrument(span).boxed()
1171        } else {
1172            f(self).boxed()
1173        };
1174        let (f, handle) = Handle::init(
1175            future,
1176            metric,
1177            executor.panicker.clone(),
1178            Arc::clone(&parent),
1179        );
1180        Tasks::register_work(&executor.tasks, label, Box::pin(f));
1181
1182        // Register the task on the parent
1183        if let Some(aborter) = handle.aborter() {
1184            parent.register(aborter);
1185        }
1186
1187        handle
1188    }
1189
1190    async fn stop(self, value: i32, timeout: Option<Duration>) -> Result<(), Error> {
1191        let executor = self.executor();
1192        executor.auditor.event(b"stop", |hasher| {
1193            hasher.update(value.to_be_bytes());
1194        });
1195        let stop_resolved = {
1196            let mut shutdown = executor.shutdown.lock();
1197            shutdown.stop(value)
1198        };
1199
1200        // Wait for all tasks to complete or the timeout to fire
1201        let timeout_future = timeout.map_or_else(
1202            || futures::future::Either::Right(futures::future::pending()),
1203            |duration| futures::future::Either::Left(self.sleep(duration)),
1204        );
1205        select! {
1206            result = stop_resolved => {
1207                result.map_err(|_| Error::Closed)?;
1208                Ok(())
1209            },
1210            _ = timeout_future => Err(Error::Timeout),
1211        }
1212    }
1213
1214    fn stopped(&self) -> Signal {
1215        let executor = self.executor();
1216        executor.auditor.event(b"stopped", |_| {});
1217        let stopped = executor.shutdown.lock().stopped();
1218        stopped
1219    }
1220}
1221
1222impl crate::ThreadPooler for Context {
1223    fn create_thread_pool(
1224        &self,
1225        concurrency: NonZeroUsize,
1226    ) -> Result<ThreadPool, ThreadPoolBuildError> {
1227        let mut builder = ThreadPoolBuilder::new().num_threads(concurrency.get());
1228
1229        if rayon::current_thread_index().is_none() {
1230            builder = builder.use_current_thread()
1231        }
1232
1233        builder
1234            .spawn_handler(move |thread| {
1235                self.with_label("rayon_thread")
1236                    .dedicated()
1237                    .spawn(move |_| async move { thread.run() });
1238                Ok(())
1239            })
1240            .build()
1241            .map(Arc::new)
1242    }
1243}
1244
1245impl crate::Metrics for Context {
1246    fn with_label(&self, label: &str) -> Self {
1247        // Validate label format (must match [a-zA-Z][a-zA-Z0-9_]*)
1248        validate_label(label);
1249
1250        // Construct the full label name
1251        let name = {
1252            let prefix = self.name.clone();
1253            if prefix.is_empty() {
1254                label.to_string()
1255            } else {
1256                format!("{prefix}_{label}")
1257            }
1258        };
1259        assert!(
1260            !name.starts_with(METRICS_PREFIX),
1261            "using runtime label is not allowed"
1262        );
1263        Self {
1264            name,
1265            ..self.clone()
1266        }
1267    }
1268
1269    fn with_attribute(&self, key: &str, value: impl std::fmt::Display) -> Self {
1270        // Validate label format (must match [a-zA-Z][a-zA-Z0-9_]*)
1271        validate_label(key);
1272
1273        // Add the attribute to the list of attributes
1274        let mut attributes = self.attributes.clone();
1275        assert!(
1276            add_attribute(&mut attributes, key, value),
1277            "duplicate attribute key: {key}"
1278        );
1279        Self {
1280            attributes,
1281            ..self.clone()
1282        }
1283    }
1284
1285    fn label(&self) -> String {
1286        self.name.clone()
1287    }
1288
1289    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
1290        // Prepare args
1291        let name = name.into();
1292        let help = help.into();
1293
1294        // Name metric
1295        let executor = self.executor();
1296        executor.auditor.event(b"register", |hasher| {
1297            hasher.update(name.as_bytes());
1298            hasher.update(help.as_bytes());
1299            for (k, v) in &self.attributes {
1300                hasher.update(k.as_bytes());
1301                hasher.update(v.as_bytes());
1302            }
1303        });
1304        let prefixed_name = {
1305            let prefix = &self.name;
1306            if prefix.is_empty() {
1307                name
1308            } else {
1309                format!("{}_{}", *prefix, name)
1310            }
1311        };
1312
1313        // Check for duplicate registration (O(1) lookup)
1314        let metric_key = (prefixed_name.clone(), self.attributes.clone());
1315        let is_new = executor.registered_metrics.lock().insert(metric_key);
1316        assert!(
1317            is_new,
1318            "duplicate metric: {} with attributes {:?}",
1319            prefixed_name, self.attributes
1320        );
1321
1322        // Route to the appropriate registry (root or scoped)
1323        let mut registry = executor.registry.lock();
1324        let scoped = registry.get_scope(self.scope.as_ref().map(|s| s.scope_id()));
1325        let sub_registry = self
1326            .attributes
1327            .iter()
1328            .fold(scoped, |reg, (k, v): &(String, String)| {
1329                reg.sub_registry_with_label((Cow::Owned(k.clone()), Cow::Owned(v.clone())))
1330            });
1331        sub_registry.register(prefixed_name, help, metric);
1332    }
1333
1334    fn encode(&self) -> String {
1335        let executor = self.executor();
1336        executor.auditor.event(b"encode", |_| {});
1337        let encoded = executor.registry.lock().encode();
1338        encoded
1339    }
1340
1341    fn with_scope(&self) -> Self {
1342        let executor = self.executor();
1343        executor.auditor.event(b"with_scope", |_| {});
1344
1345        // If already scoped, inherit the existing scope
1346        if self.scope.is_some() {
1347            return self.clone();
1348        }
1349
1350        // RAII guard removes the scoped registry when all clones drop.
1351        let weak = self.executor.clone();
1352        let scope_id = executor.registry.lock().create_scope();
1353        let guard = Arc::new(ScopeGuard::new(scope_id, move |id| {
1354            if let Some(exec) = weak.upgrade() {
1355                exec.registry.lock().remove_scope(id);
1356            }
1357        }));
1358        Self {
1359            scope: Some(guard),
1360            ..self.clone()
1361        }
1362    }
1363}
1364
1365struct Sleeper {
1366    executor: Weak<Executor>,
1367    time: SystemTime,
1368    registered: bool,
1369}
1370
1371impl Sleeper {
1372    /// Upgrade Weak reference to [Executor].
1373    fn executor(&self) -> Arc<Executor> {
1374        self.executor.upgrade().expect("executor already dropped")
1375    }
1376}
1377
1378struct Alarm {
1379    time: SystemTime,
1380    waker: Waker,
1381}
1382
1383impl PartialEq for Alarm {
1384    fn eq(&self, other: &Self) -> bool {
1385        self.time.eq(&other.time)
1386    }
1387}
1388
1389impl Eq for Alarm {}
1390
1391impl PartialOrd for Alarm {
1392    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1393        Some(self.cmp(other))
1394    }
1395}
1396
1397impl Ord for Alarm {
1398    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1399        // Reverse the ordering for min-heap
1400        other.time.cmp(&self.time)
1401    }
1402}
1403
1404impl Future for Sleeper {
1405    type Output = ();
1406
1407    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1408        let executor = self.executor();
1409        {
1410            let current_time = *executor.time.lock();
1411            if current_time >= self.time {
1412                return Poll::Ready(());
1413            }
1414        }
1415        if !self.registered {
1416            self.registered = true;
1417            executor.sleeping.lock().push(Alarm {
1418                time: self.time,
1419                waker: cx.waker().clone(),
1420            });
1421        }
1422        Poll::Pending
1423    }
1424}
1425
1426impl Clock for Context {
1427    fn current(&self) -> SystemTime {
1428        *self.executor().time.lock()
1429    }
1430
1431    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
1432        let deadline = self
1433            .current()
1434            .checked_add(duration)
1435            .expect("overflow when setting wake time");
1436        self.sleep_until(deadline)
1437    }
1438
1439    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
1440        Sleeper {
1441            executor: self.executor.clone(),
1442
1443            time: deadline,
1444            registered: false,
1445        }
1446    }
1447}
1448
1449/// A future that resolves when a given target time is reached.
1450///
1451/// If the future is not ready at the target time, the future is blocked until the target time is reached.
1452#[cfg(feature = "external")]
1453#[pin_project]
1454struct Waiter<F: Future> {
1455    executor: Weak<Executor>,
1456    target: SystemTime,
1457    #[pin]
1458    future: F,
1459    ready: Option<F::Output>,
1460    started: bool,
1461    registered: bool,
1462}
1463
1464#[cfg(feature = "external")]
1465impl<F> Future for Waiter<F>
1466where
1467    F: Future + Send,
1468{
1469    type Output = F::Output;
1470
1471    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1472        let mut this = self.project();
1473
1474        // Poll once with a noop waker so the future can register interest or start work
1475        // without being able to wake this task before the sampled delay expires. Any ready
1476        // value is cached and only released after the clock reaches `self.target`.
1477        if !*this.started {
1478            *this.started = true;
1479            let waker = noop_waker();
1480            let mut cx_noop = task::Context::from_waker(&waker);
1481            if let Poll::Ready(value) = this.future.as_mut().poll(&mut cx_noop) {
1482                *this.ready = Some(value);
1483            }
1484        }
1485
1486        // Only allow the task to progress once the sampled delay has elapsed.
1487        let executor = this.executor.upgrade().expect("executor already dropped");
1488        let current_time = *executor.time.lock();
1489        if current_time < *this.target {
1490            // Register exactly once with the deterministic sleeper queue so the executor
1491            // wakes us once the clock reaches the scheduled target time.
1492            if !*this.registered {
1493                *this.registered = true;
1494                executor.sleeping.lock().push(Alarm {
1495                    time: *this.target,
1496                    waker: cx.waker().clone(),
1497                });
1498            }
1499            return Poll::Pending;
1500        }
1501
1502        // If the underlying future completed during the noop pre-poll, surface the cached value.
1503        if let Some(value) = this.ready.take() {
1504            return Poll::Ready(value);
1505        }
1506
1507        // Block the current thread until the future reschedules itself, keeping polling
1508        // deterministic with respect to executor time.
1509        let blocker = Blocker::new();
1510        loop {
1511            let waker = waker(blocker.clone());
1512            let mut cx_block = task::Context::from_waker(&waker);
1513            match this.future.as_mut().poll(&mut cx_block) {
1514                Poll::Ready(value) => {
1515                    break Poll::Ready(value);
1516                }
1517                Poll::Pending => blocker.wait(),
1518            }
1519        }
1520    }
1521}
1522
1523#[cfg(feature = "external")]
1524impl Pacer for Context {
1525    fn pace<'a, F, T>(&'a self, latency: Duration, future: F) -> impl Future<Output = T> + Send + 'a
1526    where
1527        F: Future<Output = T> + Send + 'a,
1528        T: Send + 'a,
1529    {
1530        // Compute target time
1531        let target = self
1532            .executor()
1533            .time
1534            .lock()
1535            .checked_add(latency)
1536            .expect("overflow when setting wake time");
1537
1538        Waiter {
1539            executor: self.executor.clone(),
1540            target,
1541            future,
1542            ready: None,
1543            started: false,
1544            registered: false,
1545        }
1546    }
1547}
1548
1549impl GClock for Context {
1550    type Instant = SystemTime;
1551
1552    fn now(&self) -> Self::Instant {
1553        self.current()
1554    }
1555}
1556
1557impl ReasonablyRealtime for Context {}
1558
1559impl crate::Network for Context {
1560    type Listener = ListenerOf<Network>;
1561
1562    async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
1563        self.network.bind(socket).await
1564    }
1565
1566    async fn dial(
1567        &self,
1568        socket: SocketAddr,
1569    ) -> Result<(crate::SinkOf<Self>, crate::StreamOf<Self>), Error> {
1570        self.network.dial(socket).await
1571    }
1572}
1573
1574impl crate::Resolver for Context {
1575    async fn resolve(&self, host: &str) -> Result<Vec<IpAddr>, Error> {
1576        // Get the record
1577        let executor = self.executor();
1578        let dns = executor.dns.lock();
1579        let result = dns.get(host).cloned();
1580        drop(dns);
1581
1582        // Update the auditor
1583        executor.auditor.event(b"resolve", |hasher| {
1584            hasher.update(host.as_bytes());
1585            hasher.update(result.encode());
1586        });
1587        result.ok_or_else(|| Error::ResolveFailed(host.to_string()))
1588    }
1589}
1590
1591impl RngCore for Context {
1592    fn next_u32(&mut self) -> u32 {
1593        let executor = self.executor();
1594        executor.auditor.event(b"rand", |hasher| {
1595            hasher.update(b"next_u32");
1596        });
1597        let result = executor.rng.lock().next_u32();
1598        result
1599    }
1600
1601    fn next_u64(&mut self) -> u64 {
1602        let executor = self.executor();
1603        executor.auditor.event(b"rand", |hasher| {
1604            hasher.update(b"next_u64");
1605        });
1606        let result = executor.rng.lock().next_u64();
1607        result
1608    }
1609
1610    fn fill_bytes(&mut self, dest: &mut [u8]) {
1611        let executor = self.executor();
1612        executor.auditor.event(b"rand", |hasher| {
1613            hasher.update(b"fill_bytes");
1614        });
1615        executor.rng.lock().fill_bytes(dest);
1616    }
1617
1618    fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1619        let executor = self.executor();
1620        executor.auditor.event(b"rand", |hasher| {
1621            hasher.update(b"try_fill_bytes");
1622        });
1623        let result = executor.rng.lock().try_fill_bytes(dest);
1624        result
1625    }
1626}
1627
1628impl CryptoRng for Context {}
1629
1630impl crate::Storage for Context {
1631    type Blob = <Storage as crate::Storage>::Blob;
1632
1633    async fn open_versioned(
1634        &self,
1635        partition: &str,
1636        name: &[u8],
1637        versions: std::ops::RangeInclusive<u16>,
1638    ) -> Result<(Self::Blob, u64, u16), Error> {
1639        self.storage.open_versioned(partition, name, versions).await
1640    }
1641
1642    async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1643        self.storage.remove(partition, name).await
1644    }
1645
1646    async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1647        self.storage.scan(partition).await
1648    }
1649}
1650
1651impl crate::BufferPooler for Context {
1652    fn network_buffer_pool(&self) -> &crate::BufferPool {
1653        &self.network_buffer_pool
1654    }
1655
1656    fn storage_buffer_pool(&self) -> &crate::BufferPool {
1657        &self.storage_buffer_pool
1658    }
1659}
1660
1661#[cfg(test)]
1662mod tests {
1663    use super::*;
1664    #[cfg(feature = "external")]
1665    use crate::FutureExt;
1666    #[cfg(feature = "external")]
1667    use crate::Spawner;
1668    use crate::{deterministic, reschedule, Blob, Metrics, Resolver, Runner as _, Storage};
1669    use commonware_macros::test_traced;
1670    #[cfg(feature = "external")]
1671    use commonware_utils::channel::mpsc;
1672    use commonware_utils::channel::oneshot;
1673    #[cfg(not(feature = "external"))]
1674    use futures::future::pending;
1675    #[cfg(not(feature = "external"))]
1676    use futures::stream::StreamExt as _;
1677    #[cfg(feature = "external")]
1678    use futures::StreamExt;
1679    use futures::{stream::FuturesUnordered, task::noop_waker};
1680
1681    async fn task(i: usize) -> usize {
1682        for _ in 0..5 {
1683            reschedule().await;
1684        }
1685        i
1686    }
1687
1688    fn run_tasks(tasks: usize, runner: deterministic::Runner) -> (String, Vec<usize>) {
1689        runner.start(|context| async move {
1690            let mut handles = FuturesUnordered::new();
1691            for i in 0..=tasks - 1 {
1692                handles.push(context.clone().spawn(move |_| task(i)));
1693            }
1694
1695            let mut outputs = Vec::new();
1696            while let Some(result) = handles.next().await {
1697                outputs.push(result.unwrap());
1698            }
1699            assert_eq!(outputs.len(), tasks);
1700            (context.auditor().state(), outputs)
1701        })
1702    }
1703
1704    fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1705        let executor = deterministic::Runner::seeded(seed);
1706        run_tasks(5, executor)
1707    }
1708
1709    #[test]
1710    fn test_same_seed_same_order() {
1711        // Generate initial outputs
1712        let mut outputs = Vec::new();
1713        for seed in 0..1000 {
1714            let output = run_with_seed(seed);
1715            outputs.push(output);
1716        }
1717
1718        // Ensure they match
1719        for seed in 0..1000 {
1720            let output = run_with_seed(seed);
1721            assert_eq!(output, outputs[seed as usize]);
1722        }
1723    }
1724
1725    #[test_traced("TRACE")]
1726    fn test_different_seeds_different_order() {
1727        let output1 = run_with_seed(12345);
1728        let output2 = run_with_seed(54321);
1729        assert_ne!(output1, output2);
1730    }
1731
1732    #[test]
1733    fn test_alarm_min_heap() {
1734        // Populate heap
1735        let now = SystemTime::now();
1736        let alarms = vec![
1737            Alarm {
1738                time: now + Duration::new(10, 0),
1739                waker: noop_waker(),
1740            },
1741            Alarm {
1742                time: now + Duration::new(5, 0),
1743                waker: noop_waker(),
1744            },
1745            Alarm {
1746                time: now + Duration::new(15, 0),
1747                waker: noop_waker(),
1748            },
1749            Alarm {
1750                time: now + Duration::new(5, 0),
1751                waker: noop_waker(),
1752            },
1753        ];
1754        let mut heap = BinaryHeap::new();
1755        for alarm in alarms {
1756            heap.push(alarm);
1757        }
1758
1759        // Verify min-heap
1760        let mut sorted_times = Vec::new();
1761        while let Some(alarm) = heap.pop() {
1762            sorted_times.push(alarm.time);
1763        }
1764        assert_eq!(
1765            sorted_times,
1766            vec![
1767                now + Duration::new(5, 0),
1768                now + Duration::new(5, 0),
1769                now + Duration::new(10, 0),
1770                now + Duration::new(15, 0),
1771            ]
1772        );
1773    }
1774
1775    #[test]
1776    #[should_panic(expected = "runtime timeout")]
1777    fn test_timeout() {
1778        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1779        executor.start(|context| async move {
1780            loop {
1781                context.sleep(Duration::from_secs(1)).await;
1782            }
1783        });
1784    }
1785
1786    #[test]
1787    #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1788    fn test_bad_timeout() {
1789        let cfg = Config {
1790            timeout: Some(Duration::default()),
1791            cycle: Duration::default(),
1792            ..Config::default()
1793        };
1794        deterministic::Runner::new(cfg);
1795    }
1796
1797    #[test]
1798    #[should_panic(
1799        expected = "cycle duration must be greater than or equal to system time precision"
1800    )]
1801    fn test_bad_cycle() {
1802        let cfg = Config {
1803            cycle: SYSTEM_TIME_PRECISION - Duration::from_nanos(1),
1804            ..Config::default()
1805        };
1806        deterministic::Runner::new(cfg);
1807    }
1808
1809    #[test]
1810    fn test_recover_synced_storage_persists() {
1811        // Initialize the first runtime
1812        let executor1 = deterministic::Runner::default();
1813        let partition = "test_partition";
1814        let name = b"test_blob";
1815        let data = b"Hello, world!";
1816
1817        // Run some tasks, sync storage, and recover the runtime
1818        let (state, checkpoint) = executor1.start_and_recover(|context| async move {
1819            let (blob, _) = context.open(partition, name).await.unwrap();
1820            blob.write_at(0, data).await.unwrap();
1821            blob.sync().await.unwrap();
1822            context.auditor().state()
1823        });
1824
1825        // Verify auditor state is the same
1826        assert_eq!(state, checkpoint.auditor.state());
1827
1828        // Check that synced storage persists after recovery
1829        let executor = Runner::from(checkpoint);
1830        executor.start(|context| async move {
1831            let (blob, len) = context.open(partition, name).await.unwrap();
1832            assert_eq!(len, data.len() as u64);
1833            let read = blob.read_at(0, data.len()).await.unwrap();
1834            assert_eq!(read.coalesce(), data);
1835        });
1836    }
1837
1838    #[test]
1839    #[should_panic(expected = "goodbye")]
1840    fn test_recover_panic_handling() {
1841        // Initialize the first runtime
1842        let executor1 = deterministic::Runner::default();
1843        let (_, checkpoint) = executor1.start_and_recover(|_| async move {
1844            reschedule().await;
1845        });
1846
1847        // Ensure that panic setting is preserved
1848        let executor = Runner::from(checkpoint);
1849        executor.start(|_| async move {
1850            panic!("goodbye");
1851        });
1852    }
1853
1854    #[test]
1855    fn test_recover_unsynced_storage_does_not_persist() {
1856        // Initialize the first runtime
1857        let executor = deterministic::Runner::default();
1858        let partition = "test_partition";
1859        let name = b"test_blob";
1860        let data = b"Hello, world!";
1861
1862        // Run some tasks without syncing storage
1863        let (_, checkpoint) = executor.start_and_recover(|context| async move {
1864            let context = context.clone();
1865            let (blob, _) = context.open(partition, name).await.unwrap();
1866            blob.write_at(0, data).await.unwrap();
1867        });
1868
1869        // Recover the runtime
1870        let executor = Runner::from(checkpoint);
1871
1872        // Check that unsynced storage does not persist after recovery
1873        executor.start(|context| async move {
1874            let (_, len) = context.open(partition, name).await.unwrap();
1875            assert_eq!(len, 0);
1876        });
1877    }
1878
1879    #[test]
1880    fn test_recover_dns_mappings_persist() {
1881        // Initialize the first runtime
1882        let executor = deterministic::Runner::default();
1883        let host = "example.com";
1884        let addrs = vec![
1885            IpAddr::V4(std::net::Ipv4Addr::new(192, 168, 1, 1)),
1886            IpAddr::V4(std::net::Ipv4Addr::new(192, 168, 1, 2)),
1887        ];
1888
1889        // Register DNS mapping and recover the runtime
1890        let (state, checkpoint) = executor.start_and_recover({
1891            let addrs = addrs.clone();
1892            |context| async move {
1893                context.resolver_register(host, Some(addrs));
1894                context.auditor().state()
1895            }
1896        });
1897
1898        // Verify auditor state is the same
1899        assert_eq!(state, checkpoint.auditor.state());
1900
1901        // Check that DNS mappings persist after recovery
1902        let executor = Runner::from(checkpoint);
1903        executor.start(move |context| async move {
1904            let resolved = context.resolve(host).await.unwrap();
1905            assert_eq!(resolved, addrs);
1906        });
1907    }
1908
1909    #[test]
1910    fn test_recover_time_persists() {
1911        // Initialize the first runtime
1912        let executor = deterministic::Runner::default();
1913        let duration_to_sleep = Duration::from_secs(10);
1914
1915        // Sleep for some time and recover the runtime
1916        let (time_before_recovery, checkpoint) = executor.start_and_recover(|context| async move {
1917            context.sleep(duration_to_sleep).await;
1918            context.current()
1919        });
1920
1921        // Check that the time advanced correctly before recovery
1922        assert_eq!(
1923            time_before_recovery.duration_since(UNIX_EPOCH).unwrap(),
1924            duration_to_sleep
1925        );
1926
1927        // Check that the time persists after recovery
1928        let executor2 = Runner::from(checkpoint);
1929        executor2.start(move |context| async move {
1930            assert_eq!(context.current(), time_before_recovery);
1931
1932            // Advance time further
1933            context.sleep(duration_to_sleep).await;
1934            assert_eq!(
1935                context.current().duration_since(UNIX_EPOCH).unwrap(),
1936                duration_to_sleep * 2
1937            );
1938        });
1939    }
1940
1941    #[test]
1942    #[should_panic(expected = "executor still has weak references")]
1943    fn test_context_return() {
1944        // Initialize runtime
1945        let executor = deterministic::Runner::default();
1946
1947        // Start runtime
1948        let context = executor.start(|context| async move {
1949            // Attempt to recover before the runtime has finished
1950            context
1951        });
1952
1953        // Should never get this far
1954        drop(context);
1955    }
1956
1957    #[test]
1958    fn test_default_time_zero() {
1959        // Initialize runtime
1960        let executor = deterministic::Runner::default();
1961
1962        executor.start(|context| async move {
1963            // Check that the time is zero
1964            assert_eq!(
1965                context.current().duration_since(UNIX_EPOCH).unwrap(),
1966                Duration::ZERO
1967            );
1968        });
1969    }
1970
1971    #[test]
1972    fn test_start_time() {
1973        // Initialize runtime with default config
1974        let executor_default = deterministic::Runner::default();
1975        executor_default.start(|context| async move {
1976            assert_eq!(context.current(), UNIX_EPOCH);
1977        });
1978
1979        // Initialize runtime with custom start time
1980        let start_time = UNIX_EPOCH + Duration::from_secs(100);
1981        let cfg = Config::default().with_start_time(start_time);
1982        let executor = deterministic::Runner::new(cfg);
1983
1984        executor.start(move |context| async move {
1985            // Check that the time matches the custom start time
1986            assert_eq!(context.current(), start_time);
1987        });
1988    }
1989
1990    #[cfg(not(feature = "external"))]
1991    #[test]
1992    #[should_panic(expected = "runtime stalled")]
1993    fn test_stall() {
1994        // Initialize runtime
1995        let executor = deterministic::Runner::default();
1996
1997        // Start runtime
1998        executor.start(|_| async move {
1999            pending::<()>().await;
2000        });
2001    }
2002
2003    #[cfg(not(feature = "external"))]
2004    #[test]
2005    #[should_panic(expected = "runtime stalled")]
2006    fn test_external_simulated() {
2007        // Initialize runtime
2008        let executor = deterministic::Runner::default();
2009
2010        // Create a thread that waits for 1 second
2011        let (tx, rx) = oneshot::channel();
2012        std::thread::spawn(move || {
2013            std::thread::sleep(Duration::from_secs(1));
2014            tx.send(()).unwrap();
2015        });
2016
2017        // Start runtime
2018        executor.start(|_| async move {
2019            rx.await.unwrap();
2020        });
2021    }
2022
2023    #[cfg(feature = "external")]
2024    #[test]
2025    fn test_external_realtime() {
2026        // Initialize runtime
2027        let executor = deterministic::Runner::default();
2028
2029        // Create a thread that waits for 1 second
2030        let (tx, rx) = oneshot::channel();
2031        std::thread::spawn(move || {
2032            std::thread::sleep(Duration::from_secs(1));
2033            tx.send(()).unwrap();
2034        });
2035
2036        // Start runtime
2037        executor.start(|_| async move {
2038            rx.await.unwrap();
2039        });
2040    }
2041
2042    #[cfg(feature = "external")]
2043    #[test]
2044    fn test_external_realtime_variable() {
2045        // Initialize runtime
2046        let executor = deterministic::Runner::default();
2047
2048        // Start runtime
2049        executor.start(|context| async move {
2050            // Initialize test
2051            let start_real = SystemTime::now();
2052            let start_sim = context.current();
2053            let (first_tx, first_rx) = oneshot::channel();
2054            let (second_tx, second_rx) = oneshot::channel();
2055            let (results_tx, mut results_rx) = mpsc::channel(2);
2056
2057            // Create a thread that waits for 1 second
2058            let first_wait = Duration::from_secs(1);
2059            std::thread::spawn(move || {
2060                std::thread::sleep(first_wait);
2061                first_tx.send(()).unwrap();
2062            });
2063
2064            // Create a thread
2065            std::thread::spawn(move || {
2066                std::thread::sleep(Duration::ZERO);
2067                second_tx.send(()).unwrap();
2068            });
2069
2070            // Wait for a delay sampled before the external send occurs
2071            let first = context.clone().spawn({
2072                let results_tx = results_tx.clone();
2073                move |context| async move {
2074                    first_rx.pace(&context, Duration::ZERO).await.unwrap();
2075                    let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
2076                    assert!(elapsed_real > first_wait);
2077                    let elapsed_sim = context.current().duration_since(start_sim).unwrap();
2078                    assert!(elapsed_sim < first_wait);
2079                    results_tx.send(1).await.unwrap();
2080                }
2081            });
2082
2083            // Wait for a delay sampled after the external send occurs
2084            let second = context.clone().spawn(move |context| async move {
2085                second_rx.pace(&context, first_wait).await.unwrap();
2086                let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
2087                assert!(elapsed_real >= first_wait);
2088                let elapsed_sim = context.current().duration_since(start_sim).unwrap();
2089                assert!(elapsed_sim >= first_wait);
2090                results_tx.send(2).await.unwrap();
2091            });
2092
2093            // Wait for both tasks to complete
2094            second.await.unwrap();
2095            first.await.unwrap();
2096
2097            // Ensure order is correct
2098            let mut results = Vec::new();
2099            for _ in 0..2 {
2100                results.push(results_rx.recv().await.unwrap());
2101            }
2102            assert_eq!(results, vec![1, 2]);
2103        });
2104    }
2105
2106    #[cfg(not(feature = "external"))]
2107    #[test]
2108    fn test_simulated_skip() {
2109        // Initialize runtime
2110        let executor = deterministic::Runner::default();
2111
2112        // Start runtime
2113        executor.start(|context| async move {
2114            context.sleep(Duration::from_secs(1)).await;
2115
2116            // Check if we skipped
2117            let metrics = context.encode();
2118            let iterations = metrics
2119                .lines()
2120                .find_map(|line| {
2121                    line.strip_prefix("runtime_iterations_total ")
2122                        .and_then(|value| value.trim().parse::<u64>().ok())
2123                })
2124                .expect("missing runtime_iterations_total metric");
2125            assert!(iterations < 10);
2126        });
2127    }
2128
2129    #[cfg(feature = "external")]
2130    #[test]
2131    fn test_realtime_no_skip() {
2132        // Initialize runtime
2133        let executor = deterministic::Runner::default();
2134
2135        // Start runtime
2136        executor.start(|context| async move {
2137            context.sleep(Duration::from_secs(1)).await;
2138
2139            // Check if we skipped
2140            let metrics = context.encode();
2141            let iterations = metrics
2142                .lines()
2143                .find_map(|line| {
2144                    line.strip_prefix("runtime_iterations_total ")
2145                        .and_then(|value| value.trim().parse::<u64>().ok())
2146                })
2147                .expect("missing runtime_iterations_total metric");
2148            assert!(iterations > 500);
2149        });
2150    }
2151
2152    #[test]
2153    #[should_panic(expected = "label must start with [a-zA-Z]")]
2154    fn test_metrics_label_empty() {
2155        let executor = deterministic::Runner::default();
2156        executor.start(|context| async move {
2157            context.with_label("");
2158        });
2159    }
2160
2161    #[test]
2162    #[should_panic(expected = "label must start with [a-zA-Z]")]
2163    fn test_metrics_label_invalid_first_char() {
2164        let executor = deterministic::Runner::default();
2165        executor.start(|context| async move {
2166            context.with_label("1invalid");
2167        });
2168    }
2169
2170    #[test]
2171    #[should_panic(expected = "label must only contain [a-zA-Z0-9_]")]
2172    fn test_metrics_label_invalid_char() {
2173        let executor = deterministic::Runner::default();
2174        executor.start(|context| async move {
2175            context.with_label("invalid-label");
2176        });
2177    }
2178
2179    #[test]
2180    #[should_panic(expected = "using runtime label is not allowed")]
2181    fn test_metrics_label_reserved_prefix() {
2182        let executor = deterministic::Runner::default();
2183        executor.start(|context| async move {
2184            context.with_label(METRICS_PREFIX);
2185        });
2186    }
2187
2188    #[test]
2189    #[should_panic(expected = "duplicate attribute key: epoch")]
2190    fn test_metrics_duplicate_attribute_panics() {
2191        let executor = deterministic::Runner::default();
2192        executor.start(|context| async move {
2193            let _ = context
2194                .with_label("test")
2195                .with_attribute("epoch", "old")
2196                .with_attribute("epoch", "new");
2197        });
2198    }
2199
2200    #[test]
2201    fn test_storage_fault_injection_and_recovery() {
2202        // Phase 1: Run with 100% sync failure rate
2203        let cfg = deterministic::Config::default().with_storage_fault_config(FaultConfig {
2204            sync_rate: Some(1.0),
2205            ..Default::default()
2206        });
2207
2208        let (result, checkpoint) =
2209            deterministic::Runner::new(cfg).start_and_recover(|ctx| async move {
2210                let (blob, _) = ctx.open("test_fault", b"blob").await.unwrap();
2211                blob.write_at(0, b"data".to_vec()).await.unwrap();
2212                blob.sync().await // This should fail due to fault injection
2213            });
2214
2215        // Verify sync failed
2216        assert!(result.is_err());
2217
2218        // Phase 2: Recover and disable faults explicitly
2219        deterministic::Runner::from(checkpoint).start(|ctx| async move {
2220            // Explicitly disable faults for recovery verification
2221            *ctx.storage_fault_config().write() = FaultConfig::default();
2222
2223            // Data was not synced, so blob should be empty (unsynced writes are lost)
2224            let (blob, len) = ctx.open("test_fault", b"blob").await.unwrap();
2225            assert_eq!(len, 0, "unsynced data should be lost after recovery");
2226
2227            // Now we can write and sync successfully
2228            blob.write_at(0, b"recovered".to_vec()).await.unwrap();
2229            blob.sync()
2230                .await
2231                .expect("sync should succeed with faults disabled");
2232
2233            // Verify data persisted
2234            let read_buf = blob.read_at(0, 9).await.unwrap();
2235            assert_eq!(read_buf.coalesce(), b"recovered");
2236        });
2237    }
2238
2239    #[test]
2240    fn test_storage_fault_dynamic_config() {
2241        let executor = deterministic::Runner::default();
2242        executor.start(|ctx| async move {
2243            let (blob, _) = ctx.open("test_dynamic", b"blob").await.unwrap();
2244
2245            // Initially no faults - sync should succeed
2246            blob.write_at(0, b"initial".to_vec()).await.unwrap();
2247            blob.sync().await.expect("initial sync should succeed");
2248
2249            // Enable sync faults dynamically
2250            let storage_fault_cfg = ctx.storage_fault_config();
2251            storage_fault_cfg.write().sync_rate = Some(1.0);
2252
2253            // Now sync should fail
2254            blob.write_at(0, b"updated".to_vec()).await.unwrap();
2255            let result = blob.sync().await;
2256            assert!(result.is_err(), "sync should fail with faults enabled");
2257
2258            // Disable faults
2259            storage_fault_cfg.write().sync_rate = Some(0.0);
2260
2261            // Sync should succeed again
2262            blob.sync()
2263                .await
2264                .expect("sync should succeed with faults disabled");
2265        });
2266    }
2267
2268    #[test]
2269    fn test_storage_fault_determinism() {
2270        // Run the same sequence twice with the same seed
2271        fn run_with_seed(seed: u64) -> Vec<bool> {
2272            let cfg = deterministic::Config::default()
2273                .with_seed(seed)
2274                .with_storage_fault_config(FaultConfig {
2275                    open_rate: Some(0.5),
2276                    ..Default::default()
2277                });
2278
2279            let runner = deterministic::Runner::new(cfg);
2280            runner.start(|ctx| async move {
2281                let mut results = Vec::new();
2282                for i in 0..20 {
2283                    let name = format!("blob{i}");
2284                    let result = ctx.open("test_determinism", name.as_bytes()).await;
2285                    results.push(result.is_ok());
2286                }
2287                results
2288            })
2289        }
2290
2291        let results1 = run_with_seed(12345);
2292        let results2 = run_with_seed(12345);
2293        assert_eq!(
2294            results1, results2,
2295            "same seed should produce same failure pattern"
2296        );
2297
2298        let results3 = run_with_seed(99999);
2299        assert_ne!(
2300            results1, results3,
2301            "different seeds should produce different patterns"
2302        );
2303    }
2304
2305    #[test]
2306    fn test_storage_fault_determinism_multi_task() {
2307        // Run the same multi-task sequence twice with the same seed.
2308        // This tests that task shuffling + fault decisions interleave deterministically.
2309        fn run_with_seed(seed: u64) -> Vec<u32> {
2310            let cfg = deterministic::Config::default()
2311                .with_seed(seed)
2312                .with_storage_fault_config(FaultConfig {
2313                    open_rate: Some(0.5),
2314                    write_rate: Some(0.3),
2315                    sync_rate: Some(0.2),
2316                    ..Default::default()
2317                });
2318
2319            let runner = deterministic::Runner::new(cfg);
2320            runner.start(|ctx| async move {
2321                // Spawn multiple tasks that do storage operations
2322                let mut handles = Vec::new();
2323                for i in 0..5 {
2324                    let ctx = ctx.clone();
2325                    handles.push(ctx.spawn(move |ctx| async move {
2326                        let mut successes = 0u32;
2327                        for j in 0..4 {
2328                            let name = format!("task{i}_blob{j}");
2329                            if let Ok((blob, _)) = ctx.open("partition", name.as_bytes()).await {
2330                                successes += 1;
2331                                if blob.write_at(0, b"data".to_vec()).await.is_ok() {
2332                                    successes += 1;
2333                                }
2334                                if blob.sync().await.is_ok() {
2335                                    successes += 1;
2336                                }
2337                            }
2338                        }
2339                        successes
2340                    }));
2341                }
2342
2343                // Collect results from all tasks
2344                let mut results = Vec::new();
2345                for handle in handles {
2346                    results.push(handle.await.unwrap());
2347                }
2348                results
2349            })
2350        }
2351
2352        let results1 = run_with_seed(42);
2353        let results2 = run_with_seed(42);
2354        assert_eq!(
2355            results1, results2,
2356            "same seed should produce same multi-task pattern"
2357        );
2358
2359        let results3 = run_with_seed(99999);
2360        assert_ne!(
2361            results1, results3,
2362            "different seeds should produce different patterns"
2363        );
2364    }
2365}