Skip to main content

commonware_runtime/
deterministic.rs

1//! A deterministic runtime that randomly selects tasks to run based on a seed
2//!
3//! # Panics
4//!
5//! Unless configured otherwise, any task panic will lead to a runtime panic.
6//!
7//! # External Processes
8//!
9//! When testing an application that interacts with some external process, it can appear to
10//! the runtime that progress has stalled because no pending tasks can make progress and/or
11//! that futures resolve at variable latency (which in turn triggers non-deterministic execution).
12//!
13//! To support such applications, the runtime can be built with the `external` feature to both
14//! sleep for each [Config::cycle] (opting to wait if all futures are pending) and to constrain
15//! the resolution latency of any future (with `pace()`).
16//!
17//! **Applications that do not interact with external processes (or are able to mock them) should never
18//! need to enable this feature. It is commonly used when testing consensus with external execution environments
19//! that use their own runtime (but are deterministic over some set of inputs).**
20//!
21//! # Metrics
22//!
23//! This runtime enforces metrics are unique and well-formed:
24//! - Labels must start with `[a-zA-Z]` and contain only `[a-zA-Z0-9_]`
25//! - Metric names must be unique (panics on duplicate registration)
26//!
27//! # Example
28//!
29//! ```rust
30//! use commonware_runtime::{Spawner, Runner, deterministic, Metrics};
31//!
32//! let executor =  deterministic::Runner::default();
33//! executor.start(|context| async move {
34//!     println!("Parent started");
35//!     let result = context.with_label("child").spawn(|_| async move {
36//!         println!("Child started");
37//!         "hello"
38//!     });
39//!     println!("Child result: {:?}", result.await);
40//!     println!("Parent exited");
41//!     println!("Auditor state: {}", context.auditor().state());
42//! });
43//! ```
44
45pub use crate::storage::faulty::Config as FaultConfig;
46use crate::{
47    network::{
48        audited::Network as AuditedNetwork, deterministic::Network as DeterministicNetwork,
49        metered::Network as MeteredNetwork,
50    },
51    storage::{
52        audited::Storage as AuditedStorage, faulty::Storage as FaultyStorage,
53        memory::Storage as MemStorage, metered::Storage as MeteredStorage,
54    },
55    telemetry::metrics::task::Label,
56    utils::{
57        add_attribute,
58        signal::{Signal, Stopper},
59        supervision::Tree,
60        Panicker, Registry, ScopeGuard,
61    },
62    validate_label, BufferPool, BufferPoolConfig, Clock, Error, Execution, Handle, ListenerOf,
63    Metrics as _, Panicked, Spawner as _, METRICS_PREFIX,
64};
65#[cfg(feature = "external")]
66use crate::{Blocker, Pacer};
67use commonware_codec::Encode;
68use commonware_macros::select;
69use commonware_parallel::ThreadPool;
70use commonware_utils::{
71    hex,
72    sync::{Mutex, RwLock},
73    time::SYSTEM_TIME_PRECISION,
74    SystemTimeExt,
75};
76#[cfg(feature = "external")]
77use futures::task::noop_waker;
78use futures::{
79    future::Either,
80    task::{waker, ArcWake},
81    Future,
82};
83use governor::clock::{Clock as GClock, ReasonablyRealtime};
84#[cfg(feature = "external")]
85use pin_project::pin_project;
86use prometheus_client::{
87    metrics::{counter::Counter, family::Family, gauge::Gauge},
88    registry::{Metric, Registry as PrometheusRegistry},
89};
90use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
91use rand_core::CryptoRngCore;
92use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
93use sha2::{Digest as _, Sha256};
94use std::{
95    borrow::Cow,
96    collections::{BTreeMap, BinaryHeap, HashMap, HashSet},
97    mem::{replace, take},
98    net::{IpAddr, SocketAddr},
99    num::NonZeroUsize,
100    panic::{catch_unwind, resume_unwind, AssertUnwindSafe},
101    pin::Pin,
102    sync::{Arc, Weak},
103    task::{self, Poll, Waker},
104    time::{Duration, SystemTime, UNIX_EPOCH},
105};
106use tracing::{info_span, trace, Instrument};
107use tracing_opentelemetry::OpenTelemetrySpanExt;
108
109#[derive(Debug)]
110struct Metrics {
111    iterations: Counter,
112    tasks_spawned: Family<Label, Counter>,
113    tasks_running: Family<Label, Gauge>,
114    task_polls: Family<Label, Counter>,
115
116    network_bandwidth: Counter,
117}
118
119impl Metrics {
120    pub fn init(registry: &mut PrometheusRegistry) -> Self {
121        let metrics = Self {
122            iterations: Counter::default(),
123            task_polls: Family::default(),
124            tasks_spawned: Family::default(),
125            tasks_running: Family::default(),
126            network_bandwidth: Counter::default(),
127        };
128        registry.register(
129            "iterations",
130            "Total number of iterations",
131            metrics.iterations.clone(),
132        );
133        registry.register(
134            "tasks_spawned",
135            "Total number of tasks spawned",
136            metrics.tasks_spawned.clone(),
137        );
138        registry.register(
139            "tasks_running",
140            "Number of tasks currently running",
141            metrics.tasks_running.clone(),
142        );
143        registry.register(
144            "task_polls",
145            "Total number of task polls",
146            metrics.task_polls.clone(),
147        );
148        registry.register(
149            "bandwidth",
150            "Total amount of data sent over network",
151            metrics.network_bandwidth.clone(),
152        );
153        metrics
154    }
155}
156
157/// A SHA-256 digest.
158type Digest = [u8; 32];
159
160/// Track the state of the runtime for determinism auditing.
161pub struct Auditor {
162    digest: Mutex<Digest>,
163}
164
165impl Default for Auditor {
166    fn default() -> Self {
167        Self {
168            digest: Digest::default().into(),
169        }
170    }
171}
172
173impl Auditor {
174    /// Record that an event happened.
175    /// This auditor's hash will be updated with the event's `label` and
176    /// whatever other data is passed in the `payload` closure.
177    pub(crate) fn event<F>(&self, label: &'static [u8], payload: F)
178    where
179        F: FnOnce(&mut Sha256),
180    {
181        let mut digest = self.digest.lock();
182
183        let mut hasher = Sha256::new();
184        hasher.update(digest.as_ref());
185        hasher.update(label);
186        payload(&mut hasher);
187
188        *digest = hasher.finalize().into();
189    }
190
191    /// Generate a representation of the current state of the runtime.
192    ///
193    /// This can be used to ensure that logic running on top
194    /// of the runtime is interacting deterministically.
195    pub fn state(&self) -> String {
196        let hash = self.digest.lock();
197        hex(hash.as_ref())
198    }
199}
200
201/// A dynamic RNG that can safely be sent between threads.
202pub type BoxDynRng = Box<dyn CryptoRngCore + Send + 'static>;
203
204/// Configuration for the `deterministic` runtime.
205pub struct Config {
206    /// Random number generator.
207    rng: BoxDynRng,
208
209    /// The cycle duration determines how much time is advanced after each iteration of the event
210    /// loop. This is useful to prevent starvation if some task never yields.
211    cycle: Duration,
212
213    /// Time the runtime starts at.
214    start_time: SystemTime,
215
216    /// If the runtime is still executing at this point (i.e. a test hasn't stopped), panic.
217    timeout: Option<Duration>,
218
219    /// Whether spawned tasks should catch panics instead of propagating them.
220    catch_panics: bool,
221
222    /// Configuration for deterministic storage fault injection.
223    /// Defaults to no faults being injected.
224    storage_fault_cfg: FaultConfig,
225
226    /// Buffer pool configuration for network I/O.
227    network_buffer_pool_cfg: BufferPoolConfig,
228
229    /// Buffer pool configuration for storage I/O.
230    storage_buffer_pool_cfg: BufferPoolConfig,
231}
232
233impl Config {
234    /// Returns a new [Config] with default values.
235    pub fn new() -> Self {
236        cfg_if::cfg_if! {
237            if #[cfg(miri)] {
238                // Reduce max_per_class to avoid slow atomics under Miri
239                let network_buffer_pool_cfg = BufferPoolConfig::for_network()
240                    .with_max_per_class(commonware_utils::NZUsize!(32))
241                    .with_thread_cache_disabled();
242                let storage_buffer_pool_cfg = BufferPoolConfig::for_storage()
243                    .with_max_per_class(commonware_utils::NZUsize!(32))
244                    .with_thread_cache_disabled();
245            } else {
246                let network_buffer_pool_cfg =
247                    BufferPoolConfig::for_network().with_thread_cache_disabled();
248                let storage_buffer_pool_cfg =
249                    BufferPoolConfig::for_storage().with_thread_cache_disabled();
250            }
251        }
252
253        Self {
254            rng: Box::new(StdRng::seed_from_u64(42)),
255            cycle: Duration::from_millis(1),
256            start_time: UNIX_EPOCH,
257            timeout: None,
258            catch_panics: false,
259            storage_fault_cfg: FaultConfig::default(),
260            network_buffer_pool_cfg,
261            storage_buffer_pool_cfg,
262        }
263    }
264
265    // Setters
266    /// See [Config]
267    pub fn with_seed(self, seed: u64) -> Self {
268        self.with_rng(Box::new(StdRng::seed_from_u64(seed)))
269    }
270
271    /// Provide the config with a dynamic RNG directly.
272    ///
273    /// This can be useful for, e.g. fuzzing, where beyond just having randomness,
274    /// you might want to control specific bytes of the RNG. By taking in a dynamic
275    /// RNG object, any behavior is possible.
276    pub fn with_rng(mut self, rng: BoxDynRng) -> Self {
277        self.rng = rng;
278        self
279    }
280
281    /// See [Config]
282    pub const fn with_cycle(mut self, cycle: Duration) -> Self {
283        self.cycle = cycle;
284        self
285    }
286    /// See [Config]
287    pub const fn with_start_time(mut self, start_time: SystemTime) -> Self {
288        self.start_time = start_time;
289        self
290    }
291    /// See [Config]
292    pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
293        self.timeout = timeout;
294        self
295    }
296    /// See [Config]
297    pub const fn with_catch_panics(mut self, catch_panics: bool) -> Self {
298        self.catch_panics = catch_panics;
299        self
300    }
301    /// See [Config]
302    pub const fn with_network_buffer_pool_config(mut self, cfg: BufferPoolConfig) -> Self {
303        self.network_buffer_pool_cfg = cfg;
304        self
305    }
306    /// See [Config]
307    pub const fn with_storage_buffer_pool_config(mut self, cfg: BufferPoolConfig) -> Self {
308        self.storage_buffer_pool_cfg = cfg;
309        self
310    }
311
312    /// Configure storage fault injection.
313    ///
314    /// When set, the runtime will inject deterministic storage errors based on
315    /// the provided configuration. Faults are drawn from the shared RNG, ensuring
316    /// reproducible failure patterns for a given seed.
317    pub const fn with_storage_fault_config(mut self, faults: FaultConfig) -> Self {
318        self.storage_fault_cfg = faults;
319        self
320    }
321
322    // Getters
323    /// See [Config]
324    pub const fn cycle(&self) -> Duration {
325        self.cycle
326    }
327    /// See [Config]
328    pub const fn start_time(&self) -> SystemTime {
329        self.start_time
330    }
331    /// See [Config]
332    pub const fn timeout(&self) -> Option<Duration> {
333        self.timeout
334    }
335    /// See [Config]
336    pub const fn catch_panics(&self) -> bool {
337        self.catch_panics
338    }
339    /// See [Config]
340    pub const fn network_buffer_pool_config(&self) -> &BufferPoolConfig {
341        &self.network_buffer_pool_cfg
342    }
343    /// See [Config]
344    pub const fn storage_buffer_pool_config(&self) -> &BufferPoolConfig {
345        &self.storage_buffer_pool_cfg
346    }
347
348    /// Assert that the configuration is valid.
349    pub fn assert(&self) {
350        assert!(
351            self.cycle != Duration::default() || self.timeout.is_none(),
352            "cycle duration must be non-zero when timeout is set",
353        );
354        assert!(
355            self.cycle >= SYSTEM_TIME_PRECISION,
356            "cycle duration must be greater than or equal to system time precision"
357        );
358        assert!(
359            self.start_time >= UNIX_EPOCH,
360            "start time must be greater than or equal to unix epoch"
361        );
362    }
363}
364
365impl Default for Config {
366    fn default() -> Self {
367        Self::new()
368    }
369}
370
371/// A (prefixed_name, attributes) pair identifying a unique metric registration.
372type MetricKey = (String, Vec<(String, String)>);
373
374/// Deterministic runtime that randomly selects tasks to run based on a seed.
375pub struct Executor {
376    registry: Mutex<Registry>,
377    registered_metrics: Mutex<HashSet<MetricKey>>,
378    cycle: Duration,
379    deadline: Option<SystemTime>,
380    metrics: Arc<Metrics>,
381    auditor: Arc<Auditor>,
382    rng: Arc<Mutex<BoxDynRng>>,
383    time: Mutex<SystemTime>,
384    tasks: Arc<Tasks>,
385    sleeping: Mutex<BinaryHeap<Alarm>>,
386    shutdown: Mutex<Stopper>,
387    panicker: Panicker,
388    dns: Mutex<HashMap<String, Vec<IpAddr>>>,
389}
390
391impl Executor {
392    /// Advance simulated time by [Config::cycle].
393    ///
394    /// When built with the `external` feature, sleep for [Config::cycle] to let
395    /// external processes make progress.
396    fn advance_time(&self) -> SystemTime {
397        #[cfg(feature = "external")]
398        std::thread::sleep(self.cycle);
399
400        let mut time = self.time.lock();
401        *time = time
402            .checked_add(self.cycle)
403            .expect("executor time overflowed");
404        let now = *time;
405        trace!(now = now.epoch_millis(), "time advanced");
406        now
407    }
408
409    /// When idle, jump directly to the next actionable time.
410    ///
411    /// When built with the `external` feature, never skip ahead (to ensure we poll all pending tasks
412    /// every [Config::cycle]).
413    fn skip_idle_time(&self, current: SystemTime) -> SystemTime {
414        if cfg!(feature = "external") || self.tasks.ready() != 0 {
415            return current;
416        }
417
418        let mut skip_until = None;
419        {
420            let sleeping = self.sleeping.lock();
421            if let Some(next) = sleeping.peek() {
422                if next.time > current {
423                    skip_until = Some(next.time);
424                }
425            }
426        }
427
428        skip_until.map_or(current, |deadline| {
429            let mut time = self.time.lock();
430            *time = deadline;
431            let now = *time;
432            trace!(now = now.epoch_millis(), "time skipped");
433            now
434        })
435    }
436
437    /// Wake any sleepers whose deadlines have elapsed.
438    fn wake_ready_sleepers(&self, current: SystemTime) {
439        let mut sleeping = self.sleeping.lock();
440        while let Some(next) = sleeping.peek() {
441            if next.time <= current {
442                let sleeper = sleeping.pop().unwrap();
443                sleeper.waker.wake();
444            } else {
445                break;
446            }
447        }
448    }
449
450    /// Ensure the runtime is making progress.
451    ///
452    /// When built with the `external` feature, always poll pending tasks after the passage of time.
453    fn assert_liveness(&self) {
454        if cfg!(feature = "external") || self.tasks.ready() != 0 {
455            return;
456        }
457
458        panic!("runtime stalled");
459    }
460}
461
462/// An artifact that can be used to recover the state of the runtime.
463///
464/// This is useful when mocking unclean shutdown (while retaining deterministic behavior).
465pub struct Checkpoint {
466    cycle: Duration,
467    deadline: Option<SystemTime>,
468    auditor: Arc<Auditor>,
469    rng: Arc<Mutex<BoxDynRng>>,
470    time: Mutex<SystemTime>,
471    storage: Arc<Storage>,
472    dns: Mutex<HashMap<String, Vec<IpAddr>>>,
473    catch_panics: bool,
474    network_buffer_pool_cfg: BufferPoolConfig,
475    storage_buffer_pool_cfg: BufferPoolConfig,
476}
477
478impl Checkpoint {
479    /// Get a reference to the [Auditor].
480    pub fn auditor(&self) -> Arc<Auditor> {
481        self.auditor.clone()
482    }
483}
484
485#[allow(clippy::large_enum_variant)]
486enum State {
487    Config(Config),
488    Checkpoint(Checkpoint),
489}
490
491/// Implementation of [crate::Runner] for the `deterministic` runtime.
492pub struct Runner {
493    state: State,
494}
495
496impl From<Config> for Runner {
497    fn from(cfg: Config) -> Self {
498        Self::new(cfg)
499    }
500}
501
502impl From<Checkpoint> for Runner {
503    fn from(checkpoint: Checkpoint) -> Self {
504        Self {
505            state: State::Checkpoint(checkpoint),
506        }
507    }
508}
509
510impl Runner {
511    /// Initialize a new `deterministic` runtime with the given seed and cycle duration.
512    pub fn new(cfg: Config) -> Self {
513        // Ensure config is valid
514        cfg.assert();
515        Self {
516            state: State::Config(cfg),
517        }
518    }
519
520    /// Initialize a new `deterministic` runtime with the default configuration
521    /// and the provided seed.
522    pub fn seeded(seed: u64) -> Self {
523        Self::new(Config::default().with_seed(seed))
524    }
525
526    /// Initialize a new `deterministic` runtime with the default configuration
527    /// but exit after the given timeout.
528    pub fn timed(timeout: Duration) -> Self {
529        let cfg = Config {
530            timeout: Some(timeout),
531            ..Config::default()
532        };
533        Self::new(cfg)
534    }
535
536    /// Like [crate::Runner::start], but also returns a [Checkpoint] that can be used
537    /// to recover the state of the runtime in a subsequent run.
538    pub fn start_and_recover<F, Fut>(self, f: F) -> (Fut::Output, Checkpoint)
539    where
540        F: FnOnce(Context) -> Fut,
541        Fut: Future,
542    {
543        // Setup context and return strong reference to executor
544        let (context, executor, panicked) = match self.state {
545            State::Config(config) => Context::new(config),
546            State::Checkpoint(checkpoint) => Context::recover(checkpoint),
547        };
548
549        // Pin root task to the heap
550        let storage = context.storage.clone();
551        let network_buffer_pool_cfg = context.network_buffer_pool.config().clone();
552        let storage_buffer_pool_cfg = context.storage_buffer_pool.config().clone();
553        let mut root = Box::pin(panicked.interrupt(f(context)));
554
555        // Register the root task
556        Tasks::register_root(&executor.tasks);
557
558        // Process tasks until root task completes or progress stalls.
559        // Wrap the loop in catch_unwind to ensure task cleanup runs even if the loop or a task panics.
560        let result = catch_unwind(AssertUnwindSafe(|| loop {
561            // Ensure we have not exceeded our deadline
562            {
563                let current = executor.time.lock();
564                if let Some(deadline) = executor.deadline {
565                    if *current >= deadline {
566                        drop(current);
567                        panic!("runtime timeout");
568                    }
569                }
570            }
571
572            // Drain all ready tasks
573            let mut queue = executor.tasks.drain();
574
575            // Shuffle tasks (if more than one)
576            if queue.len() > 1 {
577                let mut rng = executor.rng.lock();
578                queue.shuffle(&mut *rng);
579            }
580
581            // Run all snapshotted tasks
582            //
583            // This approach is more efficient than randomly selecting a task one-at-a-time
584            // because it ensures we don't pull the same pending task multiple times in a row (without
585            // processing a different task required for other tasks to make progress).
586            trace!(
587                iter = executor.metrics.iterations.get(),
588                tasks = queue.len(),
589                "starting loop"
590            );
591            let mut output = None;
592            for id in queue {
593                // Lookup the task (it may have completed already)
594                let Some(task) = executor.tasks.get(id) else {
595                    trace!(id, "skipping missing task");
596                    continue;
597                };
598
599                // Record task for auditing
600                executor.auditor.event(b"process_task", |hasher| {
601                    hasher.update(task.id.to_be_bytes());
602                    hasher.update(task.label.name().as_bytes());
603                });
604                executor.metrics.task_polls.get_or_create(&task.label).inc();
605                trace!(id, "processing task");
606
607                // Prepare task for polling
608                let waker = waker(Arc::new(TaskWaker {
609                    id,
610                    tasks: Arc::downgrade(&executor.tasks),
611                }));
612                let mut cx = task::Context::from_waker(&waker);
613
614                // Poll the task
615                match &task.mode {
616                    Mode::Root => {
617                        // Poll the root task
618                        if let Poll::Ready(result) = root.as_mut().poll(&mut cx) {
619                            trace!(id, "root task is complete");
620                            output = Some(result);
621                            break;
622                        }
623                    }
624                    Mode::Work(future) => {
625                        // Get the future (if it still exists)
626                        let mut fut_opt = future.lock();
627                        let Some(fut) = fut_opt.as_mut() else {
628                            trace!(id, "skipping already complete task");
629
630                            // Remove the future
631                            executor.tasks.remove(id);
632                            continue;
633                        };
634
635                        // Poll the task
636                        if fut.as_mut().poll(&mut cx).is_ready() {
637                            trace!(id, "task is complete");
638
639                            // Remove the future
640                            executor.tasks.remove(id);
641                            *fut_opt = None;
642                            continue;
643                        }
644                    }
645                }
646
647                // Try again later if task is still pending
648                trace!(id, "task is still pending");
649            }
650
651            // If the root task has completed, exit as soon as possible
652            if let Some(output) = output {
653                break output;
654            }
655
656            // Advance time (skipping ahead if no tasks are ready yet)
657            let mut current = executor.advance_time();
658            current = executor.skip_idle_time(current);
659
660            // Wake sleepers and ensure we continue to make progress
661            executor.wake_ready_sleepers(current);
662            executor.assert_liveness();
663
664            // Record that we completed another iteration of the event loop.
665            executor.metrics.iterations.inc();
666        }));
667
668        // Clear remaining tasks from the executor.
669        //
670        // It is critical that we wait to drop the strong
671        // reference to executor until after we have dropped
672        // all tasks (as they may attempt to upgrade their weak
673        // reference to the executor during drop).
674        executor.sleeping.lock().clear(); // included in tasks
675        let tasks = executor.tasks.clear();
676        for task in tasks {
677            let Mode::Work(future) = &task.mode else {
678                continue;
679            };
680            *future.lock() = None;
681        }
682
683        // Drop the root task to release any Context references it may still hold.
684        // This is necessary when the loop exits early (e.g., timeout) while the
685        // root future is still Pending and holds captured variables with Context references.
686        drop(root);
687
688        // Assert the context doesn't escape the start() function (behavior
689        // is undefined in this case)
690        assert!(
691            Arc::weak_count(&executor) == 0,
692            "executor still has weak references"
693        );
694
695        // Handle the result — resume the original panic after cleanup if one was caught.
696        let output = match result {
697            Ok(output) => output,
698            Err(payload) => resume_unwind(payload),
699        };
700
701        // Extract the executor from the Arc
702        let executor = Arc::into_inner(executor).expect("executor still has strong references");
703
704        // Construct a checkpoint that can be used to restart the runtime
705        let checkpoint = Checkpoint {
706            cycle: executor.cycle,
707            deadline: executor.deadline,
708            auditor: executor.auditor,
709            rng: executor.rng,
710            time: executor.time,
711            storage,
712            dns: executor.dns,
713            catch_panics: executor.panicker.catch(),
714            network_buffer_pool_cfg,
715            storage_buffer_pool_cfg,
716        };
717
718        (output, checkpoint)
719    }
720}
721
722impl Default for Runner {
723    fn default() -> Self {
724        Self::new(Config::default())
725    }
726}
727
728impl crate::Runner for Runner {
729    type Context = Context;
730
731    fn start<F, Fut>(self, f: F) -> Fut::Output
732    where
733        F: FnOnce(Self::Context) -> Fut,
734        Fut: Future,
735    {
736        let (output, _) = self.start_and_recover(f);
737        output
738    }
739}
740
741/// The mode of a [Task].
742enum Mode {
743    Root,
744    Work(Mutex<Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>>),
745}
746
747/// A future being executed by the [Executor].
748struct Task {
749    id: u128,
750    label: Label,
751
752    mode: Mode,
753}
754
755/// A waker for a [Task].
756struct TaskWaker {
757    id: u128,
758
759    tasks: Weak<Tasks>,
760}
761
762impl ArcWake for TaskWaker {
763    fn wake_by_ref(arc_self: &Arc<Self>) {
764        // Upgrade the weak reference to re-enqueue this task.
765        // If upgrade fails, the task queue has been dropped and no action is required.
766        //
767        // This can happen if some data is passed into the runtime and it drops after the runtime exits.
768        if let Some(tasks) = arc_self.tasks.upgrade() {
769            tasks.queue(arc_self.id);
770        }
771    }
772}
773
774/// A collection of [Task]s that are being executed by the [Executor].
775struct Tasks {
776    /// The next task id.
777    counter: Mutex<u128>,
778    /// Tasks ready to be polled.
779    ready: Mutex<Vec<u128>>,
780    /// All running tasks.
781    running: Mutex<BTreeMap<u128, Arc<Task>>>,
782}
783
784impl Tasks {
785    /// Create a new task queue.
786    const fn new() -> Self {
787        Self {
788            counter: Mutex::new(0),
789            ready: Mutex::new(Vec::new()),
790            running: Mutex::new(BTreeMap::new()),
791        }
792    }
793
794    /// Increment the task counter and return the old value.
795    fn increment(&self) -> u128 {
796        let mut counter = self.counter.lock();
797        let old = *counter;
798        *counter = counter.checked_add(1).expect("task counter overflow");
799        old
800    }
801
802    /// Register the root task.
803    ///
804    /// If the root task has already been registered, this function will panic.
805    fn register_root(arc_self: &Arc<Self>) {
806        let id = arc_self.increment();
807        let task = Arc::new(Task {
808            id,
809            label: Label::root(),
810            mode: Mode::Root,
811        });
812        arc_self.register(id, task);
813    }
814
815    /// Register a non-root task to be executed.
816    fn register_work(
817        arc_self: &Arc<Self>,
818        label: Label,
819        future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
820    ) {
821        let id = arc_self.increment();
822        let task = Arc::new(Task {
823            id,
824            label,
825            mode: Mode::Work(Mutex::new(Some(future))),
826        });
827        arc_self.register(id, task);
828    }
829
830    /// Register a new task to be executed.
831    fn register(&self, id: u128, task: Arc<Task>) {
832        // Track as running until completion
833        self.running.lock().insert(id, task);
834
835        // Add to ready
836        self.queue(id);
837    }
838
839    /// Enqueue an already registered task to be executed.
840    fn queue(&self, id: u128) {
841        let mut ready = self.ready.lock();
842        ready.push(id);
843    }
844
845    /// Drain all ready tasks.
846    fn drain(&self) -> Vec<u128> {
847        let mut queue = self.ready.lock();
848        let len = queue.len();
849        replace(&mut *queue, Vec::with_capacity(len))
850    }
851
852    /// The number of ready tasks.
853    fn ready(&self) -> usize {
854        self.ready.lock().len()
855    }
856
857    /// Lookup a task.
858    ///
859    /// We must return cloned here because we cannot hold the running lock while polling a task (will
860    /// deadlock if [Self::register_work] is called).
861    fn get(&self, id: u128) -> Option<Arc<Task>> {
862        let running = self.running.lock();
863        running.get(&id).cloned()
864    }
865
866    /// Remove a task.
867    fn remove(&self, id: u128) {
868        self.running.lock().remove(&id);
869    }
870
871    /// Clear all tasks.
872    fn clear(&self) -> Vec<Arc<Task>> {
873        // Clear ready
874        self.ready.lock().clear();
875
876        // Clear running tasks
877        let running: BTreeMap<u128, Arc<Task>> = {
878            let mut running = self.running.lock();
879            take(&mut *running)
880        };
881        running.into_values().collect()
882    }
883}
884
885type Network = MeteredNetwork<AuditedNetwork<DeterministicNetwork>>;
886type Storage = MeteredStorage<AuditedStorage<FaultyStorage<MemStorage>>>;
887
888/// Implementation of [crate::Spawner], [crate::Clock],
889/// [crate::Network], and [crate::Storage] for the `deterministic`
890/// runtime.
891pub struct Context {
892    name: String,
893    attributes: Vec<(String, String)>,
894    scope: Option<Arc<ScopeGuard>>,
895    executor: Weak<Executor>,
896    network: Arc<Network>,
897    storage: Arc<Storage>,
898    network_buffer_pool: BufferPool,
899    storage_buffer_pool: BufferPool,
900    tree: Arc<Tree>,
901    execution: Execution,
902    traced: bool,
903}
904
905impl Clone for Context {
906    fn clone(&self) -> Self {
907        let (child, _) = Tree::child(&self.tree);
908        Self {
909            name: self.name.clone(),
910            attributes: self.attributes.clone(),
911            scope: self.scope.clone(),
912            executor: self.executor.clone(),
913            network: self.network.clone(),
914            storage: self.storage.clone(),
915            network_buffer_pool: self.network_buffer_pool.clone(),
916            storage_buffer_pool: self.storage_buffer_pool.clone(),
917
918            tree: child,
919            execution: Execution::default(),
920            traced: false,
921        }
922    }
923}
924
925impl Context {
926    fn new(cfg: Config) -> (Self, Arc<Executor>, Panicked) {
927        // Create a new registry
928        let mut registry = Registry::new();
929        let runtime_registry = registry.root_mut().sub_registry_with_prefix(METRICS_PREFIX);
930
931        // Initialize runtime
932        let metrics = Arc::new(Metrics::init(runtime_registry));
933        let start_time = cfg.start_time;
934        let deadline = cfg
935            .timeout
936            .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
937        let auditor = Arc::new(Auditor::default());
938
939        // Create shared RNG (used by both executor and storage)
940        let rng = Arc::new(Mutex::new(cfg.rng));
941
942        // Initialize buffer pools
943        let network_buffer_pool = BufferPool::new(
944            cfg.network_buffer_pool_cfg.clone(),
945            runtime_registry.sub_registry_with_prefix("network_buffer_pool"),
946        );
947        let storage_buffer_pool = BufferPool::new(
948            cfg.storage_buffer_pool_cfg.clone(),
949            runtime_registry.sub_registry_with_prefix("storage_buffer_pool"),
950        );
951
952        // Create storage fault config (default to disabled if None)
953        let storage_fault_config = Arc::new(RwLock::new(cfg.storage_fault_cfg));
954        let storage = MeteredStorage::new(
955            AuditedStorage::new(
956                FaultyStorage::new(
957                    MemStorage::new(storage_buffer_pool.clone()),
958                    rng.clone(),
959                    storage_fault_config,
960                ),
961                auditor.clone(),
962            ),
963            runtime_registry,
964        );
965
966        // Create network
967        let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
968        let network = MeteredNetwork::new(network, runtime_registry);
969
970        // Initialize panicker
971        let (panicker, panicked) = Panicker::new(cfg.catch_panics);
972
973        let executor = Arc::new(Executor {
974            registry: Mutex::new(registry),
975            registered_metrics: Mutex::new(HashSet::new()),
976            cycle: cfg.cycle,
977            deadline,
978            metrics,
979            auditor,
980            rng,
981            time: Mutex::new(start_time),
982            tasks: Arc::new(Tasks::new()),
983            sleeping: Mutex::new(BinaryHeap::new()),
984            shutdown: Mutex::new(Stopper::default()),
985            panicker,
986            dns: Mutex::new(HashMap::new()),
987        });
988
989        (
990            Self {
991                name: String::new(),
992                attributes: Vec::new(),
993                scope: None,
994                executor: Arc::downgrade(&executor),
995                network: Arc::new(network),
996                storage: Arc::new(storage),
997                network_buffer_pool,
998                storage_buffer_pool,
999                tree: Tree::root(),
1000                execution: Execution::default(),
1001                traced: false,
1002            },
1003            executor,
1004            panicked,
1005        )
1006    }
1007
1008    /// Recover the inner state (deadline, metrics, auditor, rng, synced storage, etc.) from the
1009    /// current runtime and use it to initialize a new instance of the runtime. A recovered runtime
1010    /// does not inherit the current runtime's pending tasks, unsynced storage, network connections, nor
1011    /// its shutdown signaler.
1012    ///
1013    /// This is useful for performing a deterministic simulation that spans multiple runtime instantiations,
1014    /// like simulating unclean shutdown (which involves repeatedly halting the runtime at unexpected intervals).
1015    ///
1016    /// It is only permitted to call this method after the runtime has finished (i.e. once `start` returns)
1017    /// and only permitted to do once (otherwise multiple recovered runtimes will share the same inner state).
1018    /// If either one of these conditions is violated, this method will panic.
1019    fn recover(checkpoint: Checkpoint) -> (Self, Arc<Executor>, Panicked) {
1020        // Rebuild metrics
1021        let mut registry = Registry::new();
1022        let runtime_registry = registry.root_mut().sub_registry_with_prefix(METRICS_PREFIX);
1023        let metrics = Arc::new(Metrics::init(runtime_registry));
1024
1025        // Copy state
1026        let network =
1027            AuditedNetwork::new(DeterministicNetwork::default(), checkpoint.auditor.clone());
1028        let network = MeteredNetwork::new(network, runtime_registry);
1029
1030        // Initialize buffer pools
1031        let network_buffer_pool = BufferPool::new(
1032            checkpoint.network_buffer_pool_cfg.clone(),
1033            runtime_registry.sub_registry_with_prefix("network_buffer_pool"),
1034        );
1035        let storage_buffer_pool = BufferPool::new(
1036            checkpoint.storage_buffer_pool_cfg.clone(),
1037            runtime_registry.sub_registry_with_prefix("storage_buffer_pool"),
1038        );
1039
1040        // Initialize panicker
1041        let (panicker, panicked) = Panicker::new(checkpoint.catch_panics);
1042
1043        let executor = Arc::new(Executor {
1044            // Copied from the checkpoint
1045            cycle: checkpoint.cycle,
1046            deadline: checkpoint.deadline,
1047            auditor: checkpoint.auditor,
1048            rng: checkpoint.rng,
1049            time: checkpoint.time,
1050            dns: checkpoint.dns,
1051
1052            // New state for the new runtime
1053            registry: Mutex::new(registry),
1054            registered_metrics: Mutex::new(HashSet::new()),
1055            metrics,
1056            tasks: Arc::new(Tasks::new()),
1057            sleeping: Mutex::new(BinaryHeap::new()),
1058            shutdown: Mutex::new(Stopper::default()),
1059            panicker,
1060        });
1061        (
1062            Self {
1063                name: String::new(),
1064                attributes: Vec::new(),
1065                scope: None,
1066                executor: Arc::downgrade(&executor),
1067                network: Arc::new(network),
1068                storage: checkpoint.storage,
1069                network_buffer_pool,
1070                storage_buffer_pool,
1071                tree: Tree::root(),
1072                execution: Execution::default(),
1073                traced: false,
1074            },
1075            executor,
1076            panicked,
1077        )
1078    }
1079
1080    /// Upgrade Weak reference to [Executor].
1081    fn executor(&self) -> Arc<Executor> {
1082        self.executor.upgrade().expect("executor already dropped")
1083    }
1084
1085    /// Get a reference to [Metrics].
1086    fn metrics(&self) -> Arc<Metrics> {
1087        self.executor().metrics.clone()
1088    }
1089
1090    /// Get a reference to the [Auditor].
1091    pub fn auditor(&self) -> Arc<Auditor> {
1092        self.executor().auditor.clone()
1093    }
1094
1095    /// Compute a [Sha256] digest of all storage contents.
1096    pub fn storage_audit(&self) -> Digest {
1097        self.storage.inner().inner().inner().audit()
1098    }
1099
1100    /// Access the storage fault configuration.
1101    ///
1102    /// Changes to the returned [`FaultConfig`] take effect immediately for
1103    /// subsequent storage operations. This allows dynamically enabling or
1104    /// disabling fault injection during a test.
1105    pub fn storage_fault_config(&self) -> Arc<RwLock<FaultConfig>> {
1106        self.storage.inner().inner().config()
1107    }
1108
1109    /// Register a DNS mapping for a hostname.
1110    ///
1111    /// If `addrs` is `None`, the mapping is removed.
1112    /// If `addrs` is `Some`, the mapping is added or updated.
1113    pub fn resolver_register(&self, host: impl Into<String>, addrs: Option<Vec<IpAddr>>) {
1114        // Update the auditor
1115        let executor = self.executor();
1116        let host = host.into();
1117        executor.auditor.event(b"resolver_register", |hasher| {
1118            hasher.update(host.as_bytes());
1119            hasher.update(addrs.encode());
1120        });
1121
1122        // Update the DNS mapping
1123        let mut dns = executor.dns.lock();
1124        match addrs {
1125            Some(addrs) => {
1126                dns.insert(host, addrs);
1127            }
1128            None => {
1129                dns.remove(&host);
1130            }
1131        }
1132    }
1133}
1134
1135impl crate::Spawner for Context {
1136    fn dedicated(mut self) -> Self {
1137        self.execution = Execution::Dedicated;
1138        self
1139    }
1140
1141    fn shared(mut self, blocking: bool) -> Self {
1142        self.execution = Execution::Shared(blocking);
1143        self
1144    }
1145
1146    fn spawn<F, Fut, T>(mut self, f: F) -> Handle<T>
1147    where
1148        F: FnOnce(Self) -> Fut + Send + 'static,
1149        Fut: Future<Output = T> + Send + 'static,
1150        T: Send + 'static,
1151    {
1152        // Get metrics
1153        let (label, metric) = spawn_metrics!(self);
1154
1155        // Track supervision before resetting configuration
1156        let parent = Arc::clone(&self.tree);
1157        let traced = self.traced;
1158        self.execution = Execution::default();
1159        self.traced = false;
1160        let (child, aborted) = Tree::child(&parent);
1161        if aborted {
1162            return Handle::closed(metric);
1163        }
1164        self.tree = child;
1165
1166        // Spawn the task (we don't care about Model)
1167        let executor = self.executor();
1168        let future = if traced {
1169            let span = info_span!(parent: None, "task", name = %label.name());
1170            for (key, value) in &self.attributes {
1171                span.set_attribute(key.clone(), value.clone());
1172            }
1173            Either::Left(f(self).instrument(span))
1174        } else {
1175            Either::Right(f(self))
1176        };
1177        let (f, handle) = Handle::init(
1178            future,
1179            metric,
1180            executor.panicker.clone(),
1181            Arc::clone(&parent),
1182        );
1183        Tasks::register_work(&executor.tasks, label, Box::pin(f));
1184
1185        // Register the task on the parent
1186        if let Some(aborter) = handle.aborter() {
1187            parent.register(aborter);
1188        }
1189
1190        handle
1191    }
1192
1193    async fn stop(self, value: i32, timeout: Option<Duration>) -> Result<(), Error> {
1194        let executor = self.executor();
1195        executor.auditor.event(b"stop", |hasher| {
1196            hasher.update(value.to_be_bytes());
1197        });
1198        let stop_resolved = {
1199            let mut shutdown = executor.shutdown.lock();
1200            shutdown.stop(value)
1201        };
1202
1203        // Wait for all tasks to complete or the timeout to fire
1204        let timeout_future = timeout.map_or_else(
1205            || futures::future::Either::Right(futures::future::pending()),
1206            |duration| futures::future::Either::Left(self.sleep(duration)),
1207        );
1208        select! {
1209            result = stop_resolved => {
1210                result.map_err(|_| Error::Closed)?;
1211                Ok(())
1212            },
1213            _ = timeout_future => Err(Error::Timeout),
1214        }
1215    }
1216
1217    fn stopped(&self) -> Signal {
1218        let executor = self.executor();
1219        executor.auditor.event(b"stopped", |_| {});
1220        let stopped = executor.shutdown.lock().stopped();
1221        stopped
1222    }
1223}
1224
1225impl crate::ThreadPooler for Context {
1226    fn create_thread_pool(
1227        &self,
1228        concurrency: NonZeroUsize,
1229    ) -> Result<ThreadPool, ThreadPoolBuildError> {
1230        let mut builder = ThreadPoolBuilder::new().num_threads(concurrency.get());
1231
1232        if rayon::current_thread_index().is_none() {
1233            builder = builder.use_current_thread()
1234        }
1235
1236        builder
1237            .spawn_handler(move |thread| {
1238                self.with_label("rayon_thread")
1239                    .dedicated()
1240                    .spawn(move |_| async move { thread.run() });
1241                Ok(())
1242            })
1243            .build()
1244            .map(Arc::new)
1245    }
1246}
1247
1248impl crate::Metrics for Context {
1249    fn label(&self) -> String {
1250        self.name.clone()
1251    }
1252
1253    fn with_label(&self, label: &str) -> Self {
1254        // Validate label format (must match [a-zA-Z][a-zA-Z0-9_]*)
1255        validate_label(label);
1256
1257        // Construct the full label name
1258        let name = {
1259            let prefix = self.name.clone();
1260            if prefix.is_empty() {
1261                label.to_string()
1262            } else {
1263                format!("{prefix}_{label}")
1264            }
1265        };
1266        assert!(
1267            !name.starts_with(METRICS_PREFIX),
1268            "using runtime label is not allowed"
1269        );
1270        Self {
1271            name,
1272            ..self.clone()
1273        }
1274    }
1275
1276    fn with_attribute(&self, key: &str, value: impl std::fmt::Display) -> Self {
1277        // Validate label format (must match [a-zA-Z][a-zA-Z0-9_]*)
1278        validate_label(key);
1279
1280        // Add the attribute to the list of attributes
1281        let mut attributes = self.attributes.clone();
1282        assert!(
1283            add_attribute(&mut attributes, key, value),
1284            "duplicate attribute key: {key}"
1285        );
1286        Self {
1287            attributes,
1288            ..self.clone()
1289        }
1290    }
1291
1292    fn with_scope(&self) -> Self {
1293        let executor = self.executor();
1294        executor.auditor.event(b"with_scope", |_| {});
1295
1296        // If already scoped, inherit the existing scope
1297        if self.scope.is_some() {
1298            return self.clone();
1299        }
1300
1301        // RAII guard removes the scoped registry when all clones drop.
1302        let weak = self.executor.clone();
1303        let scope_id = executor.registry.lock().create_scope();
1304        let guard = Arc::new(ScopeGuard::new(scope_id, move |id| {
1305            if let Some(exec) = weak.upgrade() {
1306                exec.registry.lock().remove_scope(id);
1307            }
1308        }));
1309        Self {
1310            scope: Some(guard),
1311            ..self.clone()
1312        }
1313    }
1314
1315    fn with_span(&self) -> Self {
1316        Self {
1317            traced: true,
1318            ..self.clone()
1319        }
1320    }
1321
1322    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
1323        // Prepare args
1324        let name = name.into();
1325        let help = help.into();
1326
1327        // Name metric
1328        let executor = self.executor();
1329        executor.auditor.event(b"register", |hasher| {
1330            hasher.update(name.as_bytes());
1331            hasher.update(help.as_bytes());
1332            for (k, v) in &self.attributes {
1333                hasher.update(k.as_bytes());
1334                hasher.update(v.as_bytes());
1335            }
1336        });
1337        let prefixed_name = {
1338            let prefix = &self.name;
1339            if prefix.is_empty() {
1340                name
1341            } else {
1342                format!("{}_{}", *prefix, name)
1343            }
1344        };
1345
1346        // Check for duplicate registration (O(1) lookup)
1347        let metric_key = (prefixed_name.clone(), self.attributes.clone());
1348        let is_new = executor.registered_metrics.lock().insert(metric_key);
1349        assert!(
1350            is_new,
1351            "duplicate metric: {} with attributes {:?}",
1352            prefixed_name, self.attributes
1353        );
1354
1355        // Route to the appropriate registry (root or scoped)
1356        let mut registry = executor.registry.lock();
1357        let scoped = registry.get_scope(self.scope.as_ref().map(|s| s.scope_id()));
1358        let sub_registry = self
1359            .attributes
1360            .iter()
1361            .fold(scoped, |reg, (k, v): &(String, String)| {
1362                reg.sub_registry_with_label((Cow::Owned(k.clone()), Cow::Owned(v.clone())))
1363            });
1364        sub_registry.register(prefixed_name, help, metric);
1365    }
1366
1367    fn encode(&self) -> String {
1368        let executor = self.executor();
1369        executor.auditor.event(b"encode", |_| {});
1370        let encoded = executor.registry.lock().encode();
1371        encoded
1372    }
1373}
1374
1375struct Sleeper {
1376    executor: Weak<Executor>,
1377    time: SystemTime,
1378    registered: bool,
1379}
1380
1381impl Sleeper {
1382    /// Upgrade Weak reference to [Executor].
1383    fn executor(&self) -> Arc<Executor> {
1384        self.executor.upgrade().expect("executor already dropped")
1385    }
1386}
1387
1388struct Alarm {
1389    time: SystemTime,
1390    waker: Waker,
1391}
1392
1393impl PartialEq for Alarm {
1394    fn eq(&self, other: &Self) -> bool {
1395        self.time.eq(&other.time)
1396    }
1397}
1398
1399impl Eq for Alarm {}
1400
1401impl PartialOrd for Alarm {
1402    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1403        Some(self.cmp(other))
1404    }
1405}
1406
1407impl Ord for Alarm {
1408    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1409        // Reverse the ordering for min-heap
1410        other.time.cmp(&self.time)
1411    }
1412}
1413
1414impl Future for Sleeper {
1415    type Output = ();
1416
1417    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1418        let executor = self.executor();
1419        {
1420            let current_time = *executor.time.lock();
1421            if current_time >= self.time {
1422                return Poll::Ready(());
1423            }
1424        }
1425        if !self.registered {
1426            self.registered = true;
1427            executor.sleeping.lock().push(Alarm {
1428                time: self.time,
1429                waker: cx.waker().clone(),
1430            });
1431        }
1432        Poll::Pending
1433    }
1434}
1435
1436impl Clock for Context {
1437    fn current(&self) -> SystemTime {
1438        *self.executor().time.lock()
1439    }
1440
1441    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
1442        let deadline = self
1443            .current()
1444            .checked_add(duration)
1445            .expect("overflow when setting wake time");
1446        self.sleep_until(deadline)
1447    }
1448
1449    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
1450        Sleeper {
1451            executor: self.executor.clone(),
1452
1453            time: deadline,
1454            registered: false,
1455        }
1456    }
1457}
1458
1459/// A future that resolves when a given target time is reached.
1460///
1461/// If the future is not ready at the target time, the future is blocked until the target time is reached.
1462#[cfg(feature = "external")]
1463#[pin_project]
1464struct Waiter<F: Future> {
1465    executor: Weak<Executor>,
1466    target: SystemTime,
1467    #[pin]
1468    future: F,
1469    ready: Option<F::Output>,
1470    started: bool,
1471    registered: bool,
1472}
1473
1474#[cfg(feature = "external")]
1475impl<F> Future for Waiter<F>
1476where
1477    F: Future + Send,
1478{
1479    type Output = F::Output;
1480
1481    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1482        let mut this = self.project();
1483
1484        // Poll once with a noop waker so the future can register interest or start work
1485        // without being able to wake this task before the sampled delay expires. Any ready
1486        // value is cached and only released after the clock reaches `self.target`.
1487        if !*this.started {
1488            *this.started = true;
1489            let waker = noop_waker();
1490            let mut cx_noop = task::Context::from_waker(&waker);
1491            if let Poll::Ready(value) = this.future.as_mut().poll(&mut cx_noop) {
1492                *this.ready = Some(value);
1493            }
1494        }
1495
1496        // Only allow the task to progress once the sampled delay has elapsed.
1497        let executor = this.executor.upgrade().expect("executor already dropped");
1498        let current_time = *executor.time.lock();
1499        if current_time < *this.target {
1500            // Register exactly once with the deterministic sleeper queue so the executor
1501            // wakes us once the clock reaches the scheduled target time.
1502            if !*this.registered {
1503                *this.registered = true;
1504                executor.sleeping.lock().push(Alarm {
1505                    time: *this.target,
1506                    waker: cx.waker().clone(),
1507                });
1508            }
1509            return Poll::Pending;
1510        }
1511
1512        // If the underlying future completed during the noop pre-poll, surface the cached value.
1513        if let Some(value) = this.ready.take() {
1514            return Poll::Ready(value);
1515        }
1516
1517        // Block the current thread until the future reschedules itself, keeping polling
1518        // deterministic with respect to executor time.
1519        let blocker = Blocker::new();
1520        loop {
1521            let waker = waker(blocker.clone());
1522            let mut cx_block = task::Context::from_waker(&waker);
1523            match this.future.as_mut().poll(&mut cx_block) {
1524                Poll::Ready(value) => {
1525                    break Poll::Ready(value);
1526                }
1527                Poll::Pending => blocker.wait(),
1528            }
1529        }
1530    }
1531}
1532
1533#[cfg(feature = "external")]
1534impl Pacer for Context {
1535    fn pace<'a, F, T>(&'a self, latency: Duration, future: F) -> impl Future<Output = T> + Send + 'a
1536    where
1537        F: Future<Output = T> + Send + 'a,
1538        T: Send + 'a,
1539    {
1540        // Compute target time
1541        let target = self
1542            .executor()
1543            .time
1544            .lock()
1545            .checked_add(latency)
1546            .expect("overflow when setting wake time");
1547
1548        Waiter {
1549            executor: self.executor.clone(),
1550            target,
1551            future,
1552            ready: None,
1553            started: false,
1554            registered: false,
1555        }
1556    }
1557}
1558
1559impl GClock for Context {
1560    type Instant = SystemTime;
1561
1562    fn now(&self) -> Self::Instant {
1563        self.current()
1564    }
1565}
1566
1567impl ReasonablyRealtime for Context {}
1568
1569impl crate::Network for Context {
1570    type Listener = ListenerOf<Network>;
1571
1572    async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
1573        self.network.bind(socket).await
1574    }
1575
1576    async fn dial(
1577        &self,
1578        socket: SocketAddr,
1579    ) -> Result<(crate::SinkOf<Self>, crate::StreamOf<Self>), Error> {
1580        self.network.dial(socket).await
1581    }
1582}
1583
1584impl crate::Resolver for Context {
1585    async fn resolve(&self, host: &str) -> Result<Vec<IpAddr>, Error> {
1586        // Get the record
1587        let executor = self.executor();
1588        let dns = executor.dns.lock();
1589        let result = dns.get(host).cloned();
1590        drop(dns);
1591
1592        // Update the auditor
1593        executor.auditor.event(b"resolve", |hasher| {
1594            hasher.update(host.as_bytes());
1595            hasher.update(result.encode());
1596        });
1597        result.ok_or_else(|| Error::ResolveFailed(host.to_string()))
1598    }
1599}
1600
1601impl RngCore for Context {
1602    fn next_u32(&mut self) -> u32 {
1603        let executor = self.executor();
1604        executor.auditor.event(b"rand", |hasher| {
1605            hasher.update(b"next_u32");
1606        });
1607        let result = executor.rng.lock().next_u32();
1608        result
1609    }
1610
1611    fn next_u64(&mut self) -> u64 {
1612        let executor = self.executor();
1613        executor.auditor.event(b"rand", |hasher| {
1614            hasher.update(b"next_u64");
1615        });
1616        let result = executor.rng.lock().next_u64();
1617        result
1618    }
1619
1620    fn fill_bytes(&mut self, dest: &mut [u8]) {
1621        let executor = self.executor();
1622        executor.auditor.event(b"rand", |hasher| {
1623            hasher.update(b"fill_bytes");
1624        });
1625        executor.rng.lock().fill_bytes(dest);
1626    }
1627
1628    fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1629        let executor = self.executor();
1630        executor.auditor.event(b"rand", |hasher| {
1631            hasher.update(b"try_fill_bytes");
1632        });
1633        let result = executor.rng.lock().try_fill_bytes(dest);
1634        result
1635    }
1636}
1637
1638impl CryptoRng for Context {}
1639
1640impl crate::Storage for Context {
1641    type Blob = <Storage as crate::Storage>::Blob;
1642
1643    async fn open_versioned(
1644        &self,
1645        partition: &str,
1646        name: &[u8],
1647        versions: std::ops::RangeInclusive<u16>,
1648    ) -> Result<(Self::Blob, u64, u16), Error> {
1649        self.storage.open_versioned(partition, name, versions).await
1650    }
1651
1652    async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1653        self.storage.remove(partition, name).await
1654    }
1655
1656    async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1657        self.storage.scan(partition).await
1658    }
1659}
1660
1661impl crate::BufferPooler for Context {
1662    fn network_buffer_pool(&self) -> &crate::BufferPool {
1663        &self.network_buffer_pool
1664    }
1665
1666    fn storage_buffer_pool(&self) -> &crate::BufferPool {
1667        &self.storage_buffer_pool
1668    }
1669}
1670
1671#[cfg(test)]
1672mod tests {
1673    use super::*;
1674    #[cfg(feature = "external")]
1675    use crate::FutureExt;
1676    #[cfg(feature = "external")]
1677    use crate::Spawner;
1678    use crate::{deterministic, reschedule, Blob, Metrics, Resolver, Runner as _, Storage};
1679    use commonware_macros::test_traced;
1680    #[cfg(feature = "external")]
1681    use commonware_utils::channel::mpsc;
1682    use commonware_utils::channel::oneshot;
1683    #[cfg(not(feature = "external"))]
1684    use futures::future::pending;
1685    #[cfg(not(feature = "external"))]
1686    use futures::stream::StreamExt as _;
1687    #[cfg(feature = "external")]
1688    use futures::StreamExt;
1689    use futures::{stream::FuturesUnordered, task::noop_waker};
1690
1691    async fn task(i: usize) -> usize {
1692        for _ in 0..5 {
1693            reschedule().await;
1694        }
1695        i
1696    }
1697
1698    fn run_tasks(tasks: usize, runner: deterministic::Runner) -> (String, Vec<usize>) {
1699        runner.start(|context| async move {
1700            let mut handles = FuturesUnordered::new();
1701            for i in 0..=tasks - 1 {
1702                handles.push(context.clone().spawn(move |_| task(i)));
1703            }
1704
1705            let mut outputs = Vec::new();
1706            while let Some(result) = handles.next().await {
1707                outputs.push(result.unwrap());
1708            }
1709            assert_eq!(outputs.len(), tasks);
1710            (context.auditor().state(), outputs)
1711        })
1712    }
1713
1714    fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1715        let executor = deterministic::Runner::seeded(seed);
1716        run_tasks(5, executor)
1717    }
1718
1719    #[test]
1720    fn test_same_seed_same_order() {
1721        // Generate initial outputs
1722        let mut outputs = Vec::new();
1723        for seed in 0..1000 {
1724            let output = run_with_seed(seed);
1725            outputs.push(output);
1726        }
1727
1728        // Ensure they match
1729        for seed in 0..1000 {
1730            let output = run_with_seed(seed);
1731            assert_eq!(output, outputs[seed as usize]);
1732        }
1733    }
1734
1735    #[test_traced("TRACE")]
1736    fn test_different_seeds_different_order() {
1737        let output1 = run_with_seed(12345);
1738        let output2 = run_with_seed(54321);
1739        assert_ne!(output1, output2);
1740    }
1741
1742    #[test]
1743    fn test_alarm_min_heap() {
1744        // Populate heap
1745        let now = SystemTime::now();
1746        let alarms = vec![
1747            Alarm {
1748                time: now + Duration::new(10, 0),
1749                waker: noop_waker(),
1750            },
1751            Alarm {
1752                time: now + Duration::new(5, 0),
1753                waker: noop_waker(),
1754            },
1755            Alarm {
1756                time: now + Duration::new(15, 0),
1757                waker: noop_waker(),
1758            },
1759            Alarm {
1760                time: now + Duration::new(5, 0),
1761                waker: noop_waker(),
1762            },
1763        ];
1764        let mut heap = BinaryHeap::new();
1765        for alarm in alarms {
1766            heap.push(alarm);
1767        }
1768
1769        // Verify min-heap
1770        let mut sorted_times = Vec::new();
1771        while let Some(alarm) = heap.pop() {
1772            sorted_times.push(alarm.time);
1773        }
1774        assert_eq!(
1775            sorted_times,
1776            vec![
1777                now + Duration::new(5, 0),
1778                now + Duration::new(5, 0),
1779                now + Duration::new(10, 0),
1780                now + Duration::new(15, 0),
1781            ]
1782        );
1783    }
1784
1785    #[test]
1786    #[should_panic(expected = "runtime timeout")]
1787    fn test_timeout() {
1788        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1789        executor.start(|context| async move {
1790            loop {
1791                context.sleep(Duration::from_secs(1)).await;
1792            }
1793        });
1794    }
1795
1796    #[test]
1797    #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1798    fn test_bad_timeout() {
1799        let cfg = Config {
1800            timeout: Some(Duration::default()),
1801            cycle: Duration::default(),
1802            ..Config::default()
1803        };
1804        deterministic::Runner::new(cfg);
1805    }
1806
1807    #[test]
1808    #[should_panic(
1809        expected = "cycle duration must be greater than or equal to system time precision"
1810    )]
1811    fn test_bad_cycle() {
1812        let cfg = Config {
1813            cycle: SYSTEM_TIME_PRECISION - Duration::from_nanos(1),
1814            ..Config::default()
1815        };
1816        deterministic::Runner::new(cfg);
1817    }
1818
1819    #[test]
1820    fn test_recover_synced_storage_persists() {
1821        // Initialize the first runtime
1822        let executor1 = deterministic::Runner::default();
1823        let partition = "test_partition";
1824        let name = b"test_blob";
1825        let data = b"Hello, world!";
1826
1827        // Run some tasks, sync storage, and recover the runtime
1828        let (state, checkpoint) = executor1.start_and_recover(|context| async move {
1829            let (blob, _) = context.open(partition, name).await.unwrap();
1830            blob.write_at(0, data).await.unwrap();
1831            blob.sync().await.unwrap();
1832            context.auditor().state()
1833        });
1834
1835        // Verify auditor state is the same
1836        assert_eq!(state, checkpoint.auditor.state());
1837
1838        // Check that synced storage persists after recovery
1839        let executor = Runner::from(checkpoint);
1840        executor.start(|context| async move {
1841            let (blob, len) = context.open(partition, name).await.unwrap();
1842            assert_eq!(len, data.len() as u64);
1843            let read = blob.read_at(0, data.len()).await.unwrap();
1844            assert_eq!(read.coalesce(), data);
1845        });
1846    }
1847
1848    #[test]
1849    #[should_panic(expected = "goodbye")]
1850    fn test_recover_panic_handling() {
1851        // Initialize the first runtime
1852        let executor1 = deterministic::Runner::default();
1853        let (_, checkpoint) = executor1.start_and_recover(|_| async move {
1854            reschedule().await;
1855        });
1856
1857        // Ensure that panic setting is preserved
1858        let executor = Runner::from(checkpoint);
1859        executor.start(|_| async move {
1860            panic!("goodbye");
1861        });
1862    }
1863
1864    #[test]
1865    fn test_recover_unsynced_storage_does_not_persist() {
1866        // Initialize the first runtime
1867        let executor = deterministic::Runner::default();
1868        let partition = "test_partition";
1869        let name = b"test_blob";
1870        let data = b"Hello, world!";
1871
1872        // Run some tasks without syncing storage
1873        let (_, checkpoint) = executor.start_and_recover(|context| async move {
1874            let context = context.clone();
1875            let (blob, _) = context.open(partition, name).await.unwrap();
1876            blob.write_at(0, data).await.unwrap();
1877        });
1878
1879        // Recover the runtime
1880        let executor = Runner::from(checkpoint);
1881
1882        // Check that unsynced storage does not persist after recovery
1883        executor.start(|context| async move {
1884            let (_, len) = context.open(partition, name).await.unwrap();
1885            assert_eq!(len, 0);
1886        });
1887    }
1888
1889    #[test]
1890    fn test_recover_dns_mappings_persist() {
1891        // Initialize the first runtime
1892        let executor = deterministic::Runner::default();
1893        let host = "example.com";
1894        let addrs = vec![
1895            IpAddr::V4(std::net::Ipv4Addr::new(192, 168, 1, 1)),
1896            IpAddr::V4(std::net::Ipv4Addr::new(192, 168, 1, 2)),
1897        ];
1898
1899        // Register DNS mapping and recover the runtime
1900        let (state, checkpoint) = executor.start_and_recover({
1901            let addrs = addrs.clone();
1902            |context| async move {
1903                context.resolver_register(host, Some(addrs));
1904                context.auditor().state()
1905            }
1906        });
1907
1908        // Verify auditor state is the same
1909        assert_eq!(state, checkpoint.auditor.state());
1910
1911        // Check that DNS mappings persist after recovery
1912        let executor = Runner::from(checkpoint);
1913        executor.start(move |context| async move {
1914            let resolved = context.resolve(host).await.unwrap();
1915            assert_eq!(resolved, addrs);
1916        });
1917    }
1918
1919    #[test]
1920    fn test_recover_time_persists() {
1921        // Initialize the first runtime
1922        let executor = deterministic::Runner::default();
1923        let duration_to_sleep = Duration::from_secs(10);
1924
1925        // Sleep for some time and recover the runtime
1926        let (time_before_recovery, checkpoint) = executor.start_and_recover(|context| async move {
1927            context.sleep(duration_to_sleep).await;
1928            context.current()
1929        });
1930
1931        // Check that the time advanced correctly before recovery
1932        assert_eq!(
1933            time_before_recovery.duration_since(UNIX_EPOCH).unwrap(),
1934            duration_to_sleep
1935        );
1936
1937        // Check that the time persists after recovery
1938        let executor2 = Runner::from(checkpoint);
1939        executor2.start(move |context| async move {
1940            assert_eq!(context.current(), time_before_recovery);
1941
1942            // Advance time further
1943            context.sleep(duration_to_sleep).await;
1944            assert_eq!(
1945                context.current().duration_since(UNIX_EPOCH).unwrap(),
1946                duration_to_sleep * 2
1947            );
1948        });
1949    }
1950
1951    #[test]
1952    #[should_panic(expected = "executor still has weak references")]
1953    fn test_context_return() {
1954        // Initialize runtime
1955        let executor = deterministic::Runner::default();
1956
1957        // Start runtime
1958        let context = executor.start(|context| async move {
1959            // Attempt to recover before the runtime has finished
1960            context
1961        });
1962
1963        // Should never get this far
1964        drop(context);
1965    }
1966
1967    #[test]
1968    fn test_default_time_zero() {
1969        // Initialize runtime
1970        let executor = deterministic::Runner::default();
1971
1972        executor.start(|context| async move {
1973            // Check that the time is zero
1974            assert_eq!(
1975                context.current().duration_since(UNIX_EPOCH).unwrap(),
1976                Duration::ZERO
1977            );
1978        });
1979    }
1980
1981    #[test]
1982    fn test_start_time() {
1983        // Initialize runtime with default config
1984        let executor_default = deterministic::Runner::default();
1985        executor_default.start(|context| async move {
1986            assert_eq!(context.current(), UNIX_EPOCH);
1987        });
1988
1989        // Initialize runtime with custom start time
1990        let start_time = UNIX_EPOCH + Duration::from_secs(100);
1991        let cfg = Config::default().with_start_time(start_time);
1992        let executor = deterministic::Runner::new(cfg);
1993
1994        executor.start(move |context| async move {
1995            // Check that the time matches the custom start time
1996            assert_eq!(context.current(), start_time);
1997        });
1998    }
1999
2000    #[test]
2001    #[should_panic(expected = "start time must be greater than or equal to unix epoch")]
2002    fn test_bad_start_time() {
2003        let cfg = Config::default().with_start_time(UNIX_EPOCH - Duration::from_secs(1));
2004        deterministic::Runner::new(cfg);
2005    }
2006
2007    #[cfg(not(feature = "external"))]
2008    #[test]
2009    #[should_panic(expected = "runtime stalled")]
2010    fn test_stall() {
2011        // Initialize runtime
2012        let executor = deterministic::Runner::default();
2013
2014        // Start runtime
2015        executor.start(|_| async move {
2016            pending::<()>().await;
2017        });
2018    }
2019
2020    #[cfg(not(feature = "external"))]
2021    #[test]
2022    #[should_panic(expected = "runtime stalled")]
2023    fn test_external_simulated() {
2024        // Initialize runtime
2025        let executor = deterministic::Runner::default();
2026
2027        // Create a thread that waits for 1 second
2028        let (tx, rx) = oneshot::channel();
2029        std::thread::spawn(move || {
2030            std::thread::sleep(Duration::from_secs(1));
2031            tx.send(()).unwrap();
2032        });
2033
2034        // Start runtime
2035        executor.start(|_| async move {
2036            rx.await.unwrap();
2037        });
2038    }
2039
2040    #[cfg(feature = "external")]
2041    #[test]
2042    fn test_external_realtime() {
2043        // Initialize runtime
2044        let executor = deterministic::Runner::default();
2045
2046        // Create a thread that waits for 1 second
2047        let (tx, rx) = oneshot::channel();
2048        std::thread::spawn(move || {
2049            std::thread::sleep(Duration::from_secs(1));
2050            tx.send(()).unwrap();
2051        });
2052
2053        // Start runtime
2054        executor.start(|_| async move {
2055            rx.await.unwrap();
2056        });
2057    }
2058
2059    #[cfg(feature = "external")]
2060    #[test]
2061    fn test_external_realtime_variable() {
2062        // Initialize runtime
2063        let executor = deterministic::Runner::default();
2064
2065        // Start runtime
2066        executor.start(|context| async move {
2067            // Initialize test
2068            let start_real = SystemTime::now();
2069            let start_sim = context.current();
2070            let (first_tx, first_rx) = oneshot::channel();
2071            let (second_tx, second_rx) = oneshot::channel();
2072            let (results_tx, mut results_rx) = mpsc::channel(2);
2073
2074            // Create a thread that waits for 1 second
2075            let first_wait = Duration::from_secs(1);
2076            std::thread::spawn(move || {
2077                std::thread::sleep(first_wait);
2078                first_tx.send(()).unwrap();
2079            });
2080
2081            // Create a thread
2082            std::thread::spawn(move || {
2083                std::thread::sleep(Duration::ZERO);
2084                second_tx.send(()).unwrap();
2085            });
2086
2087            // Wait for a delay sampled before the external send occurs
2088            let first = context.clone().spawn({
2089                let results_tx = results_tx.clone();
2090                move |context| async move {
2091                    first_rx.pace(&context, Duration::ZERO).await.unwrap();
2092                    let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
2093                    assert!(elapsed_real > first_wait);
2094                    let elapsed_sim = context.current().duration_since(start_sim).unwrap();
2095                    assert!(elapsed_sim < first_wait);
2096                    results_tx.send(1).await.unwrap();
2097                }
2098            });
2099
2100            // Wait for a delay sampled after the external send occurs
2101            let second = context.clone().spawn(move |context| async move {
2102                second_rx.pace(&context, first_wait).await.unwrap();
2103                let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
2104                assert!(elapsed_real >= first_wait);
2105                let elapsed_sim = context.current().duration_since(start_sim).unwrap();
2106                assert!(elapsed_sim >= first_wait);
2107                results_tx.send(2).await.unwrap();
2108            });
2109
2110            // Wait for both tasks to complete
2111            second.await.unwrap();
2112            first.await.unwrap();
2113
2114            // Ensure order is correct
2115            let mut results = Vec::new();
2116            for _ in 0..2 {
2117                results.push(results_rx.recv().await.unwrap());
2118            }
2119            assert_eq!(results, vec![1, 2]);
2120        });
2121    }
2122
2123    #[cfg(not(feature = "external"))]
2124    #[test]
2125    fn test_simulated_skip() {
2126        // Initialize runtime
2127        let executor = deterministic::Runner::default();
2128
2129        // Start runtime
2130        executor.start(|context| async move {
2131            context.sleep(Duration::from_secs(1)).await;
2132
2133            // Check if we skipped
2134            let metrics = context.encode();
2135            let iterations = metrics
2136                .lines()
2137                .find_map(|line| {
2138                    line.strip_prefix("runtime_iterations_total ")
2139                        .and_then(|value| value.trim().parse::<u64>().ok())
2140                })
2141                .expect("missing runtime_iterations_total metric");
2142            assert!(iterations < 10);
2143        });
2144    }
2145
2146    #[cfg(feature = "external")]
2147    #[test]
2148    fn test_realtime_no_skip() {
2149        // Initialize runtime
2150        let executor = deterministic::Runner::default();
2151
2152        // Start runtime
2153        executor.start(|context| async move {
2154            context.sleep(Duration::from_secs(1)).await;
2155
2156            // Check if we skipped
2157            let metrics = context.encode();
2158            let iterations = metrics
2159                .lines()
2160                .find_map(|line| {
2161                    line.strip_prefix("runtime_iterations_total ")
2162                        .and_then(|value| value.trim().parse::<u64>().ok())
2163                })
2164                .expect("missing runtime_iterations_total metric");
2165            assert!(iterations > 500);
2166        });
2167    }
2168
2169    #[test]
2170    #[should_panic(expected = "label must start with [a-zA-Z]")]
2171    fn test_metrics_label_empty() {
2172        let executor = deterministic::Runner::default();
2173        executor.start(|context| async move {
2174            context.with_label("");
2175        });
2176    }
2177
2178    #[test]
2179    #[should_panic(expected = "label must start with [a-zA-Z]")]
2180    fn test_metrics_label_invalid_first_char() {
2181        let executor = deterministic::Runner::default();
2182        executor.start(|context| async move {
2183            context.with_label("1invalid");
2184        });
2185    }
2186
2187    #[test]
2188    #[should_panic(expected = "label must only contain [a-zA-Z0-9_]")]
2189    fn test_metrics_label_invalid_char() {
2190        let executor = deterministic::Runner::default();
2191        executor.start(|context| async move {
2192            context.with_label("invalid-label");
2193        });
2194    }
2195
2196    #[test]
2197    #[should_panic(expected = "using runtime label is not allowed")]
2198    fn test_metrics_label_reserved_prefix() {
2199        let executor = deterministic::Runner::default();
2200        executor.start(|context| async move {
2201            context.with_label(METRICS_PREFIX);
2202        });
2203    }
2204
2205    #[test]
2206    #[should_panic(expected = "duplicate attribute key: epoch")]
2207    fn test_metrics_duplicate_attribute_panics() {
2208        let executor = deterministic::Runner::default();
2209        executor.start(|context| async move {
2210            let _ = context
2211                .with_label("test")
2212                .with_attribute("epoch", "old")
2213                .with_attribute("epoch", "new");
2214        });
2215    }
2216
2217    #[test]
2218    fn test_storage_fault_injection_and_recovery() {
2219        // Phase 1: Run with 100% sync failure rate
2220        let cfg = deterministic::Config::default().with_storage_fault_config(FaultConfig {
2221            sync_rate: Some(1.0),
2222            ..Default::default()
2223        });
2224
2225        let (result, checkpoint) =
2226            deterministic::Runner::new(cfg).start_and_recover(|ctx| async move {
2227                let (blob, _) = ctx.open("test_fault", b"blob").await.unwrap();
2228                blob.write_at(0, b"data".to_vec()).await.unwrap();
2229                blob.sync().await // This should fail due to fault injection
2230            });
2231
2232        // Verify sync failed
2233        assert!(result.is_err());
2234
2235        // Phase 2: Recover and disable faults explicitly
2236        deterministic::Runner::from(checkpoint).start(|ctx| async move {
2237            // Explicitly disable faults for recovery verification
2238            *ctx.storage_fault_config().write() = FaultConfig::default();
2239
2240            // Data was not synced, so blob should be empty (unsynced writes are lost)
2241            let (blob, len) = ctx.open("test_fault", b"blob").await.unwrap();
2242            assert_eq!(len, 0, "unsynced data should be lost after recovery");
2243
2244            // Now we can write and sync successfully
2245            blob.write_at(0, b"recovered".to_vec()).await.unwrap();
2246            blob.sync()
2247                .await
2248                .expect("sync should succeed with faults disabled");
2249
2250            // Verify data persisted
2251            let read_buf = blob.read_at(0, 9).await.unwrap();
2252            assert_eq!(read_buf.coalesce(), b"recovered");
2253        });
2254    }
2255
2256    #[test]
2257    fn test_storage_fault_dynamic_config() {
2258        let executor = deterministic::Runner::default();
2259        executor.start(|ctx| async move {
2260            let (blob, _) = ctx.open("test_dynamic", b"blob").await.unwrap();
2261
2262            // Initially no faults - sync should succeed
2263            blob.write_at(0, b"initial".to_vec()).await.unwrap();
2264            blob.sync().await.expect("initial sync should succeed");
2265
2266            // Enable sync faults dynamically
2267            let storage_fault_cfg = ctx.storage_fault_config();
2268            storage_fault_cfg.write().sync_rate = Some(1.0);
2269
2270            // Now sync should fail
2271            blob.write_at(0, b"updated".to_vec()).await.unwrap();
2272            let result = blob.sync().await;
2273            assert!(result.is_err(), "sync should fail with faults enabled");
2274
2275            // Disable faults
2276            storage_fault_cfg.write().sync_rate = Some(0.0);
2277
2278            // Sync should succeed again
2279            blob.sync()
2280                .await
2281                .expect("sync should succeed with faults disabled");
2282        });
2283    }
2284
2285    #[test]
2286    fn test_storage_fault_determinism() {
2287        // Run the same sequence twice with the same seed
2288        fn run_with_seed(seed: u64) -> Vec<bool> {
2289            let cfg = deterministic::Config::default()
2290                .with_seed(seed)
2291                .with_storage_fault_config(FaultConfig {
2292                    open_rate: Some(0.5),
2293                    ..Default::default()
2294                });
2295
2296            let runner = deterministic::Runner::new(cfg);
2297            runner.start(|ctx| async move {
2298                let mut results = Vec::new();
2299                for i in 0..20 {
2300                    let name = format!("blob{i}");
2301                    let result = ctx.open("test_determinism", name.as_bytes()).await;
2302                    results.push(result.is_ok());
2303                }
2304                results
2305            })
2306        }
2307
2308        let results1 = run_with_seed(12345);
2309        let results2 = run_with_seed(12345);
2310        assert_eq!(
2311            results1, results2,
2312            "same seed should produce same failure pattern"
2313        );
2314
2315        let results3 = run_with_seed(99999);
2316        assert_ne!(
2317            results1, results3,
2318            "different seeds should produce different patterns"
2319        );
2320    }
2321
2322    #[test]
2323    fn test_storage_fault_determinism_multi_task() {
2324        // Run the same multi-task sequence twice with the same seed.
2325        // This tests that task shuffling + fault decisions interleave deterministically.
2326        fn run_with_seed(seed: u64) -> Vec<u32> {
2327            let cfg = deterministic::Config::default()
2328                .with_seed(seed)
2329                .with_storage_fault_config(FaultConfig {
2330                    open_rate: Some(0.5),
2331                    write_rate: Some(0.3),
2332                    sync_rate: Some(0.2),
2333                    ..Default::default()
2334                });
2335
2336            let runner = deterministic::Runner::new(cfg);
2337            runner.start(|ctx| async move {
2338                // Spawn multiple tasks that do storage operations
2339                let mut handles = Vec::new();
2340                for i in 0..5 {
2341                    let ctx = ctx.clone();
2342                    handles.push(ctx.spawn(move |ctx| async move {
2343                        let mut successes = 0u32;
2344                        for j in 0..4 {
2345                            let name = format!("task{i}_blob{j}");
2346                            if let Ok((blob, _)) = ctx.open("partition", name.as_bytes()).await {
2347                                successes += 1;
2348                                if blob.write_at(0, b"data".to_vec()).await.is_ok() {
2349                                    successes += 1;
2350                                }
2351                                if blob.sync().await.is_ok() {
2352                                    successes += 1;
2353                                }
2354                            }
2355                        }
2356                        successes
2357                    }));
2358                }
2359
2360                // Collect results from all tasks
2361                let mut results = Vec::new();
2362                for handle in handles {
2363                    results.push(handle.await.unwrap());
2364                }
2365                results
2366            })
2367        }
2368
2369        let results1 = run_with_seed(42);
2370        let results2 = run_with_seed(42);
2371        assert_eq!(
2372            results1, results2,
2373            "same seed should produce same multi-task pattern"
2374        );
2375
2376        let results3 = run_with_seed(99999);
2377        assert_ne!(
2378            results1, results3,
2379            "different seeds should produce different patterns"
2380        );
2381    }
2382}