commonware_runtime/
deterministic.rs

1//! A deterministic runtime that randomly selects tasks to run based on a seed
2//!
3//! # Panics
4//!
5//! Unless configured otherwise, any task panic will lead to a runtime panic.
6//!
7//! # External Processes
8//!
9//! When testing an application that interacts with some external process, it can appear to
10//! the runtime that progress has stalled because no pending tasks can make progress and/or
11//! that futures resolve at variable latency (which in turn triggers non-deterministic execution).
12//!
13//! To support such applications, the runtime can be built with the `external` feature to both
14//! sleep for each [Config::cycle] (opting to wait if all futures are pending) and to constrain
15//! the resolution latency of any future (with `pace()`).
16//!
17//! **Applications that do not interact with external processes (or are able to mock them) should never
18//! need to enable this feature. It is commonly used when testing consensus with external execution environments
19//! that use their own runtime (but are deterministic over some set of inputs).**
20//!
21//! # Example
22//!
23//! ```rust
24//! use commonware_runtime::{Spawner, Runner, deterministic, Metrics};
25//!
26//! let executor =  deterministic::Runner::default();
27//! executor.start(|context| async move {
28//!     println!("Parent started");
29//!     let result = context.with_label("child").spawn(|_| async move {
30//!         println!("Child started");
31//!         "hello"
32//!     });
33//!     println!("Child result: {:?}", result.await);
34//!     println!("Parent exited");
35//!     println!("Auditor state: {}", context.auditor().state());
36//! });
37//! ```
38
39use crate::{
40    network::{
41        audited::Network as AuditedNetwork, deterministic::Network as DeterministicNetwork,
42        metered::Network as MeteredNetwork,
43    },
44    storage::{
45        audited::Storage as AuditedStorage, memory::Storage as MemStorage,
46        metered::Storage as MeteredStorage,
47    },
48    telemetry::metrics::task::Label,
49    utils::{
50        signal::{Signal, Stopper},
51        supervision::Tree,
52        Panicker,
53    },
54    validate_label, Clock, Error, Execution, Handle, ListenerOf, Panicked, METRICS_PREFIX,
55};
56#[cfg(feature = "external")]
57use crate::{Blocker, Pacer};
58use commonware_codec::Encode;
59use commonware_macros::select;
60use commonware_utils::{hex, time::SYSTEM_TIME_PRECISION, SystemTimeExt};
61#[cfg(feature = "external")]
62use futures::task::noop_waker;
63use futures::{
64    future::BoxFuture,
65    task::{waker, ArcWake},
66    Future, FutureExt,
67};
68use governor::clock::{Clock as GClock, ReasonablyRealtime};
69#[cfg(feature = "external")]
70use pin_project::pin_project;
71use prometheus_client::{
72    encoding::text::encode,
73    metrics::{counter::Counter, family::Family, gauge::Gauge},
74    registry::{Metric, Registry},
75};
76use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
77use sha2::{Digest as _, Sha256};
78use std::{
79    collections::{BTreeMap, BinaryHeap, HashMap},
80    mem::{replace, take},
81    net::{IpAddr, SocketAddr},
82    panic::{catch_unwind, resume_unwind, AssertUnwindSafe},
83    pin::Pin,
84    sync::{Arc, Mutex, Weak},
85    task::{self, Poll, Waker},
86    time::{Duration, SystemTime, UNIX_EPOCH},
87};
88use tracing::{info_span, trace, Instrument};
89
90#[derive(Debug)]
91struct Metrics {
92    iterations: Counter,
93    tasks_spawned: Family<Label, Counter>,
94    tasks_running: Family<Label, Gauge>,
95    task_polls: Family<Label, Counter>,
96
97    network_bandwidth: Counter,
98}
99
100impl Metrics {
101    pub fn init(registry: &mut Registry) -> Self {
102        let metrics = Self {
103            iterations: Counter::default(),
104            task_polls: Family::default(),
105            tasks_spawned: Family::default(),
106            tasks_running: Family::default(),
107            network_bandwidth: Counter::default(),
108        };
109        registry.register(
110            "iterations",
111            "Total number of iterations",
112            metrics.iterations.clone(),
113        );
114        registry.register(
115            "tasks_spawned",
116            "Total number of tasks spawned",
117            metrics.tasks_spawned.clone(),
118        );
119        registry.register(
120            "tasks_running",
121            "Number of tasks currently running",
122            metrics.tasks_running.clone(),
123        );
124        registry.register(
125            "task_polls",
126            "Total number of task polls",
127            metrics.task_polls.clone(),
128        );
129        registry.register(
130            "bandwidth",
131            "Total amount of data sent over network",
132            metrics.network_bandwidth.clone(),
133        );
134        metrics
135    }
136}
137
138/// A SHA-256 digest.
139type Digest = [u8; 32];
140
141/// Track the state of the runtime for determinism auditing.
142pub struct Auditor {
143    digest: Mutex<Digest>,
144}
145
146impl Default for Auditor {
147    fn default() -> Self {
148        Self {
149            digest: Digest::default().into(),
150        }
151    }
152}
153
154impl Auditor {
155    /// Record that an event happened.
156    /// This auditor's hash will be updated with the event's `label` and
157    /// whatever other data is passed in the `payload` closure.
158    pub(crate) fn event<F>(&self, label: &'static [u8], payload: F)
159    where
160        F: FnOnce(&mut Sha256),
161    {
162        let mut digest = self.digest.lock().unwrap();
163
164        let mut hasher = Sha256::new();
165        hasher.update(digest.as_ref());
166        hasher.update(label);
167        payload(&mut hasher);
168
169        *digest = hasher.finalize().into();
170    }
171
172    /// Generate a representation of the current state of the runtime.
173    ///
174    /// This can be used to ensure that logic running on top
175    /// of the runtime is interacting deterministically.
176    pub fn state(&self) -> String {
177        let hash = self.digest.lock().unwrap();
178        hex(hash.as_ref())
179    }
180}
181
182/// Configuration for the `deterministic` runtime.
183#[derive(Clone)]
184pub struct Config {
185    /// Seed for the random number generator.
186    seed: u64,
187
188    /// The cycle duration determines how much time is advanced after each iteration of the event
189    /// loop. This is useful to prevent starvation if some task never yields.
190    cycle: Duration,
191
192    /// If the runtime is still executing at this point (i.e. a test hasn't stopped), panic.
193    timeout: Option<Duration>,
194
195    /// Whether spawned tasks should catch panics instead of propagating them.
196    catch_panics: bool,
197}
198
199impl Config {
200    /// Returns a new [Config] with default values.
201    pub const fn new() -> Self {
202        Self {
203            seed: 42,
204            cycle: Duration::from_millis(1),
205            timeout: None,
206            catch_panics: false,
207        }
208    }
209
210    // Setters
211    /// See [Config]
212    pub const fn with_seed(mut self, seed: u64) -> Self {
213        self.seed = seed;
214        self
215    }
216    /// See [Config]
217    pub const fn with_cycle(mut self, cycle: Duration) -> Self {
218        self.cycle = cycle;
219        self
220    }
221    /// See [Config]
222    pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
223        self.timeout = timeout;
224        self
225    }
226    /// See [Config]
227    pub const fn with_catch_panics(mut self, catch_panics: bool) -> Self {
228        self.catch_panics = catch_panics;
229        self
230    }
231
232    // Getters
233    /// See [Config]
234    pub const fn seed(&self) -> u64 {
235        self.seed
236    }
237    /// See [Config]
238    pub const fn cycle(&self) -> Duration {
239        self.cycle
240    }
241    /// See [Config]
242    pub const fn timeout(&self) -> Option<Duration> {
243        self.timeout
244    }
245    /// See [Config]
246    pub const fn catch_panics(&self) -> bool {
247        self.catch_panics
248    }
249
250    /// Assert that the configuration is valid.
251    pub fn assert(&self) {
252        assert!(
253            self.cycle != Duration::default() || self.timeout.is_none(),
254            "cycle duration must be non-zero when timeout is set",
255        );
256        assert!(
257            self.cycle >= SYSTEM_TIME_PRECISION,
258            "cycle duration must be greater than or equal to system time precision"
259        );
260    }
261}
262
263impl Default for Config {
264    fn default() -> Self {
265        Self::new()
266    }
267}
268
269/// Deterministic runtime that randomly selects tasks to run based on a seed.
270pub struct Executor {
271    registry: Mutex<Registry>,
272    cycle: Duration,
273    deadline: Option<SystemTime>,
274    metrics: Arc<Metrics>,
275    auditor: Arc<Auditor>,
276    rng: Mutex<StdRng>,
277    time: Mutex<SystemTime>,
278    tasks: Arc<Tasks>,
279    sleeping: Mutex<BinaryHeap<Alarm>>,
280    shutdown: Mutex<Stopper>,
281    panicker: Panicker,
282    dns: Mutex<HashMap<String, Vec<IpAddr>>>,
283}
284
285impl Executor {
286    /// Advance simulated time by [Config::cycle].
287    ///
288    /// When built with the `external` feature, sleep for [Config::cycle] to let
289    /// external processes make progress.
290    fn advance_time(&self) -> SystemTime {
291        #[cfg(feature = "external")]
292        std::thread::sleep(self.cycle);
293
294        let mut time = self.time.lock().unwrap();
295        *time = time
296            .checked_add(self.cycle)
297            .expect("executor time overflowed");
298        let now = *time;
299        trace!(now = now.epoch_millis(), "time advanced");
300        now
301    }
302
303    /// When idle, jump directly to the next actionable time.
304    ///
305    /// When built with the `external` feature, never skip ahead (to ensure we poll all pending tasks
306    /// every [Config::cycle]).
307    fn skip_idle_time(&self, current: SystemTime) -> SystemTime {
308        if cfg!(feature = "external") || self.tasks.ready() != 0 {
309            return current;
310        }
311
312        let mut skip_until = None;
313        {
314            let sleeping = self.sleeping.lock().unwrap();
315            if let Some(next) = sleeping.peek() {
316                if next.time > current {
317                    skip_until = Some(next.time);
318                }
319            }
320        }
321
322        skip_until.map_or(current, |deadline| {
323            let mut time = self.time.lock().unwrap();
324            *time = deadline;
325            let now = *time;
326            trace!(now = now.epoch_millis(), "time skipped");
327            now
328        })
329    }
330
331    /// Wake any sleepers whose deadlines have elapsed.
332    fn wake_ready_sleepers(&self, current: SystemTime) {
333        let mut sleeping = self.sleeping.lock().unwrap();
334        while let Some(next) = sleeping.peek() {
335            if next.time <= current {
336                let sleeper = sleeping.pop().unwrap();
337                sleeper.waker.wake();
338            } else {
339                break;
340            }
341        }
342    }
343
344    /// Ensure the runtime is making progress.
345    ///
346    /// When built with the `external` feature, always poll pending tasks after the passage of time.
347    fn assert_liveness(&self) {
348        if cfg!(feature = "external") || self.tasks.ready() != 0 {
349            return;
350        }
351
352        panic!("runtime stalled");
353    }
354}
355
356/// An artifact that can be used to recover the state of the runtime.
357///
358/// This is useful when mocking unclean shutdown (while retaining deterministic behavior).
359pub struct Checkpoint {
360    cycle: Duration,
361    deadline: Option<SystemTime>,
362    auditor: Arc<Auditor>,
363    rng: Mutex<StdRng>,
364    time: Mutex<SystemTime>,
365    storage: Arc<Storage>,
366    dns: Mutex<HashMap<String, Vec<IpAddr>>>,
367    catch_panics: bool,
368}
369
370impl Checkpoint {
371    /// Get a reference to the [Auditor].
372    pub fn auditor(&self) -> Arc<Auditor> {
373        self.auditor.clone()
374    }
375}
376
377#[allow(clippy::large_enum_variant)]
378enum State {
379    Config(Config),
380    Checkpoint(Checkpoint),
381}
382
383/// Implementation of [crate::Runner] for the `deterministic` runtime.
384pub struct Runner {
385    state: State,
386}
387
388impl From<Config> for Runner {
389    fn from(cfg: Config) -> Self {
390        Self::new(cfg)
391    }
392}
393
394impl From<Checkpoint> for Runner {
395    fn from(checkpoint: Checkpoint) -> Self {
396        Self {
397            state: State::Checkpoint(checkpoint),
398        }
399    }
400}
401
402impl Runner {
403    /// Initialize a new `deterministic` runtime with the given seed and cycle duration.
404    pub fn new(cfg: Config) -> Self {
405        // Ensure config is valid
406        cfg.assert();
407        Self {
408            state: State::Config(cfg),
409        }
410    }
411
412    /// Initialize a new `deterministic` runtime with the default configuration
413    /// and the provided seed.
414    pub fn seeded(seed: u64) -> Self {
415        let cfg = Config {
416            seed,
417            ..Config::default()
418        };
419        Self::new(cfg)
420    }
421
422    /// Initialize a new `deterministic` runtime with the default configuration
423    /// but exit after the given timeout.
424    pub fn timed(timeout: Duration) -> Self {
425        let cfg = Config {
426            timeout: Some(timeout),
427            ..Config::default()
428        };
429        Self::new(cfg)
430    }
431
432    /// Like [crate::Runner::start], but also returns a [Checkpoint] that can be used
433    /// to recover the state of the runtime in a subsequent run.
434    pub fn start_and_recover<F, Fut>(self, f: F) -> (Fut::Output, Checkpoint)
435    where
436        F: FnOnce(Context) -> Fut,
437        Fut: Future,
438    {
439        // Setup context and return strong reference to executor
440        let (context, executor, panicked) = match self.state {
441            State::Config(config) => Context::new(config),
442            State::Checkpoint(checkpoint) => Context::recover(checkpoint),
443        };
444
445        // Pin root task to the heap
446        let storage = context.storage.clone();
447        let mut root = Box::pin(panicked.interrupt(f(context)));
448
449        // Register the root task
450        Tasks::register_root(&executor.tasks);
451
452        // Process tasks until root task completes or progress stalls.
453        // Wrap the loop in catch_unwind to ensure task cleanup runs even if the loop or a task panics.
454        let result = catch_unwind(AssertUnwindSafe(|| loop {
455            // Ensure we have not exceeded our deadline
456            {
457                let current = executor.time.lock().unwrap();
458                if let Some(deadline) = executor.deadline {
459                    if *current >= deadline {
460                        // Drop the lock before panicking to avoid mutex poisoning.
461                        drop(current);
462                        panic!("runtime timeout");
463                    }
464                }
465            }
466
467            // Drain all ready tasks
468            let mut queue = executor.tasks.drain();
469
470            // Shuffle tasks (if more than one)
471            if queue.len() > 1 {
472                let mut rng = executor.rng.lock().unwrap();
473                queue.shuffle(&mut *rng);
474            }
475
476            // Run all snapshotted tasks
477            //
478            // This approach is more efficient than randomly selecting a task one-at-a-time
479            // because it ensures we don't pull the same pending task multiple times in a row (without
480            // processing a different task required for other tasks to make progress).
481            trace!(
482                iter = executor.metrics.iterations.get(),
483                tasks = queue.len(),
484                "starting loop"
485            );
486            let mut output = None;
487            for id in queue {
488                // Lookup the task (it may have completed already)
489                let Some(task) = executor.tasks.get(id) else {
490                    trace!(id, "skipping missing task");
491                    continue;
492                };
493
494                // Record task for auditing
495                executor.auditor.event(b"process_task", |hasher| {
496                    hasher.update(task.id.to_be_bytes());
497                    hasher.update(task.label.name().as_bytes());
498                });
499                executor.metrics.task_polls.get_or_create(&task.label).inc();
500                trace!(id, "processing task");
501
502                // Prepare task for polling
503                let waker = waker(Arc::new(TaskWaker {
504                    id,
505                    tasks: Arc::downgrade(&executor.tasks),
506                }));
507                let mut cx = task::Context::from_waker(&waker);
508
509                // Poll the task
510                match &task.mode {
511                    Mode::Root => {
512                        // Poll the root task
513                        if let Poll::Ready(result) = root.as_mut().poll(&mut cx) {
514                            trace!(id, "root task is complete");
515                            output = Some(result);
516                            break;
517                        }
518                    }
519                    Mode::Work(future) => {
520                        // Get the future (if it still exists)
521                        let mut fut_opt = future.lock().unwrap();
522                        let Some(fut) = fut_opt.as_mut() else {
523                            trace!(id, "skipping already complete task");
524
525                            // Remove the future
526                            executor.tasks.remove(id);
527                            continue;
528                        };
529
530                        // Poll the task
531                        if fut.as_mut().poll(&mut cx).is_ready() {
532                            trace!(id, "task is complete");
533
534                            // Remove the future
535                            executor.tasks.remove(id);
536                            *fut_opt = None;
537                            continue;
538                        }
539                    }
540                }
541
542                // Try again later if task is still pending
543                trace!(id, "task is still pending");
544            }
545
546            // If the root task has completed, exit as soon as possible
547            if let Some(output) = output {
548                break output;
549            }
550
551            // Advance time (skipping ahead if no tasks are ready yet)
552            let mut current = executor.advance_time();
553            current = executor.skip_idle_time(current);
554
555            // Wake sleepers and ensure we continue to make progress
556            executor.wake_ready_sleepers(current);
557            executor.assert_liveness();
558
559            // Record that we completed another iteration of the event loop.
560            executor.metrics.iterations.inc();
561        }));
562
563        // Clear remaining tasks from the executor.
564        //
565        // It is critical that we wait to drop the strong
566        // reference to executor until after we have dropped
567        // all tasks (as they may attempt to upgrade their weak
568        // reference to the executor during drop).
569        executor.sleeping.lock().unwrap().clear(); // included in tasks
570        let tasks = executor.tasks.clear();
571        for task in tasks {
572            let Mode::Work(future) = &task.mode else {
573                continue;
574            };
575            *future.lock().unwrap() = None;
576        }
577
578        // Drop the root task to release any Context references it may still hold.
579        // This is necessary when the loop exits early (e.g., timeout) while the
580        // root future is still Pending and holds captured variables with Context references.
581        drop(root);
582
583        // Assert the context doesn't escape the start() function (behavior
584        // is undefined in this case)
585        assert!(
586            Arc::weak_count(&executor) == 0,
587            "executor still has weak references"
588        );
589
590        // Handle the result — resume the original panic after cleanup if one was caught.
591        let output = match result {
592            Ok(output) => output,
593            Err(payload) => resume_unwind(payload),
594        };
595
596        // Extract the executor from the Arc
597        let executor = Arc::into_inner(executor).expect("executor still has strong references");
598
599        // Construct a checkpoint that can be used to restart the runtime
600        let checkpoint = Checkpoint {
601            cycle: executor.cycle,
602            deadline: executor.deadline,
603            auditor: executor.auditor,
604            rng: executor.rng,
605            time: executor.time,
606            storage,
607            dns: executor.dns,
608            catch_panics: executor.panicker.catch(),
609        };
610
611        (output, checkpoint)
612    }
613}
614
615impl Default for Runner {
616    fn default() -> Self {
617        Self::new(Config::default())
618    }
619}
620
621impl crate::Runner for Runner {
622    type Context = Context;
623
624    fn start<F, Fut>(self, f: F) -> Fut::Output
625    where
626        F: FnOnce(Self::Context) -> Fut,
627        Fut: Future,
628    {
629        let (output, _) = self.start_and_recover(f);
630        output
631    }
632}
633
634/// The mode of a [Task].
635enum Mode {
636    Root,
637    Work(Mutex<Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>>),
638}
639
640/// A future being executed by the [Executor].
641struct Task {
642    id: u128,
643    label: Label,
644
645    mode: Mode,
646}
647
648/// A waker for a [Task].
649struct TaskWaker {
650    id: u128,
651
652    tasks: Weak<Tasks>,
653}
654
655impl ArcWake for TaskWaker {
656    fn wake_by_ref(arc_self: &Arc<Self>) {
657        // Upgrade the weak reference to re-enqueue this task.
658        // If upgrade fails, the task queue has been dropped and no action is required.
659        //
660        // This can happen if some data is passed into the runtime and it drops after the runtime exits.
661        if let Some(tasks) = arc_self.tasks.upgrade() {
662            tasks.queue(arc_self.id);
663        }
664    }
665}
666
667/// A collection of [Task]s that are being executed by the [Executor].
668struct Tasks {
669    /// The next task id.
670    counter: Mutex<u128>,
671    /// Tasks ready to be polled.
672    ready: Mutex<Vec<u128>>,
673    /// All running tasks.
674    running: Mutex<BTreeMap<u128, Arc<Task>>>,
675}
676
677impl Tasks {
678    /// Create a new task queue.
679    const fn new() -> Self {
680        Self {
681            counter: Mutex::new(0),
682            ready: Mutex::new(Vec::new()),
683            running: Mutex::new(BTreeMap::new()),
684        }
685    }
686
687    /// Increment the task counter and return the old value.
688    fn increment(&self) -> u128 {
689        let mut counter = self.counter.lock().unwrap();
690        let old = *counter;
691        *counter = counter.checked_add(1).expect("task counter overflow");
692        old
693    }
694
695    /// Register the root task.
696    ///
697    /// If the root task has already been registered, this function will panic.
698    fn register_root(arc_self: &Arc<Self>) {
699        let id = arc_self.increment();
700        let task = Arc::new(Task {
701            id,
702            label: Label::root(),
703            mode: Mode::Root,
704        });
705        arc_self.register(id, task);
706    }
707
708    /// Register a non-root task to be executed.
709    fn register_work(
710        arc_self: &Arc<Self>,
711        label: Label,
712        future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
713    ) {
714        let id = arc_self.increment();
715        let task = Arc::new(Task {
716            id,
717            label,
718            mode: Mode::Work(Mutex::new(Some(future))),
719        });
720        arc_self.register(id, task);
721    }
722
723    /// Register a new task to be executed.
724    fn register(&self, id: u128, task: Arc<Task>) {
725        // Track as running until completion
726        self.running.lock().unwrap().insert(id, task);
727
728        // Add to ready
729        self.queue(id);
730    }
731
732    /// Enqueue an already registered task to be executed.
733    fn queue(&self, id: u128) {
734        let mut ready = self.ready.lock().unwrap();
735        ready.push(id);
736    }
737
738    /// Drain all ready tasks.
739    fn drain(&self) -> Vec<u128> {
740        let mut queue = self.ready.lock().unwrap();
741        let len = queue.len();
742        replace(&mut *queue, Vec::with_capacity(len))
743    }
744
745    /// The number of ready tasks.
746    fn ready(&self) -> usize {
747        self.ready.lock().unwrap().len()
748    }
749
750    /// Lookup a task.
751    ///
752    /// We must return cloned here because we cannot hold the running lock while polling a task (will
753    /// deadlock if [Self::register_work] is called).
754    fn get(&self, id: u128) -> Option<Arc<Task>> {
755        let running = self.running.lock().unwrap();
756        running.get(&id).cloned()
757    }
758
759    /// Remove a task.
760    fn remove(&self, id: u128) {
761        self.running.lock().unwrap().remove(&id);
762    }
763
764    /// Clear all tasks.
765    fn clear(&self) -> Vec<Arc<Task>> {
766        // Clear ready
767        self.ready.lock().unwrap().clear();
768
769        // Clear running tasks
770        let running: BTreeMap<u128, Arc<Task>> = {
771            let mut running = self.running.lock().unwrap();
772            take(&mut *running)
773        };
774        running.into_values().collect()
775    }
776}
777
778type Network = MeteredNetwork<AuditedNetwork<DeterministicNetwork>>;
779type Storage = MeteredStorage<AuditedStorage<MemStorage>>;
780
781/// Implementation of [crate::Spawner], [crate::Clock],
782/// [crate::Network], and [crate::Storage] for the `deterministic`
783/// runtime.
784pub struct Context {
785    name: String,
786    executor: Weak<Executor>,
787    network: Arc<Network>,
788    storage: Arc<Storage>,
789    tree: Arc<Tree>,
790    execution: Execution,
791    instrumented: bool,
792}
793
794impl Clone for Context {
795    fn clone(&self) -> Self {
796        let (child, _) = Tree::child(&self.tree);
797        Self {
798            name: self.name.clone(),
799            executor: self.executor.clone(),
800            network: self.network.clone(),
801            storage: self.storage.clone(),
802
803            tree: child,
804            execution: Execution::default(),
805            instrumented: false,
806        }
807    }
808}
809
810impl Context {
811    fn new(cfg: Config) -> (Self, Arc<Executor>, Panicked) {
812        // Create a new registry
813        let mut registry = Registry::default();
814        let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
815
816        // Initialize runtime
817        let metrics = Arc::new(Metrics::init(runtime_registry));
818        let start_time = UNIX_EPOCH;
819        let deadline = cfg
820            .timeout
821            .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
822        let auditor = Arc::new(Auditor::default());
823        let storage = MeteredStorage::new(
824            AuditedStorage::new(MemStorage::default(), auditor.clone()),
825            runtime_registry,
826        );
827        let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
828        let network = MeteredNetwork::new(network, runtime_registry);
829
830        // Initialize panicker
831        let (panicker, panicked) = Panicker::new(cfg.catch_panics);
832
833        let executor = Arc::new(Executor {
834            registry: Mutex::new(registry),
835            cycle: cfg.cycle,
836            deadline,
837            metrics,
838            auditor,
839            rng: Mutex::new(StdRng::seed_from_u64(cfg.seed)),
840            time: Mutex::new(start_time),
841            tasks: Arc::new(Tasks::new()),
842            sleeping: Mutex::new(BinaryHeap::new()),
843            shutdown: Mutex::new(Stopper::default()),
844            panicker,
845            dns: Mutex::new(HashMap::new()),
846        });
847
848        (
849            Self {
850                name: String::new(),
851                executor: Arc::downgrade(&executor),
852                network: Arc::new(network),
853                storage: Arc::new(storage),
854                tree: Tree::root(),
855                execution: Execution::default(),
856                instrumented: false,
857            },
858            executor,
859            panicked,
860        )
861    }
862
863    /// Recover the inner state (deadline, metrics, auditor, rng, synced storage, etc.) from the
864    /// current runtime and use it to initialize a new instance of the runtime. A recovered runtime
865    /// does not inherit the current runtime's pending tasks, unsynced storage, network connections, nor
866    /// its shutdown signaler.
867    ///
868    /// This is useful for performing a deterministic simulation that spans multiple runtime instantiations,
869    /// like simulating unclean shutdown (which involves repeatedly halting the runtime at unexpected intervals).
870    ///
871    /// It is only permitted to call this method after the runtime has finished (i.e. once `start` returns)
872    /// and only permitted to do once (otherwise multiple recovered runtimes will share the same inner state).
873    /// If either one of these conditions is violated, this method will panic.
874    fn recover(checkpoint: Checkpoint) -> (Self, Arc<Executor>, Panicked) {
875        // Rebuild metrics
876        let mut registry = Registry::default();
877        let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
878        let metrics = Arc::new(Metrics::init(runtime_registry));
879
880        // Copy state
881        let network =
882            AuditedNetwork::new(DeterministicNetwork::default(), checkpoint.auditor.clone());
883        let network = MeteredNetwork::new(network, runtime_registry);
884
885        // Initialize panicker
886        let (panicker, panicked) = Panicker::new(checkpoint.catch_panics);
887
888        let executor = Arc::new(Executor {
889            // Copied from the checkpoint
890            cycle: checkpoint.cycle,
891            deadline: checkpoint.deadline,
892            auditor: checkpoint.auditor,
893            rng: checkpoint.rng,
894            time: checkpoint.time,
895            dns: checkpoint.dns,
896
897            // New state for the new runtime
898            registry: Mutex::new(registry),
899            metrics,
900            tasks: Arc::new(Tasks::new()),
901            sleeping: Mutex::new(BinaryHeap::new()),
902            shutdown: Mutex::new(Stopper::default()),
903            panicker,
904        });
905        (
906            Self {
907                name: String::new(),
908                executor: Arc::downgrade(&executor),
909                network: Arc::new(network),
910                storage: checkpoint.storage,
911                tree: Tree::root(),
912                execution: Execution::default(),
913                instrumented: false,
914            },
915            executor,
916            panicked,
917        )
918    }
919
920    /// Upgrade Weak reference to [Executor].
921    fn executor(&self) -> Arc<Executor> {
922        self.executor.upgrade().expect("executor already dropped")
923    }
924
925    /// Get a reference to [Metrics].
926    fn metrics(&self) -> Arc<Metrics> {
927        self.executor().metrics.clone()
928    }
929
930    /// Get a reference to the [Auditor].
931    pub fn auditor(&self) -> Arc<Auditor> {
932        self.executor().auditor.clone()
933    }
934
935    /// Register a DNS mapping for a hostname.
936    ///
937    /// If `addrs` is `None`, the mapping is removed.
938    /// If `addrs` is `Some`, the mapping is added or updated.
939    pub fn resolver_register(&self, host: impl Into<String>, addrs: Option<Vec<IpAddr>>) {
940        // Update the auditor
941        let executor = self.executor();
942        let host = host.into();
943        executor.auditor.event(b"resolver_register", |hasher| {
944            hasher.update(host.as_bytes());
945            hasher.update(addrs.encode());
946        });
947
948        // Update the DNS mapping
949        let mut dns = executor.dns.lock().unwrap();
950        match addrs {
951            Some(addrs) => {
952                dns.insert(host, addrs);
953            }
954            None => {
955                dns.remove(&host);
956            }
957        }
958    }
959}
960
961impl crate::Spawner for Context {
962    fn dedicated(mut self) -> Self {
963        self.execution = Execution::Dedicated;
964        self
965    }
966
967    fn shared(mut self, blocking: bool) -> Self {
968        self.execution = Execution::Shared(blocking);
969        self
970    }
971
972    fn instrumented(mut self) -> Self {
973        self.instrumented = true;
974        self
975    }
976
977    fn spawn<F, Fut, T>(mut self, f: F) -> Handle<T>
978    where
979        F: FnOnce(Self) -> Fut + Send + 'static,
980        Fut: Future<Output = T> + Send + 'static,
981        T: Send + 'static,
982    {
983        // Get metrics
984        let (label, metric) = spawn_metrics!(self);
985
986        // Track supervision before resetting configuration
987        let parent = Arc::clone(&self.tree);
988        let is_instrumented = self.instrumented;
989        self.execution = Execution::default();
990        self.instrumented = false;
991        let (child, aborted) = Tree::child(&parent);
992        if aborted {
993            return Handle::closed(metric);
994        }
995        self.tree = child;
996
997        // Spawn the task (we don't care about Model)
998        let executor = self.executor();
999        let future: BoxFuture<'_, T> = if is_instrumented {
1000            f(self)
1001                .instrument(info_span!(parent: None, "task", name = %label.name()))
1002                .boxed()
1003        } else {
1004            f(self).boxed()
1005        };
1006        let (f, handle) = Handle::init(
1007            future,
1008            metric,
1009            executor.panicker.clone(),
1010            Arc::clone(&parent),
1011        );
1012        Tasks::register_work(&executor.tasks, label, Box::pin(f));
1013
1014        // Register the task on the parent
1015        if let Some(aborter) = handle.aborter() {
1016            parent.register(aborter);
1017        }
1018
1019        handle
1020    }
1021
1022    async fn stop(self, value: i32, timeout: Option<Duration>) -> Result<(), Error> {
1023        let executor = self.executor();
1024        executor.auditor.event(b"stop", |hasher| {
1025            hasher.update(value.to_be_bytes());
1026        });
1027        let stop_resolved = {
1028            let mut shutdown = executor.shutdown.lock().unwrap();
1029            shutdown.stop(value)
1030        };
1031
1032        // Wait for all tasks to complete or the timeout to fire
1033        let timeout_future = timeout.map_or_else(
1034            || futures::future::Either::Right(futures::future::pending()),
1035            |duration| futures::future::Either::Left(self.sleep(duration)),
1036        );
1037        select! {
1038            result = stop_resolved => {
1039                result.map_err(|_| Error::Closed)?;
1040                Ok(())
1041            },
1042            _ = timeout_future => {
1043                Err(Error::Timeout)
1044            }
1045        }
1046    }
1047
1048    fn stopped(&self) -> Signal {
1049        let executor = self.executor();
1050        executor.auditor.event(b"stopped", |_| {});
1051        let stopped = executor.shutdown.lock().unwrap().stopped();
1052        stopped
1053    }
1054}
1055
1056impl crate::Metrics for Context {
1057    fn with_label(&self, label: &str) -> Self {
1058        // Ensure the label is well-formatted
1059        validate_label(label);
1060
1061        // Construct the full label name
1062        let name = {
1063            let prefix = self.name.clone();
1064            if prefix.is_empty() {
1065                label.to_string()
1066            } else {
1067                format!("{prefix}_{label}")
1068            }
1069        };
1070        assert!(
1071            !name.starts_with(METRICS_PREFIX),
1072            "using runtime label is not allowed"
1073        );
1074        Self {
1075            name,
1076            ..self.clone()
1077        }
1078    }
1079
1080    fn label(&self) -> String {
1081        self.name.clone()
1082    }
1083
1084    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
1085        // Prepare args
1086        let name = name.into();
1087        let help = help.into();
1088
1089        // Register metric
1090        let executor = self.executor();
1091        executor.auditor.event(b"register", |hasher| {
1092            hasher.update(name.as_bytes());
1093            hasher.update(help.as_bytes());
1094        });
1095        let prefixed_name = {
1096            let prefix = &self.name;
1097            if prefix.is_empty() {
1098                name
1099            } else {
1100                format!("{}_{}", *prefix, name)
1101            }
1102        };
1103        executor
1104            .registry
1105            .lock()
1106            .unwrap()
1107            .register(prefixed_name, help, metric);
1108    }
1109
1110    fn encode(&self) -> String {
1111        let executor = self.executor();
1112        executor.auditor.event(b"encode", |_| {});
1113        let mut buffer = String::new();
1114        encode(&mut buffer, &executor.registry.lock().unwrap()).expect("encoding failed");
1115        buffer
1116    }
1117}
1118
1119struct Sleeper {
1120    executor: Weak<Executor>,
1121    time: SystemTime,
1122    registered: bool,
1123}
1124
1125impl Sleeper {
1126    /// Upgrade Weak reference to [Executor].
1127    fn executor(&self) -> Arc<Executor> {
1128        self.executor.upgrade().expect("executor already dropped")
1129    }
1130}
1131
1132struct Alarm {
1133    time: SystemTime,
1134    waker: Waker,
1135}
1136
1137impl PartialEq for Alarm {
1138    fn eq(&self, other: &Self) -> bool {
1139        self.time.eq(&other.time)
1140    }
1141}
1142
1143impl Eq for Alarm {}
1144
1145impl PartialOrd for Alarm {
1146    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1147        Some(self.cmp(other))
1148    }
1149}
1150
1151impl Ord for Alarm {
1152    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1153        // Reverse the ordering for min-heap
1154        other.time.cmp(&self.time)
1155    }
1156}
1157
1158impl Future for Sleeper {
1159    type Output = ();
1160
1161    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1162        let executor = self.executor();
1163        {
1164            let current_time = *executor.time.lock().unwrap();
1165            if current_time >= self.time {
1166                return Poll::Ready(());
1167            }
1168        }
1169        if !self.registered {
1170            self.registered = true;
1171            executor.sleeping.lock().unwrap().push(Alarm {
1172                time: self.time,
1173                waker: cx.waker().clone(),
1174            });
1175        }
1176        Poll::Pending
1177    }
1178}
1179
1180impl Clock for Context {
1181    fn current(&self) -> SystemTime {
1182        *self.executor().time.lock().unwrap()
1183    }
1184
1185    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
1186        let deadline = self
1187            .current()
1188            .checked_add(duration)
1189            .expect("overflow when setting wake time");
1190        self.sleep_until(deadline)
1191    }
1192
1193    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
1194        Sleeper {
1195            executor: self.executor.clone(),
1196
1197            time: deadline,
1198            registered: false,
1199        }
1200    }
1201}
1202
1203/// A future that resolves when a given target time is reached.
1204///
1205/// If the future is not ready at the target time, the future is blocked until the target time is reached.
1206#[cfg(feature = "external")]
1207#[pin_project]
1208struct Waiter<F: Future> {
1209    executor: Weak<Executor>,
1210    target: SystemTime,
1211    #[pin]
1212    future: F,
1213    ready: Option<F::Output>,
1214    started: bool,
1215    registered: bool,
1216}
1217
1218#[cfg(feature = "external")]
1219impl<F> Future for Waiter<F>
1220where
1221    F: Future + Send,
1222{
1223    type Output = F::Output;
1224
1225    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1226        let mut this = self.project();
1227
1228        // Poll once with a noop waker so the future can register interest or start work
1229        // without being able to wake this task before the sampled delay expires. Any ready
1230        // value is cached and only released after the clock reaches `self.target`.
1231        if !*this.started {
1232            *this.started = true;
1233            let waker = noop_waker();
1234            let mut cx_noop = task::Context::from_waker(&waker);
1235            if let Poll::Ready(value) = this.future.as_mut().poll(&mut cx_noop) {
1236                *this.ready = Some(value);
1237            }
1238        }
1239
1240        // Only allow the task to progress once the sampled delay has elapsed.
1241        let executor = this.executor.upgrade().expect("executor already dropped");
1242        let current_time = *executor.time.lock().unwrap();
1243        if current_time < *this.target {
1244            // Register exactly once with the deterministic sleeper queue so the executor
1245            // wakes us once the clock reaches the scheduled target time.
1246            if !*this.registered {
1247                *this.registered = true;
1248                executor.sleeping.lock().unwrap().push(Alarm {
1249                    time: *this.target,
1250                    waker: cx.waker().clone(),
1251                });
1252            }
1253            return Poll::Pending;
1254        }
1255
1256        // If the underlying future completed during the noop pre-poll, surface the cached value.
1257        if let Some(value) = this.ready.take() {
1258            return Poll::Ready(value);
1259        }
1260
1261        // Block the current thread until the future reschedules itself, keeping polling
1262        // deterministic with respect to executor time.
1263        let blocker = Blocker::new();
1264        loop {
1265            let waker = waker(blocker.clone());
1266            let mut cx_block = task::Context::from_waker(&waker);
1267            match this.future.as_mut().poll(&mut cx_block) {
1268                Poll::Ready(value) => {
1269                    break Poll::Ready(value);
1270                }
1271                Poll::Pending => blocker.wait(),
1272            }
1273        }
1274    }
1275}
1276
1277#[cfg(feature = "external")]
1278impl Pacer for Context {
1279    fn pace<'a, F, T>(&'a self, latency: Duration, future: F) -> impl Future<Output = T> + Send + 'a
1280    where
1281        F: Future<Output = T> + Send + 'a,
1282        T: Send + 'a,
1283    {
1284        // Compute target time
1285        let target = self
1286            .executor()
1287            .time
1288            .lock()
1289            .unwrap()
1290            .checked_add(latency)
1291            .expect("overflow when setting wake time");
1292
1293        Waiter {
1294            executor: self.executor.clone(),
1295            target,
1296            future,
1297            ready: None,
1298            started: false,
1299            registered: false,
1300        }
1301    }
1302}
1303
1304impl GClock for Context {
1305    type Instant = SystemTime;
1306
1307    fn now(&self) -> Self::Instant {
1308        self.current()
1309    }
1310}
1311
1312impl ReasonablyRealtime for Context {}
1313
1314impl crate::Network for Context {
1315    type Listener = ListenerOf<Network>;
1316
1317    async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
1318        self.network.bind(socket).await
1319    }
1320
1321    async fn dial(
1322        &self,
1323        socket: SocketAddr,
1324    ) -> Result<(crate::SinkOf<Self>, crate::StreamOf<Self>), Error> {
1325        self.network.dial(socket).await
1326    }
1327}
1328
1329impl crate::Resolver for Context {
1330    async fn resolve(&self, host: &str) -> Result<Vec<IpAddr>, Error> {
1331        // Get the record
1332        let executor = self.executor();
1333        let dns = executor.dns.lock().unwrap();
1334        let result = dns.get(host).cloned();
1335        drop(dns);
1336
1337        // Update the auditor
1338        executor.auditor.event(b"resolve", |hasher| {
1339            hasher.update(host.as_bytes());
1340            hasher.update(result.encode());
1341        });
1342        result.ok_or_else(|| Error::ResolveFailed(host.to_string()))
1343    }
1344}
1345
1346impl RngCore for Context {
1347    fn next_u32(&mut self) -> u32 {
1348        let executor = self.executor();
1349        executor.auditor.event(b"rand", |hasher| {
1350            hasher.update(b"next_u32");
1351        });
1352        let result = executor.rng.lock().unwrap().next_u32();
1353        result
1354    }
1355
1356    fn next_u64(&mut self) -> u64 {
1357        let executor = self.executor();
1358        executor.auditor.event(b"rand", |hasher| {
1359            hasher.update(b"next_u64");
1360        });
1361        let result = executor.rng.lock().unwrap().next_u64();
1362        result
1363    }
1364
1365    fn fill_bytes(&mut self, dest: &mut [u8]) {
1366        let executor = self.executor();
1367        executor.auditor.event(b"rand", |hasher| {
1368            hasher.update(b"fill_bytes");
1369        });
1370        executor.rng.lock().unwrap().fill_bytes(dest);
1371    }
1372
1373    fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1374        let executor = self.executor();
1375        executor.auditor.event(b"rand", |hasher| {
1376            hasher.update(b"try_fill_bytes");
1377        });
1378        let result = executor.rng.lock().unwrap().try_fill_bytes(dest);
1379        result
1380    }
1381}
1382
1383impl CryptoRng for Context {}
1384
1385impl crate::Storage for Context {
1386    type Blob = <Storage as crate::Storage>::Blob;
1387
1388    async fn open(&self, partition: &str, name: &[u8]) -> Result<(Self::Blob, u64), Error> {
1389        self.storage.open(partition, name).await
1390    }
1391
1392    async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1393        self.storage.remove(partition, name).await
1394    }
1395
1396    async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1397        self.storage.scan(partition).await
1398    }
1399}
1400
1401#[cfg(test)]
1402mod tests {
1403    use super::*;
1404    #[cfg(feature = "external")]
1405    use crate::FutureExt;
1406    #[cfg(feature = "external")]
1407    use crate::Spawner;
1408    use crate::{
1409        deterministic, reschedule, utils::run_tasks, Blob, Metrics, Resolver, Runner as _, Storage,
1410    };
1411    use commonware_macros::test_traced;
1412    #[cfg(not(feature = "external"))]
1413    use futures::future::pending;
1414    #[cfg(feature = "external")]
1415    use futures::{channel::mpsc, SinkExt, StreamExt};
1416    use futures::{channel::oneshot, task::noop_waker};
1417
1418    fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1419        let executor = deterministic::Runner::seeded(seed);
1420        run_tasks(5, executor)
1421    }
1422
1423    #[test]
1424    fn test_same_seed_same_order() {
1425        // Generate initial outputs
1426        let mut outputs = Vec::new();
1427        for seed in 0..1000 {
1428            let output = run_with_seed(seed);
1429            outputs.push(output);
1430        }
1431
1432        // Ensure they match
1433        for seed in 0..1000 {
1434            let output = run_with_seed(seed);
1435            assert_eq!(output, outputs[seed as usize]);
1436        }
1437    }
1438
1439    #[test_traced("TRACE")]
1440    fn test_different_seeds_different_order() {
1441        let output1 = run_with_seed(12345);
1442        let output2 = run_with_seed(54321);
1443        assert_ne!(output1, output2);
1444    }
1445
1446    #[test]
1447    fn test_alarm_min_heap() {
1448        // Populate heap
1449        let now = SystemTime::now();
1450        let alarms = vec![
1451            Alarm {
1452                time: now + Duration::new(10, 0),
1453                waker: noop_waker(),
1454            },
1455            Alarm {
1456                time: now + Duration::new(5, 0),
1457                waker: noop_waker(),
1458            },
1459            Alarm {
1460                time: now + Duration::new(15, 0),
1461                waker: noop_waker(),
1462            },
1463            Alarm {
1464                time: now + Duration::new(5, 0),
1465                waker: noop_waker(),
1466            },
1467        ];
1468        let mut heap = BinaryHeap::new();
1469        for alarm in alarms {
1470            heap.push(alarm);
1471        }
1472
1473        // Verify min-heap
1474        let mut sorted_times = Vec::new();
1475        while let Some(alarm) = heap.pop() {
1476            sorted_times.push(alarm.time);
1477        }
1478        assert_eq!(
1479            sorted_times,
1480            vec![
1481                now + Duration::new(5, 0),
1482                now + Duration::new(5, 0),
1483                now + Duration::new(10, 0),
1484                now + Duration::new(15, 0),
1485            ]
1486        );
1487    }
1488
1489    #[test]
1490    #[should_panic(expected = "runtime timeout")]
1491    fn test_timeout() {
1492        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1493        executor.start(|context| async move {
1494            loop {
1495                context.sleep(Duration::from_secs(1)).await;
1496            }
1497        });
1498    }
1499
1500    #[test]
1501    #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1502    fn test_bad_timeout() {
1503        let cfg = Config {
1504            timeout: Some(Duration::default()),
1505            cycle: Duration::default(),
1506            ..Config::default()
1507        };
1508        deterministic::Runner::new(cfg);
1509    }
1510
1511    #[test]
1512    #[should_panic(
1513        expected = "cycle duration must be greater than or equal to system time precision"
1514    )]
1515    fn test_bad_cycle() {
1516        let cfg = Config {
1517            cycle: SYSTEM_TIME_PRECISION - Duration::from_nanos(1),
1518            ..Config::default()
1519        };
1520        deterministic::Runner::new(cfg);
1521    }
1522
1523    #[test]
1524    fn test_recover_synced_storage_persists() {
1525        // Initialize the first runtime
1526        let executor1 = deterministic::Runner::default();
1527        let partition = "test_partition";
1528        let name = b"test_blob";
1529        let data = b"Hello, world!";
1530
1531        // Run some tasks, sync storage, and recover the runtime
1532        let (state, checkpoint) = executor1.start_and_recover(|context| async move {
1533            let (blob, _) = context.open(partition, name).await.unwrap();
1534            blob.write_at(Vec::from(data), 0).await.unwrap();
1535            blob.sync().await.unwrap();
1536            context.auditor().state()
1537        });
1538
1539        // Verify auditor state is the same
1540        assert_eq!(state, checkpoint.auditor.state());
1541
1542        // Check that synced storage persists after recovery
1543        let executor = Runner::from(checkpoint);
1544        executor.start(|context| async move {
1545            let (blob, len) = context.open(partition, name).await.unwrap();
1546            assert_eq!(len, data.len() as u64);
1547            let read = blob.read_at(vec![0; data.len()], 0).await.unwrap();
1548            assert_eq!(read.as_ref(), data);
1549        });
1550    }
1551
1552    #[test]
1553    #[should_panic(expected = "goodbye")]
1554    fn test_recover_panic_handling() {
1555        // Initialize the first runtime
1556        let executor1 = deterministic::Runner::default();
1557        let (_, checkpoint) = executor1.start_and_recover(|_| async move {
1558            reschedule().await;
1559        });
1560
1561        // Ensure that panic setting is preserved
1562        let executor = Runner::from(checkpoint);
1563        executor.start(|_| async move {
1564            panic!("goodbye");
1565        });
1566    }
1567
1568    #[test]
1569    fn test_recover_unsynced_storage_does_not_persist() {
1570        // Initialize the first runtime
1571        let executor = deterministic::Runner::default();
1572        let partition = "test_partition";
1573        let name = b"test_blob";
1574        let data = Vec::from("Hello, world!");
1575
1576        // Run some tasks without syncing storage
1577        let (_, checkpoint) = executor.start_and_recover(|context| async move {
1578            let context = context.clone();
1579            let (blob, _) = context.open(partition, name).await.unwrap();
1580            blob.write_at(data, 0).await.unwrap();
1581        });
1582
1583        // Recover the runtime
1584        let executor = Runner::from(checkpoint);
1585
1586        // Check that unsynced storage does not persist after recovery
1587        executor.start(|context| async move {
1588            let (_, len) = context.open(partition, name).await.unwrap();
1589            assert_eq!(len, 0);
1590        });
1591    }
1592
1593    #[test]
1594    fn test_recover_dns_mappings_persist() {
1595        // Initialize the first runtime
1596        let executor = deterministic::Runner::default();
1597        let host = "example.com";
1598        let addrs = vec![
1599            IpAddr::V4(std::net::Ipv4Addr::new(192, 168, 1, 1)),
1600            IpAddr::V4(std::net::Ipv4Addr::new(192, 168, 1, 2)),
1601        ];
1602
1603        // Register DNS mapping and recover the runtime
1604        let (state, checkpoint) = executor.start_and_recover({
1605            let addrs = addrs.clone();
1606            |context| async move {
1607                context.resolver_register(host, Some(addrs));
1608                context.auditor().state()
1609            }
1610        });
1611
1612        // Verify auditor state is the same
1613        assert_eq!(state, checkpoint.auditor.state());
1614
1615        // Check that DNS mappings persist after recovery
1616        let executor = Runner::from(checkpoint);
1617        executor.start(move |context| async move {
1618            let resolved = context.resolve(host).await.unwrap();
1619            assert_eq!(resolved, addrs);
1620        });
1621    }
1622
1623    #[test]
1624    #[should_panic(expected = "executor still has weak references")]
1625    fn test_context_return() {
1626        // Initialize runtime
1627        let executor = deterministic::Runner::default();
1628
1629        // Start runtime
1630        let context = executor.start(|context| async move {
1631            // Attempt to recover before the runtime has finished
1632            context
1633        });
1634
1635        // Should never get this far
1636        drop(context);
1637    }
1638
1639    #[test]
1640    fn test_default_time_zero() {
1641        // Initialize runtime
1642        let executor = deterministic::Runner::default();
1643
1644        executor.start(|context| async move {
1645            // Check that the time is zero
1646            assert_eq!(
1647                context.current().duration_since(UNIX_EPOCH).unwrap(),
1648                Duration::ZERO
1649            );
1650        });
1651    }
1652
1653    #[cfg(not(feature = "external"))]
1654    #[test]
1655    #[should_panic(expected = "runtime stalled")]
1656    fn test_stall() {
1657        // Initialize runtime
1658        let executor = deterministic::Runner::default();
1659
1660        // Start runtime
1661        executor.start(|_| async move {
1662            pending::<()>().await;
1663        });
1664    }
1665
1666    #[cfg(not(feature = "external"))]
1667    #[test]
1668    #[should_panic(expected = "runtime stalled")]
1669    fn test_external_simulated() {
1670        // Initialize runtime
1671        let executor = deterministic::Runner::default();
1672
1673        // Create a thread that waits for 1 second
1674        let (tx, rx) = oneshot::channel();
1675        std::thread::spawn(move || {
1676            std::thread::sleep(Duration::from_secs(1));
1677            tx.send(()).unwrap();
1678        });
1679
1680        // Start runtime
1681        executor.start(|_| async move {
1682            rx.await.unwrap();
1683        });
1684    }
1685
1686    #[cfg(feature = "external")]
1687    #[test]
1688    fn test_external_realtime() {
1689        // Initialize runtime
1690        let executor = deterministic::Runner::default();
1691
1692        // Create a thread that waits for 1 second
1693        let (tx, rx) = oneshot::channel();
1694        std::thread::spawn(move || {
1695            std::thread::sleep(Duration::from_secs(1));
1696            tx.send(()).unwrap();
1697        });
1698
1699        // Start runtime
1700        executor.start(|_| async move {
1701            rx.await.unwrap();
1702        });
1703    }
1704
1705    #[cfg(feature = "external")]
1706    #[test]
1707    fn test_external_realtime_variable() {
1708        // Initialize runtime
1709        let executor = deterministic::Runner::default();
1710
1711        // Start runtime
1712        executor.start(|context| async move {
1713            // Initialize test
1714            let start_real = SystemTime::now();
1715            let start_sim = context.current();
1716            let (first_tx, first_rx) = oneshot::channel();
1717            let (second_tx, second_rx) = oneshot::channel();
1718            let (mut results_tx, mut results_rx) = mpsc::channel(2);
1719
1720            // Create a thread that waits for 1 second
1721            let first_wait = Duration::from_secs(1);
1722            std::thread::spawn(move || {
1723                std::thread::sleep(first_wait);
1724                first_tx.send(()).unwrap();
1725            });
1726
1727            // Create a thread
1728            std::thread::spawn(move || {
1729                std::thread::sleep(Duration::ZERO);
1730                second_tx.send(()).unwrap();
1731            });
1732
1733            // Wait for a delay sampled before the external send occurs
1734            let first = context.clone().spawn({
1735                let mut results_tx = results_tx.clone();
1736                move |context| async move {
1737                    first_rx.pace(&context, Duration::ZERO).await.unwrap();
1738                    let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
1739                    assert!(elapsed_real > first_wait);
1740                    let elapsed_sim = context.current().duration_since(start_sim).unwrap();
1741                    assert!(elapsed_sim < first_wait);
1742                    results_tx.send(1).await.unwrap();
1743                }
1744            });
1745
1746            // Wait for a delay sampled after the external send occurs
1747            let second = context.clone().spawn(move |context| async move {
1748                second_rx.pace(&context, first_wait).await.unwrap();
1749                let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
1750                assert!(elapsed_real >= first_wait);
1751                let elapsed_sim = context.current().duration_since(start_sim).unwrap();
1752                assert!(elapsed_sim >= first_wait);
1753                results_tx.send(2).await.unwrap();
1754            });
1755
1756            // Wait for both tasks to complete
1757            second.await.unwrap();
1758            first.await.unwrap();
1759
1760            // Ensure order is correct
1761            let mut results = Vec::new();
1762            for _ in 0..2 {
1763                results.push(results_rx.next().await.unwrap());
1764            }
1765            assert_eq!(results, vec![1, 2]);
1766        });
1767    }
1768
1769    #[cfg(not(feature = "external"))]
1770    #[test]
1771    fn test_simulated_skip() {
1772        // Initialize runtime
1773        let executor = deterministic::Runner::default();
1774
1775        // Start runtime
1776        executor.start(|context| async move {
1777            context.sleep(Duration::from_secs(1)).await;
1778
1779            // Check if we skipped
1780            let metrics = context.encode();
1781            let iterations = metrics
1782                .lines()
1783                .find_map(|line| {
1784                    line.strip_prefix("runtime_iterations_total ")
1785                        .and_then(|value| value.trim().parse::<u64>().ok())
1786                })
1787                .expect("missing runtime_iterations_total metric");
1788            assert!(iterations < 10);
1789        });
1790    }
1791
1792    #[cfg(feature = "external")]
1793    #[test]
1794    fn test_realtime_no_skip() {
1795        // Initialize runtime
1796        let executor = deterministic::Runner::default();
1797
1798        // Start runtime
1799        executor.start(|context| async move {
1800            context.sleep(Duration::from_secs(1)).await;
1801
1802            // Check if we skipped
1803            let metrics = context.encode();
1804            let iterations = metrics
1805                .lines()
1806                .find_map(|line| {
1807                    line.strip_prefix("runtime_iterations_total ")
1808                        .and_then(|value| value.trim().parse::<u64>().ok())
1809                })
1810                .expect("missing runtime_iterations_total metric");
1811            assert!(iterations > 500);
1812        });
1813    }
1814}