commonware_runtime/
deterministic.rs

1//! A deterministic runtime that randomly selects tasks to run based on a seed
2//!
3//! # Panics
4//!
5//! Unless configured otherwise, any task panic will lead to a runtime panic.
6//!
7//! # External Processes
8//!
9//! When testing an application that interacts with some external process, it can appear to
10//! the runtime that progress has stalled because no pending tasks can make progress and/or
11//! that futures resolve at variable latency (which in turn triggers non-deterministic execution).
12//!
13//! To support such applications, the runtime can be built with the `external` feature to both
14//! sleep for each [Config::cycle] (opting to wait if all futures are pending) and to constrain
15//! the resolution latency of any future (with `pace()`).
16//!
17//! **Applications that do not interact with external processes (or are able to mock them) should never
18//! need to enable this feature. It is commonly used when testing consensus with external execution environments
19//! that use their own runtime (but are deterministic over some set of inputs).**
20//!
21//! # Example
22//!
23//! ```rust
24//! use commonware_runtime::{Spawner, Runner, deterministic, Metrics};
25//!
26//! let executor =  deterministic::Runner::default();
27//! executor.start(|context| async move {
28//!     println!("Parent started");
29//!     let result = context.with_label("child").spawn(|_| async move {
30//!         println!("Child started");
31//!         "hello"
32//!     });
33//!     println!("Child result: {:?}", result.await);
34//!     println!("Parent exited");
35//!     println!("Auditor state: {}", context.auditor().state());
36//! });
37//! ```
38
39use crate::{
40    network::{
41        audited::Network as AuditedNetwork, deterministic::Network as DeterministicNetwork,
42        metered::Network as MeteredNetwork,
43    },
44    storage::{
45        audited::Storage as AuditedStorage, memory::Storage as MemStorage,
46        metered::Storage as MeteredStorage,
47    },
48    telemetry::metrics::task::Label,
49    utils::{
50        signal::{Signal, Stopper},
51        supervision::Tree,
52        Panicker,
53    },
54    Clock, Error, Execution, Handle, ListenerOf, Panicked, METRICS_PREFIX,
55};
56#[cfg(feature = "external")]
57use crate::{Blocker, Pacer};
58use commonware_macros::select;
59use commonware_utils::{hex, time::SYSTEM_TIME_PRECISION, SystemTimeExt};
60#[cfg(feature = "external")]
61use futures::task::noop_waker;
62use futures::{
63    future::BoxFuture,
64    task::{waker, ArcWake},
65    Future, FutureExt,
66};
67use governor::clock::{Clock as GClock, ReasonablyRealtime};
68#[cfg(feature = "external")]
69use pin_project::pin_project;
70use prometheus_client::{
71    encoding::text::encode,
72    metrics::{counter::Counter, family::Family, gauge::Gauge},
73    registry::{Metric, Registry},
74};
75use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
76use sha2::{Digest as _, Sha256};
77use std::{
78    collections::{BTreeMap, BinaryHeap},
79    mem::{replace, take},
80    net::SocketAddr,
81    pin::Pin,
82    sync::{Arc, Mutex, Weak},
83    task::{self, Poll, Waker},
84    time::{Duration, SystemTime, UNIX_EPOCH},
85};
86use tracing::{info_span, trace, Instrument};
87
88#[derive(Debug)]
89struct Metrics {
90    iterations: Counter,
91    tasks_spawned: Family<Label, Counter>,
92    tasks_running: Family<Label, Gauge>,
93    task_polls: Family<Label, Counter>,
94
95    network_bandwidth: Counter,
96}
97
98impl Metrics {
99    pub fn init(registry: &mut Registry) -> Self {
100        let metrics = Self {
101            iterations: Counter::default(),
102            task_polls: Family::default(),
103            tasks_spawned: Family::default(),
104            tasks_running: Family::default(),
105            network_bandwidth: Counter::default(),
106        };
107        registry.register(
108            "iterations",
109            "Total number of iterations",
110            metrics.iterations.clone(),
111        );
112        registry.register(
113            "tasks_spawned",
114            "Total number of tasks spawned",
115            metrics.tasks_spawned.clone(),
116        );
117        registry.register(
118            "tasks_running",
119            "Number of tasks currently running",
120            metrics.tasks_running.clone(),
121        );
122        registry.register(
123            "task_polls",
124            "Total number of task polls",
125            metrics.task_polls.clone(),
126        );
127        registry.register(
128            "bandwidth",
129            "Total amount of data sent over network",
130            metrics.network_bandwidth.clone(),
131        );
132        metrics
133    }
134}
135
136/// A SHA-256 digest.
137type Digest = [u8; 32];
138
139/// Track the state of the runtime for determinism auditing.
140pub struct Auditor {
141    digest: Mutex<Digest>,
142}
143
144impl Default for Auditor {
145    fn default() -> Self {
146        Self {
147            digest: Digest::default().into(),
148        }
149    }
150}
151
152impl Auditor {
153    /// Record that an event happened.
154    /// This auditor's hash will be updated with the event's `label` and
155    /// whatever other data is passed in the `payload` closure.
156    pub(crate) fn event<F>(&self, label: &'static [u8], payload: F)
157    where
158        F: FnOnce(&mut Sha256),
159    {
160        let mut digest = self.digest.lock().unwrap();
161
162        let mut hasher = Sha256::new();
163        hasher.update(digest.as_ref());
164        hasher.update(label);
165        payload(&mut hasher);
166
167        *digest = hasher.finalize().into();
168    }
169
170    /// Generate a representation of the current state of the runtime.
171    ///
172    /// This can be used to ensure that logic running on top
173    /// of the runtime is interacting deterministically.
174    pub fn state(&self) -> String {
175        let hash = self.digest.lock().unwrap();
176        hex(hash.as_ref())
177    }
178}
179
180/// Configuration for the `deterministic` runtime.
181#[derive(Clone)]
182pub struct Config {
183    /// Seed for the random number generator.
184    seed: u64,
185
186    /// The cycle duration determines how much time is advanced after each iteration of the event
187    /// loop. This is useful to prevent starvation if some task never yields.
188    cycle: Duration,
189
190    /// If the runtime is still executing at this point (i.e. a test hasn't stopped), panic.
191    timeout: Option<Duration>,
192
193    /// Whether spawned tasks should catch panics instead of propagating them.
194    catch_panics: bool,
195}
196
197impl Config {
198    /// Returns a new [Config] with default values.
199    pub fn new() -> Self {
200        Self {
201            seed: 42,
202            cycle: Duration::from_millis(1),
203            timeout: None,
204            catch_panics: false,
205        }
206    }
207
208    // Setters
209    /// See [Config]
210    pub fn with_seed(mut self, seed: u64) -> Self {
211        self.seed = seed;
212        self
213    }
214    /// See [Config]
215    pub fn with_cycle(mut self, cycle: Duration) -> Self {
216        self.cycle = cycle;
217        self
218    }
219    /// See [Config]
220    pub fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
221        self.timeout = timeout;
222        self
223    }
224    /// See [Config]
225    pub fn with_catch_panics(mut self, catch_panics: bool) -> Self {
226        self.catch_panics = catch_panics;
227        self
228    }
229
230    // Getters
231    /// See [Config]
232    pub fn seed(&self) -> u64 {
233        self.seed
234    }
235    /// See [Config]
236    pub fn cycle(&self) -> Duration {
237        self.cycle
238    }
239    /// See [Config]
240    pub fn timeout(&self) -> Option<Duration> {
241        self.timeout
242    }
243    /// See [Config]
244    pub fn catch_panics(&self) -> bool {
245        self.catch_panics
246    }
247
248    /// Assert that the configuration is valid.
249    pub fn assert(&self) {
250        assert!(
251            self.cycle != Duration::default() || self.timeout.is_none(),
252            "cycle duration must be non-zero when timeout is set",
253        );
254        assert!(
255            self.cycle >= SYSTEM_TIME_PRECISION,
256            "cycle duration must be greater than or equal to system time precision"
257        );
258    }
259}
260
261impl Default for Config {
262    fn default() -> Self {
263        Self::new()
264    }
265}
266
267/// Deterministic runtime that randomly selects tasks to run based on a seed.
268pub struct Executor {
269    registry: Mutex<Registry>,
270    cycle: Duration,
271    deadline: Option<SystemTime>,
272    metrics: Arc<Metrics>,
273    auditor: Arc<Auditor>,
274    rng: Mutex<StdRng>,
275    time: Mutex<SystemTime>,
276    tasks: Arc<Tasks>,
277    sleeping: Mutex<BinaryHeap<Alarm>>,
278    shutdown: Mutex<Stopper>,
279    panicker: Panicker,
280}
281
282impl Executor {
283    /// Advance simulated time by [Config::cycle].
284    ///
285    /// When built with the `external` feature, sleep for [Config::cycle] to let
286    /// external processes make progress.
287    fn advance_time(&self) -> SystemTime {
288        #[cfg(feature = "external")]
289        std::thread::sleep(self.cycle);
290
291        let mut time = self.time.lock().unwrap();
292        *time = time
293            .checked_add(self.cycle)
294            .expect("executor time overflowed");
295        let now = *time;
296        trace!(now = now.epoch_millis(), "time advanced");
297        now
298    }
299
300    /// When idle, jump directly to the next actionable time.
301    ///
302    /// When built with the `external` feature, never skip ahead (to ensure we poll all pending tasks
303    /// every [Config::cycle]).
304    fn skip_idle_time(&self, current: SystemTime) -> SystemTime {
305        if cfg!(feature = "external") || self.tasks.ready() != 0 {
306            return current;
307        }
308
309        let mut skip_until = None;
310        {
311            let sleeping = self.sleeping.lock().unwrap();
312            if let Some(next) = sleeping.peek() {
313                if next.time > current {
314                    skip_until = Some(next.time);
315                }
316            }
317        }
318
319        if let Some(deadline) = skip_until {
320            let mut time = self.time.lock().unwrap();
321            *time = deadline;
322            let now = *time;
323            trace!(now = now.epoch_millis(), "time skipped");
324            now
325        } else {
326            current
327        }
328    }
329
330    /// Wake any sleepers whose deadlines have elapsed.
331    fn wake_ready_sleepers(&self, current: SystemTime) {
332        let mut sleeping = self.sleeping.lock().unwrap();
333        while let Some(next) = sleeping.peek() {
334            if next.time <= current {
335                let sleeper = sleeping.pop().unwrap();
336                sleeper.waker.wake();
337            } else {
338                break;
339            }
340        }
341    }
342
343    /// Ensure the runtime is making progress.
344    ///
345    /// When built with the `external` feature, always poll pending tasks after the passage of time.
346    fn assert_liveness(&self) {
347        if cfg!(feature = "external") || self.tasks.ready() != 0 {
348            return;
349        }
350
351        panic!("runtime stalled");
352    }
353}
354
355/// An artifact that can be used to recover the state of the runtime.
356///
357/// This is useful when mocking unclean shutdown (while retaining deterministic behavior).
358pub struct Checkpoint {
359    cycle: Duration,
360    deadline: Option<SystemTime>,
361    auditor: Arc<Auditor>,
362    rng: Mutex<StdRng>,
363    time: Mutex<SystemTime>,
364    storage: Arc<Storage>,
365    catch_panics: bool,
366}
367
368impl Checkpoint {
369    /// Get a reference to the [Auditor].
370    pub fn auditor(&self) -> Arc<Auditor> {
371        self.auditor.clone()
372    }
373}
374
375#[allow(clippy::large_enum_variant)]
376enum State {
377    Config(Config),
378    Checkpoint(Checkpoint),
379}
380
381/// Implementation of [crate::Runner] for the `deterministic` runtime.
382pub struct Runner {
383    state: State,
384}
385
386impl From<Config> for Runner {
387    fn from(cfg: Config) -> Self {
388        Self::new(cfg)
389    }
390}
391
392impl From<Checkpoint> for Runner {
393    fn from(checkpoint: Checkpoint) -> Self {
394        Self {
395            state: State::Checkpoint(checkpoint),
396        }
397    }
398}
399
400impl Runner {
401    /// Initialize a new `deterministic` runtime with the given seed and cycle duration.
402    pub fn new(cfg: Config) -> Self {
403        // Ensure config is valid
404        cfg.assert();
405        Runner {
406            state: State::Config(cfg),
407        }
408    }
409
410    /// Initialize a new `deterministic` runtime with the default configuration
411    /// and the provided seed.
412    pub fn seeded(seed: u64) -> Self {
413        let cfg = Config {
414            seed,
415            ..Config::default()
416        };
417        Self::new(cfg)
418    }
419
420    /// Initialize a new `deterministic` runtime with the default configuration
421    /// but exit after the given timeout.
422    pub fn timed(timeout: Duration) -> Self {
423        let cfg = Config {
424            timeout: Some(timeout),
425            ..Config::default()
426        };
427        Self::new(cfg)
428    }
429
430    /// Like [crate::Runner::start], but also returns a [Checkpoint] that can be used
431    /// to recover the state of the runtime in a subsequent run.
432    pub fn start_and_recover<F, Fut>(self, f: F) -> (Fut::Output, Checkpoint)
433    where
434        F: FnOnce(Context) -> Fut,
435        Fut: Future,
436    {
437        // Setup context and return strong reference to executor
438        let (context, executor, panicked) = match self.state {
439            State::Config(config) => Context::new(config),
440            State::Checkpoint(checkpoint) => Context::recover(checkpoint),
441        };
442
443        // Pin root task to the heap
444        let storage = context.storage.clone();
445        let mut root = Box::pin(panicked.interrupt(f(context)));
446
447        // Register the root task
448        Tasks::register_root(&executor.tasks);
449
450        // Process tasks until root task completes or progress stalls
451        let output = loop {
452            // Ensure we have not exceeded our deadline
453            {
454                let current = executor.time.lock().unwrap();
455                if let Some(deadline) = executor.deadline {
456                    if *current >= deadline {
457                        panic!("runtime timeout");
458                    }
459                }
460            }
461
462            // Drain all ready tasks
463            let mut queue = executor.tasks.drain();
464
465            // Shuffle tasks (if more than one)
466            if queue.len() > 1 {
467                let mut rng = executor.rng.lock().unwrap();
468                queue.shuffle(&mut *rng);
469            }
470
471            // Run all snapshotted tasks
472            //
473            // This approach is more efficient than randomly selecting a task one-at-a-time
474            // because it ensures we don't pull the same pending task multiple times in a row (without
475            // processing a different task required for other tasks to make progress).
476            trace!(
477                iter = executor.metrics.iterations.get(),
478                tasks = queue.len(),
479                "starting loop"
480            );
481            let mut output = None;
482            for id in queue {
483                // Lookup the task (it may have completed already)
484                let Some(task) = executor.tasks.get(id) else {
485                    trace!(id, "skipping missing task");
486                    continue;
487                };
488
489                // Record task for auditing
490                executor.auditor.event(b"process_task", |hasher| {
491                    hasher.update(task.id.to_be_bytes());
492                    hasher.update(task.label.name().as_bytes());
493                });
494                executor.metrics.task_polls.get_or_create(&task.label).inc();
495                trace!(id, "processing task");
496
497                // Prepare task for polling
498                let waker = waker(Arc::new(TaskWaker {
499                    id,
500                    tasks: Arc::downgrade(&executor.tasks),
501                }));
502                let mut cx = task::Context::from_waker(&waker);
503
504                // Poll the task
505                match &task.mode {
506                    Mode::Root => {
507                        // Poll the root task
508                        if let Poll::Ready(result) = root.as_mut().poll(&mut cx) {
509                            trace!(id, "root task is complete");
510                            output = Some(result);
511                            break;
512                        }
513                    }
514                    Mode::Work(future) => {
515                        // Get the future (if it still exists)
516                        let mut fut_opt = future.lock().unwrap();
517                        let Some(fut) = fut_opt.as_mut() else {
518                            trace!(id, "skipping already complete task");
519
520                            // Remove the future
521                            executor.tasks.remove(id);
522                            continue;
523                        };
524
525                        // Poll the task
526                        if fut.as_mut().poll(&mut cx).is_ready() {
527                            trace!(id, "task is complete");
528
529                            // Remove the future
530                            executor.tasks.remove(id);
531                            *fut_opt = None;
532                            continue;
533                        }
534                    }
535                }
536
537                // Try again later if task is still pending
538                trace!(id, "task is still pending");
539            }
540
541            // If the root task has completed, exit as soon as possible
542            if let Some(output) = output {
543                break output;
544            }
545
546            // Advance time (skipping ahead if no tasks are ready yet)
547            let mut current = executor.advance_time();
548            current = executor.skip_idle_time(current);
549
550            // Wake sleepers and ensure we continue to make progress
551            executor.wake_ready_sleepers(current);
552            executor.assert_liveness();
553
554            // Record that we completed another iteration of the event loop.
555            executor.metrics.iterations.inc();
556        };
557
558        // Clear remaining tasks from the executor.
559        //
560        // It is critical that we wait to drop the strong
561        // reference to executor until after we have dropped
562        // all tasks (as they may attempt to upgrade their weak
563        // reference to the executor during drop).
564        executor.sleeping.lock().unwrap().clear(); // included in tasks
565        let tasks = executor.tasks.clear();
566        for task in tasks {
567            let Mode::Work(future) = &task.mode else {
568                continue;
569            };
570            *future.lock().unwrap() = None;
571        }
572
573        // Assert the context doesn't escape the start() function (behavior
574        // is undefined in this case)
575        assert!(
576            Arc::weak_count(&executor) == 0,
577            "executor still has weak references"
578        );
579
580        // Extract the executor from the Arc
581        let executor = Arc::into_inner(executor).expect("executor still has strong references");
582
583        // Construct a checkpoint that can be used to restart the runtime
584        let checkpoint = Checkpoint {
585            cycle: executor.cycle,
586            deadline: executor.deadline,
587            auditor: executor.auditor,
588            rng: executor.rng,
589            time: executor.time,
590            storage,
591            catch_panics: executor.panicker.catch(),
592        };
593
594        (output, checkpoint)
595    }
596}
597
598impl Default for Runner {
599    fn default() -> Self {
600        Self::new(Config::default())
601    }
602}
603
604impl crate::Runner for Runner {
605    type Context = Context;
606
607    fn start<F, Fut>(self, f: F) -> Fut::Output
608    where
609        F: FnOnce(Self::Context) -> Fut,
610        Fut: Future,
611    {
612        let (output, _) = self.start_and_recover(f);
613        output
614    }
615}
616
617/// The mode of a [Task].
618enum Mode {
619    Root,
620    Work(Mutex<Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>>),
621}
622
623/// A future being executed by the [Executor].
624struct Task {
625    id: u128,
626    label: Label,
627
628    mode: Mode,
629}
630
631/// A waker for a [Task].
632struct TaskWaker {
633    id: u128,
634
635    tasks: Weak<Tasks>,
636}
637
638impl ArcWake for TaskWaker {
639    fn wake_by_ref(arc_self: &Arc<Self>) {
640        // Upgrade the weak reference to re-enqueue this task.
641        // If upgrade fails, the task queue has been dropped and no action is required.
642        //
643        // This can happen if some data is passed into the runtime and it drops after the runtime exits.
644        if let Some(tasks) = arc_self.tasks.upgrade() {
645            tasks.queue(arc_self.id);
646        }
647    }
648}
649
650/// A collection of [Task]s that are being executed by the [Executor].
651struct Tasks {
652    /// The next task id.
653    counter: Mutex<u128>,
654    /// Tasks ready to be polled.
655    ready: Mutex<Vec<u128>>,
656    /// All running tasks.
657    running: Mutex<BTreeMap<u128, Arc<Task>>>,
658}
659
660impl Tasks {
661    /// Create a new task queue.
662    fn new() -> Self {
663        Self {
664            counter: Mutex::new(0),
665            ready: Mutex::new(Vec::new()),
666            running: Mutex::new(BTreeMap::new()),
667        }
668    }
669
670    /// Increment the task counter and return the old value.
671    fn increment(&self) -> u128 {
672        let mut counter = self.counter.lock().unwrap();
673        let old = *counter;
674        *counter = counter.checked_add(1).expect("task counter overflow");
675        old
676    }
677
678    /// Register the root task.
679    ///
680    /// If the root task has already been registered, this function will panic.
681    fn register_root(arc_self: &Arc<Self>) {
682        let id = arc_self.increment();
683        let task = Arc::new(Task {
684            id,
685            label: Label::root(),
686            mode: Mode::Root,
687        });
688        arc_self.register(id, task);
689    }
690
691    /// Register a non-root task to be executed.
692    fn register_work(
693        arc_self: &Arc<Self>,
694        label: Label,
695        future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
696    ) {
697        let id = arc_self.increment();
698        let task = Arc::new(Task {
699            id,
700            label,
701            mode: Mode::Work(Mutex::new(Some(future))),
702        });
703        arc_self.register(id, task);
704    }
705
706    /// Register a new task to be executed.
707    fn register(&self, id: u128, task: Arc<Task>) {
708        // Track as running until completion
709        self.running.lock().unwrap().insert(id, task);
710
711        // Add to ready
712        self.queue(id);
713    }
714
715    /// Enqueue an already registered task to be executed.
716    fn queue(&self, id: u128) {
717        let mut ready = self.ready.lock().unwrap();
718        ready.push(id);
719    }
720
721    /// Drain all ready tasks.
722    fn drain(&self) -> Vec<u128> {
723        let mut queue = self.ready.lock().unwrap();
724        let len = queue.len();
725        replace(&mut *queue, Vec::with_capacity(len))
726    }
727
728    /// The number of ready tasks.
729    fn ready(&self) -> usize {
730        self.ready.lock().unwrap().len()
731    }
732
733    /// Lookup a task.
734    ///
735    /// We must return cloned here because we cannot hold the running lock while polling a task (will
736    /// deadlock if [Self::register_work] is called).
737    fn get(&self, id: u128) -> Option<Arc<Task>> {
738        let running = self.running.lock().unwrap();
739        running.get(&id).cloned()
740    }
741
742    /// Remove a task.
743    fn remove(&self, id: u128) {
744        self.running.lock().unwrap().remove(&id);
745    }
746
747    /// Clear all tasks.
748    fn clear(&self) -> Vec<Arc<Task>> {
749        // Clear ready
750        self.ready.lock().unwrap().clear();
751
752        // Clear running tasks
753        let running: BTreeMap<u128, Arc<Task>> = {
754            let mut running = self.running.lock().unwrap();
755            take(&mut *running)
756        };
757        running.into_values().collect()
758    }
759}
760
761type Network = MeteredNetwork<AuditedNetwork<DeterministicNetwork>>;
762type Storage = MeteredStorage<AuditedStorage<MemStorage>>;
763
764/// Implementation of [crate::Spawner], [crate::Clock],
765/// [crate::Network], and [crate::Storage] for the `deterministic`
766/// runtime.
767pub struct Context {
768    name: String,
769    executor: Weak<Executor>,
770    network: Arc<Network>,
771    storage: Arc<Storage>,
772    tree: Arc<Tree>,
773    execution: Execution,
774    instrumented: bool,
775}
776
777impl Clone for Context {
778    fn clone(&self) -> Self {
779        let (child, _) = Tree::child(&self.tree);
780        Self {
781            name: self.name.clone(),
782            executor: self.executor.clone(),
783            network: self.network.clone(),
784            storage: self.storage.clone(),
785
786            tree: child,
787            execution: Execution::default(),
788            instrumented: false,
789        }
790    }
791}
792
793impl Context {
794    fn new(cfg: Config) -> (Self, Arc<Executor>, Panicked) {
795        // Create a new registry
796        let mut registry = Registry::default();
797        let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
798
799        // Initialize runtime
800        let metrics = Arc::new(Metrics::init(runtime_registry));
801        let start_time = UNIX_EPOCH;
802        let deadline = cfg
803            .timeout
804            .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
805        let auditor = Arc::new(Auditor::default());
806        let storage = MeteredStorage::new(
807            AuditedStorage::new(MemStorage::default(), auditor.clone()),
808            runtime_registry,
809        );
810        let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
811        let network = MeteredNetwork::new(network, runtime_registry);
812
813        // Initialize panicker
814        let (panicker, panicked) = Panicker::new(cfg.catch_panics);
815
816        let executor = Arc::new(Executor {
817            registry: Mutex::new(registry),
818            cycle: cfg.cycle,
819            deadline,
820            metrics: metrics.clone(),
821            auditor: auditor.clone(),
822            rng: Mutex::new(StdRng::seed_from_u64(cfg.seed)),
823            time: Mutex::new(start_time),
824            tasks: Arc::new(Tasks::new()),
825            sleeping: Mutex::new(BinaryHeap::new()),
826            shutdown: Mutex::new(Stopper::default()),
827            panicker,
828        });
829
830        (
831            Self {
832                name: String::new(),
833                executor: Arc::downgrade(&executor),
834                network: Arc::new(network),
835                storage: Arc::new(storage),
836                tree: Tree::root(),
837                execution: Execution::default(),
838                instrumented: false,
839            },
840            executor,
841            panicked,
842        )
843    }
844
845    /// Recover the inner state (deadline, metrics, auditor, rng, synced storage, etc.) from the
846    /// current runtime and use it to initialize a new instance of the runtime. A recovered runtime
847    /// does not inherit the current runtime's pending tasks, unsynced storage, network connections, nor
848    /// its shutdown signaler.
849    ///
850    /// This is useful for performing a deterministic simulation that spans multiple runtime instantiations,
851    /// like simulating unclean shutdown (which involves repeatedly halting the runtime at unexpected intervals).
852    ///
853    /// It is only permitted to call this method after the runtime has finished (i.e. once `start` returns)
854    /// and only permitted to do once (otherwise multiple recovered runtimes will share the same inner state).
855    /// If either one of these conditions is violated, this method will panic.
856    fn recover(checkpoint: Checkpoint) -> (Self, Arc<Executor>, Panicked) {
857        // Rebuild metrics
858        let mut registry = Registry::default();
859        let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
860        let metrics = Arc::new(Metrics::init(runtime_registry));
861
862        // Copy state
863        let network =
864            AuditedNetwork::new(DeterministicNetwork::default(), checkpoint.auditor.clone());
865        let network = MeteredNetwork::new(network, runtime_registry);
866
867        // Initialize panicker
868        let (panicker, panicked) = Panicker::new(checkpoint.catch_panics);
869
870        let executor = Arc::new(Executor {
871            // Copied from the checkpoint
872            cycle: checkpoint.cycle,
873            deadline: checkpoint.deadline,
874            auditor: checkpoint.auditor,
875            rng: checkpoint.rng,
876            time: checkpoint.time,
877
878            // New state for the new runtime
879            registry: Mutex::new(registry),
880            metrics: metrics.clone(),
881            tasks: Arc::new(Tasks::new()),
882            sleeping: Mutex::new(BinaryHeap::new()),
883            shutdown: Mutex::new(Stopper::default()),
884            panicker,
885        });
886        (
887            Self {
888                name: String::new(),
889                executor: Arc::downgrade(&executor),
890                network: Arc::new(network),
891                storage: checkpoint.storage,
892                tree: Tree::root(),
893                execution: Execution::default(),
894                instrumented: false,
895            },
896            executor,
897            panicked,
898        )
899    }
900
901    /// Upgrade Weak reference to [Executor].
902    fn executor(&self) -> Arc<Executor> {
903        self.executor.upgrade().expect("executor already dropped")
904    }
905
906    /// Get a reference to [Metrics].
907    fn metrics(&self) -> Arc<Metrics> {
908        self.executor().metrics.clone()
909    }
910
911    /// Get a reference to the [Auditor].
912    pub fn auditor(&self) -> Arc<Auditor> {
913        self.executor().auditor.clone()
914    }
915}
916
917impl crate::Spawner for Context {
918    fn dedicated(mut self) -> Self {
919        self.execution = Execution::Dedicated;
920        self
921    }
922
923    fn shared(mut self, blocking: bool) -> Self {
924        self.execution = Execution::Shared(blocking);
925        self
926    }
927
928    fn instrumented(mut self) -> Self {
929        self.instrumented = true;
930        self
931    }
932
933    fn spawn<F, Fut, T>(mut self, f: F) -> Handle<T>
934    where
935        F: FnOnce(Self) -> Fut + Send + 'static,
936        Fut: Future<Output = T> + Send + 'static,
937        T: Send + 'static,
938    {
939        // Get metrics
940        let (label, metric) = spawn_metrics!(self);
941
942        // Track supervision before resetting configuration
943        let parent = Arc::clone(&self.tree);
944        let is_instrumented = self.instrumented;
945        self.execution = Execution::default();
946        self.instrumented = false;
947        let (child, aborted) = Tree::child(&parent);
948        if aborted {
949            return Handle::closed(metric);
950        }
951        self.tree = child;
952
953        // Spawn the task (we don't care about Model)
954        let executor = self.executor();
955        let future: BoxFuture<T> = if is_instrumented {
956            f(self)
957                .instrument(info_span!(parent: None, "task", name = %label.name()))
958                .boxed()
959        } else {
960            f(self).boxed()
961        };
962        let (f, handle) = Handle::init(
963            future,
964            metric,
965            executor.panicker.clone(),
966            Arc::clone(&parent),
967        );
968        Tasks::register_work(&executor.tasks, label, Box::pin(f));
969
970        // Register the task on the parent
971        if let Some(aborter) = handle.aborter() {
972            parent.register(aborter);
973        }
974
975        handle
976    }
977
978    async fn stop(self, value: i32, timeout: Option<Duration>) -> Result<(), Error> {
979        let executor = self.executor();
980        executor.auditor.event(b"stop", |hasher| {
981            hasher.update(value.to_be_bytes());
982        });
983        let stop_resolved = {
984            let mut shutdown = executor.shutdown.lock().unwrap();
985            shutdown.stop(value)
986        };
987
988        // Wait for all tasks to complete or the timeout to fire
989        let timeout_future = match timeout {
990            Some(duration) => futures::future::Either::Left(self.sleep(duration)),
991            None => futures::future::Either::Right(futures::future::pending()),
992        };
993        select! {
994            result = stop_resolved => {
995                result.map_err(|_| Error::Closed)?;
996                Ok(())
997            },
998            _ = timeout_future => {
999                Err(Error::Timeout)
1000            }
1001        }
1002    }
1003
1004    fn stopped(&self) -> Signal {
1005        let executor = self.executor();
1006        executor.auditor.event(b"stopped", |_| {});
1007        let stopped = executor.shutdown.lock().unwrap().stopped();
1008        stopped
1009    }
1010}
1011
1012impl crate::Metrics for Context {
1013    fn with_label(&self, label: &str) -> Self {
1014        let name = {
1015            let prefix = self.name.clone();
1016            if prefix.is_empty() {
1017                label.to_string()
1018            } else {
1019                format!("{prefix}_{label}")
1020            }
1021        };
1022        assert!(
1023            !name.starts_with(METRICS_PREFIX),
1024            "using runtime label is not allowed"
1025        );
1026        Self {
1027            name,
1028            ..self.clone()
1029        }
1030    }
1031
1032    fn label(&self) -> String {
1033        self.name.clone()
1034    }
1035
1036    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
1037        // Prepare args
1038        let name = name.into();
1039        let help = help.into();
1040
1041        // Register metric
1042        let executor = self.executor();
1043        executor.auditor.event(b"register", |hasher| {
1044            hasher.update(name.as_bytes());
1045            hasher.update(help.as_bytes());
1046        });
1047        let prefixed_name = {
1048            let prefix = &self.name;
1049            if prefix.is_empty() {
1050                name
1051            } else {
1052                format!("{}_{}", *prefix, name)
1053            }
1054        };
1055        executor
1056            .registry
1057            .lock()
1058            .unwrap()
1059            .register(prefixed_name, help, metric);
1060    }
1061
1062    fn encode(&self) -> String {
1063        let executor = self.executor();
1064        executor.auditor.event(b"encode", |_| {});
1065        let mut buffer = String::new();
1066        encode(&mut buffer, &executor.registry.lock().unwrap()).expect("encoding failed");
1067        buffer
1068    }
1069}
1070
1071struct Sleeper {
1072    executor: Weak<Executor>,
1073    time: SystemTime,
1074    registered: bool,
1075}
1076
1077impl Sleeper {
1078    /// Upgrade Weak reference to [Executor].
1079    fn executor(&self) -> Arc<Executor> {
1080        self.executor.upgrade().expect("executor already dropped")
1081    }
1082}
1083
1084struct Alarm {
1085    time: SystemTime,
1086    waker: Waker,
1087}
1088
1089impl PartialEq for Alarm {
1090    fn eq(&self, other: &Self) -> bool {
1091        self.time.eq(&other.time)
1092    }
1093}
1094
1095impl Eq for Alarm {}
1096
1097impl PartialOrd for Alarm {
1098    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1099        Some(self.cmp(other))
1100    }
1101}
1102
1103impl Ord for Alarm {
1104    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1105        // Reverse the ordering for min-heap
1106        other.time.cmp(&self.time)
1107    }
1108}
1109
1110impl Future for Sleeper {
1111    type Output = ();
1112
1113    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1114        let executor = self.executor();
1115        {
1116            let current_time = *executor.time.lock().unwrap();
1117            if current_time >= self.time {
1118                return Poll::Ready(());
1119            }
1120        }
1121        if !self.registered {
1122            self.registered = true;
1123            executor.sleeping.lock().unwrap().push(Alarm {
1124                time: self.time,
1125                waker: cx.waker().clone(),
1126            });
1127        }
1128        Poll::Pending
1129    }
1130}
1131
1132impl Clock for Context {
1133    fn current(&self) -> SystemTime {
1134        *self.executor().time.lock().unwrap()
1135    }
1136
1137    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
1138        let deadline = self
1139            .current()
1140            .checked_add(duration)
1141            .expect("overflow when setting wake time");
1142        self.sleep_until(deadline)
1143    }
1144
1145    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
1146        Sleeper {
1147            executor: self.executor.clone(),
1148
1149            time: deadline,
1150            registered: false,
1151        }
1152    }
1153}
1154
1155/// A future that resolves when a given target time is reached.
1156///
1157/// If the future is not ready at the target time, the future is blocked until the target time is reached.
1158#[cfg(feature = "external")]
1159#[pin_project]
1160struct Waiter<F: Future> {
1161    executor: Weak<Executor>,
1162    target: SystemTime,
1163    #[pin]
1164    future: F,
1165    ready: Option<F::Output>,
1166    started: bool,
1167    registered: bool,
1168}
1169
1170#[cfg(feature = "external")]
1171impl<F> Future for Waiter<F>
1172where
1173    F: Future + Send,
1174{
1175    type Output = F::Output;
1176
1177    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1178        let mut this = self.project();
1179
1180        // Poll once with a noop waker so the future can register interest or start work
1181        // without being able to wake this task before the sampled delay expires. Any ready
1182        // value is cached and only released after the clock reaches `self.target`.
1183        if !*this.started {
1184            *this.started = true;
1185            let waker = noop_waker();
1186            let mut cx_noop = task::Context::from_waker(&waker);
1187            if let Poll::Ready(value) = this.future.as_mut().poll(&mut cx_noop) {
1188                *this.ready = Some(value);
1189            }
1190        }
1191
1192        // Only allow the task to progress once the sampled delay has elapsed.
1193        let executor = this.executor.upgrade().expect("executor already dropped");
1194        let current_time = *executor.time.lock().unwrap();
1195        if current_time < *this.target {
1196            // Register exactly once with the deterministic sleeper queue so the executor
1197            // wakes us once the clock reaches the scheduled target time.
1198            if !*this.registered {
1199                *this.registered = true;
1200                executor.sleeping.lock().unwrap().push(Alarm {
1201                    time: *this.target,
1202                    waker: cx.waker().clone(),
1203                });
1204            }
1205            return Poll::Pending;
1206        }
1207
1208        // If the underlying future completed during the noop pre-poll, surface the cached value.
1209        if let Some(value) = this.ready.take() {
1210            return Poll::Ready(value);
1211        }
1212
1213        // Block the current thread until the future reschedules itself, keeping polling
1214        // deterministic with respect to executor time.
1215        let blocker = Blocker::new();
1216        loop {
1217            let waker = waker(blocker.clone());
1218            let mut cx_block = task::Context::from_waker(&waker);
1219            match this.future.as_mut().poll(&mut cx_block) {
1220                Poll::Ready(value) => {
1221                    break Poll::Ready(value);
1222                }
1223                Poll::Pending => blocker.wait(),
1224            }
1225        }
1226    }
1227}
1228
1229#[cfg(feature = "external")]
1230impl Pacer for Context {
1231    fn pace<'a, F, T>(&'a self, latency: Duration, future: F) -> impl Future<Output = T> + Send + 'a
1232    where
1233        F: Future<Output = T> + Send + 'a,
1234        T: Send + 'a,
1235    {
1236        // Compute target time
1237        let target = self
1238            .executor()
1239            .time
1240            .lock()
1241            .unwrap()
1242            .checked_add(latency)
1243            .expect("overflow when setting wake time");
1244
1245        Waiter {
1246            executor: self.executor.clone(),
1247            target,
1248            future,
1249            ready: None,
1250            started: false,
1251            registered: false,
1252        }
1253    }
1254}
1255
1256impl GClock for Context {
1257    type Instant = SystemTime;
1258
1259    fn now(&self) -> Self::Instant {
1260        self.current()
1261    }
1262}
1263
1264impl ReasonablyRealtime for Context {}
1265
1266impl crate::Network for Context {
1267    type Listener = ListenerOf<Network>;
1268
1269    async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
1270        self.network.bind(socket).await
1271    }
1272
1273    async fn dial(
1274        &self,
1275        socket: SocketAddr,
1276    ) -> Result<(crate::SinkOf<Self>, crate::StreamOf<Self>), Error> {
1277        self.network.dial(socket).await
1278    }
1279}
1280
1281impl RngCore for Context {
1282    fn next_u32(&mut self) -> u32 {
1283        let executor = self.executor();
1284        executor.auditor.event(b"rand", |hasher| {
1285            hasher.update(b"next_u32");
1286        });
1287        let result = executor.rng.lock().unwrap().next_u32();
1288        result
1289    }
1290
1291    fn next_u64(&mut self) -> u64 {
1292        let executor = self.executor();
1293        executor.auditor.event(b"rand", |hasher| {
1294            hasher.update(b"next_u64");
1295        });
1296        let result = executor.rng.lock().unwrap().next_u64();
1297        result
1298    }
1299
1300    fn fill_bytes(&mut self, dest: &mut [u8]) {
1301        let executor = self.executor();
1302        executor.auditor.event(b"rand", |hasher| {
1303            hasher.update(b"fill_bytes");
1304        });
1305        executor.rng.lock().unwrap().fill_bytes(dest);
1306    }
1307
1308    fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1309        let executor = self.executor();
1310        executor.auditor.event(b"rand", |hasher| {
1311            hasher.update(b"try_fill_bytes");
1312        });
1313        let result = executor.rng.lock().unwrap().try_fill_bytes(dest);
1314        result
1315    }
1316}
1317
1318impl CryptoRng for Context {}
1319
1320impl crate::Storage for Context {
1321    type Blob = <Storage as crate::Storage>::Blob;
1322
1323    async fn open(&self, partition: &str, name: &[u8]) -> Result<(Self::Blob, u64), Error> {
1324        self.storage.open(partition, name).await
1325    }
1326
1327    async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1328        self.storage.remove(partition, name).await
1329    }
1330
1331    async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1332        self.storage.scan(partition).await
1333    }
1334}
1335
1336#[cfg(test)]
1337mod tests {
1338    use super::*;
1339    #[cfg(feature = "external")]
1340    use crate::FutureExt;
1341    #[cfg(feature = "external")]
1342    use crate::Spawner;
1343    use crate::{deterministic, reschedule, utils::run_tasks, Blob, Metrics, Runner as _, Storage};
1344    use commonware_macros::test_traced;
1345    #[cfg(not(feature = "external"))]
1346    use futures::future::pending;
1347    #[cfg(feature = "external")]
1348    use futures::{channel::mpsc, SinkExt, StreamExt};
1349    use futures::{channel::oneshot, task::noop_waker};
1350
1351    fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1352        let executor = deterministic::Runner::seeded(seed);
1353        run_tasks(5, executor)
1354    }
1355
1356    #[test]
1357    fn test_same_seed_same_order() {
1358        // Generate initial outputs
1359        let mut outputs = Vec::new();
1360        for seed in 0..1000 {
1361            let output = run_with_seed(seed);
1362            outputs.push(output);
1363        }
1364
1365        // Ensure they match
1366        for seed in 0..1000 {
1367            let output = run_with_seed(seed);
1368            assert_eq!(output, outputs[seed as usize]);
1369        }
1370    }
1371
1372    #[test_traced("TRACE")]
1373    fn test_different_seeds_different_order() {
1374        let output1 = run_with_seed(12345);
1375        let output2 = run_with_seed(54321);
1376        assert_ne!(output1, output2);
1377    }
1378
1379    #[test]
1380    fn test_alarm_min_heap() {
1381        // Populate heap
1382        let now = SystemTime::now();
1383        let alarms = vec![
1384            Alarm {
1385                time: now + Duration::new(10, 0),
1386                waker: noop_waker(),
1387            },
1388            Alarm {
1389                time: now + Duration::new(5, 0),
1390                waker: noop_waker(),
1391            },
1392            Alarm {
1393                time: now + Duration::new(15, 0),
1394                waker: noop_waker(),
1395            },
1396            Alarm {
1397                time: now + Duration::new(5, 0),
1398                waker: noop_waker(),
1399            },
1400        ];
1401        let mut heap = BinaryHeap::new();
1402        for alarm in alarms {
1403            heap.push(alarm);
1404        }
1405
1406        // Verify min-heap
1407        let mut sorted_times = Vec::new();
1408        while let Some(alarm) = heap.pop() {
1409            sorted_times.push(alarm.time);
1410        }
1411        assert_eq!(
1412            sorted_times,
1413            vec![
1414                now + Duration::new(5, 0),
1415                now + Duration::new(5, 0),
1416                now + Duration::new(10, 0),
1417                now + Duration::new(15, 0),
1418            ]
1419        );
1420    }
1421
1422    #[test]
1423    #[should_panic(expected = "runtime timeout")]
1424    fn test_timeout() {
1425        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1426        executor.start(|context| async move {
1427            loop {
1428                context.sleep(Duration::from_secs(1)).await;
1429            }
1430        });
1431    }
1432
1433    #[test]
1434    #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1435    fn test_bad_timeout() {
1436        let cfg = Config {
1437            timeout: Some(Duration::default()),
1438            cycle: Duration::default(),
1439            ..Config::default()
1440        };
1441        deterministic::Runner::new(cfg);
1442    }
1443
1444    #[test]
1445    #[should_panic(
1446        expected = "cycle duration must be greater than or equal to system time precision"
1447    )]
1448    fn test_bad_cycle() {
1449        let cfg = Config {
1450            cycle: SYSTEM_TIME_PRECISION - Duration::from_nanos(1),
1451            ..Config::default()
1452        };
1453        deterministic::Runner::new(cfg);
1454    }
1455
1456    #[test]
1457    fn test_recover_synced_storage_persists() {
1458        // Initialize the first runtime
1459        let executor1 = deterministic::Runner::default();
1460        let partition = "test_partition";
1461        let name = b"test_blob";
1462        let data = b"Hello, world!";
1463
1464        // Run some tasks, sync storage, and recover the runtime
1465        let (state, checkpoint) = executor1.start_and_recover(|context| async move {
1466            let (blob, _) = context.open(partition, name).await.unwrap();
1467            blob.write_at(Vec::from(data), 0).await.unwrap();
1468            blob.sync().await.unwrap();
1469            context.auditor().state()
1470        });
1471
1472        // Verify auditor state is the same
1473        assert_eq!(state, checkpoint.auditor.state());
1474
1475        // Check that synced storage persists after recovery
1476        let executor = Runner::from(checkpoint);
1477        executor.start(|context| async move {
1478            let (blob, len) = context.open(partition, name).await.unwrap();
1479            assert_eq!(len, data.len() as u64);
1480            let read = blob.read_at(vec![0; data.len()], 0).await.unwrap();
1481            assert_eq!(read.as_ref(), data);
1482        });
1483    }
1484
1485    #[test]
1486    #[should_panic(expected = "goodbye")]
1487    fn test_recover_panic_handling() {
1488        // Initialize the first runtime
1489        let executor1 = deterministic::Runner::default();
1490        let (_, checkpoint) = executor1.start_and_recover(|_| async move {
1491            reschedule().await;
1492        });
1493
1494        // Ensure that panic setting is preserved
1495        let executor = Runner::from(checkpoint);
1496        executor.start(|_| async move {
1497            panic!("goodbye");
1498        });
1499    }
1500
1501    #[test]
1502    fn test_recover_unsynced_storage_does_not_persist() {
1503        // Initialize the first runtime
1504        let executor = deterministic::Runner::default();
1505        let partition = "test_partition";
1506        let name = b"test_blob";
1507        let data = Vec::from("Hello, world!");
1508
1509        // Run some tasks without syncing storage
1510        let (_, checkpoint) = executor.start_and_recover(|context| async move {
1511            let context = context.clone();
1512            let (blob, _) = context.open(partition, name).await.unwrap();
1513            blob.write_at(data, 0).await.unwrap();
1514        });
1515
1516        // Recover the runtime
1517        let executor = Runner::from(checkpoint);
1518
1519        // Check that unsynced storage does not persist after recovery
1520        executor.start(|context| async move {
1521            let (_, len) = context.open(partition, name).await.unwrap();
1522            assert_eq!(len, 0);
1523        });
1524    }
1525
1526    #[test]
1527    #[should_panic(expected = "executor still has weak references")]
1528    fn test_context_return() {
1529        // Initialize runtime
1530        let executor = deterministic::Runner::default();
1531
1532        // Start runtime
1533        let context = executor.start(|context| async move {
1534            // Attempt to recover before the runtime has finished
1535            context
1536        });
1537
1538        // Should never get this far
1539        drop(context);
1540    }
1541
1542    #[test]
1543    fn test_default_time_zero() {
1544        // Initialize runtime
1545        let executor = deterministic::Runner::default();
1546
1547        executor.start(|context| async move {
1548            // Check that the time is zero
1549            assert_eq!(
1550                context.current().duration_since(UNIX_EPOCH).unwrap(),
1551                Duration::ZERO
1552            );
1553        });
1554    }
1555
1556    #[cfg(not(feature = "external"))]
1557    #[test]
1558    #[should_panic(expected = "runtime stalled")]
1559    fn test_stall() {
1560        // Initialize runtime
1561        let executor = deterministic::Runner::default();
1562
1563        // Start runtime
1564        executor.start(|_| async move {
1565            pending::<()>().await;
1566        });
1567    }
1568
1569    #[cfg(not(feature = "external"))]
1570    #[test]
1571    #[should_panic(expected = "runtime stalled")]
1572    fn test_external_simulated() {
1573        // Initialize runtime
1574        let executor = deterministic::Runner::default();
1575
1576        // Create a thread that waits for 1 second
1577        let (tx, rx) = oneshot::channel();
1578        std::thread::spawn(move || {
1579            std::thread::sleep(Duration::from_secs(1));
1580            tx.send(()).unwrap();
1581        });
1582
1583        // Start runtime
1584        executor.start(|_| async move {
1585            rx.await.unwrap();
1586        });
1587    }
1588
1589    #[cfg(feature = "external")]
1590    #[test]
1591    fn test_external_realtime() {
1592        // Initialize runtime
1593        let executor = deterministic::Runner::default();
1594
1595        // Create a thread that waits for 1 second
1596        let (tx, rx) = oneshot::channel();
1597        std::thread::spawn(move || {
1598            std::thread::sleep(Duration::from_secs(1));
1599            tx.send(()).unwrap();
1600        });
1601
1602        // Start runtime
1603        executor.start(|_| async move {
1604            rx.await.unwrap();
1605        });
1606    }
1607
1608    #[cfg(feature = "external")]
1609    #[test]
1610    fn test_external_realtime_variable() {
1611        // Initialize runtime
1612        let executor = deterministic::Runner::default();
1613
1614        // Start runtime
1615        executor.start(|context| async move {
1616            // Initialize test
1617            let start_real = SystemTime::now();
1618            let start_sim = context.current();
1619            let (first_tx, first_rx) = oneshot::channel();
1620            let (second_tx, second_rx) = oneshot::channel();
1621            let (mut results_tx, mut results_rx) = mpsc::channel(2);
1622
1623            // Create a thread that waits for 1 second
1624            let first_wait = Duration::from_secs(1);
1625            std::thread::spawn(move || {
1626                std::thread::sleep(first_wait);
1627                first_tx.send(()).unwrap();
1628            });
1629
1630            // Create a thread
1631            std::thread::spawn(move || {
1632                std::thread::sleep(Duration::ZERO);
1633                second_tx.send(()).unwrap();
1634            });
1635
1636            // Wait for a delay sampled before the external send occurs
1637            let first = context.clone().spawn({
1638                let mut results_tx = results_tx.clone();
1639                move |context| async move {
1640                    first_rx.pace(&context, Duration::ZERO).await.unwrap();
1641                    let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
1642                    assert!(elapsed_real > first_wait);
1643                    let elapsed_sim = context.current().duration_since(start_sim).unwrap();
1644                    assert!(elapsed_sim < first_wait);
1645                    results_tx.send(1).await.unwrap();
1646                }
1647            });
1648
1649            // Wait for a delay sampled after the external send occurs
1650            let second = context.clone().spawn(move |context| async move {
1651                second_rx.pace(&context, first_wait).await.unwrap();
1652                let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
1653                assert!(elapsed_real >= first_wait);
1654                let elapsed_sim = context.current().duration_since(start_sim).unwrap();
1655                assert!(elapsed_sim >= first_wait);
1656                results_tx.send(2).await.unwrap();
1657            });
1658
1659            // Wait for both tasks to complete
1660            second.await.unwrap();
1661            first.await.unwrap();
1662
1663            // Ensure order is correct
1664            let mut results = Vec::new();
1665            for _ in 0..2 {
1666                results.push(results_rx.next().await.unwrap());
1667            }
1668            assert_eq!(results, vec![1, 2]);
1669        });
1670    }
1671
1672    #[cfg(not(feature = "external"))]
1673    #[test]
1674    fn test_simulated_skip() {
1675        // Initialize runtime
1676        let executor = deterministic::Runner::default();
1677
1678        // Start runtime
1679        executor.start(|context| async move {
1680            context.sleep(Duration::from_secs(1)).await;
1681
1682            // Check if we skipped
1683            let metrics = context.encode();
1684            let iterations = metrics
1685                .lines()
1686                .find_map(|line| {
1687                    line.strip_prefix("runtime_iterations_total ")
1688                        .and_then(|value| value.trim().parse::<u64>().ok())
1689                })
1690                .expect("missing runtime_iterations_total metric");
1691            assert!(iterations < 10);
1692        });
1693    }
1694
1695    #[cfg(feature = "external")]
1696    #[test]
1697    fn test_realtime_no_skip() {
1698        // Initialize runtime
1699        let executor = deterministic::Runner::default();
1700
1701        // Start runtime
1702        executor.start(|context| async move {
1703            context.sleep(Duration::from_secs(1)).await;
1704
1705            // Check if we skipped
1706            let metrics = context.encode();
1707            let iterations = metrics
1708                .lines()
1709                .find_map(|line| {
1710                    line.strip_prefix("runtime_iterations_total ")
1711                        .and_then(|value| value.trim().parse::<u64>().ok())
1712                })
1713                .expect("missing runtime_iterations_total metric");
1714            assert!(iterations > 500);
1715        });
1716    }
1717}