commonware_runtime/
deterministic.rs

1//! A deterministic runtime that randomly selects tasks to run based on a seed
2//!
3//! # Panics
4//!
5//! If any task panics, the runtime will panic (and shutdown).
6//!
7//! # Example
8//!
9//! ```rust
10//! use commonware_runtime::{Spawner, Runner, deterministic, Metrics};
11//!
12//! let executor =  deterministic::Runner::default();
13//! executor.start(|context| async move {
14//!     println!("Parent started");
15//!     let result = context.with_label("child").spawn(|_| async move {
16//!         println!("Child started");
17//!         "hello"
18//!     });
19//!     println!("Child result: {:?}", result.await);
20//!     println!("Parent exited");
21//!     println!("Auditor state: {}", context.auditor().state());
22//! });
23//! ```
24
25use crate::{
26    network::{
27        audited::Network as AuditedNetwork, deterministic::Network as DeterministicNetwork,
28        metered::Network as MeteredNetwork,
29    },
30    storage::{
31        audited::Storage as AuditedStorage, memory::Storage as MemStorage,
32        metered::Storage as MeteredStorage,
33    },
34    telemetry::metrics::task::Label,
35    utils::signal::{Signal, Stopper},
36    Clock, Error, Handle, ListenerOf, METRICS_PREFIX,
37};
38use commonware_macros::select;
39use commonware_utils::{hex, SystemTimeExt};
40use futures::{
41    task::{waker_ref, ArcWake},
42    Future,
43};
44use governor::clock::{Clock as GClock, ReasonablyRealtime};
45use prometheus_client::{
46    encoding::text::encode,
47    metrics::{counter::Counter, family::Family, gauge::Gauge},
48    registry::{Metric, Registry},
49};
50use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
51use sha2::{Digest, Sha256};
52use std::{
53    collections::{BinaryHeap, HashMap},
54    mem::replace,
55    net::SocketAddr,
56    pin::Pin,
57    sync::{Arc, Mutex, Weak},
58    task::{self, Poll, Waker},
59    time::{Duration, SystemTime, UNIX_EPOCH},
60};
61use tracing::trace;
62
63/// Map of names to blob contents.
64pub type Partition = HashMap<Vec<u8>, Vec<u8>>;
65
66#[derive(Debug)]
67struct Metrics {
68    tasks_spawned: Family<Label, Counter>,
69    tasks_running: Family<Label, Gauge>,
70    task_polls: Family<Label, Counter>,
71
72    network_bandwidth: Counter,
73}
74
75impl Metrics {
76    pub fn init(registry: &mut Registry) -> Self {
77        let metrics = Self {
78            task_polls: Family::default(),
79            tasks_spawned: Family::default(),
80            tasks_running: Family::default(),
81            network_bandwidth: Counter::default(),
82        };
83        registry.register(
84            "tasks_spawned",
85            "Total number of tasks spawned",
86            metrics.tasks_spawned.clone(),
87        );
88        registry.register(
89            "tasks_running",
90            "Number of tasks currently running",
91            metrics.tasks_running.clone(),
92        );
93        registry.register(
94            "task_polls",
95            "Total number of task polls",
96            metrics.task_polls.clone(),
97        );
98        registry.register(
99            "bandwidth",
100            "Total amount of data sent over network",
101            metrics.network_bandwidth.clone(),
102        );
103        metrics
104    }
105}
106
107/// Track the state of the runtime for determinism auditing.
108pub struct Auditor {
109    hash: Mutex<Vec<u8>>,
110}
111
112impl Default for Auditor {
113    fn default() -> Self {
114        Self {
115            hash: Vec::new().into(),
116        }
117    }
118}
119
120impl Auditor {
121    /// Record that an event happened.
122    /// This auditor's hash will be updated with the event's `label` and
123    /// whatever other data is passed in the `payload` closure.
124    pub(crate) fn event<F>(&self, label: &'static [u8], payload: F)
125    where
126        F: FnOnce(&mut Sha256),
127    {
128        let mut hash = self.hash.lock().unwrap();
129
130        let mut hasher = Sha256::new();
131        hasher.update(&*hash);
132        hasher.update(label);
133        payload(&mut hasher);
134
135        *hash = hasher.finalize().to_vec();
136    }
137
138    /// Generate a representation of the current state of the runtime.
139    ///
140    /// This can be used to ensure that logic running on top
141    /// of the runtime is interacting deterministically.
142    pub fn state(&self) -> String {
143        let hash = self.hash.lock().unwrap().clone();
144        hex(&hash)
145    }
146}
147
148/// Configuration for the `deterministic` runtime.
149#[derive(Clone)]
150pub struct Config {
151    /// Seed for the random number generator.
152    seed: u64,
153
154    /// The cycle duration determines how much time is advanced after each iteration of the event
155    /// loop. This is useful to prevent starvation if some task never yields.
156    cycle: Duration,
157
158    /// If the runtime is still executing at this point (i.e. a test hasn't stopped), panic.
159    timeout: Option<Duration>,
160}
161
162impl Config {
163    /// Returns a new [Config] with default values.
164    pub fn new() -> Self {
165        Self {
166            seed: 42,
167            cycle: Duration::from_millis(1),
168            timeout: None,
169        }
170    }
171
172    // Setters
173    /// See [Config]
174    pub fn with_seed(mut self, seed: u64) -> Self {
175        self.seed = seed;
176        self
177    }
178    /// See [Config]
179    pub fn with_cycle(mut self, cycle: Duration) -> Self {
180        self.cycle = cycle;
181        self
182    }
183    /// See [Config]
184    pub fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
185        self.timeout = timeout;
186        self
187    }
188
189    // Getters
190    /// See [Config]
191    pub fn seed(&self) -> u64 {
192        self.seed
193    }
194    /// See [Config]
195    pub fn cycle(&self) -> Duration {
196        self.cycle
197    }
198    /// See [Config]
199    pub fn timeout(&self) -> Option<Duration> {
200        self.timeout
201    }
202
203    /// Assert that the configuration is valid.
204    pub fn assert(&self) {
205        assert!(
206            self.cycle != Duration::default() || self.timeout.is_none(),
207            "cycle duration must be non-zero when timeout is set",
208        );
209    }
210}
211
212impl Default for Config {
213    fn default() -> Self {
214        Self::new()
215    }
216}
217
218/// Deterministic runtime that randomly selects tasks to run based on a seed.
219pub struct Executor {
220    registry: Mutex<Registry>,
221    cycle: Duration,
222    deadline: Option<SystemTime>,
223    metrics: Arc<Metrics>,
224    auditor: Arc<Auditor>,
225    rng: Mutex<StdRng>,
226    time: Mutex<SystemTime>,
227    tasks: Arc<Tasks>,
228    sleeping: Mutex<BinaryHeap<Alarm>>,
229    partitions: Mutex<HashMap<String, Partition>>,
230    shutdown: Mutex<Stopper>,
231    finished: Mutex<bool>,
232    recovered: Mutex<bool>,
233}
234
235enum State {
236    Config(Config),
237    Context(Context),
238}
239
240/// Implementation of [crate::Runner] for the `deterministic` runtime.
241pub struct Runner {
242    state: State,
243}
244
245impl From<Config> for Runner {
246    fn from(cfg: Config) -> Self {
247        Self::new(cfg)
248    }
249}
250
251impl From<Context> for Runner {
252    fn from(context: Context) -> Self {
253        Self {
254            state: State::Context(context),
255        }
256    }
257}
258
259impl Runner {
260    /// Initialize a new `deterministic` runtime with the given seed and cycle duration.
261    pub fn new(cfg: Config) -> Self {
262        // Ensure config is valid
263        cfg.assert();
264        Runner {
265            state: State::Config(cfg),
266        }
267    }
268
269    /// Initialize a new `deterministic` runtime with the default configuration
270    /// and the provided seed.
271    pub fn seeded(seed: u64) -> Self {
272        let cfg = Config {
273            seed,
274            ..Config::default()
275        };
276        Self::new(cfg)
277    }
278
279    /// Initialize a new `deterministic` runtime with the default configuration
280    /// but exit after the given timeout.
281    pub fn timed(timeout: Duration) -> Self {
282        let cfg = Config {
283            timeout: Some(timeout),
284            ..Config::default()
285        };
286        Self::new(cfg)
287    }
288}
289
290impl Default for Runner {
291    fn default() -> Self {
292        Self::new(Config::default())
293    }
294}
295
296impl crate::Runner for Runner {
297    type Context = Context;
298
299    fn start<F, Fut>(self, f: F) -> Fut::Output
300    where
301        F: FnOnce(Self::Context) -> Fut,
302        Fut: Future,
303    {
304        // Setup context (depending on how the runtime was initialized)
305        let context = match self.state {
306            State::Config(config) => Context::new(config),
307            State::Context(context) => context,
308        };
309
310        // Pin root task to the heap
311        let executor = context.executor.clone();
312        let mut root = Box::pin(f(context));
313
314        // Register the root task
315        Tasks::register_root(&executor.tasks);
316
317        // Process tasks until root task completes or progress stalls
318        let mut iter = 0;
319        loop {
320            // Ensure we have not exceeded our deadline
321            {
322                let current = executor.time.lock().unwrap();
323                if let Some(deadline) = executor.deadline {
324                    if *current >= deadline {
325                        panic!("runtime timeout");
326                    }
327                }
328            }
329
330            // Snapshot available tasks
331            let mut tasks = executor.tasks.drain();
332
333            // Shuffle tasks
334            {
335                let mut rng = executor.rng.lock().unwrap();
336                tasks.shuffle(&mut *rng);
337            }
338
339            // Run all snapshotted tasks
340            //
341            // This approach is more efficient than randomly selecting a task one-at-a-time
342            // because it ensures we don't pull the same pending task multiple times in a row (without
343            // processing a different task required for other tasks to make progress).
344            trace!(iter, tasks = tasks.len(), "starting loop");
345            for task in tasks {
346                // Record task for auditing
347                executor.auditor.event(b"process_task", |hasher| {
348                    hasher.update(task.id.to_be_bytes());
349                    hasher.update(task.label.name().as_bytes());
350                });
351                trace!(id = task.id, "processing task");
352
353                // Record task poll
354                executor.metrics.task_polls.get_or_create(&task.label).inc();
355
356                // Prepare task for polling
357                let waker = waker_ref(&task);
358                let mut cx = task::Context::from_waker(&waker);
359                match &task.operation {
360                    Operation::Root => {
361                        // Poll the root task
362                        if let Poll::Ready(output) = root.as_mut().poll(&mut cx) {
363                            trace!(id = task.id, "task is complete");
364                            *executor.finished.lock().unwrap() = true;
365                            return output;
366                        }
367                    }
368                    Operation::Work { future, completed } => {
369                        // If task is completed, skip it
370                        if *completed.lock().unwrap() {
371                            trace!(id = task.id, "dropping already complete task");
372                            continue;
373                        }
374
375                        // Poll the task
376                        let mut fut = future.lock().unwrap();
377                        if fut.as_mut().poll(&mut cx).is_ready() {
378                            trace!(id = task.id, "task is complete");
379                            *completed.lock().unwrap() = true;
380                            continue;
381                        }
382                    }
383                }
384
385                // Try again later if task is still pending
386                trace!(id = task.id, "task is still pending");
387            }
388
389            // Advance time by cycle
390            //
391            // This approach prevents starvation if some task never yields (to approximate this,
392            // duration can be set to 1ns).
393            let mut current;
394            {
395                let mut time = executor.time.lock().unwrap();
396                *time = time
397                    .checked_add(executor.cycle)
398                    .expect("executor time overflowed");
399                current = *time;
400            }
401            trace!(now = current.epoch_millis(), "time advanced");
402
403            // Skip time if there is nothing to do
404            if executor.tasks.len() == 0 {
405                let mut skip = None;
406                {
407                    let sleeping = executor.sleeping.lock().unwrap();
408                    if let Some(next) = sleeping.peek() {
409                        if next.time > current {
410                            skip = Some(next.time);
411                        }
412                    }
413                }
414                if let Some(skip_time) = skip {
415                    {
416                        let mut time = executor.time.lock().unwrap();
417                        *time = skip_time;
418                        current = *time;
419                    }
420                    trace!(now = current.epoch_millis(), "time skipped");
421                }
422            }
423
424            // Wake all sleeping tasks that are ready
425            let mut to_wake = Vec::new();
426            let mut remaining;
427            {
428                let mut sleeping = executor.sleeping.lock().unwrap();
429                while let Some(next) = sleeping.peek() {
430                    if next.time <= current {
431                        let sleeper = sleeping.pop().unwrap();
432                        to_wake.push(sleeper.waker);
433                    } else {
434                        break;
435                    }
436                }
437                remaining = sleeping.len();
438            }
439            for waker in to_wake {
440                waker.wake();
441            }
442
443            // Account for remaining tasks
444            remaining += executor.tasks.len();
445
446            // If there are no tasks to run and no tasks sleeping, the executor is stalled
447            // and will never finish.
448            if remaining == 0 {
449                panic!("runtime stalled");
450            }
451            iter += 1;
452        }
453    }
454}
455
456/// The operation that a task is performing.
457enum Operation {
458    Root,
459    Work {
460        future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
461        completed: Mutex<bool>,
462    },
463}
464
465/// A task that is being executed by the runtime.
466struct Task {
467    id: u128,
468    label: Label,
469    tasks: Weak<Tasks>,
470
471    operation: Operation,
472}
473
474impl ArcWake for Task {
475    fn wake_by_ref(arc_self: &Arc<Self>) {
476        // Upgrade the weak reference to re-enqueue this task.
477        // If upgrade fails, the task queue has been dropped and no action is required.
478        if let Some(tasks) = arc_self.tasks.upgrade() {
479            tasks.enqueue(arc_self.clone());
480        }
481    }
482}
483
484/// A task queue that is used to manage the tasks that are being executed by the runtime.
485struct Tasks {
486    /// The current task counter.
487    counter: Mutex<u128>,
488    /// The queue of tasks that are waiting to be executed.
489    queue: Mutex<Vec<Arc<Task>>>,
490    /// Indicates whether the root task has been registered.
491    root_registered: Mutex<bool>,
492}
493
494impl Tasks {
495    /// Create a new task queue.
496    fn new() -> Self {
497        Self {
498            counter: Mutex::new(0),
499            queue: Mutex::new(Vec::new()),
500            root_registered: Mutex::new(false),
501        }
502    }
503
504    /// Increment the task counter and return the old value.
505    fn increment(&self) -> u128 {
506        let mut counter = self.counter.lock().unwrap();
507        let old = *counter;
508        *counter = counter.checked_add(1).expect("task counter overflow");
509        old
510    }
511
512    /// Register the root task.
513    ///
514    /// If the root task has already been registered, this function will panic.
515    fn register_root(arc_self: &Arc<Self>) {
516        {
517            let mut registered = arc_self.root_registered.lock().unwrap();
518            assert!(!*registered, "root already registered");
519            *registered = true;
520        }
521        let id = arc_self.increment();
522        let mut queue = arc_self.queue.lock().unwrap();
523        queue.push(Arc::new(Task {
524            id,
525            label: Label::root(),
526            tasks: Arc::downgrade(arc_self),
527            operation: Operation::Root,
528        }));
529    }
530
531    /// Register a new task to be executed.
532    fn register_work(
533        arc_self: &Arc<Self>,
534        label: Label,
535        future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
536    ) {
537        let id = arc_self.increment();
538        let mut queue = arc_self.queue.lock().unwrap();
539        queue.push(Arc::new(Task {
540            id,
541            label,
542            tasks: Arc::downgrade(arc_self),
543            operation: Operation::Work {
544                future: Mutex::new(future),
545                completed: Mutex::new(false),
546            },
547        }));
548    }
549
550    /// Enqueue an already registered task to be executed.
551    fn enqueue(&self, task: Arc<Task>) {
552        let mut queue = self.queue.lock().unwrap();
553        queue.push(task);
554    }
555
556    /// Dequeue all tasks that are ready to execute.
557    fn drain(&self) -> Vec<Arc<Task>> {
558        let mut queue = self.queue.lock().unwrap();
559        let len = queue.len();
560        replace(&mut *queue, Vec::with_capacity(len))
561    }
562
563    /// Get the number of tasks in the queue.
564    fn len(&self) -> usize {
565        self.queue.lock().unwrap().len()
566    }
567}
568
569type Network = MeteredNetwork<AuditedNetwork<DeterministicNetwork>>;
570
571/// Implementation of [crate::Spawner], [crate::Clock],
572/// [crate::Network], and [crate::Storage] for the `deterministic`
573/// runtime.
574pub struct Context {
575    name: String,
576    spawned: bool,
577    executor: Arc<Executor>,
578    network: Arc<Network>,
579    storage: MeteredStorage<AuditedStorage<MemStorage>>,
580}
581
582impl Default for Context {
583    fn default() -> Self {
584        Self::new(Config::default())
585    }
586}
587
588impl Context {
589    pub fn new(cfg: Config) -> Self {
590        // Create a new registry
591        let mut registry = Registry::default();
592        let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
593
594        // Initialize runtime
595        let metrics = Arc::new(Metrics::init(runtime_registry));
596        let start_time = UNIX_EPOCH;
597        let deadline = cfg
598            .timeout
599            .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
600        let auditor = Arc::new(Auditor::default());
601        let storage = MeteredStorage::new(
602            AuditedStorage::new(MemStorage::default(), auditor.clone()),
603            runtime_registry,
604        );
605        let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
606        let network = MeteredNetwork::new(network, runtime_registry);
607
608        let executor = Arc::new(Executor {
609            registry: Mutex::new(registry),
610            cycle: cfg.cycle,
611            deadline,
612            metrics: metrics.clone(),
613            auditor: auditor.clone(),
614            rng: Mutex::new(StdRng::seed_from_u64(cfg.seed)),
615            time: Mutex::new(start_time),
616            tasks: Arc::new(Tasks::new()),
617            sleeping: Mutex::new(BinaryHeap::new()),
618            partitions: Mutex::new(HashMap::new()),
619            shutdown: Mutex::new(Stopper::default()),
620            finished: Mutex::new(false),
621            recovered: Mutex::new(false),
622        });
623
624        Context {
625            name: String::new(),
626            spawned: false,
627            executor: executor.clone(),
628            network: Arc::new(network),
629            storage,
630        }
631    }
632
633    /// Recover the inner state (deadline, metrics, auditor, rng, synced storage, etc.) from the
634    /// current runtime and use it to initialize a new instance of the runtime. A recovered runtime
635    /// does not inherit the current runtime's pending tasks, unsynced storage, network connections, nor
636    /// its shutdown signaler.
637    ///
638    /// This is useful for performing a deterministic simulation that spans multiple runtime instantiations,
639    /// like simulating unclean shutdown (which involves repeatedly halting the runtime at unexpected intervals).
640    ///
641    /// It is only permitted to call this method after the runtime has finished (i.e. once `start` returns)
642    /// and only permitted to do once (otherwise multiple recovered runtimes will share the same inner state).
643    /// If either one of these conditions is violated, this method will panic.
644    pub fn recover(self) -> Self {
645        // Ensure we are finished
646        if !*self.executor.finished.lock().unwrap() {
647            panic!("execution is not finished");
648        }
649
650        // Ensure runtime has not already been recovered
651        {
652            let mut recovered = self.executor.recovered.lock().unwrap();
653            if *recovered {
654                panic!("runtime has already been recovered");
655            }
656            *recovered = true;
657        }
658
659        // Rebuild metrics
660        let mut registry = Registry::default();
661        let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
662        let metrics = Arc::new(Metrics::init(runtime_registry));
663
664        // Copy state
665        let auditor = self.executor.auditor.clone();
666        let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
667        let network = MeteredNetwork::new(network, runtime_registry);
668
669        let executor = Arc::new(Executor {
670            // Copied from the current runtime
671            cycle: self.executor.cycle,
672            deadline: self.executor.deadline,
673            auditor: auditor.clone(),
674            rng: Mutex::new(self.executor.rng.lock().unwrap().clone()),
675            time: Mutex::new(*self.executor.time.lock().unwrap()),
676            partitions: Mutex::new(self.executor.partitions.lock().unwrap().clone()),
677
678            // New state for the new runtime
679            registry: Mutex::new(registry),
680            metrics: metrics.clone(),
681            tasks: Arc::new(Tasks::new()),
682            sleeping: Mutex::new(BinaryHeap::new()),
683            shutdown: Mutex::new(Stopper::default()),
684            finished: Mutex::new(false),
685            recovered: Mutex::new(false),
686        });
687        Self {
688            name: String::new(),
689            spawned: false,
690            executor,
691            network: Arc::new(network),
692            storage: self.storage,
693        }
694    }
695
696    pub fn auditor(&self) -> &Auditor {
697        &self.executor.auditor
698    }
699}
700
701impl Clone for Context {
702    fn clone(&self) -> Self {
703        Self {
704            name: self.name.clone(),
705            spawned: false,
706            executor: self.executor.clone(),
707            network: self.network.clone(),
708            storage: self.storage.clone(),
709        }
710    }
711}
712
713impl crate::Spawner for Context {
714    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
715    where
716        F: FnOnce(Self) -> Fut + Send + 'static,
717        Fut: Future<Output = T> + Send + 'static,
718        T: Send + 'static,
719    {
720        // Ensure a context only spawns one task
721        assert!(!self.spawned, "already spawned");
722
723        // Get metrics
724        let (label, gauge) = spawn_metrics!(self, future);
725
726        // Set up the task
727        let executor = self.executor.clone();
728        let future = f(self);
729        let (f, handle) = Handle::init_future(future, gauge, false);
730
731        // Spawn the task
732        Tasks::register_work(&executor.tasks, label, Box::pin(f));
733        handle
734    }
735
736    fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
737    where
738        F: Future<Output = T> + Send + 'static,
739        T: Send + 'static,
740    {
741        // Ensure a context only spawns one task
742        assert!(!self.spawned, "already spawned");
743        self.spawned = true;
744
745        // Get metrics
746        let (label, gauge) = spawn_metrics!(self, future);
747
748        // Set up the task
749        let executor = self.executor.clone();
750        move |f: F| {
751            let (f, handle) = Handle::init_future(f, gauge, false);
752
753            // Spawn the task
754            Tasks::register_work(&executor.tasks, label, Box::pin(f));
755            handle
756        }
757    }
758
759    fn spawn_blocking<F, T>(self, dedicated: bool, f: F) -> Handle<T>
760    where
761        F: FnOnce(Self) -> T + Send + 'static,
762        T: Send + 'static,
763    {
764        // Ensure a context only spawns one task
765        assert!(!self.spawned, "already spawned");
766
767        // Get metrics
768        let (label, gauge) = spawn_metrics!(self, blocking, dedicated);
769
770        // Initialize the blocking task
771        let executor = self.executor.clone();
772        let (f, handle) = Handle::init_blocking(|| f(self), gauge, false);
773
774        // Spawn the task
775        let f = async move { f() };
776        Tasks::register_work(&executor.tasks, label, Box::pin(f));
777        handle
778    }
779
780    fn spawn_blocking_ref<F, T>(&mut self, dedicated: bool) -> impl FnOnce(F) -> Handle<T> + 'static
781    where
782        F: FnOnce() -> T + Send + 'static,
783        T: Send + 'static,
784    {
785        // Ensure a context only spawns one task
786        assert!(!self.spawned, "already spawned");
787        self.spawned = true;
788
789        // Get metrics
790        let (label, gauge) = spawn_metrics!(self, blocking, dedicated);
791
792        // Set up the task
793        let executor = self.executor.clone();
794        move |f: F| {
795            let (f, handle) = Handle::init_blocking(f, gauge, false);
796
797            // Spawn the task
798            let f = async move { f() };
799            Tasks::register_work(&executor.tasks, label, Box::pin(f));
800            handle
801        }
802    }
803
804    async fn stop(self, value: i32, timeout: Option<Duration>) -> Result<(), Error> {
805        self.executor.auditor.event(b"stop", |hasher| {
806            hasher.update(value.to_be_bytes());
807        });
808        let stop_resolved = {
809            let mut shutdown = self.executor.shutdown.lock().unwrap();
810            shutdown.stop(value)
811        };
812
813        // Wait for all tasks to complete or the timeout to fire
814        let timeout_future = match timeout {
815            Some(duration) => futures::future::Either::Left(self.sleep(duration)),
816            None => futures::future::Either::Right(futures::future::pending()),
817        };
818        select! {
819            result = stop_resolved => {
820                result.map_err(|_| Error::Closed)?;
821                Ok(())
822            },
823            _ = timeout_future => {
824                Err(Error::Timeout)
825            }
826        }
827    }
828
829    fn stopped(&self) -> Signal {
830        self.executor.auditor.event(b"stopped", |_| {});
831        self.executor.shutdown.lock().unwrap().stopped()
832    }
833}
834
835impl crate::Metrics for Context {
836    fn with_label(&self, label: &str) -> Self {
837        let name = {
838            let prefix = self.name.clone();
839            if prefix.is_empty() {
840                label.to_string()
841            } else {
842                format!("{prefix}_{label}")
843            }
844        };
845        assert!(
846            !name.starts_with(METRICS_PREFIX),
847            "using runtime label is not allowed"
848        );
849        Self {
850            name,
851            spawned: false,
852            executor: self.executor.clone(),
853            network: self.network.clone(),
854            storage: self.storage.clone(),
855        }
856    }
857
858    fn label(&self) -> String {
859        self.name.clone()
860    }
861
862    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
863        // Prepare args
864        let name = name.into();
865        let help = help.into();
866
867        // Register metric
868        self.executor.auditor.event(b"register", |hasher| {
869            hasher.update(name.as_bytes());
870            hasher.update(help.as_bytes());
871        });
872        let prefixed_name = {
873            let prefix = &self.name;
874            if prefix.is_empty() {
875                name
876            } else {
877                format!("{}_{}", *prefix, name)
878            }
879        };
880        self.executor
881            .registry
882            .lock()
883            .unwrap()
884            .register(prefixed_name, help, metric)
885    }
886
887    fn encode(&self) -> String {
888        self.executor.auditor.event(b"encode", |_| {});
889        let mut buffer = String::new();
890        encode(&mut buffer, &self.executor.registry.lock().unwrap()).expect("encoding failed");
891        buffer
892    }
893}
894
895struct Sleeper {
896    executor: Arc<Executor>,
897    time: SystemTime,
898    registered: bool,
899}
900
901struct Alarm {
902    time: SystemTime,
903    waker: Waker,
904}
905
906impl PartialEq for Alarm {
907    fn eq(&self, other: &Self) -> bool {
908        self.time.eq(&other.time)
909    }
910}
911
912impl Eq for Alarm {}
913
914impl PartialOrd for Alarm {
915    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
916        Some(self.cmp(other))
917    }
918}
919
920impl Ord for Alarm {
921    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
922        // Reverse the ordering for min-heap
923        other.time.cmp(&self.time)
924    }
925}
926
927impl Future for Sleeper {
928    type Output = ();
929
930    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
931        {
932            let current_time = *self.executor.time.lock().unwrap();
933            if current_time >= self.time {
934                return Poll::Ready(());
935            }
936        }
937        if !self.registered {
938            self.registered = true;
939            self.executor.sleeping.lock().unwrap().push(Alarm {
940                time: self.time,
941                waker: cx.waker().clone(),
942            });
943        }
944        Poll::Pending
945    }
946}
947
948impl Clock for Context {
949    fn current(&self) -> SystemTime {
950        *self.executor.time.lock().unwrap()
951    }
952
953    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
954        let deadline = self
955            .current()
956            .checked_add(duration)
957            .expect("overflow when setting wake time");
958        self.sleep_until(deadline)
959    }
960
961    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
962        Sleeper {
963            executor: self.executor.clone(),
964
965            time: deadline,
966            registered: false,
967        }
968    }
969}
970
971impl GClock for Context {
972    type Instant = SystemTime;
973
974    fn now(&self) -> Self::Instant {
975        self.current()
976    }
977}
978
979impl ReasonablyRealtime for Context {}
980
981impl crate::Network for Context {
982    type Listener = ListenerOf<Network>;
983
984    async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
985        self.network.bind(socket).await
986    }
987
988    async fn dial(
989        &self,
990        socket: SocketAddr,
991    ) -> Result<(crate::SinkOf<Self>, crate::StreamOf<Self>), Error> {
992        self.network.dial(socket).await
993    }
994}
995
996impl RngCore for Context {
997    fn next_u32(&mut self) -> u32 {
998        self.executor.auditor.event(b"rand", |hasher| {
999            hasher.update(b"next_u32");
1000        });
1001        self.executor.rng.lock().unwrap().next_u32()
1002    }
1003
1004    fn next_u64(&mut self) -> u64 {
1005        self.executor.auditor.event(b"rand", |hasher| {
1006            hasher.update(b"next_u64");
1007        });
1008        self.executor.rng.lock().unwrap().next_u64()
1009    }
1010
1011    fn fill_bytes(&mut self, dest: &mut [u8]) {
1012        self.executor.auditor.event(b"rand", |hasher| {
1013            hasher.update(b"fill_bytes");
1014        });
1015        self.executor.rng.lock().unwrap().fill_bytes(dest)
1016    }
1017
1018    fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1019        self.executor.auditor.event(b"rand", |hasher| {
1020            hasher.update(b"try_fill_bytes");
1021        });
1022        self.executor.rng.lock().unwrap().try_fill_bytes(dest)
1023    }
1024}
1025
1026impl CryptoRng for Context {}
1027
1028impl crate::Storage for Context {
1029    type Blob = <MeteredStorage<AuditedStorage<MemStorage>> as crate::Storage>::Blob;
1030
1031    async fn open(&self, partition: &str, name: &[u8]) -> Result<(Self::Blob, u64), Error> {
1032        self.storage.open(partition, name).await
1033    }
1034
1035    async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1036        self.storage.remove(partition, name).await
1037    }
1038
1039    async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1040        self.storage.scan(partition).await
1041    }
1042}
1043
1044#[cfg(test)]
1045mod tests {
1046    use super::*;
1047    use crate::{deterministic, utils::run_tasks, Blob, Runner as _, Storage};
1048    use commonware_macros::test_traced;
1049    use futures::task::noop_waker;
1050
1051    fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1052        let executor = deterministic::Runner::seeded(seed);
1053        run_tasks(5, executor)
1054    }
1055
1056    #[test]
1057    fn test_same_seed_same_order() {
1058        // Generate initial outputs
1059        let mut outputs = Vec::new();
1060        for seed in 0..1000 {
1061            let output = run_with_seed(seed);
1062            outputs.push(output);
1063        }
1064
1065        // Ensure they match
1066        for seed in 0..1000 {
1067            let output = run_with_seed(seed);
1068            assert_eq!(output, outputs[seed as usize]);
1069        }
1070    }
1071
1072    #[test_traced("TRACE")]
1073    fn test_different_seeds_different_order() {
1074        let output1 = run_with_seed(12345);
1075        let output2 = run_with_seed(54321);
1076        assert_ne!(output1, output2);
1077    }
1078
1079    #[test]
1080    fn test_alarm_min_heap() {
1081        // Populate heap
1082        let now = SystemTime::now();
1083        let alarms = vec![
1084            Alarm {
1085                time: now + Duration::new(10, 0),
1086                waker: noop_waker(),
1087            },
1088            Alarm {
1089                time: now + Duration::new(5, 0),
1090                waker: noop_waker(),
1091            },
1092            Alarm {
1093                time: now + Duration::new(15, 0),
1094                waker: noop_waker(),
1095            },
1096            Alarm {
1097                time: now + Duration::new(5, 0),
1098                waker: noop_waker(),
1099            },
1100        ];
1101        let mut heap = BinaryHeap::new();
1102        for alarm in alarms {
1103            heap.push(alarm);
1104        }
1105
1106        // Verify min-heap
1107        let mut sorted_times = Vec::new();
1108        while let Some(alarm) = heap.pop() {
1109            sorted_times.push(alarm.time);
1110        }
1111        assert_eq!(
1112            sorted_times,
1113            vec![
1114                now + Duration::new(5, 0),
1115                now + Duration::new(5, 0),
1116                now + Duration::new(10, 0),
1117                now + Duration::new(15, 0),
1118            ]
1119        );
1120    }
1121
1122    #[test]
1123    #[should_panic(expected = "runtime timeout")]
1124    fn test_timeout() {
1125        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1126        executor.start(|context| async move {
1127            loop {
1128                context.sleep(Duration::from_secs(1)).await;
1129            }
1130        });
1131    }
1132
1133    #[test]
1134    #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1135    fn test_bad_timeout() {
1136        let cfg = Config {
1137            timeout: Some(Duration::default()),
1138            cycle: Duration::default(),
1139            ..Config::default()
1140        };
1141        deterministic::Runner::new(cfg);
1142    }
1143
1144    #[test]
1145    fn test_recover_synced_storage_persists() {
1146        // Initialize the first runtime
1147        let executor1 = deterministic::Runner::default();
1148        let partition = "test_partition";
1149        let name = b"test_blob";
1150        let data = b"Hello, world!";
1151
1152        // Run some tasks, sync storage, and recover the runtime
1153        let (context, state) = executor1.start(|context| async move {
1154            let (blob, _) = context.open(partition, name).await.unwrap();
1155            blob.write_at(Vec::from(data), 0).await.unwrap();
1156            blob.sync().await.unwrap();
1157            let state = context.auditor().state();
1158            (context, state)
1159        });
1160        let recovered_context = context.recover();
1161
1162        // Verify auditor state is the same
1163        assert_eq!(state, recovered_context.auditor().state());
1164
1165        // Check that synced storage persists after recovery
1166        let executor = Runner::from(recovered_context);
1167        executor.start(|context| async move {
1168            let (blob, len) = context.open(partition, name).await.unwrap();
1169            assert_eq!(len, data.len() as u64);
1170            let read = blob.read_at(vec![0; data.len()], 0).await.unwrap();
1171            assert_eq!(read.as_ref(), data);
1172        });
1173    }
1174
1175    #[test]
1176    fn test_recover_unsynced_storage_does_not_persist() {
1177        // Initialize the first runtime
1178        let executor = deterministic::Runner::default();
1179        let partition = "test_partition";
1180        let name = b"test_blob";
1181        let data = Vec::from("Hello, world!");
1182
1183        // Run some tasks without syncing storage
1184        let context = executor.start(|context| async move {
1185            let context = context.clone();
1186            let (blob, _) = context.open(partition, name).await.unwrap();
1187            blob.write_at(data, 0).await.unwrap();
1188            // Intentionally do not call sync() here
1189            context
1190        });
1191
1192        // Recover the runtime
1193        let context = context.recover();
1194        let executor = Runner::from(context);
1195
1196        // Check that unsynced storage does not persist after recovery
1197        executor.start(|context| async move {
1198            let (_, len) = context.open(partition, name).await.unwrap();
1199            assert_eq!(len, 0);
1200        });
1201    }
1202
1203    #[test]
1204    #[should_panic(expected = "execution is not finished")]
1205    fn test_recover_before_finish_panics() {
1206        // Initialize runtime
1207        let executor = deterministic::Runner::default();
1208
1209        // Start runtime
1210        executor.start(|context| async move {
1211            // Attempt to recover before the runtime has finished
1212            context.recover();
1213        });
1214    }
1215
1216    #[test]
1217    #[should_panic(expected = "runtime has already been recovered")]
1218    fn test_recover_twice_panics() {
1219        // Initialize runtime
1220        let executor = deterministic::Runner::default();
1221
1222        // Finish runtime
1223        let context = executor.start(|context| async move { context });
1224
1225        // Recover for the first time
1226        let cloned_context = context.clone();
1227        context.recover();
1228
1229        // Attempt to recover again using the same context
1230        cloned_context.recover();
1231    }
1232
1233    #[test]
1234    fn test_default_time_zero() {
1235        // Initialize runtime
1236        let executor = deterministic::Runner::default();
1237
1238        executor.start(|context| async move {
1239            // Check that the time is zero
1240            assert_eq!(
1241                context.current().duration_since(UNIX_EPOCH).unwrap(),
1242                Duration::ZERO
1243            );
1244        });
1245    }
1246}