Skip to main content

commonware_runtime/
deterministic.rs

1//! A deterministic runtime that randomly selects tasks to run based on a seed
2//!
3//! # Panics
4//!
5//! Unless configured otherwise, any task panic will lead to a runtime panic.
6//!
7//! # External Processes
8//!
9//! When testing an application that interacts with some external process, it can appear to
10//! the runtime that progress has stalled because no pending tasks can make progress and/or
11//! that futures resolve at variable latency (which in turn triggers non-deterministic execution).
12//!
13//! To support such applications, the runtime can be built with the `external` feature to both
14//! sleep for each [Config::cycle] (opting to wait if all futures are pending) and to constrain
15//! the resolution latency of any future (with `pace()`).
16//!
17//! **Applications that do not interact with external processes (or are able to mock them) should never
18//! need to enable this feature. It is commonly used when testing consensus with external execution environments
19//! that use their own runtime (but are deterministic over some set of inputs).**
20//!
21//! # Metrics
22//!
23//! This runtime enforces metrics are unique and well-formed:
24//! - Labels must start with `[a-zA-Z]` and contain only `[a-zA-Z0-9_]`
25//! - Metric names must be unique (panics on duplicate registration)
26//!
27//! # Example
28//!
29//! ```rust
30//! use commonware_runtime::{Spawner, Runner, deterministic, Metrics};
31//!
32//! let executor =  deterministic::Runner::default();
33//! executor.start(|context| async move {
34//!     println!("Parent started");
35//!     let result = context.with_label("child").spawn(|_| async move {
36//!         println!("Child started");
37//!         "hello"
38//!     });
39//!     println!("Child result: {:?}", result.await);
40//!     println!("Parent exited");
41//!     println!("Auditor state: {}", context.auditor().state());
42//! });
43//! ```
44
45pub use crate::storage::faulty::Config as FaultConfig;
46use crate::{
47    network::{
48        audited::Network as AuditedNetwork, deterministic::Network as DeterministicNetwork,
49        metered::Network as MeteredNetwork,
50    },
51    storage::{
52        audited::Storage as AuditedStorage, faulty::Storage as FaultyStorage,
53        memory::Storage as MemStorage, metered::Storage as MeteredStorage,
54    },
55    telemetry::metrics::task::Label,
56    utils::{
57        add_attribute,
58        signal::{Signal, Stopper},
59        supervision::Tree,
60        MetricEncoder, Panicker,
61    },
62    validate_label, BufferPool, BufferPoolConfig, Clock, Error, Execution, Handle, ListenerOf,
63    Metrics as _, Panicked, Spawner as _, METRICS_PREFIX,
64};
65#[cfg(feature = "external")]
66use crate::{Blocker, Pacer};
67use commonware_codec::Encode;
68use commonware_macros::select;
69use commonware_parallel::ThreadPool;
70#[cfg(miri)]
71use commonware_utils::NZUsize;
72use commonware_utils::{hex, time::SYSTEM_TIME_PRECISION, SystemTimeExt};
73#[cfg(feature = "external")]
74use futures::task::noop_waker;
75use futures::{
76    future::BoxFuture,
77    task::{waker, ArcWake},
78    Future, FutureExt,
79};
80use governor::clock::{Clock as GClock, ReasonablyRealtime};
81#[cfg(feature = "external")]
82use pin_project::pin_project;
83use prometheus_client::{
84    encoding::text::encode,
85    metrics::{counter::Counter, family::Family, gauge::Gauge},
86    registry::{Metric, Registry},
87};
88use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
89use rand_core::CryptoRngCore;
90use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
91use sha2::{Digest as _, Sha256};
92use std::{
93    borrow::Cow,
94    collections::{BTreeMap, BinaryHeap, HashMap, HashSet},
95    mem::{replace, take},
96    net::{IpAddr, SocketAddr},
97    num::NonZeroUsize,
98    panic::{catch_unwind, resume_unwind, AssertUnwindSafe},
99    pin::Pin,
100    sync::{Arc, Mutex, RwLock, Weak},
101    task::{self, Poll, Waker},
102    time::{Duration, SystemTime, UNIX_EPOCH},
103};
104use tracing::{info_span, trace, Instrument};
105use tracing_opentelemetry::OpenTelemetrySpanExt;
106
107#[derive(Debug)]
108struct Metrics {
109    iterations: Counter,
110    tasks_spawned: Family<Label, Counter>,
111    tasks_running: Family<Label, Gauge>,
112    task_polls: Family<Label, Counter>,
113
114    network_bandwidth: Counter,
115}
116
117impl Metrics {
118    pub fn init(registry: &mut Registry) -> Self {
119        let metrics = Self {
120            iterations: Counter::default(),
121            task_polls: Family::default(),
122            tasks_spawned: Family::default(),
123            tasks_running: Family::default(),
124            network_bandwidth: Counter::default(),
125        };
126        registry.register(
127            "iterations",
128            "Total number of iterations",
129            metrics.iterations.clone(),
130        );
131        registry.register(
132            "tasks_spawned",
133            "Total number of tasks spawned",
134            metrics.tasks_spawned.clone(),
135        );
136        registry.register(
137            "tasks_running",
138            "Number of tasks currently running",
139            metrics.tasks_running.clone(),
140        );
141        registry.register(
142            "task_polls",
143            "Total number of task polls",
144            metrics.task_polls.clone(),
145        );
146        registry.register(
147            "bandwidth",
148            "Total amount of data sent over network",
149            metrics.network_bandwidth.clone(),
150        );
151        metrics
152    }
153}
154
155/// A SHA-256 digest.
156type Digest = [u8; 32];
157
158/// Track the state of the runtime for determinism auditing.
159pub struct Auditor {
160    digest: Mutex<Digest>,
161}
162
163impl Default for Auditor {
164    fn default() -> Self {
165        Self {
166            digest: Digest::default().into(),
167        }
168    }
169}
170
171impl Auditor {
172    /// Record that an event happened.
173    /// This auditor's hash will be updated with the event's `label` and
174    /// whatever other data is passed in the `payload` closure.
175    pub(crate) fn event<F>(&self, label: &'static [u8], payload: F)
176    where
177        F: FnOnce(&mut Sha256),
178    {
179        let mut digest = self.digest.lock().unwrap();
180
181        let mut hasher = Sha256::new();
182        hasher.update(digest.as_ref());
183        hasher.update(label);
184        payload(&mut hasher);
185
186        *digest = hasher.finalize().into();
187    }
188
189    /// Generate a representation of the current state of the runtime.
190    ///
191    /// This can be used to ensure that logic running on top
192    /// of the runtime is interacting deterministically.
193    pub fn state(&self) -> String {
194        let hash = self.digest.lock().unwrap();
195        hex(hash.as_ref())
196    }
197}
198
199/// A dynamic RNG that can safely be sent between threads.
200pub type BoxDynRng = Box<dyn CryptoRngCore + Send + 'static>;
201
202/// Configuration for the `deterministic` runtime.
203pub struct Config {
204    /// Random number generator.
205    rng: BoxDynRng,
206
207    /// The cycle duration determines how much time is advanced after each iteration of the event
208    /// loop. This is useful to prevent starvation if some task never yields.
209    cycle: Duration,
210
211    /// If the runtime is still executing at this point (i.e. a test hasn't stopped), panic.
212    timeout: Option<Duration>,
213
214    /// Whether spawned tasks should catch panics instead of propagating them.
215    catch_panics: bool,
216
217    /// Configuration for deterministic storage fault injection.
218    /// Defaults to no faults being injected.
219    storage_faults: FaultConfig,
220}
221
222impl Config {
223    /// Returns a new [Config] with default values.
224    pub fn new() -> Self {
225        Self {
226            rng: Box::new(StdRng::seed_from_u64(42)),
227            cycle: Duration::from_millis(1),
228            timeout: None,
229            catch_panics: false,
230            storage_faults: FaultConfig::default(),
231        }
232    }
233
234    // Setters
235    /// See [Config]
236    pub fn with_seed(self, seed: u64) -> Self {
237        self.with_rng(Box::new(StdRng::seed_from_u64(seed)))
238    }
239
240    /// Provide the config with a dynamic RNG directly.
241    ///
242    /// This can be useful for, e.g. fuzzing, where beyond just having randomness,
243    /// you might want to control specific bytes of the RNG. By taking in a dynamic
244    /// RNG object, any behavior is possible.
245    pub fn with_rng(mut self, rng: BoxDynRng) -> Self {
246        self.rng = rng;
247        self
248    }
249
250    /// See [Config]
251    pub const fn with_cycle(mut self, cycle: Duration) -> Self {
252        self.cycle = cycle;
253        self
254    }
255    /// See [Config]
256    pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
257        self.timeout = timeout;
258        self
259    }
260    /// See [Config]
261    pub const fn with_catch_panics(mut self, catch_panics: bool) -> Self {
262        self.catch_panics = catch_panics;
263        self
264    }
265
266    /// Configure storage fault injection.
267    ///
268    /// When set, the runtime will inject deterministic storage errors based on
269    /// the provided configuration. Faults are drawn from the shared RNG, ensuring
270    /// reproducible failure patterns for a given seed.
271    pub const fn with_storage_faults(mut self, faults: FaultConfig) -> Self {
272        self.storage_faults = faults;
273        self
274    }
275
276    // Getters
277    /// See [Config]
278    pub const fn cycle(&self) -> Duration {
279        self.cycle
280    }
281    /// See [Config]
282    pub const fn timeout(&self) -> Option<Duration> {
283        self.timeout
284    }
285    /// See [Config]
286    pub const fn catch_panics(&self) -> bool {
287        self.catch_panics
288    }
289
290    /// Assert that the configuration is valid.
291    pub fn assert(&self) {
292        assert!(
293            self.cycle != Duration::default() || self.timeout.is_none(),
294            "cycle duration must be non-zero when timeout is set",
295        );
296        assert!(
297            self.cycle >= SYSTEM_TIME_PRECISION,
298            "cycle duration must be greater than or equal to system time precision"
299        );
300    }
301}
302
303impl Default for Config {
304    fn default() -> Self {
305        Self::new()
306    }
307}
308
309/// Key for detecting duplicate metric registrations: (metric_name, attributes).
310type MetricKey = (String, Vec<(String, String)>);
311
312/// Deterministic runtime that randomly selects tasks to run based on a seed.
313pub struct Executor {
314    registry: Mutex<Registry>,
315    registered_metrics: Mutex<HashSet<MetricKey>>,
316    cycle: Duration,
317    deadline: Option<SystemTime>,
318    metrics: Arc<Metrics>,
319    auditor: Arc<Auditor>,
320    rng: Arc<Mutex<BoxDynRng>>,
321    time: Mutex<SystemTime>,
322    tasks: Arc<Tasks>,
323    sleeping: Mutex<BinaryHeap<Alarm>>,
324    shutdown: Mutex<Stopper>,
325    panicker: Panicker,
326    dns: Mutex<HashMap<String, Vec<IpAddr>>>,
327}
328
329impl Executor {
330    /// Advance simulated time by [Config::cycle].
331    ///
332    /// When built with the `external` feature, sleep for [Config::cycle] to let
333    /// external processes make progress.
334    fn advance_time(&self) -> SystemTime {
335        #[cfg(feature = "external")]
336        std::thread::sleep(self.cycle);
337
338        let mut time = self.time.lock().unwrap();
339        *time = time
340            .checked_add(self.cycle)
341            .expect("executor time overflowed");
342        let now = *time;
343        trace!(now = now.epoch_millis(), "time advanced");
344        now
345    }
346
347    /// When idle, jump directly to the next actionable time.
348    ///
349    /// When built with the `external` feature, never skip ahead (to ensure we poll all pending tasks
350    /// every [Config::cycle]).
351    fn skip_idle_time(&self, current: SystemTime) -> SystemTime {
352        if cfg!(feature = "external") || self.tasks.ready() != 0 {
353            return current;
354        }
355
356        let mut skip_until = None;
357        {
358            let sleeping = self.sleeping.lock().unwrap();
359            if let Some(next) = sleeping.peek() {
360                if next.time > current {
361                    skip_until = Some(next.time);
362                }
363            }
364        }
365
366        skip_until.map_or(current, |deadline| {
367            let mut time = self.time.lock().unwrap();
368            *time = deadline;
369            let now = *time;
370            trace!(now = now.epoch_millis(), "time skipped");
371            now
372        })
373    }
374
375    /// Wake any sleepers whose deadlines have elapsed.
376    fn wake_ready_sleepers(&self, current: SystemTime) {
377        let mut sleeping = self.sleeping.lock().unwrap();
378        while let Some(next) = sleeping.peek() {
379            if next.time <= current {
380                let sleeper = sleeping.pop().unwrap();
381                sleeper.waker.wake();
382            } else {
383                break;
384            }
385        }
386    }
387
388    /// Ensure the runtime is making progress.
389    ///
390    /// When built with the `external` feature, always poll pending tasks after the passage of time.
391    fn assert_liveness(&self) {
392        if cfg!(feature = "external") || self.tasks.ready() != 0 {
393            return;
394        }
395
396        panic!("runtime stalled");
397    }
398}
399
400/// An artifact that can be used to recover the state of the runtime.
401///
402/// This is useful when mocking unclean shutdown (while retaining deterministic behavior).
403pub struct Checkpoint {
404    cycle: Duration,
405    deadline: Option<SystemTime>,
406    auditor: Arc<Auditor>,
407    rng: Arc<Mutex<BoxDynRng>>,
408    time: Mutex<SystemTime>,
409    storage: Arc<Storage>,
410    dns: Mutex<HashMap<String, Vec<IpAddr>>>,
411    catch_panics: bool,
412}
413
414impl Checkpoint {
415    /// Get a reference to the [Auditor].
416    pub fn auditor(&self) -> Arc<Auditor> {
417        self.auditor.clone()
418    }
419}
420
421#[allow(clippy::large_enum_variant)]
422enum State {
423    Config(Config),
424    Checkpoint(Checkpoint),
425}
426
427/// Implementation of [crate::Runner] for the `deterministic` runtime.
428pub struct Runner {
429    state: State,
430}
431
432impl From<Config> for Runner {
433    fn from(cfg: Config) -> Self {
434        Self::new(cfg)
435    }
436}
437
438impl From<Checkpoint> for Runner {
439    fn from(checkpoint: Checkpoint) -> Self {
440        Self {
441            state: State::Checkpoint(checkpoint),
442        }
443    }
444}
445
446impl Runner {
447    /// Initialize a new `deterministic` runtime with the given seed and cycle duration.
448    pub fn new(cfg: Config) -> Self {
449        // Ensure config is valid
450        cfg.assert();
451        Self {
452            state: State::Config(cfg),
453        }
454    }
455
456    /// Initialize a new `deterministic` runtime with the default configuration
457    /// and the provided seed.
458    pub fn seeded(seed: u64) -> Self {
459        Self::new(Config::default().with_seed(seed))
460    }
461
462    /// Initialize a new `deterministic` runtime with the default configuration
463    /// but exit after the given timeout.
464    pub fn timed(timeout: Duration) -> Self {
465        let cfg = Config {
466            timeout: Some(timeout),
467            ..Config::default()
468        };
469        Self::new(cfg)
470    }
471
472    /// Like [crate::Runner::start], but also returns a [Checkpoint] that can be used
473    /// to recover the state of the runtime in a subsequent run.
474    pub fn start_and_recover<F, Fut>(self, f: F) -> (Fut::Output, Checkpoint)
475    where
476        F: FnOnce(Context) -> Fut,
477        Fut: Future,
478    {
479        // Setup context and return strong reference to executor
480        let (context, executor, panicked) = match self.state {
481            State::Config(config) => Context::new(config),
482            State::Checkpoint(checkpoint) => Context::recover(checkpoint),
483        };
484
485        // Pin root task to the heap
486        let storage = context.storage.clone();
487        let mut root = Box::pin(panicked.interrupt(f(context)));
488
489        // Register the root task
490        Tasks::register_root(&executor.tasks);
491
492        // Process tasks until root task completes or progress stalls.
493        // Wrap the loop in catch_unwind to ensure task cleanup runs even if the loop or a task panics.
494        let result = catch_unwind(AssertUnwindSafe(|| loop {
495            // Ensure we have not exceeded our deadline
496            {
497                let current = executor.time.lock().unwrap();
498                if let Some(deadline) = executor.deadline {
499                    if *current >= deadline {
500                        // Drop the lock before panicking to avoid mutex poisoning.
501                        drop(current);
502                        panic!("runtime timeout");
503                    }
504                }
505            }
506
507            // Drain all ready tasks
508            let mut queue = executor.tasks.drain();
509
510            // Shuffle tasks (if more than one)
511            if queue.len() > 1 {
512                let mut rng = executor.rng.lock().unwrap();
513                queue.shuffle(&mut *rng);
514            }
515
516            // Run all snapshotted tasks
517            //
518            // This approach is more efficient than randomly selecting a task one-at-a-time
519            // because it ensures we don't pull the same pending task multiple times in a row (without
520            // processing a different task required for other tasks to make progress).
521            trace!(
522                iter = executor.metrics.iterations.get(),
523                tasks = queue.len(),
524                "starting loop"
525            );
526            let mut output = None;
527            for id in queue {
528                // Lookup the task (it may have completed already)
529                let Some(task) = executor.tasks.get(id) else {
530                    trace!(id, "skipping missing task");
531                    continue;
532                };
533
534                // Record task for auditing
535                executor.auditor.event(b"process_task", |hasher| {
536                    hasher.update(task.id.to_be_bytes());
537                    hasher.update(task.label.name().as_bytes());
538                });
539                executor.metrics.task_polls.get_or_create(&task.label).inc();
540                trace!(id, "processing task");
541
542                // Prepare task for polling
543                let waker = waker(Arc::new(TaskWaker {
544                    id,
545                    tasks: Arc::downgrade(&executor.tasks),
546                }));
547                let mut cx = task::Context::from_waker(&waker);
548
549                // Poll the task
550                match &task.mode {
551                    Mode::Root => {
552                        // Poll the root task
553                        if let Poll::Ready(result) = root.as_mut().poll(&mut cx) {
554                            trace!(id, "root task is complete");
555                            output = Some(result);
556                            break;
557                        }
558                    }
559                    Mode::Work(future) => {
560                        // Get the future (if it still exists)
561                        let mut fut_opt = future.lock().unwrap();
562                        let Some(fut) = fut_opt.as_mut() else {
563                            trace!(id, "skipping already complete task");
564
565                            // Remove the future
566                            executor.tasks.remove(id);
567                            continue;
568                        };
569
570                        // Poll the task
571                        if fut.as_mut().poll(&mut cx).is_ready() {
572                            trace!(id, "task is complete");
573
574                            // Remove the future
575                            executor.tasks.remove(id);
576                            *fut_opt = None;
577                            continue;
578                        }
579                    }
580                }
581
582                // Try again later if task is still pending
583                trace!(id, "task is still pending");
584            }
585
586            // If the root task has completed, exit as soon as possible
587            if let Some(output) = output {
588                break output;
589            }
590
591            // Advance time (skipping ahead if no tasks are ready yet)
592            let mut current = executor.advance_time();
593            current = executor.skip_idle_time(current);
594
595            // Wake sleepers and ensure we continue to make progress
596            executor.wake_ready_sleepers(current);
597            executor.assert_liveness();
598
599            // Record that we completed another iteration of the event loop.
600            executor.metrics.iterations.inc();
601        }));
602
603        // Clear remaining tasks from the executor.
604        //
605        // It is critical that we wait to drop the strong
606        // reference to executor until after we have dropped
607        // all tasks (as they may attempt to upgrade their weak
608        // reference to the executor during drop).
609        executor.sleeping.lock().unwrap().clear(); // included in tasks
610        let tasks = executor.tasks.clear();
611        for task in tasks {
612            let Mode::Work(future) = &task.mode else {
613                continue;
614            };
615            *future.lock().unwrap() = None;
616        }
617
618        // Drop the root task to release any Context references it may still hold.
619        // This is necessary when the loop exits early (e.g., timeout) while the
620        // root future is still Pending and holds captured variables with Context references.
621        drop(root);
622
623        // Assert the context doesn't escape the start() function (behavior
624        // is undefined in this case)
625        assert!(
626            Arc::weak_count(&executor) == 0,
627            "executor still has weak references"
628        );
629
630        // Handle the result — resume the original panic after cleanup if one was caught.
631        let output = match result {
632            Ok(output) => output,
633            Err(payload) => resume_unwind(payload),
634        };
635
636        // Extract the executor from the Arc
637        let executor = Arc::into_inner(executor).expect("executor still has strong references");
638
639        // Construct a checkpoint that can be used to restart the runtime
640        let checkpoint = Checkpoint {
641            cycle: executor.cycle,
642            deadline: executor.deadline,
643            auditor: executor.auditor,
644            rng: executor.rng,
645            time: executor.time,
646            storage,
647            dns: executor.dns,
648            catch_panics: executor.panicker.catch(),
649        };
650
651        (output, checkpoint)
652    }
653}
654
655impl Default for Runner {
656    fn default() -> Self {
657        Self::new(Config::default())
658    }
659}
660
661impl crate::Runner for Runner {
662    type Context = Context;
663
664    fn start<F, Fut>(self, f: F) -> Fut::Output
665    where
666        F: FnOnce(Self::Context) -> Fut,
667        Fut: Future,
668    {
669        let (output, _) = self.start_and_recover(f);
670        output
671    }
672}
673
674/// The mode of a [Task].
675enum Mode {
676    Root,
677    Work(Mutex<Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>>),
678}
679
680/// A future being executed by the [Executor].
681struct Task {
682    id: u128,
683    label: Label,
684
685    mode: Mode,
686}
687
688/// A waker for a [Task].
689struct TaskWaker {
690    id: u128,
691
692    tasks: Weak<Tasks>,
693}
694
695impl ArcWake for TaskWaker {
696    fn wake_by_ref(arc_self: &Arc<Self>) {
697        // Upgrade the weak reference to re-enqueue this task.
698        // If upgrade fails, the task queue has been dropped and no action is required.
699        //
700        // This can happen if some data is passed into the runtime and it drops after the runtime exits.
701        if let Some(tasks) = arc_self.tasks.upgrade() {
702            tasks.queue(arc_self.id);
703        }
704    }
705}
706
707/// A collection of [Task]s that are being executed by the [Executor].
708struct Tasks {
709    /// The next task id.
710    counter: Mutex<u128>,
711    /// Tasks ready to be polled.
712    ready: Mutex<Vec<u128>>,
713    /// All running tasks.
714    running: Mutex<BTreeMap<u128, Arc<Task>>>,
715}
716
717impl Tasks {
718    /// Create a new task queue.
719    const fn new() -> Self {
720        Self {
721            counter: Mutex::new(0),
722            ready: Mutex::new(Vec::new()),
723            running: Mutex::new(BTreeMap::new()),
724        }
725    }
726
727    /// Increment the task counter and return the old value.
728    fn increment(&self) -> u128 {
729        let mut counter = self.counter.lock().unwrap();
730        let old = *counter;
731        *counter = counter.checked_add(1).expect("task counter overflow");
732        old
733    }
734
735    /// Register the root task.
736    ///
737    /// If the root task has already been registered, this function will panic.
738    fn register_root(arc_self: &Arc<Self>) {
739        let id = arc_self.increment();
740        let task = Arc::new(Task {
741            id,
742            label: Label::root(),
743            mode: Mode::Root,
744        });
745        arc_self.register(id, task);
746    }
747
748    /// Register a non-root task to be executed.
749    fn register_work(
750        arc_self: &Arc<Self>,
751        label: Label,
752        future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
753    ) {
754        let id = arc_self.increment();
755        let task = Arc::new(Task {
756            id,
757            label,
758            mode: Mode::Work(Mutex::new(Some(future))),
759        });
760        arc_self.register(id, task);
761    }
762
763    /// Register a new task to be executed.
764    fn register(&self, id: u128, task: Arc<Task>) {
765        // Track as running until completion
766        self.running.lock().unwrap().insert(id, task);
767
768        // Add to ready
769        self.queue(id);
770    }
771
772    /// Enqueue an already registered task to be executed.
773    fn queue(&self, id: u128) {
774        let mut ready = self.ready.lock().unwrap();
775        ready.push(id);
776    }
777
778    /// Drain all ready tasks.
779    fn drain(&self) -> Vec<u128> {
780        let mut queue = self.ready.lock().unwrap();
781        let len = queue.len();
782        replace(&mut *queue, Vec::with_capacity(len))
783    }
784
785    /// The number of ready tasks.
786    fn ready(&self) -> usize {
787        self.ready.lock().unwrap().len()
788    }
789
790    /// Lookup a task.
791    ///
792    /// We must return cloned here because we cannot hold the running lock while polling a task (will
793    /// deadlock if [Self::register_work] is called).
794    fn get(&self, id: u128) -> Option<Arc<Task>> {
795        let running = self.running.lock().unwrap();
796        running.get(&id).cloned()
797    }
798
799    /// Remove a task.
800    fn remove(&self, id: u128) {
801        self.running.lock().unwrap().remove(&id);
802    }
803
804    /// Clear all tasks.
805    fn clear(&self) -> Vec<Arc<Task>> {
806        // Clear ready
807        self.ready.lock().unwrap().clear();
808
809        // Clear running tasks
810        let running: BTreeMap<u128, Arc<Task>> = {
811            let mut running = self.running.lock().unwrap();
812            take(&mut *running)
813        };
814        running.into_values().collect()
815    }
816}
817
818type Network = MeteredNetwork<AuditedNetwork<DeterministicNetwork>>;
819type Storage = MeteredStorage<AuditedStorage<FaultyStorage<MemStorage>>>;
820
821/// Implementation of [crate::Spawner], [crate::Clock],
822/// [crate::Network], and [crate::Storage] for the `deterministic`
823/// runtime.
824pub struct Context {
825    name: String,
826    attributes: Vec<(String, String)>,
827    executor: Weak<Executor>,
828    network: Arc<Network>,
829    storage: Arc<Storage>,
830    network_buffer_pool: BufferPool,
831    storage_buffer_pool: BufferPool,
832    tree: Arc<Tree>,
833    execution: Execution,
834    instrumented: bool,
835}
836
837impl Clone for Context {
838    fn clone(&self) -> Self {
839        let (child, _) = Tree::child(&self.tree);
840        Self {
841            name: self.name.clone(),
842            attributes: self.attributes.clone(),
843            executor: self.executor.clone(),
844            network: self.network.clone(),
845            storage: self.storage.clone(),
846            network_buffer_pool: self.network_buffer_pool.clone(),
847            storage_buffer_pool: self.storage_buffer_pool.clone(),
848
849            tree: child,
850            execution: Execution::default(),
851            instrumented: false,
852        }
853    }
854}
855
856impl Context {
857    fn new(cfg: Config) -> (Self, Arc<Executor>, Panicked) {
858        // Create a new registry
859        let mut registry = Registry::default();
860        let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
861
862        // Initialize runtime
863        let metrics = Arc::new(Metrics::init(runtime_registry));
864        let start_time = UNIX_EPOCH;
865        let deadline = cfg
866            .timeout
867            .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
868        let auditor = Arc::new(Auditor::default());
869
870        // Create shared RNG (used by both executor and storage)
871        let rng = Arc::new(Mutex::new(cfg.rng));
872
873        // Create storage fault config (default to disabled if None)
874        let storage_fault_config = Arc::new(RwLock::new(cfg.storage_faults));
875        let storage = MeteredStorage::new(
876            AuditedStorage::new(
877                FaultyStorage::new(MemStorage::default(), rng.clone(), storage_fault_config),
878                auditor.clone(),
879            ),
880            runtime_registry,
881        );
882
883        // Create network
884        let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
885        let network = MeteredNetwork::new(network, runtime_registry);
886
887        // Initialize buffer pools
888        cfg_if::cfg_if! {
889            if #[cfg(miri)] {
890                // Reduce max_per_class to avoid slow atomics under miri
891                let network_config = BufferPoolConfig {
892                    max_per_class: NZUsize!(32),
893                    ..BufferPoolConfig::for_network()
894                };
895                let storage_config = BufferPoolConfig {
896                    max_per_class: NZUsize!(32),
897                    ..BufferPoolConfig::for_storage()
898                };
899            } else {
900                let network_config = BufferPoolConfig::for_network();
901                let storage_config = BufferPoolConfig::for_storage();
902            }
903        }
904        let network_buffer_pool = BufferPool::new(
905            network_config,
906            runtime_registry.sub_registry_with_prefix("network_buffer_pool"),
907        );
908        let storage_buffer_pool = BufferPool::new(
909            storage_config,
910            runtime_registry.sub_registry_with_prefix("storage_buffer_pool"),
911        );
912
913        // Initialize panicker
914        let (panicker, panicked) = Panicker::new(cfg.catch_panics);
915
916        let executor = Arc::new(Executor {
917            registry: Mutex::new(registry),
918            registered_metrics: Mutex::new(HashSet::new()),
919            cycle: cfg.cycle,
920            deadline,
921            metrics,
922            auditor,
923            rng,
924            time: Mutex::new(start_time),
925            tasks: Arc::new(Tasks::new()),
926            sleeping: Mutex::new(BinaryHeap::new()),
927            shutdown: Mutex::new(Stopper::default()),
928            panicker,
929            dns: Mutex::new(HashMap::new()),
930        });
931
932        (
933            Self {
934                name: String::new(),
935                attributes: Vec::new(),
936                executor: Arc::downgrade(&executor),
937                network: Arc::new(network),
938                storage: Arc::new(storage),
939                network_buffer_pool,
940                storage_buffer_pool,
941                tree: Tree::root(),
942                execution: Execution::default(),
943                instrumented: false,
944            },
945            executor,
946            panicked,
947        )
948    }
949
950    /// Recover the inner state (deadline, metrics, auditor, rng, synced storage, etc.) from the
951    /// current runtime and use it to initialize a new instance of the runtime. A recovered runtime
952    /// does not inherit the current runtime's pending tasks, unsynced storage, network connections, nor
953    /// its shutdown signaler.
954    ///
955    /// This is useful for performing a deterministic simulation that spans multiple runtime instantiations,
956    /// like simulating unclean shutdown (which involves repeatedly halting the runtime at unexpected intervals).
957    ///
958    /// It is only permitted to call this method after the runtime has finished (i.e. once `start` returns)
959    /// and only permitted to do once (otherwise multiple recovered runtimes will share the same inner state).
960    /// If either one of these conditions is violated, this method will panic.
961    fn recover(checkpoint: Checkpoint) -> (Self, Arc<Executor>, Panicked) {
962        // Rebuild metrics
963        let mut registry = Registry::default();
964        let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
965        let metrics = Arc::new(Metrics::init(runtime_registry));
966
967        // Copy state
968        let network =
969            AuditedNetwork::new(DeterministicNetwork::default(), checkpoint.auditor.clone());
970        let network = MeteredNetwork::new(network, runtime_registry);
971
972        // Initialize buffer pools
973        cfg_if::cfg_if! {
974            if #[cfg(miri)] {
975                // Reduce max_per_class to avoid slow atomics under Miri
976                let network_config = BufferPoolConfig {
977                    max_per_class: NZUsize!(32),
978                    ..BufferPoolConfig::for_network()
979                };
980                let storage_config = BufferPoolConfig {
981                    max_per_class: NZUsize!(32),
982                    ..BufferPoolConfig::for_storage()
983                };
984            } else {
985                let network_config = BufferPoolConfig::for_network();
986                let storage_config = BufferPoolConfig::for_storage();
987            }
988        }
989        let network_buffer_pool = BufferPool::new(
990            network_config,
991            runtime_registry.sub_registry_with_prefix("network_buffer_pool"),
992        );
993        let storage_buffer_pool = BufferPool::new(
994            storage_config,
995            runtime_registry.sub_registry_with_prefix("storage_buffer_pool"),
996        );
997
998        // Initialize panicker
999        let (panicker, panicked) = Panicker::new(checkpoint.catch_panics);
1000
1001        let executor = Arc::new(Executor {
1002            // Copied from the checkpoint
1003            cycle: checkpoint.cycle,
1004            deadline: checkpoint.deadline,
1005            auditor: checkpoint.auditor,
1006            rng: checkpoint.rng,
1007            time: checkpoint.time,
1008            dns: checkpoint.dns,
1009
1010            // New state for the new runtime
1011            registry: Mutex::new(registry),
1012            registered_metrics: Mutex::new(HashSet::new()),
1013            metrics,
1014            tasks: Arc::new(Tasks::new()),
1015            sleeping: Mutex::new(BinaryHeap::new()),
1016            shutdown: Mutex::new(Stopper::default()),
1017            panicker,
1018        });
1019        (
1020            Self {
1021                name: String::new(),
1022                attributes: Vec::new(),
1023                executor: Arc::downgrade(&executor),
1024                network: Arc::new(network),
1025                storage: checkpoint.storage,
1026                network_buffer_pool,
1027                storage_buffer_pool,
1028                tree: Tree::root(),
1029                execution: Execution::default(),
1030                instrumented: false,
1031            },
1032            executor,
1033            panicked,
1034        )
1035    }
1036
1037    /// Upgrade Weak reference to [Executor].
1038    fn executor(&self) -> Arc<Executor> {
1039        self.executor.upgrade().expect("executor already dropped")
1040    }
1041
1042    /// Get a reference to [Metrics].
1043    fn metrics(&self) -> Arc<Metrics> {
1044        self.executor().metrics.clone()
1045    }
1046
1047    /// Get a reference to the [Auditor].
1048    pub fn auditor(&self) -> Arc<Auditor> {
1049        self.executor().auditor.clone()
1050    }
1051
1052    /// Compute a [Sha256] digest of all storage contents.
1053    pub fn storage_audit(&self) -> Digest {
1054        self.storage.inner().inner().inner().audit()
1055    }
1056
1057    /// Access the storage fault configuration.
1058    ///
1059    /// Changes to the returned [`FaultConfig`] take effect immediately for
1060    /// subsequent storage operations. This allows dynamically enabling or
1061    /// disabling fault injection during a test.
1062    pub fn storage_faults(&self) -> Arc<RwLock<FaultConfig>> {
1063        self.storage.inner().inner().config()
1064    }
1065
1066    /// Register a DNS mapping for a hostname.
1067    ///
1068    /// If `addrs` is `None`, the mapping is removed.
1069    /// If `addrs` is `Some`, the mapping is added or updated.
1070    pub fn resolver_register(&self, host: impl Into<String>, addrs: Option<Vec<IpAddr>>) {
1071        // Update the auditor
1072        let executor = self.executor();
1073        let host = host.into();
1074        executor.auditor.event(b"resolver_register", |hasher| {
1075            hasher.update(host.as_bytes());
1076            hasher.update(addrs.encode());
1077        });
1078
1079        // Update the DNS mapping
1080        let mut dns = executor.dns.lock().unwrap();
1081        match addrs {
1082            Some(addrs) => {
1083                dns.insert(host, addrs);
1084            }
1085            None => {
1086                dns.remove(&host);
1087            }
1088        }
1089    }
1090}
1091
1092impl crate::Spawner for Context {
1093    fn dedicated(mut self) -> Self {
1094        self.execution = Execution::Dedicated;
1095        self
1096    }
1097
1098    fn shared(mut self, blocking: bool) -> Self {
1099        self.execution = Execution::Shared(blocking);
1100        self
1101    }
1102
1103    fn instrumented(mut self) -> Self {
1104        self.instrumented = true;
1105        self
1106    }
1107
1108    fn spawn<F, Fut, T>(mut self, f: F) -> Handle<T>
1109    where
1110        F: FnOnce(Self) -> Fut + Send + 'static,
1111        Fut: Future<Output = T> + Send + 'static,
1112        T: Send + 'static,
1113    {
1114        // Get metrics
1115        let (label, metric) = spawn_metrics!(self);
1116
1117        // Track supervision before resetting configuration
1118        let parent = Arc::clone(&self.tree);
1119        let is_instrumented = self.instrumented;
1120        self.execution = Execution::default();
1121        self.instrumented = false;
1122        let (child, aborted) = Tree::child(&parent);
1123        if aborted {
1124            return Handle::closed(metric);
1125        }
1126        self.tree = child;
1127
1128        // Spawn the task (we don't care about Model)
1129        let executor = self.executor();
1130        let future: BoxFuture<'_, T> = if is_instrumented {
1131            let span = info_span!(parent: None, "task", name = %label.name());
1132            for (key, value) in &self.attributes {
1133                span.set_attribute(key.clone(), value.clone());
1134            }
1135            f(self).instrument(span).boxed()
1136        } else {
1137            f(self).boxed()
1138        };
1139        let (f, handle) = Handle::init(
1140            future,
1141            metric,
1142            executor.panicker.clone(),
1143            Arc::clone(&parent),
1144        );
1145        Tasks::register_work(&executor.tasks, label, Box::pin(f));
1146
1147        // Register the task on the parent
1148        if let Some(aborter) = handle.aborter() {
1149            parent.register(aborter);
1150        }
1151
1152        handle
1153    }
1154
1155    async fn stop(self, value: i32, timeout: Option<Duration>) -> Result<(), Error> {
1156        let executor = self.executor();
1157        executor.auditor.event(b"stop", |hasher| {
1158            hasher.update(value.to_be_bytes());
1159        });
1160        let stop_resolved = {
1161            let mut shutdown = executor.shutdown.lock().unwrap();
1162            shutdown.stop(value)
1163        };
1164
1165        // Wait for all tasks to complete or the timeout to fire
1166        let timeout_future = timeout.map_or_else(
1167            || futures::future::Either::Right(futures::future::pending()),
1168            |duration| futures::future::Either::Left(self.sleep(duration)),
1169        );
1170        select! {
1171            result = stop_resolved => {
1172                result.map_err(|_| Error::Closed)?;
1173                Ok(())
1174            },
1175            _ = timeout_future => Err(Error::Timeout),
1176        }
1177    }
1178
1179    fn stopped(&self) -> Signal {
1180        let executor = self.executor();
1181        executor.auditor.event(b"stopped", |_| {});
1182        let stopped = executor.shutdown.lock().unwrap().stopped();
1183        stopped
1184    }
1185}
1186
1187impl crate::ThreadPooler for Context {
1188    fn create_thread_pool(
1189        &self,
1190        concurrency: NonZeroUsize,
1191    ) -> Result<ThreadPool, ThreadPoolBuildError> {
1192        let mut builder = ThreadPoolBuilder::new().num_threads(concurrency.get());
1193
1194        if rayon::current_thread_index().is_none() {
1195            builder = builder.use_current_thread()
1196        }
1197
1198        builder
1199            .spawn_handler(move |thread| {
1200                self.with_label("rayon_thread")
1201                    .dedicated()
1202                    .spawn(move |_| async move { thread.run() });
1203                Ok(())
1204            })
1205            .build()
1206            .map(Arc::new)
1207    }
1208}
1209
1210impl crate::Metrics for Context {
1211    fn with_label(&self, label: &str) -> Self {
1212        // Validate label format (must match [a-zA-Z][a-zA-Z0-9_]*)
1213        validate_label(label);
1214
1215        // Construct the full label name
1216        let name = {
1217            let prefix = self.name.clone();
1218            if prefix.is_empty() {
1219                label.to_string()
1220            } else {
1221                format!("{prefix}_{label}")
1222            }
1223        };
1224        assert!(
1225            !name.starts_with(METRICS_PREFIX),
1226            "using runtime label is not allowed"
1227        );
1228        Self {
1229            name,
1230            ..self.clone()
1231        }
1232    }
1233
1234    fn with_attribute(&self, key: &str, value: impl std::fmt::Display) -> Self {
1235        // Validate label format (must match [a-zA-Z][a-zA-Z0-9_]*)
1236        validate_label(key);
1237
1238        // Add the attribute to the list of attributes
1239        let mut attributes = self.attributes.clone();
1240        assert!(
1241            add_attribute(&mut attributes, key, value),
1242            "duplicate attribute key: {key}"
1243        );
1244        Self {
1245            attributes,
1246            ..self.clone()
1247        }
1248    }
1249
1250    fn label(&self) -> String {
1251        self.name.clone()
1252    }
1253
1254    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
1255        // Prepare args
1256        let name = name.into();
1257        let help = help.into();
1258
1259        // Name metric
1260        let executor = self.executor();
1261        executor.auditor.event(b"register", |hasher| {
1262            hasher.update(name.as_bytes());
1263            hasher.update(help.as_bytes());
1264            for (k, v) in &self.attributes {
1265                hasher.update(k.as_bytes());
1266                hasher.update(v.as_bytes());
1267            }
1268        });
1269        let prefixed_name = {
1270            let prefix = &self.name;
1271            if prefix.is_empty() {
1272                name
1273            } else {
1274                format!("{}_{}", *prefix, name)
1275            }
1276        };
1277
1278        // Check for duplicate registration (O(1) lookup)
1279        let metric_key = (prefixed_name.clone(), self.attributes.clone());
1280        let is_new = executor
1281            .registered_metrics
1282            .lock()
1283            .unwrap()
1284            .insert(metric_key);
1285        assert!(
1286            is_new,
1287            "duplicate metric: {} with attributes {:?}",
1288            prefixed_name, self.attributes
1289        );
1290
1291        // Apply attributes to the registry (in sorted order)
1292        let mut registry = executor.registry.lock().unwrap();
1293        let sub_registry = self.attributes.iter().fold(&mut *registry, |reg, (k, v)| {
1294            reg.sub_registry_with_label((Cow::Owned(k.clone()), Cow::Owned(v.clone())))
1295        });
1296        sub_registry.register(prefixed_name, help, metric);
1297    }
1298
1299    fn encode(&self) -> String {
1300        let executor = self.executor();
1301        executor.auditor.event(b"encode", |_| {});
1302        let mut encoder = MetricEncoder::new();
1303        encode(&mut encoder, &executor.registry.lock().unwrap()).expect("encoding failed");
1304        encoder.into_string()
1305    }
1306}
1307
1308struct Sleeper {
1309    executor: Weak<Executor>,
1310    time: SystemTime,
1311    registered: bool,
1312}
1313
1314impl Sleeper {
1315    /// Upgrade Weak reference to [Executor].
1316    fn executor(&self) -> Arc<Executor> {
1317        self.executor.upgrade().expect("executor already dropped")
1318    }
1319}
1320
1321struct Alarm {
1322    time: SystemTime,
1323    waker: Waker,
1324}
1325
1326impl PartialEq for Alarm {
1327    fn eq(&self, other: &Self) -> bool {
1328        self.time.eq(&other.time)
1329    }
1330}
1331
1332impl Eq for Alarm {}
1333
1334impl PartialOrd for Alarm {
1335    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1336        Some(self.cmp(other))
1337    }
1338}
1339
1340impl Ord for Alarm {
1341    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1342        // Reverse the ordering for min-heap
1343        other.time.cmp(&self.time)
1344    }
1345}
1346
1347impl Future for Sleeper {
1348    type Output = ();
1349
1350    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1351        let executor = self.executor();
1352        {
1353            let current_time = *executor.time.lock().unwrap();
1354            if current_time >= self.time {
1355                return Poll::Ready(());
1356            }
1357        }
1358        if !self.registered {
1359            self.registered = true;
1360            executor.sleeping.lock().unwrap().push(Alarm {
1361                time: self.time,
1362                waker: cx.waker().clone(),
1363            });
1364        }
1365        Poll::Pending
1366    }
1367}
1368
1369impl Clock for Context {
1370    fn current(&self) -> SystemTime {
1371        *self.executor().time.lock().unwrap()
1372    }
1373
1374    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
1375        let deadline = self
1376            .current()
1377            .checked_add(duration)
1378            .expect("overflow when setting wake time");
1379        self.sleep_until(deadline)
1380    }
1381
1382    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
1383        Sleeper {
1384            executor: self.executor.clone(),
1385
1386            time: deadline,
1387            registered: false,
1388        }
1389    }
1390}
1391
1392/// A future that resolves when a given target time is reached.
1393///
1394/// If the future is not ready at the target time, the future is blocked until the target time is reached.
1395#[cfg(feature = "external")]
1396#[pin_project]
1397struct Waiter<F: Future> {
1398    executor: Weak<Executor>,
1399    target: SystemTime,
1400    #[pin]
1401    future: F,
1402    ready: Option<F::Output>,
1403    started: bool,
1404    registered: bool,
1405}
1406
1407#[cfg(feature = "external")]
1408impl<F> Future for Waiter<F>
1409where
1410    F: Future + Send,
1411{
1412    type Output = F::Output;
1413
1414    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1415        let mut this = self.project();
1416
1417        // Poll once with a noop waker so the future can register interest or start work
1418        // without being able to wake this task before the sampled delay expires. Any ready
1419        // value is cached and only released after the clock reaches `self.target`.
1420        if !*this.started {
1421            *this.started = true;
1422            let waker = noop_waker();
1423            let mut cx_noop = task::Context::from_waker(&waker);
1424            if let Poll::Ready(value) = this.future.as_mut().poll(&mut cx_noop) {
1425                *this.ready = Some(value);
1426            }
1427        }
1428
1429        // Only allow the task to progress once the sampled delay has elapsed.
1430        let executor = this.executor.upgrade().expect("executor already dropped");
1431        let current_time = *executor.time.lock().unwrap();
1432        if current_time < *this.target {
1433            // Register exactly once with the deterministic sleeper queue so the executor
1434            // wakes us once the clock reaches the scheduled target time.
1435            if !*this.registered {
1436                *this.registered = true;
1437                executor.sleeping.lock().unwrap().push(Alarm {
1438                    time: *this.target,
1439                    waker: cx.waker().clone(),
1440                });
1441            }
1442            return Poll::Pending;
1443        }
1444
1445        // If the underlying future completed during the noop pre-poll, surface the cached value.
1446        if let Some(value) = this.ready.take() {
1447            return Poll::Ready(value);
1448        }
1449
1450        // Block the current thread until the future reschedules itself, keeping polling
1451        // deterministic with respect to executor time.
1452        let blocker = Blocker::new();
1453        loop {
1454            let waker = waker(blocker.clone());
1455            let mut cx_block = task::Context::from_waker(&waker);
1456            match this.future.as_mut().poll(&mut cx_block) {
1457                Poll::Ready(value) => {
1458                    break Poll::Ready(value);
1459                }
1460                Poll::Pending => blocker.wait(),
1461            }
1462        }
1463    }
1464}
1465
1466#[cfg(feature = "external")]
1467impl Pacer for Context {
1468    fn pace<'a, F, T>(&'a self, latency: Duration, future: F) -> impl Future<Output = T> + Send + 'a
1469    where
1470        F: Future<Output = T> + Send + 'a,
1471        T: Send + 'a,
1472    {
1473        // Compute target time
1474        let target = self
1475            .executor()
1476            .time
1477            .lock()
1478            .unwrap()
1479            .checked_add(latency)
1480            .expect("overflow when setting wake time");
1481
1482        Waiter {
1483            executor: self.executor.clone(),
1484            target,
1485            future,
1486            ready: None,
1487            started: false,
1488            registered: false,
1489        }
1490    }
1491}
1492
1493impl GClock for Context {
1494    type Instant = SystemTime;
1495
1496    fn now(&self) -> Self::Instant {
1497        self.current()
1498    }
1499}
1500
1501impl ReasonablyRealtime for Context {}
1502
1503impl crate::Network for Context {
1504    type Listener = ListenerOf<Network>;
1505
1506    async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
1507        self.network.bind(socket).await
1508    }
1509
1510    async fn dial(
1511        &self,
1512        socket: SocketAddr,
1513    ) -> Result<(crate::SinkOf<Self>, crate::StreamOf<Self>), Error> {
1514        self.network.dial(socket).await
1515    }
1516}
1517
1518impl crate::Resolver for Context {
1519    async fn resolve(&self, host: &str) -> Result<Vec<IpAddr>, Error> {
1520        // Get the record
1521        let executor = self.executor();
1522        let dns = executor.dns.lock().unwrap();
1523        let result = dns.get(host).cloned();
1524        drop(dns);
1525
1526        // Update the auditor
1527        executor.auditor.event(b"resolve", |hasher| {
1528            hasher.update(host.as_bytes());
1529            hasher.update(result.encode());
1530        });
1531        result.ok_or_else(|| Error::ResolveFailed(host.to_string()))
1532    }
1533}
1534
1535impl RngCore for Context {
1536    fn next_u32(&mut self) -> u32 {
1537        let executor = self.executor();
1538        executor.auditor.event(b"rand", |hasher| {
1539            hasher.update(b"next_u32");
1540        });
1541        let result = executor.rng.lock().unwrap().next_u32();
1542        result
1543    }
1544
1545    fn next_u64(&mut self) -> u64 {
1546        let executor = self.executor();
1547        executor.auditor.event(b"rand", |hasher| {
1548            hasher.update(b"next_u64");
1549        });
1550        let result = executor.rng.lock().unwrap().next_u64();
1551        result
1552    }
1553
1554    fn fill_bytes(&mut self, dest: &mut [u8]) {
1555        let executor = self.executor();
1556        executor.auditor.event(b"rand", |hasher| {
1557            hasher.update(b"fill_bytes");
1558        });
1559        executor.rng.lock().unwrap().fill_bytes(dest);
1560    }
1561
1562    fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1563        let executor = self.executor();
1564        executor.auditor.event(b"rand", |hasher| {
1565            hasher.update(b"try_fill_bytes");
1566        });
1567        let result = executor.rng.lock().unwrap().try_fill_bytes(dest);
1568        result
1569    }
1570}
1571
1572impl CryptoRng for Context {}
1573
1574impl crate::Storage for Context {
1575    type Blob = <Storage as crate::Storage>::Blob;
1576
1577    async fn open_versioned(
1578        &self,
1579        partition: &str,
1580        name: &[u8],
1581        versions: std::ops::RangeInclusive<u16>,
1582    ) -> Result<(Self::Blob, u64, u16), Error> {
1583        self.storage.open_versioned(partition, name, versions).await
1584    }
1585
1586    async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1587        self.storage.remove(partition, name).await
1588    }
1589
1590    async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1591        self.storage.scan(partition).await
1592    }
1593}
1594
1595impl crate::BufferPooler for Context {
1596    fn network_buffer_pool(&self) -> &crate::BufferPool {
1597        &self.network_buffer_pool
1598    }
1599
1600    fn storage_buffer_pool(&self) -> &crate::BufferPool {
1601        &self.storage_buffer_pool
1602    }
1603}
1604
1605#[cfg(test)]
1606mod tests {
1607    use super::*;
1608    #[cfg(feature = "external")]
1609    use crate::FutureExt;
1610    #[cfg(feature = "external")]
1611    use crate::Spawner;
1612    use crate::{
1613        deterministic, reschedule, Blob, IoBufMut, Metrics, Resolver, Runner as _, Storage,
1614    };
1615    use commonware_macros::test_traced;
1616    #[cfg(feature = "external")]
1617    use commonware_utils::channel::mpsc;
1618    use commonware_utils::channel::oneshot;
1619    #[cfg(not(feature = "external"))]
1620    use futures::future::pending;
1621    #[cfg(not(feature = "external"))]
1622    use futures::stream::StreamExt as _;
1623    #[cfg(feature = "external")]
1624    use futures::StreamExt;
1625    use futures::{stream::FuturesUnordered, task::noop_waker};
1626
1627    async fn task(i: usize) -> usize {
1628        for _ in 0..5 {
1629            reschedule().await;
1630        }
1631        i
1632    }
1633
1634    fn run_tasks(tasks: usize, runner: deterministic::Runner) -> (String, Vec<usize>) {
1635        runner.start(|context| async move {
1636            let mut handles = FuturesUnordered::new();
1637            for i in 0..=tasks - 1 {
1638                handles.push(context.clone().spawn(move |_| task(i)));
1639            }
1640
1641            let mut outputs = Vec::new();
1642            while let Some(result) = handles.next().await {
1643                outputs.push(result.unwrap());
1644            }
1645            assert_eq!(outputs.len(), tasks);
1646            (context.auditor().state(), outputs)
1647        })
1648    }
1649
1650    fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1651        let executor = deterministic::Runner::seeded(seed);
1652        run_tasks(5, executor)
1653    }
1654
1655    #[test]
1656    fn test_same_seed_same_order() {
1657        // Generate initial outputs
1658        let mut outputs = Vec::new();
1659        for seed in 0..1000 {
1660            let output = run_with_seed(seed);
1661            outputs.push(output);
1662        }
1663
1664        // Ensure they match
1665        for seed in 0..1000 {
1666            let output = run_with_seed(seed);
1667            assert_eq!(output, outputs[seed as usize]);
1668        }
1669    }
1670
1671    #[test_traced("TRACE")]
1672    fn test_different_seeds_different_order() {
1673        let output1 = run_with_seed(12345);
1674        let output2 = run_with_seed(54321);
1675        assert_ne!(output1, output2);
1676    }
1677
1678    #[test]
1679    fn test_alarm_min_heap() {
1680        // Populate heap
1681        let now = SystemTime::now();
1682        let alarms = vec![
1683            Alarm {
1684                time: now + Duration::new(10, 0),
1685                waker: noop_waker(),
1686            },
1687            Alarm {
1688                time: now + Duration::new(5, 0),
1689                waker: noop_waker(),
1690            },
1691            Alarm {
1692                time: now + Duration::new(15, 0),
1693                waker: noop_waker(),
1694            },
1695            Alarm {
1696                time: now + Duration::new(5, 0),
1697                waker: noop_waker(),
1698            },
1699        ];
1700        let mut heap = BinaryHeap::new();
1701        for alarm in alarms {
1702            heap.push(alarm);
1703        }
1704
1705        // Verify min-heap
1706        let mut sorted_times = Vec::new();
1707        while let Some(alarm) = heap.pop() {
1708            sorted_times.push(alarm.time);
1709        }
1710        assert_eq!(
1711            sorted_times,
1712            vec![
1713                now + Duration::new(5, 0),
1714                now + Duration::new(5, 0),
1715                now + Duration::new(10, 0),
1716                now + Duration::new(15, 0),
1717            ]
1718        );
1719    }
1720
1721    #[test]
1722    #[should_panic(expected = "runtime timeout")]
1723    fn test_timeout() {
1724        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1725        executor.start(|context| async move {
1726            loop {
1727                context.sleep(Duration::from_secs(1)).await;
1728            }
1729        });
1730    }
1731
1732    #[test]
1733    #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1734    fn test_bad_timeout() {
1735        let cfg = Config {
1736            timeout: Some(Duration::default()),
1737            cycle: Duration::default(),
1738            ..Config::default()
1739        };
1740        deterministic::Runner::new(cfg);
1741    }
1742
1743    #[test]
1744    #[should_panic(
1745        expected = "cycle duration must be greater than or equal to system time precision"
1746    )]
1747    fn test_bad_cycle() {
1748        let cfg = Config {
1749            cycle: SYSTEM_TIME_PRECISION - Duration::from_nanos(1),
1750            ..Config::default()
1751        };
1752        deterministic::Runner::new(cfg);
1753    }
1754
1755    #[test]
1756    fn test_recover_synced_storage_persists() {
1757        // Initialize the first runtime
1758        let executor1 = deterministic::Runner::default();
1759        let partition = "test_partition";
1760        let name = b"test_blob";
1761        let data = b"Hello, world!";
1762
1763        // Run some tasks, sync storage, and recover the runtime
1764        let (state, checkpoint) = executor1.start_and_recover(|context| async move {
1765            let (blob, _) = context.open(partition, name).await.unwrap();
1766            blob.write_at(0, data).await.unwrap();
1767            blob.sync().await.unwrap();
1768            context.auditor().state()
1769        });
1770
1771        // Verify auditor state is the same
1772        assert_eq!(state, checkpoint.auditor.state());
1773
1774        // Check that synced storage persists after recovery
1775        let executor = Runner::from(checkpoint);
1776        executor.start(|context| async move {
1777            let (blob, len) = context.open(partition, name).await.unwrap();
1778            assert_eq!(len, data.len() as u64);
1779            let read = blob.read_at(0, IoBufMut::zeroed(data.len())).await.unwrap();
1780            assert_eq!(read.coalesce(), data);
1781        });
1782    }
1783
1784    #[test]
1785    #[should_panic(expected = "goodbye")]
1786    fn test_recover_panic_handling() {
1787        // Initialize the first runtime
1788        let executor1 = deterministic::Runner::default();
1789        let (_, checkpoint) = executor1.start_and_recover(|_| async move {
1790            reschedule().await;
1791        });
1792
1793        // Ensure that panic setting is preserved
1794        let executor = Runner::from(checkpoint);
1795        executor.start(|_| async move {
1796            panic!("goodbye");
1797        });
1798    }
1799
1800    #[test]
1801    fn test_recover_unsynced_storage_does_not_persist() {
1802        // Initialize the first runtime
1803        let executor = deterministic::Runner::default();
1804        let partition = "test_partition";
1805        let name = b"test_blob";
1806        let data = b"Hello, world!";
1807
1808        // Run some tasks without syncing storage
1809        let (_, checkpoint) = executor.start_and_recover(|context| async move {
1810            let context = context.clone();
1811            let (blob, _) = context.open(partition, name).await.unwrap();
1812            blob.write_at(0, data).await.unwrap();
1813        });
1814
1815        // Recover the runtime
1816        let executor = Runner::from(checkpoint);
1817
1818        // Check that unsynced storage does not persist after recovery
1819        executor.start(|context| async move {
1820            let (_, len) = context.open(partition, name).await.unwrap();
1821            assert_eq!(len, 0);
1822        });
1823    }
1824
1825    #[test]
1826    fn test_recover_dns_mappings_persist() {
1827        // Initialize the first runtime
1828        let executor = deterministic::Runner::default();
1829        let host = "example.com";
1830        let addrs = vec![
1831            IpAddr::V4(std::net::Ipv4Addr::new(192, 168, 1, 1)),
1832            IpAddr::V4(std::net::Ipv4Addr::new(192, 168, 1, 2)),
1833        ];
1834
1835        // Register DNS mapping and recover the runtime
1836        let (state, checkpoint) = executor.start_and_recover({
1837            let addrs = addrs.clone();
1838            |context| async move {
1839                context.resolver_register(host, Some(addrs));
1840                context.auditor().state()
1841            }
1842        });
1843
1844        // Verify auditor state is the same
1845        assert_eq!(state, checkpoint.auditor.state());
1846
1847        // Check that DNS mappings persist after recovery
1848        let executor = Runner::from(checkpoint);
1849        executor.start(move |context| async move {
1850            let resolved = context.resolve(host).await.unwrap();
1851            assert_eq!(resolved, addrs);
1852        });
1853    }
1854
1855    #[test]
1856    #[should_panic(expected = "executor still has weak references")]
1857    fn test_context_return() {
1858        // Initialize runtime
1859        let executor = deterministic::Runner::default();
1860
1861        // Start runtime
1862        let context = executor.start(|context| async move {
1863            // Attempt to recover before the runtime has finished
1864            context
1865        });
1866
1867        // Should never get this far
1868        drop(context);
1869    }
1870
1871    #[test]
1872    fn test_default_time_zero() {
1873        // Initialize runtime
1874        let executor = deterministic::Runner::default();
1875
1876        executor.start(|context| async move {
1877            // Check that the time is zero
1878            assert_eq!(
1879                context.current().duration_since(UNIX_EPOCH).unwrap(),
1880                Duration::ZERO
1881            );
1882        });
1883    }
1884
1885    #[cfg(not(feature = "external"))]
1886    #[test]
1887    #[should_panic(expected = "runtime stalled")]
1888    fn test_stall() {
1889        // Initialize runtime
1890        let executor = deterministic::Runner::default();
1891
1892        // Start runtime
1893        executor.start(|_| async move {
1894            pending::<()>().await;
1895        });
1896    }
1897
1898    #[cfg(not(feature = "external"))]
1899    #[test]
1900    #[should_panic(expected = "runtime stalled")]
1901    fn test_external_simulated() {
1902        // Initialize runtime
1903        let executor = deterministic::Runner::default();
1904
1905        // Create a thread that waits for 1 second
1906        let (tx, rx) = oneshot::channel();
1907        std::thread::spawn(move || {
1908            std::thread::sleep(Duration::from_secs(1));
1909            tx.send(()).unwrap();
1910        });
1911
1912        // Start runtime
1913        executor.start(|_| async move {
1914            rx.await.unwrap();
1915        });
1916    }
1917
1918    #[cfg(feature = "external")]
1919    #[test]
1920    fn test_external_realtime() {
1921        // Initialize runtime
1922        let executor = deterministic::Runner::default();
1923
1924        // Create a thread that waits for 1 second
1925        let (tx, rx) = oneshot::channel();
1926        std::thread::spawn(move || {
1927            std::thread::sleep(Duration::from_secs(1));
1928            tx.send(()).unwrap();
1929        });
1930
1931        // Start runtime
1932        executor.start(|_| async move {
1933            rx.await.unwrap();
1934        });
1935    }
1936
1937    #[cfg(feature = "external")]
1938    #[test]
1939    fn test_external_realtime_variable() {
1940        // Initialize runtime
1941        let executor = deterministic::Runner::default();
1942
1943        // Start runtime
1944        executor.start(|context| async move {
1945            // Initialize test
1946            let start_real = SystemTime::now();
1947            let start_sim = context.current();
1948            let (first_tx, first_rx) = oneshot::channel();
1949            let (second_tx, second_rx) = oneshot::channel();
1950            let (results_tx, mut results_rx) = mpsc::channel(2);
1951
1952            // Create a thread that waits for 1 second
1953            let first_wait = Duration::from_secs(1);
1954            std::thread::spawn(move || {
1955                std::thread::sleep(first_wait);
1956                first_tx.send(()).unwrap();
1957            });
1958
1959            // Create a thread
1960            std::thread::spawn(move || {
1961                std::thread::sleep(Duration::ZERO);
1962                second_tx.send(()).unwrap();
1963            });
1964
1965            // Wait for a delay sampled before the external send occurs
1966            let first = context.clone().spawn({
1967                let results_tx = results_tx.clone();
1968                move |context| async move {
1969                    first_rx.pace(&context, Duration::ZERO).await.unwrap();
1970                    let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
1971                    assert!(elapsed_real > first_wait);
1972                    let elapsed_sim = context.current().duration_since(start_sim).unwrap();
1973                    assert!(elapsed_sim < first_wait);
1974                    results_tx.send(1).await.unwrap();
1975                }
1976            });
1977
1978            // Wait for a delay sampled after the external send occurs
1979            let second = context.clone().spawn(move |context| async move {
1980                second_rx.pace(&context, first_wait).await.unwrap();
1981                let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
1982                assert!(elapsed_real >= first_wait);
1983                let elapsed_sim = context.current().duration_since(start_sim).unwrap();
1984                assert!(elapsed_sim >= first_wait);
1985                results_tx.send(2).await.unwrap();
1986            });
1987
1988            // Wait for both tasks to complete
1989            second.await.unwrap();
1990            first.await.unwrap();
1991
1992            // Ensure order is correct
1993            let mut results = Vec::new();
1994            for _ in 0..2 {
1995                results.push(results_rx.recv().await.unwrap());
1996            }
1997            assert_eq!(results, vec![1, 2]);
1998        });
1999    }
2000
2001    #[cfg(not(feature = "external"))]
2002    #[test]
2003    fn test_simulated_skip() {
2004        // Initialize runtime
2005        let executor = deterministic::Runner::default();
2006
2007        // Start runtime
2008        executor.start(|context| async move {
2009            context.sleep(Duration::from_secs(1)).await;
2010
2011            // Check if we skipped
2012            let metrics = context.encode();
2013            let iterations = metrics
2014                .lines()
2015                .find_map(|line| {
2016                    line.strip_prefix("runtime_iterations_total ")
2017                        .and_then(|value| value.trim().parse::<u64>().ok())
2018                })
2019                .expect("missing runtime_iterations_total metric");
2020            assert!(iterations < 10);
2021        });
2022    }
2023
2024    #[cfg(feature = "external")]
2025    #[test]
2026    fn test_realtime_no_skip() {
2027        // Initialize runtime
2028        let executor = deterministic::Runner::default();
2029
2030        // Start runtime
2031        executor.start(|context| async move {
2032            context.sleep(Duration::from_secs(1)).await;
2033
2034            // Check if we skipped
2035            let metrics = context.encode();
2036            let iterations = metrics
2037                .lines()
2038                .find_map(|line| {
2039                    line.strip_prefix("runtime_iterations_total ")
2040                        .and_then(|value| value.trim().parse::<u64>().ok())
2041                })
2042                .expect("missing runtime_iterations_total metric");
2043            assert!(iterations > 500);
2044        });
2045    }
2046
2047    #[test]
2048    #[should_panic(expected = "label must start with [a-zA-Z]")]
2049    fn test_metrics_label_empty() {
2050        let executor = deterministic::Runner::default();
2051        executor.start(|context| async move {
2052            context.with_label("");
2053        });
2054    }
2055
2056    #[test]
2057    #[should_panic(expected = "label must start with [a-zA-Z]")]
2058    fn test_metrics_label_invalid_first_char() {
2059        let executor = deterministic::Runner::default();
2060        executor.start(|context| async move {
2061            context.with_label("1invalid");
2062        });
2063    }
2064
2065    #[test]
2066    #[should_panic(expected = "label must only contain [a-zA-Z0-9_]")]
2067    fn test_metrics_label_invalid_char() {
2068        let executor = deterministic::Runner::default();
2069        executor.start(|context| async move {
2070            context.with_label("invalid-label");
2071        });
2072    }
2073
2074    #[test]
2075    #[should_panic(expected = "using runtime label is not allowed")]
2076    fn test_metrics_label_reserved_prefix() {
2077        let executor = deterministic::Runner::default();
2078        executor.start(|context| async move {
2079            context.with_label(METRICS_PREFIX);
2080        });
2081    }
2082
2083    #[test]
2084    #[should_panic(expected = "duplicate attribute key: epoch")]
2085    fn test_metrics_duplicate_attribute_panics() {
2086        let executor = deterministic::Runner::default();
2087        executor.start(|context| async move {
2088            let _ = context
2089                .with_label("test")
2090                .with_attribute("epoch", "old")
2091                .with_attribute("epoch", "new");
2092        });
2093    }
2094
2095    #[test]
2096    fn test_storage_fault_injection_and_recovery() {
2097        // Phase 1: Run with 100% sync failure rate
2098        let cfg = deterministic::Config::default().with_storage_faults(FaultConfig {
2099            sync_rate: Some(1.0),
2100            ..Default::default()
2101        });
2102
2103        let (result, checkpoint) =
2104            deterministic::Runner::new(cfg).start_and_recover(|ctx| async move {
2105                let (blob, _) = ctx.open("test_fault", b"blob").await.unwrap();
2106                blob.write_at(0, b"data".to_vec()).await.unwrap();
2107                blob.sync().await // This should fail due to fault injection
2108            });
2109
2110        // Verify sync failed
2111        assert!(result.is_err());
2112
2113        // Phase 2: Recover and disable faults explicitly
2114        deterministic::Runner::from(checkpoint).start(|ctx| async move {
2115            // Explicitly disable faults for recovery verification
2116            *ctx.storage_faults().write().unwrap() = FaultConfig::default();
2117
2118            // Data was not synced, so blob should be empty (unsynced writes are lost)
2119            let (blob, len) = ctx.open("test_fault", b"blob").await.unwrap();
2120            assert_eq!(len, 0, "unsynced data should be lost after recovery");
2121
2122            // Now we can write and sync successfully
2123            blob.write_at(0, b"recovered".to_vec()).await.unwrap();
2124            blob.sync()
2125                .await
2126                .expect("sync should succeed with faults disabled");
2127
2128            // Verify data persisted
2129            let read_buf = blob.read_at(0, vec![0u8; 9]).await.unwrap();
2130            assert_eq!(read_buf.coalesce(), b"recovered");
2131        });
2132    }
2133
2134    #[test]
2135    fn test_storage_fault_dynamic_config() {
2136        let executor = deterministic::Runner::default();
2137        executor.start(|ctx| async move {
2138            let (blob, _) = ctx.open("test_dynamic", b"blob").await.unwrap();
2139
2140            // Initially no faults - sync should succeed
2141            blob.write_at(0, b"initial".to_vec()).await.unwrap();
2142            blob.sync().await.expect("initial sync should succeed");
2143
2144            // Enable sync faults dynamically
2145            let faults = ctx.storage_faults();
2146            faults.write().unwrap().sync_rate = Some(1.0);
2147
2148            // Now sync should fail
2149            blob.write_at(0, b"updated".to_vec()).await.unwrap();
2150            let result = blob.sync().await;
2151            assert!(result.is_err(), "sync should fail with faults enabled");
2152
2153            // Disable faults
2154            faults.write().unwrap().sync_rate = Some(0.0);
2155
2156            // Sync should succeed again
2157            blob.sync()
2158                .await
2159                .expect("sync should succeed with faults disabled");
2160        });
2161    }
2162
2163    #[test]
2164    fn test_storage_fault_determinism() {
2165        // Run the same sequence twice with the same seed
2166        fn run_with_seed(seed: u64) -> Vec<bool> {
2167            let cfg = deterministic::Config::default()
2168                .with_seed(seed)
2169                .with_storage_faults(FaultConfig {
2170                    open_rate: Some(0.5),
2171                    ..Default::default()
2172                });
2173
2174            let runner = deterministic::Runner::new(cfg);
2175            runner.start(|ctx| async move {
2176                let mut results = Vec::new();
2177                for i in 0..20 {
2178                    let name = format!("blob{i}");
2179                    let result = ctx.open("test_determinism", name.as_bytes()).await;
2180                    results.push(result.is_ok());
2181                }
2182                results
2183            })
2184        }
2185
2186        let results1 = run_with_seed(12345);
2187        let results2 = run_with_seed(12345);
2188        assert_eq!(
2189            results1, results2,
2190            "same seed should produce same failure pattern"
2191        );
2192
2193        let results3 = run_with_seed(99999);
2194        assert_ne!(
2195            results1, results3,
2196            "different seeds should produce different patterns"
2197        );
2198    }
2199
2200    #[test]
2201    fn test_storage_fault_determinism_multi_task() {
2202        // Run the same multi-task sequence twice with the same seed.
2203        // This tests that task shuffling + fault decisions interleave deterministically.
2204        fn run_with_seed(seed: u64) -> Vec<u32> {
2205            let cfg = deterministic::Config::default()
2206                .with_seed(seed)
2207                .with_storage_faults(FaultConfig {
2208                    open_rate: Some(0.5),
2209                    write_rate: Some(0.3),
2210                    sync_rate: Some(0.2),
2211                    ..Default::default()
2212                });
2213
2214            let runner = deterministic::Runner::new(cfg);
2215            runner.start(|ctx| async move {
2216                // Spawn multiple tasks that do storage operations
2217                let mut handles = Vec::new();
2218                for i in 0..5 {
2219                    let ctx = ctx.clone();
2220                    handles.push(ctx.spawn(move |ctx| async move {
2221                        let mut successes = 0u32;
2222                        for j in 0..4 {
2223                            let name = format!("task{i}_blob{j}");
2224                            if let Ok((blob, _)) = ctx.open("partition", name.as_bytes()).await {
2225                                successes += 1;
2226                                if blob.write_at(0, b"data".to_vec()).await.is_ok() {
2227                                    successes += 1;
2228                                }
2229                                if blob.sync().await.is_ok() {
2230                                    successes += 1;
2231                                }
2232                            }
2233                        }
2234                        successes
2235                    }));
2236                }
2237
2238                // Collect results from all tasks
2239                let mut results = Vec::new();
2240                for handle in handles {
2241                    results.push(handle.await.unwrap());
2242                }
2243                results
2244            })
2245        }
2246
2247        let results1 = run_with_seed(42);
2248        let results2 = run_with_seed(42);
2249        assert_eq!(
2250            results1, results2,
2251            "same seed should produce same multi-task pattern"
2252        );
2253
2254        let results3 = run_with_seed(99999);
2255        assert_ne!(
2256            results1, results3,
2257            "different seeds should produce different patterns"
2258        );
2259    }
2260}