commonware_runtime/
deterministic.rs

1//! A deterministic runtime that randomly selects tasks to run based on a seed
2//!
3//! # Panics
4//!
5//! Unless configured otherwise, any task panic will lead to a runtime panic.
6//!
7//! # External Processes
8//!
9//! When testing an application that interacts with some external process, it can appear to
10//! the runtime that progress has stalled because no pending tasks can make progress and/or
11//! that futures resolve at variable latency (which in turn triggers non-deterministic execution).
12//!
13//! To support such applications, the runtime can be built with the `external` feature to both
14//! sleep for each [Config::cycle] (opting to wait if all futures are pending) and to constrain
15//! the resolution latency of any future (with `pace()`).
16//!
17//! **Applications that do not interact with external processes (or are able to mock them) should never
18//! need to enable this feature. It is commonly used when testing consensus with external execution environments
19//! that use their own runtime (but are deterministic over some set of inputs).**
20//!
21//! # Example
22//!
23//! ```rust
24//! use commonware_runtime::{Spawner, Runner, deterministic, Metrics};
25//!
26//! let executor =  deterministic::Runner::default();
27//! executor.start(|context| async move {
28//!     println!("Parent started");
29//!     let result = context.with_label("child").spawn(|_| async move {
30//!         println!("Child started");
31//!         "hello"
32//!     });
33//!     println!("Child result: {:?}", result.await);
34//!     println!("Parent exited");
35//!     println!("Auditor state: {}", context.auditor().state());
36//! });
37//! ```
38
39use crate::{
40    network::{
41        audited::Network as AuditedNetwork, deterministic::Network as DeterministicNetwork,
42        metered::Network as MeteredNetwork,
43    },
44    storage::{
45        audited::Storage as AuditedStorage, memory::Storage as MemStorage,
46        metered::Storage as MeteredStorage,
47    },
48    telemetry::metrics::task::Label,
49    utils::{
50        signal::{Signal, Stopper},
51        supervision::Tree,
52        Panicker,
53    },
54    validate_label, Clock, Error, Execution, Handle, ListenerOf, Metrics as _, Panicked,
55    Spawner as _, METRICS_PREFIX,
56};
57#[cfg(feature = "external")]
58use crate::{Blocker, Pacer};
59use commonware_codec::Encode;
60use commonware_macros::select;
61use commonware_parallel::ThreadPool;
62use commonware_utils::{hex, time::SYSTEM_TIME_PRECISION, SystemTimeExt};
63#[cfg(feature = "external")]
64use futures::task::noop_waker;
65use futures::{
66    future::BoxFuture,
67    task::{waker, ArcWake},
68    Future, FutureExt,
69};
70use governor::clock::{Clock as GClock, ReasonablyRealtime};
71#[cfg(feature = "external")]
72use pin_project::pin_project;
73use prometheus_client::{
74    encoding::text::encode,
75    metrics::{counter::Counter, family::Family, gauge::Gauge},
76    registry::{Metric, Registry},
77};
78use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
79use rand_core::CryptoRngCore;
80use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
81use sha2::{Digest as _, Sha256};
82use std::{
83    collections::{BTreeMap, BinaryHeap, HashMap},
84    mem::{replace, take},
85    net::{IpAddr, SocketAddr},
86    num::NonZeroUsize,
87    panic::{catch_unwind, resume_unwind, AssertUnwindSafe},
88    pin::Pin,
89    sync::{Arc, Mutex, Weak},
90    task::{self, Poll, Waker},
91    time::{Duration, SystemTime, UNIX_EPOCH},
92};
93use tracing::{info_span, trace, Instrument};
94
95#[derive(Debug)]
96struct Metrics {
97    iterations: Counter,
98    tasks_spawned: Family<Label, Counter>,
99    tasks_running: Family<Label, Gauge>,
100    task_polls: Family<Label, Counter>,
101
102    network_bandwidth: Counter,
103}
104
105impl Metrics {
106    pub fn init(registry: &mut Registry) -> Self {
107        let metrics = Self {
108            iterations: Counter::default(),
109            task_polls: Family::default(),
110            tasks_spawned: Family::default(),
111            tasks_running: Family::default(),
112            network_bandwidth: Counter::default(),
113        };
114        registry.register(
115            "iterations",
116            "Total number of iterations",
117            metrics.iterations.clone(),
118        );
119        registry.register(
120            "tasks_spawned",
121            "Total number of tasks spawned",
122            metrics.tasks_spawned.clone(),
123        );
124        registry.register(
125            "tasks_running",
126            "Number of tasks currently running",
127            metrics.tasks_running.clone(),
128        );
129        registry.register(
130            "task_polls",
131            "Total number of task polls",
132            metrics.task_polls.clone(),
133        );
134        registry.register(
135            "bandwidth",
136            "Total amount of data sent over network",
137            metrics.network_bandwidth.clone(),
138        );
139        metrics
140    }
141}
142
143/// A SHA-256 digest.
144type Digest = [u8; 32];
145
146/// Track the state of the runtime for determinism auditing.
147pub struct Auditor {
148    digest: Mutex<Digest>,
149}
150
151impl Default for Auditor {
152    fn default() -> Self {
153        Self {
154            digest: Digest::default().into(),
155        }
156    }
157}
158
159impl Auditor {
160    /// Record that an event happened.
161    /// This auditor's hash will be updated with the event's `label` and
162    /// whatever other data is passed in the `payload` closure.
163    pub(crate) fn event<F>(&self, label: &'static [u8], payload: F)
164    where
165        F: FnOnce(&mut Sha256),
166    {
167        let mut digest = self.digest.lock().unwrap();
168
169        let mut hasher = Sha256::new();
170        hasher.update(digest.as_ref());
171        hasher.update(label);
172        payload(&mut hasher);
173
174        *digest = hasher.finalize().into();
175    }
176
177    /// Generate a representation of the current state of the runtime.
178    ///
179    /// This can be used to ensure that logic running on top
180    /// of the runtime is interacting deterministically.
181    pub fn state(&self) -> String {
182        let hash = self.digest.lock().unwrap();
183        hex(hash.as_ref())
184    }
185}
186
187/// A dynamic RNG that can safely be sent between threads.
188pub type BoxDynRng = Box<dyn CryptoRngCore + Send + 'static>;
189
190/// Configuration for the `deterministic` runtime.
191pub struct Config {
192    /// Random number generator.
193    rng: BoxDynRng,
194
195    /// The cycle duration determines how much time is advanced after each iteration of the event
196    /// loop. This is useful to prevent starvation if some task never yields.
197    cycle: Duration,
198
199    /// If the runtime is still executing at this point (i.e. a test hasn't stopped), panic.
200    timeout: Option<Duration>,
201
202    /// Whether spawned tasks should catch panics instead of propagating them.
203    catch_panics: bool,
204}
205
206impl Config {
207    /// Returns a new [Config] with default values.
208    pub fn new() -> Self {
209        Self {
210            rng: Box::new(StdRng::seed_from_u64(42)),
211            cycle: Duration::from_millis(1),
212            timeout: None,
213            catch_panics: false,
214        }
215    }
216
217    // Setters
218    /// See [Config]
219    pub fn with_seed(self, seed: u64) -> Self {
220        self.with_rng(Box::new(StdRng::seed_from_u64(seed)))
221    }
222
223    /// Provide the config with a dynamic RNG directly.
224    ///
225    /// This can be useful for, e.g. fuzzing, where beyond just having randomness,
226    /// you might want to control specific bytes of the RNG. By taking in a dynamic
227    /// RNG object, any behavior is possible.
228    pub fn with_rng(mut self, rng: BoxDynRng) -> Self {
229        self.rng = rng;
230        self
231    }
232
233    /// See [Config]
234    pub const fn with_cycle(mut self, cycle: Duration) -> Self {
235        self.cycle = cycle;
236        self
237    }
238    /// See [Config]
239    pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
240        self.timeout = timeout;
241        self
242    }
243    /// See [Config]
244    pub const fn with_catch_panics(mut self, catch_panics: bool) -> Self {
245        self.catch_panics = catch_panics;
246        self
247    }
248
249    // Getters
250    /// See [Config]
251    pub const fn cycle(&self) -> Duration {
252        self.cycle
253    }
254    /// See [Config]
255    pub const fn timeout(&self) -> Option<Duration> {
256        self.timeout
257    }
258    /// See [Config]
259    pub const fn catch_panics(&self) -> bool {
260        self.catch_panics
261    }
262
263    /// Assert that the configuration is valid.
264    pub fn assert(&self) {
265        assert!(
266            self.cycle != Duration::default() || self.timeout.is_none(),
267            "cycle duration must be non-zero when timeout is set",
268        );
269        assert!(
270            self.cycle >= SYSTEM_TIME_PRECISION,
271            "cycle duration must be greater than or equal to system time precision"
272        );
273    }
274}
275
276impl Default for Config {
277    fn default() -> Self {
278        Self::new()
279    }
280}
281
282/// Deterministic runtime that randomly selects tasks to run based on a seed.
283pub struct Executor {
284    registry: Mutex<Registry>,
285    cycle: Duration,
286    deadline: Option<SystemTime>,
287    metrics: Arc<Metrics>,
288    auditor: Arc<Auditor>,
289    rng: Mutex<BoxDynRng>,
290    time: Mutex<SystemTime>,
291    tasks: Arc<Tasks>,
292    sleeping: Mutex<BinaryHeap<Alarm>>,
293    shutdown: Mutex<Stopper>,
294    panicker: Panicker,
295    dns: Mutex<HashMap<String, Vec<IpAddr>>>,
296}
297
298impl Executor {
299    /// Advance simulated time by [Config::cycle].
300    ///
301    /// When built with the `external` feature, sleep for [Config::cycle] to let
302    /// external processes make progress.
303    fn advance_time(&self) -> SystemTime {
304        #[cfg(feature = "external")]
305        std::thread::sleep(self.cycle);
306
307        let mut time = self.time.lock().unwrap();
308        *time = time
309            .checked_add(self.cycle)
310            .expect("executor time overflowed");
311        let now = *time;
312        trace!(now = now.epoch_millis(), "time advanced");
313        now
314    }
315
316    /// When idle, jump directly to the next actionable time.
317    ///
318    /// When built with the `external` feature, never skip ahead (to ensure we poll all pending tasks
319    /// every [Config::cycle]).
320    fn skip_idle_time(&self, current: SystemTime) -> SystemTime {
321        if cfg!(feature = "external") || self.tasks.ready() != 0 {
322            return current;
323        }
324
325        let mut skip_until = None;
326        {
327            let sleeping = self.sleeping.lock().unwrap();
328            if let Some(next) = sleeping.peek() {
329                if next.time > current {
330                    skip_until = Some(next.time);
331                }
332            }
333        }
334
335        skip_until.map_or(current, |deadline| {
336            let mut time = self.time.lock().unwrap();
337            *time = deadline;
338            let now = *time;
339            trace!(now = now.epoch_millis(), "time skipped");
340            now
341        })
342    }
343
344    /// Wake any sleepers whose deadlines have elapsed.
345    fn wake_ready_sleepers(&self, current: SystemTime) {
346        let mut sleeping = self.sleeping.lock().unwrap();
347        while let Some(next) = sleeping.peek() {
348            if next.time <= current {
349                let sleeper = sleeping.pop().unwrap();
350                sleeper.waker.wake();
351            } else {
352                break;
353            }
354        }
355    }
356
357    /// Ensure the runtime is making progress.
358    ///
359    /// When built with the `external` feature, always poll pending tasks after the passage of time.
360    fn assert_liveness(&self) {
361        if cfg!(feature = "external") || self.tasks.ready() != 0 {
362            return;
363        }
364
365        panic!("runtime stalled");
366    }
367}
368
369/// An artifact that can be used to recover the state of the runtime.
370///
371/// This is useful when mocking unclean shutdown (while retaining deterministic behavior).
372pub struct Checkpoint {
373    cycle: Duration,
374    deadline: Option<SystemTime>,
375    auditor: Arc<Auditor>,
376    rng: Mutex<BoxDynRng>,
377    time: Mutex<SystemTime>,
378    storage: Arc<Storage>,
379    dns: Mutex<HashMap<String, Vec<IpAddr>>>,
380    catch_panics: bool,
381}
382
383impl Checkpoint {
384    /// Get a reference to the [Auditor].
385    pub fn auditor(&self) -> Arc<Auditor> {
386        self.auditor.clone()
387    }
388}
389
390#[allow(clippy::large_enum_variant)]
391enum State {
392    Config(Config),
393    Checkpoint(Checkpoint),
394}
395
396/// Implementation of [crate::Runner] for the `deterministic` runtime.
397pub struct Runner {
398    state: State,
399}
400
401impl From<Config> for Runner {
402    fn from(cfg: Config) -> Self {
403        Self::new(cfg)
404    }
405}
406
407impl From<Checkpoint> for Runner {
408    fn from(checkpoint: Checkpoint) -> Self {
409        Self {
410            state: State::Checkpoint(checkpoint),
411        }
412    }
413}
414
415impl Runner {
416    /// Initialize a new `deterministic` runtime with the given seed and cycle duration.
417    pub fn new(cfg: Config) -> Self {
418        // Ensure config is valid
419        cfg.assert();
420        Self {
421            state: State::Config(cfg),
422        }
423    }
424
425    /// Initialize a new `deterministic` runtime with the default configuration
426    /// and the provided seed.
427    pub fn seeded(seed: u64) -> Self {
428        Self::new(Config::default().with_seed(seed))
429    }
430
431    /// Initialize a new `deterministic` runtime with the default configuration
432    /// but exit after the given timeout.
433    pub fn timed(timeout: Duration) -> Self {
434        let cfg = Config {
435            timeout: Some(timeout),
436            ..Config::default()
437        };
438        Self::new(cfg)
439    }
440
441    /// Like [crate::Runner::start], but also returns a [Checkpoint] that can be used
442    /// to recover the state of the runtime in a subsequent run.
443    pub fn start_and_recover<F, Fut>(self, f: F) -> (Fut::Output, Checkpoint)
444    where
445        F: FnOnce(Context) -> Fut,
446        Fut: Future,
447    {
448        // Setup context and return strong reference to executor
449        let (context, executor, panicked) = match self.state {
450            State::Config(config) => Context::new(config),
451            State::Checkpoint(checkpoint) => Context::recover(checkpoint),
452        };
453
454        // Pin root task to the heap
455        let storage = context.storage.clone();
456        let mut root = Box::pin(panicked.interrupt(f(context)));
457
458        // Register the root task
459        Tasks::register_root(&executor.tasks);
460
461        // Process tasks until root task completes or progress stalls.
462        // Wrap the loop in catch_unwind to ensure task cleanup runs even if the loop or a task panics.
463        let result = catch_unwind(AssertUnwindSafe(|| loop {
464            // Ensure we have not exceeded our deadline
465            {
466                let current = executor.time.lock().unwrap();
467                if let Some(deadline) = executor.deadline {
468                    if *current >= deadline {
469                        // Drop the lock before panicking to avoid mutex poisoning.
470                        drop(current);
471                        panic!("runtime timeout");
472                    }
473                }
474            }
475
476            // Drain all ready tasks
477            let mut queue = executor.tasks.drain();
478
479            // Shuffle tasks (if more than one)
480            if queue.len() > 1 {
481                let mut rng = executor.rng.lock().unwrap();
482                queue.shuffle(&mut *rng);
483            }
484
485            // Run all snapshotted tasks
486            //
487            // This approach is more efficient than randomly selecting a task one-at-a-time
488            // because it ensures we don't pull the same pending task multiple times in a row (without
489            // processing a different task required for other tasks to make progress).
490            trace!(
491                iter = executor.metrics.iterations.get(),
492                tasks = queue.len(),
493                "starting loop"
494            );
495            let mut output = None;
496            for id in queue {
497                // Lookup the task (it may have completed already)
498                let Some(task) = executor.tasks.get(id) else {
499                    trace!(id, "skipping missing task");
500                    continue;
501                };
502
503                // Record task for auditing
504                executor.auditor.event(b"process_task", |hasher| {
505                    hasher.update(task.id.to_be_bytes());
506                    hasher.update(task.label.name().as_bytes());
507                });
508                executor.metrics.task_polls.get_or_create(&task.label).inc();
509                trace!(id, "processing task");
510
511                // Prepare task for polling
512                let waker = waker(Arc::new(TaskWaker {
513                    id,
514                    tasks: Arc::downgrade(&executor.tasks),
515                }));
516                let mut cx = task::Context::from_waker(&waker);
517
518                // Poll the task
519                match &task.mode {
520                    Mode::Root => {
521                        // Poll the root task
522                        if let Poll::Ready(result) = root.as_mut().poll(&mut cx) {
523                            trace!(id, "root task is complete");
524                            output = Some(result);
525                            break;
526                        }
527                    }
528                    Mode::Work(future) => {
529                        // Get the future (if it still exists)
530                        let mut fut_opt = future.lock().unwrap();
531                        let Some(fut) = fut_opt.as_mut() else {
532                            trace!(id, "skipping already complete task");
533
534                            // Remove the future
535                            executor.tasks.remove(id);
536                            continue;
537                        };
538
539                        // Poll the task
540                        if fut.as_mut().poll(&mut cx).is_ready() {
541                            trace!(id, "task is complete");
542
543                            // Remove the future
544                            executor.tasks.remove(id);
545                            *fut_opt = None;
546                            continue;
547                        }
548                    }
549                }
550
551                // Try again later if task is still pending
552                trace!(id, "task is still pending");
553            }
554
555            // If the root task has completed, exit as soon as possible
556            if let Some(output) = output {
557                break output;
558            }
559
560            // Advance time (skipping ahead if no tasks are ready yet)
561            let mut current = executor.advance_time();
562            current = executor.skip_idle_time(current);
563
564            // Wake sleepers and ensure we continue to make progress
565            executor.wake_ready_sleepers(current);
566            executor.assert_liveness();
567
568            // Record that we completed another iteration of the event loop.
569            executor.metrics.iterations.inc();
570        }));
571
572        // Clear remaining tasks from the executor.
573        //
574        // It is critical that we wait to drop the strong
575        // reference to executor until after we have dropped
576        // all tasks (as they may attempt to upgrade their weak
577        // reference to the executor during drop).
578        executor.sleeping.lock().unwrap().clear(); // included in tasks
579        let tasks = executor.tasks.clear();
580        for task in tasks {
581            let Mode::Work(future) = &task.mode else {
582                continue;
583            };
584            *future.lock().unwrap() = None;
585        }
586
587        // Drop the root task to release any Context references it may still hold.
588        // This is necessary when the loop exits early (e.g., timeout) while the
589        // root future is still Pending and holds captured variables with Context references.
590        drop(root);
591
592        // Assert the context doesn't escape the start() function (behavior
593        // is undefined in this case)
594        assert!(
595            Arc::weak_count(&executor) == 0,
596            "executor still has weak references"
597        );
598
599        // Handle the result — resume the original panic after cleanup if one was caught.
600        let output = match result {
601            Ok(output) => output,
602            Err(payload) => resume_unwind(payload),
603        };
604
605        // Extract the executor from the Arc
606        let executor = Arc::into_inner(executor).expect("executor still has strong references");
607
608        // Construct a checkpoint that can be used to restart the runtime
609        let checkpoint = Checkpoint {
610            cycle: executor.cycle,
611            deadline: executor.deadline,
612            auditor: executor.auditor,
613            rng: executor.rng,
614            time: executor.time,
615            storage,
616            dns: executor.dns,
617            catch_panics: executor.panicker.catch(),
618        };
619
620        (output, checkpoint)
621    }
622}
623
624impl Default for Runner {
625    fn default() -> Self {
626        Self::new(Config::default())
627    }
628}
629
630impl crate::Runner for Runner {
631    type Context = Context;
632
633    fn start<F, Fut>(self, f: F) -> Fut::Output
634    where
635        F: FnOnce(Self::Context) -> Fut,
636        Fut: Future,
637    {
638        let (output, _) = self.start_and_recover(f);
639        output
640    }
641}
642
643/// The mode of a [Task].
644enum Mode {
645    Root,
646    Work(Mutex<Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>>),
647}
648
649/// A future being executed by the [Executor].
650struct Task {
651    id: u128,
652    label: Label,
653
654    mode: Mode,
655}
656
657/// A waker for a [Task].
658struct TaskWaker {
659    id: u128,
660
661    tasks: Weak<Tasks>,
662}
663
664impl ArcWake for TaskWaker {
665    fn wake_by_ref(arc_self: &Arc<Self>) {
666        // Upgrade the weak reference to re-enqueue this task.
667        // If upgrade fails, the task queue has been dropped and no action is required.
668        //
669        // This can happen if some data is passed into the runtime and it drops after the runtime exits.
670        if let Some(tasks) = arc_self.tasks.upgrade() {
671            tasks.queue(arc_self.id);
672        }
673    }
674}
675
676/// A collection of [Task]s that are being executed by the [Executor].
677struct Tasks {
678    /// The next task id.
679    counter: Mutex<u128>,
680    /// Tasks ready to be polled.
681    ready: Mutex<Vec<u128>>,
682    /// All running tasks.
683    running: Mutex<BTreeMap<u128, Arc<Task>>>,
684}
685
686impl Tasks {
687    /// Create a new task queue.
688    const fn new() -> Self {
689        Self {
690            counter: Mutex::new(0),
691            ready: Mutex::new(Vec::new()),
692            running: Mutex::new(BTreeMap::new()),
693        }
694    }
695
696    /// Increment the task counter and return the old value.
697    fn increment(&self) -> u128 {
698        let mut counter = self.counter.lock().unwrap();
699        let old = *counter;
700        *counter = counter.checked_add(1).expect("task counter overflow");
701        old
702    }
703
704    /// Register the root task.
705    ///
706    /// If the root task has already been registered, this function will panic.
707    fn register_root(arc_self: &Arc<Self>) {
708        let id = arc_self.increment();
709        let task = Arc::new(Task {
710            id,
711            label: Label::root(),
712            mode: Mode::Root,
713        });
714        arc_self.register(id, task);
715    }
716
717    /// Register a non-root task to be executed.
718    fn register_work(
719        arc_self: &Arc<Self>,
720        label: Label,
721        future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
722    ) {
723        let id = arc_self.increment();
724        let task = Arc::new(Task {
725            id,
726            label,
727            mode: Mode::Work(Mutex::new(Some(future))),
728        });
729        arc_self.register(id, task);
730    }
731
732    /// Register a new task to be executed.
733    fn register(&self, id: u128, task: Arc<Task>) {
734        // Track as running until completion
735        self.running.lock().unwrap().insert(id, task);
736
737        // Add to ready
738        self.queue(id);
739    }
740
741    /// Enqueue an already registered task to be executed.
742    fn queue(&self, id: u128) {
743        let mut ready = self.ready.lock().unwrap();
744        ready.push(id);
745    }
746
747    /// Drain all ready tasks.
748    fn drain(&self) -> Vec<u128> {
749        let mut queue = self.ready.lock().unwrap();
750        let len = queue.len();
751        replace(&mut *queue, Vec::with_capacity(len))
752    }
753
754    /// The number of ready tasks.
755    fn ready(&self) -> usize {
756        self.ready.lock().unwrap().len()
757    }
758
759    /// Lookup a task.
760    ///
761    /// We must return cloned here because we cannot hold the running lock while polling a task (will
762    /// deadlock if [Self::register_work] is called).
763    fn get(&self, id: u128) -> Option<Arc<Task>> {
764        let running = self.running.lock().unwrap();
765        running.get(&id).cloned()
766    }
767
768    /// Remove a task.
769    fn remove(&self, id: u128) {
770        self.running.lock().unwrap().remove(&id);
771    }
772
773    /// Clear all tasks.
774    fn clear(&self) -> Vec<Arc<Task>> {
775        // Clear ready
776        self.ready.lock().unwrap().clear();
777
778        // Clear running tasks
779        let running: BTreeMap<u128, Arc<Task>> = {
780            let mut running = self.running.lock().unwrap();
781            take(&mut *running)
782        };
783        running.into_values().collect()
784    }
785}
786
787type Network = MeteredNetwork<AuditedNetwork<DeterministicNetwork>>;
788type Storage = MeteredStorage<AuditedStorage<MemStorage>>;
789
790/// Implementation of [crate::Spawner], [crate::Clock],
791/// [crate::Network], and [crate::Storage] for the `deterministic`
792/// runtime.
793pub struct Context {
794    name: String,
795    executor: Weak<Executor>,
796    network: Arc<Network>,
797    storage: Arc<Storage>,
798    tree: Arc<Tree>,
799    execution: Execution,
800    instrumented: bool,
801}
802
803impl Clone for Context {
804    fn clone(&self) -> Self {
805        let (child, _) = Tree::child(&self.tree);
806        Self {
807            name: self.name.clone(),
808            executor: self.executor.clone(),
809            network: self.network.clone(),
810            storage: self.storage.clone(),
811
812            tree: child,
813            execution: Execution::default(),
814            instrumented: false,
815        }
816    }
817}
818
819impl Context {
820    fn new(cfg: Config) -> (Self, Arc<Executor>, Panicked) {
821        // Create a new registry
822        let mut registry = Registry::default();
823        let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
824
825        // Initialize runtime
826        let metrics = Arc::new(Metrics::init(runtime_registry));
827        let start_time = UNIX_EPOCH;
828        let deadline = cfg
829            .timeout
830            .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
831        let auditor = Arc::new(Auditor::default());
832        let storage = MeteredStorage::new(
833            AuditedStorage::new(MemStorage::default(), auditor.clone()),
834            runtime_registry,
835        );
836        let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
837        let network = MeteredNetwork::new(network, runtime_registry);
838
839        // Initialize panicker
840        let (panicker, panicked) = Panicker::new(cfg.catch_panics);
841
842        let executor = Arc::new(Executor {
843            registry: Mutex::new(registry),
844            cycle: cfg.cycle,
845            deadline,
846            metrics,
847            auditor,
848            rng: Mutex::new(cfg.rng),
849            time: Mutex::new(start_time),
850            tasks: Arc::new(Tasks::new()),
851            sleeping: Mutex::new(BinaryHeap::new()),
852            shutdown: Mutex::new(Stopper::default()),
853            panicker,
854            dns: Mutex::new(HashMap::new()),
855        });
856
857        (
858            Self {
859                name: String::new(),
860                executor: Arc::downgrade(&executor),
861                network: Arc::new(network),
862                storage: Arc::new(storage),
863                tree: Tree::root(),
864                execution: Execution::default(),
865                instrumented: false,
866            },
867            executor,
868            panicked,
869        )
870    }
871
872    /// Recover the inner state (deadline, metrics, auditor, rng, synced storage, etc.) from the
873    /// current runtime and use it to initialize a new instance of the runtime. A recovered runtime
874    /// does not inherit the current runtime's pending tasks, unsynced storage, network connections, nor
875    /// its shutdown signaler.
876    ///
877    /// This is useful for performing a deterministic simulation that spans multiple runtime instantiations,
878    /// like simulating unclean shutdown (which involves repeatedly halting the runtime at unexpected intervals).
879    ///
880    /// It is only permitted to call this method after the runtime has finished (i.e. once `start` returns)
881    /// and only permitted to do once (otherwise multiple recovered runtimes will share the same inner state).
882    /// If either one of these conditions is violated, this method will panic.
883    fn recover(checkpoint: Checkpoint) -> (Self, Arc<Executor>, Panicked) {
884        // Rebuild metrics
885        let mut registry = Registry::default();
886        let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
887        let metrics = Arc::new(Metrics::init(runtime_registry));
888
889        // Copy state
890        let network =
891            AuditedNetwork::new(DeterministicNetwork::default(), checkpoint.auditor.clone());
892        let network = MeteredNetwork::new(network, runtime_registry);
893
894        // Initialize panicker
895        let (panicker, panicked) = Panicker::new(checkpoint.catch_panics);
896
897        let executor = Arc::new(Executor {
898            // Copied from the checkpoint
899            cycle: checkpoint.cycle,
900            deadline: checkpoint.deadline,
901            auditor: checkpoint.auditor,
902            rng: checkpoint.rng,
903            time: checkpoint.time,
904            dns: checkpoint.dns,
905
906            // New state for the new runtime
907            registry: Mutex::new(registry),
908            metrics,
909            tasks: Arc::new(Tasks::new()),
910            sleeping: Mutex::new(BinaryHeap::new()),
911            shutdown: Mutex::new(Stopper::default()),
912            panicker,
913        });
914        (
915            Self {
916                name: String::new(),
917                executor: Arc::downgrade(&executor),
918                network: Arc::new(network),
919                storage: checkpoint.storage,
920                tree: Tree::root(),
921                execution: Execution::default(),
922                instrumented: false,
923            },
924            executor,
925            panicked,
926        )
927    }
928
929    /// Upgrade Weak reference to [Executor].
930    fn executor(&self) -> Arc<Executor> {
931        self.executor.upgrade().expect("executor already dropped")
932    }
933
934    /// Get a reference to [Metrics].
935    fn metrics(&self) -> Arc<Metrics> {
936        self.executor().metrics.clone()
937    }
938
939    /// Get a reference to the [Auditor].
940    pub fn auditor(&self) -> Arc<Auditor> {
941        self.executor().auditor.clone()
942    }
943
944    /// Compute a [Sha256] digest of all storage contents.
945    pub fn storage_audit(&self) -> Digest {
946        self.storage.inner().inner().audit()
947    }
948
949    /// Register a DNS mapping for a hostname.
950    ///
951    /// If `addrs` is `None`, the mapping is removed.
952    /// If `addrs` is `Some`, the mapping is added or updated.
953    pub fn resolver_register(&self, host: impl Into<String>, addrs: Option<Vec<IpAddr>>) {
954        // Update the auditor
955        let executor = self.executor();
956        let host = host.into();
957        executor.auditor.event(b"resolver_register", |hasher| {
958            hasher.update(host.as_bytes());
959            hasher.update(addrs.encode());
960        });
961
962        // Update the DNS mapping
963        let mut dns = executor.dns.lock().unwrap();
964        match addrs {
965            Some(addrs) => {
966                dns.insert(host, addrs);
967            }
968            None => {
969                dns.remove(&host);
970            }
971        }
972    }
973}
974
975impl crate::Spawner for Context {
976    fn dedicated(mut self) -> Self {
977        self.execution = Execution::Dedicated;
978        self
979    }
980
981    fn shared(mut self, blocking: bool) -> Self {
982        self.execution = Execution::Shared(blocking);
983        self
984    }
985
986    fn instrumented(mut self) -> Self {
987        self.instrumented = true;
988        self
989    }
990
991    fn spawn<F, Fut, T>(mut self, f: F) -> Handle<T>
992    where
993        F: FnOnce(Self) -> Fut + Send + 'static,
994        Fut: Future<Output = T> + Send + 'static,
995        T: Send + 'static,
996    {
997        // Get metrics
998        let (label, metric) = spawn_metrics!(self);
999
1000        // Track supervision before resetting configuration
1001        let parent = Arc::clone(&self.tree);
1002        let is_instrumented = self.instrumented;
1003        self.execution = Execution::default();
1004        self.instrumented = false;
1005        let (child, aborted) = Tree::child(&parent);
1006        if aborted {
1007            return Handle::closed(metric);
1008        }
1009        self.tree = child;
1010
1011        // Spawn the task (we don't care about Model)
1012        let executor = self.executor();
1013        let future: BoxFuture<'_, T> = if is_instrumented {
1014            f(self)
1015                .instrument(info_span!(parent: None, "task", name = %label.name()))
1016                .boxed()
1017        } else {
1018            f(self).boxed()
1019        };
1020        let (f, handle) = Handle::init(
1021            future,
1022            metric,
1023            executor.panicker.clone(),
1024            Arc::clone(&parent),
1025        );
1026        Tasks::register_work(&executor.tasks, label, Box::pin(f));
1027
1028        // Register the task on the parent
1029        if let Some(aborter) = handle.aborter() {
1030            parent.register(aborter);
1031        }
1032
1033        handle
1034    }
1035
1036    async fn stop(self, value: i32, timeout: Option<Duration>) -> Result<(), Error> {
1037        let executor = self.executor();
1038        executor.auditor.event(b"stop", |hasher| {
1039            hasher.update(value.to_be_bytes());
1040        });
1041        let stop_resolved = {
1042            let mut shutdown = executor.shutdown.lock().unwrap();
1043            shutdown.stop(value)
1044        };
1045
1046        // Wait for all tasks to complete or the timeout to fire
1047        let timeout_future = timeout.map_or_else(
1048            || futures::future::Either::Right(futures::future::pending()),
1049            |duration| futures::future::Either::Left(self.sleep(duration)),
1050        );
1051        select! {
1052            result = stop_resolved => {
1053                result.map_err(|_| Error::Closed)?;
1054                Ok(())
1055            },
1056            _ = timeout_future => {
1057                Err(Error::Timeout)
1058            }
1059        }
1060    }
1061
1062    fn stopped(&self) -> Signal {
1063        let executor = self.executor();
1064        executor.auditor.event(b"stopped", |_| {});
1065        let stopped = executor.shutdown.lock().unwrap().stopped();
1066        stopped
1067    }
1068}
1069
1070impl crate::RayonPoolSpawner for Context {
1071    fn create_pool(&self, concurrency: NonZeroUsize) -> Result<ThreadPool, ThreadPoolBuildError> {
1072        let mut builder = ThreadPoolBuilder::new().num_threads(concurrency.get());
1073
1074        if rayon::current_thread_index().is_none() {
1075            builder = builder.use_current_thread()
1076        }
1077
1078        builder
1079            .spawn_handler(move |thread| {
1080                self.with_label("rayon_thread")
1081                    .dedicated()
1082                    .spawn(move |_| async move { thread.run() });
1083                Ok(())
1084            })
1085            .build()
1086            .map(Arc::new)
1087    }
1088}
1089
1090impl crate::Metrics for Context {
1091    fn with_label(&self, label: &str) -> Self {
1092        // Ensure the label is well-formatted
1093        validate_label(label);
1094
1095        // Construct the full label name
1096        let name = {
1097            let prefix = self.name.clone();
1098            if prefix.is_empty() {
1099                label.to_string()
1100            } else {
1101                format!("{prefix}_{label}")
1102            }
1103        };
1104        assert!(
1105            !name.starts_with(METRICS_PREFIX),
1106            "using runtime label is not allowed"
1107        );
1108        Self {
1109            name,
1110            ..self.clone()
1111        }
1112    }
1113
1114    fn label(&self) -> String {
1115        self.name.clone()
1116    }
1117
1118    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
1119        // Prepare args
1120        let name = name.into();
1121        let help = help.into();
1122
1123        // Register metric
1124        let executor = self.executor();
1125        executor.auditor.event(b"register", |hasher| {
1126            hasher.update(name.as_bytes());
1127            hasher.update(help.as_bytes());
1128        });
1129        let prefixed_name = {
1130            let prefix = &self.name;
1131            if prefix.is_empty() {
1132                name
1133            } else {
1134                format!("{}_{}", *prefix, name)
1135            }
1136        };
1137        executor
1138            .registry
1139            .lock()
1140            .unwrap()
1141            .register(prefixed_name, help, metric);
1142    }
1143
1144    fn encode(&self) -> String {
1145        let executor = self.executor();
1146        executor.auditor.event(b"encode", |_| {});
1147        let mut buffer = String::new();
1148        encode(&mut buffer, &executor.registry.lock().unwrap()).expect("encoding failed");
1149        buffer
1150    }
1151}
1152
1153struct Sleeper {
1154    executor: Weak<Executor>,
1155    time: SystemTime,
1156    registered: bool,
1157}
1158
1159impl Sleeper {
1160    /// Upgrade Weak reference to [Executor].
1161    fn executor(&self) -> Arc<Executor> {
1162        self.executor.upgrade().expect("executor already dropped")
1163    }
1164}
1165
1166struct Alarm {
1167    time: SystemTime,
1168    waker: Waker,
1169}
1170
1171impl PartialEq for Alarm {
1172    fn eq(&self, other: &Self) -> bool {
1173        self.time.eq(&other.time)
1174    }
1175}
1176
1177impl Eq for Alarm {}
1178
1179impl PartialOrd for Alarm {
1180    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1181        Some(self.cmp(other))
1182    }
1183}
1184
1185impl Ord for Alarm {
1186    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1187        // Reverse the ordering for min-heap
1188        other.time.cmp(&self.time)
1189    }
1190}
1191
1192impl Future for Sleeper {
1193    type Output = ();
1194
1195    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1196        let executor = self.executor();
1197        {
1198            let current_time = *executor.time.lock().unwrap();
1199            if current_time >= self.time {
1200                return Poll::Ready(());
1201            }
1202        }
1203        if !self.registered {
1204            self.registered = true;
1205            executor.sleeping.lock().unwrap().push(Alarm {
1206                time: self.time,
1207                waker: cx.waker().clone(),
1208            });
1209        }
1210        Poll::Pending
1211    }
1212}
1213
1214impl Clock for Context {
1215    fn current(&self) -> SystemTime {
1216        *self.executor().time.lock().unwrap()
1217    }
1218
1219    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
1220        let deadline = self
1221            .current()
1222            .checked_add(duration)
1223            .expect("overflow when setting wake time");
1224        self.sleep_until(deadline)
1225    }
1226
1227    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
1228        Sleeper {
1229            executor: self.executor.clone(),
1230
1231            time: deadline,
1232            registered: false,
1233        }
1234    }
1235}
1236
1237/// A future that resolves when a given target time is reached.
1238///
1239/// If the future is not ready at the target time, the future is blocked until the target time is reached.
1240#[cfg(feature = "external")]
1241#[pin_project]
1242struct Waiter<F: Future> {
1243    executor: Weak<Executor>,
1244    target: SystemTime,
1245    #[pin]
1246    future: F,
1247    ready: Option<F::Output>,
1248    started: bool,
1249    registered: bool,
1250}
1251
1252#[cfg(feature = "external")]
1253impl<F> Future for Waiter<F>
1254where
1255    F: Future + Send,
1256{
1257    type Output = F::Output;
1258
1259    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1260        let mut this = self.project();
1261
1262        // Poll once with a noop waker so the future can register interest or start work
1263        // without being able to wake this task before the sampled delay expires. Any ready
1264        // value is cached and only released after the clock reaches `self.target`.
1265        if !*this.started {
1266            *this.started = true;
1267            let waker = noop_waker();
1268            let mut cx_noop = task::Context::from_waker(&waker);
1269            if let Poll::Ready(value) = this.future.as_mut().poll(&mut cx_noop) {
1270                *this.ready = Some(value);
1271            }
1272        }
1273
1274        // Only allow the task to progress once the sampled delay has elapsed.
1275        let executor = this.executor.upgrade().expect("executor already dropped");
1276        let current_time = *executor.time.lock().unwrap();
1277        if current_time < *this.target {
1278            // Register exactly once with the deterministic sleeper queue so the executor
1279            // wakes us once the clock reaches the scheduled target time.
1280            if !*this.registered {
1281                *this.registered = true;
1282                executor.sleeping.lock().unwrap().push(Alarm {
1283                    time: *this.target,
1284                    waker: cx.waker().clone(),
1285                });
1286            }
1287            return Poll::Pending;
1288        }
1289
1290        // If the underlying future completed during the noop pre-poll, surface the cached value.
1291        if let Some(value) = this.ready.take() {
1292            return Poll::Ready(value);
1293        }
1294
1295        // Block the current thread until the future reschedules itself, keeping polling
1296        // deterministic with respect to executor time.
1297        let blocker = Blocker::new();
1298        loop {
1299            let waker = waker(blocker.clone());
1300            let mut cx_block = task::Context::from_waker(&waker);
1301            match this.future.as_mut().poll(&mut cx_block) {
1302                Poll::Ready(value) => {
1303                    break Poll::Ready(value);
1304                }
1305                Poll::Pending => blocker.wait(),
1306            }
1307        }
1308    }
1309}
1310
1311#[cfg(feature = "external")]
1312impl Pacer for Context {
1313    fn pace<'a, F, T>(&'a self, latency: Duration, future: F) -> impl Future<Output = T> + Send + 'a
1314    where
1315        F: Future<Output = T> + Send + 'a,
1316        T: Send + 'a,
1317    {
1318        // Compute target time
1319        let target = self
1320            .executor()
1321            .time
1322            .lock()
1323            .unwrap()
1324            .checked_add(latency)
1325            .expect("overflow when setting wake time");
1326
1327        Waiter {
1328            executor: self.executor.clone(),
1329            target,
1330            future,
1331            ready: None,
1332            started: false,
1333            registered: false,
1334        }
1335    }
1336}
1337
1338impl GClock for Context {
1339    type Instant = SystemTime;
1340
1341    fn now(&self) -> Self::Instant {
1342        self.current()
1343    }
1344}
1345
1346impl ReasonablyRealtime for Context {}
1347
1348impl crate::Network for Context {
1349    type Listener = ListenerOf<Network>;
1350
1351    async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
1352        self.network.bind(socket).await
1353    }
1354
1355    async fn dial(
1356        &self,
1357        socket: SocketAddr,
1358    ) -> Result<(crate::SinkOf<Self>, crate::StreamOf<Self>), Error> {
1359        self.network.dial(socket).await
1360    }
1361}
1362
1363impl crate::Resolver for Context {
1364    async fn resolve(&self, host: &str) -> Result<Vec<IpAddr>, Error> {
1365        // Get the record
1366        let executor = self.executor();
1367        let dns = executor.dns.lock().unwrap();
1368        let result = dns.get(host).cloned();
1369        drop(dns);
1370
1371        // Update the auditor
1372        executor.auditor.event(b"resolve", |hasher| {
1373            hasher.update(host.as_bytes());
1374            hasher.update(result.encode());
1375        });
1376        result.ok_or_else(|| Error::ResolveFailed(host.to_string()))
1377    }
1378}
1379
1380impl RngCore for Context {
1381    fn next_u32(&mut self) -> u32 {
1382        let executor = self.executor();
1383        executor.auditor.event(b"rand", |hasher| {
1384            hasher.update(b"next_u32");
1385        });
1386        let result = executor.rng.lock().unwrap().next_u32();
1387        result
1388    }
1389
1390    fn next_u64(&mut self) -> u64 {
1391        let executor = self.executor();
1392        executor.auditor.event(b"rand", |hasher| {
1393            hasher.update(b"next_u64");
1394        });
1395        let result = executor.rng.lock().unwrap().next_u64();
1396        result
1397    }
1398
1399    fn fill_bytes(&mut self, dest: &mut [u8]) {
1400        let executor = self.executor();
1401        executor.auditor.event(b"rand", |hasher| {
1402            hasher.update(b"fill_bytes");
1403        });
1404        executor.rng.lock().unwrap().fill_bytes(dest);
1405    }
1406
1407    fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1408        let executor = self.executor();
1409        executor.auditor.event(b"rand", |hasher| {
1410            hasher.update(b"try_fill_bytes");
1411        });
1412        let result = executor.rng.lock().unwrap().try_fill_bytes(dest);
1413        result
1414    }
1415}
1416
1417impl CryptoRng for Context {}
1418
1419impl crate::Storage for Context {
1420    type Blob = <Storage as crate::Storage>::Blob;
1421
1422    async fn open_versioned(
1423        &self,
1424        partition: &str,
1425        name: &[u8],
1426        versions: std::ops::RangeInclusive<u16>,
1427    ) -> Result<(Self::Blob, u64, u16), Error> {
1428        self.storage.open_versioned(partition, name, versions).await
1429    }
1430
1431    async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1432        self.storage.remove(partition, name).await
1433    }
1434
1435    async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1436        self.storage.scan(partition).await
1437    }
1438}
1439
1440#[cfg(test)]
1441mod tests {
1442    use super::*;
1443    #[cfg(feature = "external")]
1444    use crate::FutureExt;
1445    #[cfg(feature = "external")]
1446    use crate::Spawner;
1447    use crate::{
1448        deterministic, reschedule, utils::run_tasks, Blob, Metrics, Resolver, Runner as _, Storage,
1449    };
1450    use commonware_macros::test_traced;
1451    #[cfg(not(feature = "external"))]
1452    use futures::future::pending;
1453    #[cfg(feature = "external")]
1454    use futures::{channel::mpsc, SinkExt, StreamExt};
1455    use futures::{channel::oneshot, task::noop_waker};
1456
1457    fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1458        let executor = deterministic::Runner::seeded(seed);
1459        run_tasks(5, executor)
1460    }
1461
1462    #[test]
1463    fn test_same_seed_same_order() {
1464        // Generate initial outputs
1465        let mut outputs = Vec::new();
1466        for seed in 0..1000 {
1467            let output = run_with_seed(seed);
1468            outputs.push(output);
1469        }
1470
1471        // Ensure they match
1472        for seed in 0..1000 {
1473            let output = run_with_seed(seed);
1474            assert_eq!(output, outputs[seed as usize]);
1475        }
1476    }
1477
1478    #[test_traced("TRACE")]
1479    fn test_different_seeds_different_order() {
1480        let output1 = run_with_seed(12345);
1481        let output2 = run_with_seed(54321);
1482        assert_ne!(output1, output2);
1483    }
1484
1485    #[test]
1486    fn test_alarm_min_heap() {
1487        // Populate heap
1488        let now = SystemTime::now();
1489        let alarms = vec![
1490            Alarm {
1491                time: now + Duration::new(10, 0),
1492                waker: noop_waker(),
1493            },
1494            Alarm {
1495                time: now + Duration::new(5, 0),
1496                waker: noop_waker(),
1497            },
1498            Alarm {
1499                time: now + Duration::new(15, 0),
1500                waker: noop_waker(),
1501            },
1502            Alarm {
1503                time: now + Duration::new(5, 0),
1504                waker: noop_waker(),
1505            },
1506        ];
1507        let mut heap = BinaryHeap::new();
1508        for alarm in alarms {
1509            heap.push(alarm);
1510        }
1511
1512        // Verify min-heap
1513        let mut sorted_times = Vec::new();
1514        while let Some(alarm) = heap.pop() {
1515            sorted_times.push(alarm.time);
1516        }
1517        assert_eq!(
1518            sorted_times,
1519            vec![
1520                now + Duration::new(5, 0),
1521                now + Duration::new(5, 0),
1522                now + Duration::new(10, 0),
1523                now + Duration::new(15, 0),
1524            ]
1525        );
1526    }
1527
1528    #[test]
1529    #[should_panic(expected = "runtime timeout")]
1530    fn test_timeout() {
1531        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1532        executor.start(|context| async move {
1533            loop {
1534                context.sleep(Duration::from_secs(1)).await;
1535            }
1536        });
1537    }
1538
1539    #[test]
1540    #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1541    fn test_bad_timeout() {
1542        let cfg = Config {
1543            timeout: Some(Duration::default()),
1544            cycle: Duration::default(),
1545            ..Config::default()
1546        };
1547        deterministic::Runner::new(cfg);
1548    }
1549
1550    #[test]
1551    #[should_panic(
1552        expected = "cycle duration must be greater than or equal to system time precision"
1553    )]
1554    fn test_bad_cycle() {
1555        let cfg = Config {
1556            cycle: SYSTEM_TIME_PRECISION - Duration::from_nanos(1),
1557            ..Config::default()
1558        };
1559        deterministic::Runner::new(cfg);
1560    }
1561
1562    #[test]
1563    fn test_recover_synced_storage_persists() {
1564        // Initialize the first runtime
1565        let executor1 = deterministic::Runner::default();
1566        let partition = "test_partition";
1567        let name = b"test_blob";
1568        let data = b"Hello, world!";
1569
1570        // Run some tasks, sync storage, and recover the runtime
1571        let (state, checkpoint) = executor1.start_and_recover(|context| async move {
1572            let (blob, _) = context.open(partition, name).await.unwrap();
1573            blob.write_at(Vec::from(data), 0).await.unwrap();
1574            blob.sync().await.unwrap();
1575            context.auditor().state()
1576        });
1577
1578        // Verify auditor state is the same
1579        assert_eq!(state, checkpoint.auditor.state());
1580
1581        // Check that synced storage persists after recovery
1582        let executor = Runner::from(checkpoint);
1583        executor.start(|context| async move {
1584            let (blob, len) = context.open(partition, name).await.unwrap();
1585            assert_eq!(len, data.len() as u64);
1586            let read = blob.read_at(vec![0; data.len()], 0).await.unwrap();
1587            assert_eq!(read.as_ref(), data);
1588        });
1589    }
1590
1591    #[test]
1592    #[should_panic(expected = "goodbye")]
1593    fn test_recover_panic_handling() {
1594        // Initialize the first runtime
1595        let executor1 = deterministic::Runner::default();
1596        let (_, checkpoint) = executor1.start_and_recover(|_| async move {
1597            reschedule().await;
1598        });
1599
1600        // Ensure that panic setting is preserved
1601        let executor = Runner::from(checkpoint);
1602        executor.start(|_| async move {
1603            panic!("goodbye");
1604        });
1605    }
1606
1607    #[test]
1608    fn test_recover_unsynced_storage_does_not_persist() {
1609        // Initialize the first runtime
1610        let executor = deterministic::Runner::default();
1611        let partition = "test_partition";
1612        let name = b"test_blob";
1613        let data = Vec::from("Hello, world!");
1614
1615        // Run some tasks without syncing storage
1616        let (_, checkpoint) = executor.start_and_recover(|context| async move {
1617            let context = context.clone();
1618            let (blob, _) = context.open(partition, name).await.unwrap();
1619            blob.write_at(data, 0).await.unwrap();
1620        });
1621
1622        // Recover the runtime
1623        let executor = Runner::from(checkpoint);
1624
1625        // Check that unsynced storage does not persist after recovery
1626        executor.start(|context| async move {
1627            let (_, len) = context.open(partition, name).await.unwrap();
1628            assert_eq!(len, 0);
1629        });
1630    }
1631
1632    #[test]
1633    fn test_recover_dns_mappings_persist() {
1634        // Initialize the first runtime
1635        let executor = deterministic::Runner::default();
1636        let host = "example.com";
1637        let addrs = vec![
1638            IpAddr::V4(std::net::Ipv4Addr::new(192, 168, 1, 1)),
1639            IpAddr::V4(std::net::Ipv4Addr::new(192, 168, 1, 2)),
1640        ];
1641
1642        // Register DNS mapping and recover the runtime
1643        let (state, checkpoint) = executor.start_and_recover({
1644            let addrs = addrs.clone();
1645            |context| async move {
1646                context.resolver_register(host, Some(addrs));
1647                context.auditor().state()
1648            }
1649        });
1650
1651        // Verify auditor state is the same
1652        assert_eq!(state, checkpoint.auditor.state());
1653
1654        // Check that DNS mappings persist after recovery
1655        let executor = Runner::from(checkpoint);
1656        executor.start(move |context| async move {
1657            let resolved = context.resolve(host).await.unwrap();
1658            assert_eq!(resolved, addrs);
1659        });
1660    }
1661
1662    #[test]
1663    #[should_panic(expected = "executor still has weak references")]
1664    fn test_context_return() {
1665        // Initialize runtime
1666        let executor = deterministic::Runner::default();
1667
1668        // Start runtime
1669        let context = executor.start(|context| async move {
1670            // Attempt to recover before the runtime has finished
1671            context
1672        });
1673
1674        // Should never get this far
1675        drop(context);
1676    }
1677
1678    #[test]
1679    fn test_default_time_zero() {
1680        // Initialize runtime
1681        let executor = deterministic::Runner::default();
1682
1683        executor.start(|context| async move {
1684            // Check that the time is zero
1685            assert_eq!(
1686                context.current().duration_since(UNIX_EPOCH).unwrap(),
1687                Duration::ZERO
1688            );
1689        });
1690    }
1691
1692    #[cfg(not(feature = "external"))]
1693    #[test]
1694    #[should_panic(expected = "runtime stalled")]
1695    fn test_stall() {
1696        // Initialize runtime
1697        let executor = deterministic::Runner::default();
1698
1699        // Start runtime
1700        executor.start(|_| async move {
1701            pending::<()>().await;
1702        });
1703    }
1704
1705    #[cfg(not(feature = "external"))]
1706    #[test]
1707    #[should_panic(expected = "runtime stalled")]
1708    fn test_external_simulated() {
1709        // Initialize runtime
1710        let executor = deterministic::Runner::default();
1711
1712        // Create a thread that waits for 1 second
1713        let (tx, rx) = oneshot::channel();
1714        std::thread::spawn(move || {
1715            std::thread::sleep(Duration::from_secs(1));
1716            tx.send(()).unwrap();
1717        });
1718
1719        // Start runtime
1720        executor.start(|_| async move {
1721            rx.await.unwrap();
1722        });
1723    }
1724
1725    #[cfg(feature = "external")]
1726    #[test]
1727    fn test_external_realtime() {
1728        // Initialize runtime
1729        let executor = deterministic::Runner::default();
1730
1731        // Create a thread that waits for 1 second
1732        let (tx, rx) = oneshot::channel();
1733        std::thread::spawn(move || {
1734            std::thread::sleep(Duration::from_secs(1));
1735            tx.send(()).unwrap();
1736        });
1737
1738        // Start runtime
1739        executor.start(|_| async move {
1740            rx.await.unwrap();
1741        });
1742    }
1743
1744    #[cfg(feature = "external")]
1745    #[test]
1746    fn test_external_realtime_variable() {
1747        // Initialize runtime
1748        let executor = deterministic::Runner::default();
1749
1750        // Start runtime
1751        executor.start(|context| async move {
1752            // Initialize test
1753            let start_real = SystemTime::now();
1754            let start_sim = context.current();
1755            let (first_tx, first_rx) = oneshot::channel();
1756            let (second_tx, second_rx) = oneshot::channel();
1757            let (mut results_tx, mut results_rx) = mpsc::channel(2);
1758
1759            // Create a thread that waits for 1 second
1760            let first_wait = Duration::from_secs(1);
1761            std::thread::spawn(move || {
1762                std::thread::sleep(first_wait);
1763                first_tx.send(()).unwrap();
1764            });
1765
1766            // Create a thread
1767            std::thread::spawn(move || {
1768                std::thread::sleep(Duration::ZERO);
1769                second_tx.send(()).unwrap();
1770            });
1771
1772            // Wait for a delay sampled before the external send occurs
1773            let first = context.clone().spawn({
1774                let mut results_tx = results_tx.clone();
1775                move |context| async move {
1776                    first_rx.pace(&context, Duration::ZERO).await.unwrap();
1777                    let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
1778                    assert!(elapsed_real > first_wait);
1779                    let elapsed_sim = context.current().duration_since(start_sim).unwrap();
1780                    assert!(elapsed_sim < first_wait);
1781                    results_tx.send(1).await.unwrap();
1782                }
1783            });
1784
1785            // Wait for a delay sampled after the external send occurs
1786            let second = context.clone().spawn(move |context| async move {
1787                second_rx.pace(&context, first_wait).await.unwrap();
1788                let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
1789                assert!(elapsed_real >= first_wait);
1790                let elapsed_sim = context.current().duration_since(start_sim).unwrap();
1791                assert!(elapsed_sim >= first_wait);
1792                results_tx.send(2).await.unwrap();
1793            });
1794
1795            // Wait for both tasks to complete
1796            second.await.unwrap();
1797            first.await.unwrap();
1798
1799            // Ensure order is correct
1800            let mut results = Vec::new();
1801            for _ in 0..2 {
1802                results.push(results_rx.next().await.unwrap());
1803            }
1804            assert_eq!(results, vec![1, 2]);
1805        });
1806    }
1807
1808    #[cfg(not(feature = "external"))]
1809    #[test]
1810    fn test_simulated_skip() {
1811        // Initialize runtime
1812        let executor = deterministic::Runner::default();
1813
1814        // Start runtime
1815        executor.start(|context| async move {
1816            context.sleep(Duration::from_secs(1)).await;
1817
1818            // Check if we skipped
1819            let metrics = context.encode();
1820            let iterations = metrics
1821                .lines()
1822                .find_map(|line| {
1823                    line.strip_prefix("runtime_iterations_total ")
1824                        .and_then(|value| value.trim().parse::<u64>().ok())
1825                })
1826                .expect("missing runtime_iterations_total metric");
1827            assert!(iterations < 10);
1828        });
1829    }
1830
1831    #[cfg(feature = "external")]
1832    #[test]
1833    fn test_realtime_no_skip() {
1834        // Initialize runtime
1835        let executor = deterministic::Runner::default();
1836
1837        // Start runtime
1838        executor.start(|context| async move {
1839            context.sleep(Duration::from_secs(1)).await;
1840
1841            // Check if we skipped
1842            let metrics = context.encode();
1843            let iterations = metrics
1844                .lines()
1845                .find_map(|line| {
1846                    line.strip_prefix("runtime_iterations_total ")
1847                        .and_then(|value| value.trim().parse::<u64>().ok())
1848                })
1849                .expect("missing runtime_iterations_total metric");
1850            assert!(iterations > 500);
1851        });
1852    }
1853}