commonware_runtime/
deterministic.rs

1//! A deterministic runtime that randomly selects tasks to run based on a seed
2//!
3//! # Panics
4//!
5//! If any task panics, the runtime will panic (and shutdown).
6//!
7//! # Example
8//!
9//! ```rust
10//! use commonware_runtime::{Spawner, Runner, deterministic, Metrics};
11//!
12//! let executor =  deterministic::Runner::default();
13//! executor.start(|context| async move {
14//!     println!("Parent started");
15//!     let result = context.with_label("child").spawn(|_| async move {
16//!         println!("Child started");
17//!         "hello"
18//!     });
19//!     println!("Child result: {:?}", result.await);
20//!     println!("Parent exited");
21//!     println!("Auditor state: {}", context.auditor().state());
22//! });
23//! ```
24
25use crate::{
26    network::{
27        audited::Network as AuditedNetwork, deterministic::Network as DeterministicNetwork,
28        metered::Network as MeteredNetwork,
29    },
30    storage::{
31        audited::Storage as AuditedStorage, memory::Storage as MemStorage,
32        metered::Storage as MeteredStorage,
33    },
34    telemetry::metrics::task::Label,
35    utils::signal::{Signal, Stopper},
36    Clock, Error, Handle, ListenerOf, METRICS_PREFIX,
37};
38use commonware_macros::select;
39use commonware_utils::{hex, SystemTimeExt};
40use futures::{
41    future::AbortHandle,
42    task::{waker_ref, ArcWake},
43    Future,
44};
45use governor::clock::{Clock as GClock, ReasonablyRealtime};
46use prometheus_client::{
47    encoding::text::encode,
48    metrics::{counter::Counter, family::Family, gauge::Gauge},
49    registry::{Metric, Registry},
50};
51use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
52use sha2::{Digest, Sha256};
53use std::{
54    collections::{BinaryHeap, HashMap},
55    mem::replace,
56    net::SocketAddr,
57    pin::Pin,
58    sync::{Arc, Mutex, Weak},
59    task::{self, Poll, Waker},
60    time::{Duration, SystemTime, UNIX_EPOCH},
61};
62use tracing::trace;
63
64/// Map of names to blob contents.
65pub type Partition = HashMap<Vec<u8>, Vec<u8>>;
66
67#[derive(Debug)]
68struct Metrics {
69    tasks_spawned: Family<Label, Counter>,
70    tasks_running: Family<Label, Gauge>,
71    task_polls: Family<Label, Counter>,
72
73    network_bandwidth: Counter,
74}
75
76impl Metrics {
77    pub fn init(registry: &mut Registry) -> Self {
78        let metrics = Self {
79            task_polls: Family::default(),
80            tasks_spawned: Family::default(),
81            tasks_running: Family::default(),
82            network_bandwidth: Counter::default(),
83        };
84        registry.register(
85            "tasks_spawned",
86            "Total number of tasks spawned",
87            metrics.tasks_spawned.clone(),
88        );
89        registry.register(
90            "tasks_running",
91            "Number of tasks currently running",
92            metrics.tasks_running.clone(),
93        );
94        registry.register(
95            "task_polls",
96            "Total number of task polls",
97            metrics.task_polls.clone(),
98        );
99        registry.register(
100            "bandwidth",
101            "Total amount of data sent over network",
102            metrics.network_bandwidth.clone(),
103        );
104        metrics
105    }
106}
107
108/// Track the state of the runtime for determinism auditing.
109pub struct Auditor {
110    hash: Mutex<Vec<u8>>,
111}
112
113impl Default for Auditor {
114    fn default() -> Self {
115        Self {
116            hash: Vec::new().into(),
117        }
118    }
119}
120
121impl Auditor {
122    /// Record that an event happened.
123    /// This auditor's hash will be updated with the event's `label` and
124    /// whatever other data is passed in the `payload` closure.
125    pub(crate) fn event<F>(&self, label: &'static [u8], payload: F)
126    where
127        F: FnOnce(&mut Sha256),
128    {
129        let mut hash = self.hash.lock().unwrap();
130
131        let mut hasher = Sha256::new();
132        hasher.update(&*hash);
133        hasher.update(label);
134        payload(&mut hasher);
135
136        *hash = hasher.finalize().to_vec();
137    }
138
139    /// Generate a representation of the current state of the runtime.
140    ///
141    /// This can be used to ensure that logic running on top
142    /// of the runtime is interacting deterministically.
143    pub fn state(&self) -> String {
144        let hash = self.hash.lock().unwrap().clone();
145        hex(&hash)
146    }
147}
148
149/// Configuration for the `deterministic` runtime.
150#[derive(Clone)]
151pub struct Config {
152    /// Seed for the random number generator.
153    seed: u64,
154
155    /// The cycle duration determines how much time is advanced after each iteration of the event
156    /// loop. This is useful to prevent starvation if some task never yields.
157    cycle: Duration,
158
159    /// If the runtime is still executing at this point (i.e. a test hasn't stopped), panic.
160    timeout: Option<Duration>,
161}
162
163impl Config {
164    /// Returns a new [Config] with default values.
165    pub fn new() -> Self {
166        Self {
167            seed: 42,
168            cycle: Duration::from_millis(1),
169            timeout: None,
170        }
171    }
172
173    // Setters
174    /// See [Config]
175    pub fn with_seed(mut self, seed: u64) -> Self {
176        self.seed = seed;
177        self
178    }
179    /// See [Config]
180    pub fn with_cycle(mut self, cycle: Duration) -> Self {
181        self.cycle = cycle;
182        self
183    }
184    /// See [Config]
185    pub fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
186        self.timeout = timeout;
187        self
188    }
189
190    // Getters
191    /// See [Config]
192    pub fn seed(&self) -> u64 {
193        self.seed
194    }
195    /// See [Config]
196    pub fn cycle(&self) -> Duration {
197        self.cycle
198    }
199    /// See [Config]
200    pub fn timeout(&self) -> Option<Duration> {
201        self.timeout
202    }
203
204    /// Assert that the configuration is valid.
205    pub fn assert(&self) {
206        assert!(
207            self.cycle != Duration::default() || self.timeout.is_none(),
208            "cycle duration must be non-zero when timeout is set",
209        );
210    }
211}
212
213impl Default for Config {
214    fn default() -> Self {
215        Self::new()
216    }
217}
218
219/// Deterministic runtime that randomly selects tasks to run based on a seed.
220pub struct Executor {
221    registry: Mutex<Registry>,
222    cycle: Duration,
223    deadline: Option<SystemTime>,
224    metrics: Arc<Metrics>,
225    auditor: Arc<Auditor>,
226    rng: Mutex<StdRng>,
227    time: Mutex<SystemTime>,
228    tasks: Arc<Tasks>,
229    sleeping: Mutex<BinaryHeap<Alarm>>,
230    partitions: Mutex<HashMap<String, Partition>>,
231    shutdown: Mutex<Stopper>,
232    finished: Mutex<bool>,
233    recovered: Mutex<bool>,
234}
235
236enum State {
237    Config(Config),
238    Context(Context),
239}
240
241/// Implementation of [crate::Runner] for the `deterministic` runtime.
242pub struct Runner {
243    state: State,
244}
245
246impl From<Config> for Runner {
247    fn from(cfg: Config) -> Self {
248        Self::new(cfg)
249    }
250}
251
252impl From<Context> for Runner {
253    fn from(context: Context) -> Self {
254        Self {
255            state: State::Context(context),
256        }
257    }
258}
259
260impl Runner {
261    /// Initialize a new `deterministic` runtime with the given seed and cycle duration.
262    pub fn new(cfg: Config) -> Self {
263        // Ensure config is valid
264        cfg.assert();
265        Runner {
266            state: State::Config(cfg),
267        }
268    }
269
270    /// Initialize a new `deterministic` runtime with the default configuration
271    /// and the provided seed.
272    pub fn seeded(seed: u64) -> Self {
273        let cfg = Config {
274            seed,
275            ..Config::default()
276        };
277        Self::new(cfg)
278    }
279
280    /// Initialize a new `deterministic` runtime with the default configuration
281    /// but exit after the given timeout.
282    pub fn timed(timeout: Duration) -> Self {
283        let cfg = Config {
284            timeout: Some(timeout),
285            ..Config::default()
286        };
287        Self::new(cfg)
288    }
289}
290
291impl Default for Runner {
292    fn default() -> Self {
293        Self::new(Config::default())
294    }
295}
296
297impl crate::Runner for Runner {
298    type Context = Context;
299
300    fn start<F, Fut>(self, f: F) -> Fut::Output
301    where
302        F: FnOnce(Self::Context) -> Fut,
303        Fut: Future,
304    {
305        // Setup context (depending on how the runtime was initialized)
306        let context = match self.state {
307            State::Config(config) => Context::new(config),
308            State::Context(context) => context,
309        };
310
311        // Pin root task to the heap
312        let executor = context.executor.clone();
313        let mut root = Box::pin(f(context));
314
315        // Register the root task
316        Tasks::register_root(&executor.tasks);
317
318        // Process tasks until root task completes or progress stalls
319        let mut iter = 0;
320        loop {
321            // Ensure we have not exceeded our deadline
322            {
323                let current = executor.time.lock().unwrap();
324                if let Some(deadline) = executor.deadline {
325                    if *current >= deadline {
326                        panic!("runtime timeout");
327                    }
328                }
329            }
330
331            // Snapshot available tasks
332            let mut tasks = executor.tasks.drain();
333
334            // Shuffle tasks
335            {
336                let mut rng = executor.rng.lock().unwrap();
337                tasks.shuffle(&mut *rng);
338            }
339
340            // Run all snapshotted tasks
341            //
342            // This approach is more efficient than randomly selecting a task one-at-a-time
343            // because it ensures we don't pull the same pending task multiple times in a row (without
344            // processing a different task required for other tasks to make progress).
345            trace!(iter, tasks = tasks.len(), "starting loop");
346            for task in tasks {
347                // Record task for auditing
348                executor.auditor.event(b"process_task", |hasher| {
349                    hasher.update(task.id.to_be_bytes());
350                    hasher.update(task.label.name().as_bytes());
351                });
352                trace!(id = task.id, "processing task");
353
354                // Record task poll
355                executor.metrics.task_polls.get_or_create(&task.label).inc();
356
357                // Prepare task for polling
358                let waker = waker_ref(&task);
359                let mut cx = task::Context::from_waker(&waker);
360                match &task.operation {
361                    Operation::Root => {
362                        // Poll the root task
363                        if let Poll::Ready(output) = root.as_mut().poll(&mut cx) {
364                            trace!(id = task.id, "task is complete");
365                            *executor.finished.lock().unwrap() = true;
366                            return output;
367                        }
368                    }
369                    Operation::Work { future, completed } => {
370                        // If task is completed, skip it
371                        if *completed.lock().unwrap() {
372                            trace!(id = task.id, "dropping already complete task");
373                            continue;
374                        }
375
376                        // Poll the task
377                        let mut fut = future.lock().unwrap();
378                        if fut.as_mut().poll(&mut cx).is_ready() {
379                            trace!(id = task.id, "task is complete");
380                            *completed.lock().unwrap() = true;
381                            continue;
382                        }
383                    }
384                }
385
386                // Try again later if task is still pending
387                trace!(id = task.id, "task is still pending");
388            }
389
390            // Advance time by cycle
391            //
392            // This approach prevents starvation if some task never yields (to approximate this,
393            // duration can be set to 1ns).
394            let mut current;
395            {
396                let mut time = executor.time.lock().unwrap();
397                *time = time
398                    .checked_add(executor.cycle)
399                    .expect("executor time overflowed");
400                current = *time;
401            }
402            trace!(now = current.epoch_millis(), "time advanced");
403
404            // Skip time if there is nothing to do
405            if executor.tasks.len() == 0 {
406                let mut skip = None;
407                {
408                    let sleeping = executor.sleeping.lock().unwrap();
409                    if let Some(next) = sleeping.peek() {
410                        if next.time > current {
411                            skip = Some(next.time);
412                        }
413                    }
414                }
415                if let Some(skip_time) = skip {
416                    {
417                        let mut time = executor.time.lock().unwrap();
418                        *time = skip_time;
419                        current = *time;
420                    }
421                    trace!(now = current.epoch_millis(), "time skipped");
422                }
423            }
424
425            // Wake all sleeping tasks that are ready
426            let mut to_wake = Vec::new();
427            let mut remaining;
428            {
429                let mut sleeping = executor.sleeping.lock().unwrap();
430                while let Some(next) = sleeping.peek() {
431                    if next.time <= current {
432                        let sleeper = sleeping.pop().unwrap();
433                        to_wake.push(sleeper.waker);
434                    } else {
435                        break;
436                    }
437                }
438                remaining = sleeping.len();
439            }
440            for waker in to_wake {
441                waker.wake();
442            }
443
444            // Account for remaining tasks
445            remaining += executor.tasks.len();
446
447            // If there are no tasks to run and no tasks sleeping, the executor is stalled
448            // and will never finish.
449            if remaining == 0 {
450                panic!("runtime stalled");
451            }
452            iter += 1;
453        }
454    }
455}
456
457/// The operation that a task is performing.
458enum Operation {
459    Root,
460    Work {
461        future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
462        completed: Mutex<bool>,
463    },
464}
465
466/// A task that is being executed by the runtime.
467struct Task {
468    id: u128,
469    label: Label,
470    tasks: Weak<Tasks>,
471
472    operation: Operation,
473}
474
475impl ArcWake for Task {
476    fn wake_by_ref(arc_self: &Arc<Self>) {
477        // Upgrade the weak reference to re-enqueue this task.
478        // If upgrade fails, the task queue has been dropped and no action is required.
479        if let Some(tasks) = arc_self.tasks.upgrade() {
480            tasks.enqueue(arc_self.clone());
481        }
482    }
483}
484
485/// A task queue that is used to manage the tasks that are being executed by the runtime.
486struct Tasks {
487    /// The current task counter.
488    counter: Mutex<u128>,
489    /// The queue of tasks that are waiting to be executed.
490    queue: Mutex<Vec<Arc<Task>>>,
491    /// Indicates whether the root task has been registered.
492    root_registered: Mutex<bool>,
493}
494
495impl Tasks {
496    /// Create a new task queue.
497    fn new() -> Self {
498        Self {
499            counter: Mutex::new(0),
500            queue: Mutex::new(Vec::new()),
501            root_registered: Mutex::new(false),
502        }
503    }
504
505    /// Increment the task counter and return the old value.
506    fn increment(&self) -> u128 {
507        let mut counter = self.counter.lock().unwrap();
508        let old = *counter;
509        *counter = counter.checked_add(1).expect("task counter overflow");
510        old
511    }
512
513    /// Register the root task.
514    ///
515    /// If the root task has already been registered, this function will panic.
516    fn register_root(arc_self: &Arc<Self>) {
517        {
518            let mut registered = arc_self.root_registered.lock().unwrap();
519            assert!(!*registered, "root already registered");
520            *registered = true;
521        }
522        let id = arc_self.increment();
523        let mut queue = arc_self.queue.lock().unwrap();
524        queue.push(Arc::new(Task {
525            id,
526            label: Label::root(),
527            tasks: Arc::downgrade(arc_self),
528            operation: Operation::Root,
529        }));
530    }
531
532    /// Register a new task to be executed.
533    fn register_work(
534        arc_self: &Arc<Self>,
535        label: Label,
536        future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
537    ) {
538        let id = arc_self.increment();
539        let mut queue = arc_self.queue.lock().unwrap();
540        queue.push(Arc::new(Task {
541            id,
542            label,
543            tasks: Arc::downgrade(arc_self),
544            operation: Operation::Work {
545                future: Mutex::new(future),
546                completed: Mutex::new(false),
547            },
548        }));
549    }
550
551    /// Enqueue an already registered task to be executed.
552    fn enqueue(&self, task: Arc<Task>) {
553        let mut queue = self.queue.lock().unwrap();
554        queue.push(task);
555    }
556
557    /// Dequeue all tasks that are ready to execute.
558    fn drain(&self) -> Vec<Arc<Task>> {
559        let mut queue = self.queue.lock().unwrap();
560        let len = queue.len();
561        replace(&mut *queue, Vec::with_capacity(len))
562    }
563
564    /// Get the number of tasks in the queue.
565    fn len(&self) -> usize {
566        self.queue.lock().unwrap().len()
567    }
568}
569
570type Network = MeteredNetwork<AuditedNetwork<DeterministicNetwork>>;
571
572/// Implementation of [crate::Spawner], [crate::Clock],
573/// [crate::Network], and [crate::Storage] for the `deterministic`
574/// runtime.
575pub struct Context {
576    name: String,
577    spawned: bool,
578    executor: Arc<Executor>,
579    network: Arc<Network>,
580    storage: MeteredStorage<AuditedStorage<MemStorage>>,
581    children: Arc<Mutex<Vec<AbortHandle>>>,
582}
583
584impl Default for Context {
585    fn default() -> Self {
586        Self::new(Config::default())
587    }
588}
589
590impl Context {
591    pub fn new(cfg: Config) -> Self {
592        // Create a new registry
593        let mut registry = Registry::default();
594        let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
595
596        // Initialize runtime
597        let metrics = Arc::new(Metrics::init(runtime_registry));
598        let start_time = UNIX_EPOCH;
599        let deadline = cfg
600            .timeout
601            .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
602        let auditor = Arc::new(Auditor::default());
603        let storage = MeteredStorage::new(
604            AuditedStorage::new(MemStorage::default(), auditor.clone()),
605            runtime_registry,
606        );
607        let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
608        let network = MeteredNetwork::new(network, runtime_registry);
609
610        let executor = Arc::new(Executor {
611            registry: Mutex::new(registry),
612            cycle: cfg.cycle,
613            deadline,
614            metrics: metrics.clone(),
615            auditor: auditor.clone(),
616            rng: Mutex::new(StdRng::seed_from_u64(cfg.seed)),
617            time: Mutex::new(start_time),
618            tasks: Arc::new(Tasks::new()),
619            sleeping: Mutex::new(BinaryHeap::new()),
620            partitions: Mutex::new(HashMap::new()),
621            shutdown: Mutex::new(Stopper::default()),
622            finished: Mutex::new(false),
623            recovered: Mutex::new(false),
624        });
625
626        Context {
627            name: String::new(),
628            spawned: false,
629            executor: executor.clone(),
630            network: Arc::new(network),
631            storage,
632            children: Arc::new(Mutex::new(Vec::new())),
633        }
634    }
635
636    /// Recover the inner state (deadline, metrics, auditor, rng, synced storage, etc.) from the
637    /// current runtime and use it to initialize a new instance of the runtime. A recovered runtime
638    /// does not inherit the current runtime's pending tasks, unsynced storage, network connections, nor
639    /// its shutdown signaler.
640    ///
641    /// This is useful for performing a deterministic simulation that spans multiple runtime instantiations,
642    /// like simulating unclean shutdown (which involves repeatedly halting the runtime at unexpected intervals).
643    ///
644    /// It is only permitted to call this method after the runtime has finished (i.e. once `start` returns)
645    /// and only permitted to do once (otherwise multiple recovered runtimes will share the same inner state).
646    /// If either one of these conditions is violated, this method will panic.
647    pub fn recover(self) -> Self {
648        // Ensure we are finished
649        if !*self.executor.finished.lock().unwrap() {
650            panic!("execution is not finished");
651        }
652
653        // Ensure runtime has not already been recovered
654        {
655            let mut recovered = self.executor.recovered.lock().unwrap();
656            if *recovered {
657                panic!("runtime has already been recovered");
658            }
659            *recovered = true;
660        }
661
662        // Rebuild metrics
663        let mut registry = Registry::default();
664        let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
665        let metrics = Arc::new(Metrics::init(runtime_registry));
666
667        // Copy state
668        let auditor = self.executor.auditor.clone();
669        let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
670        let network = MeteredNetwork::new(network, runtime_registry);
671
672        let executor = Arc::new(Executor {
673            // Copied from the current runtime
674            cycle: self.executor.cycle,
675            deadline: self.executor.deadline,
676            auditor: auditor.clone(),
677            rng: Mutex::new(self.executor.rng.lock().unwrap().clone()),
678            time: Mutex::new(*self.executor.time.lock().unwrap()),
679            partitions: Mutex::new(self.executor.partitions.lock().unwrap().clone()),
680
681            // New state for the new runtime
682            registry: Mutex::new(registry),
683            metrics: metrics.clone(),
684            tasks: Arc::new(Tasks::new()),
685            sleeping: Mutex::new(BinaryHeap::new()),
686            shutdown: Mutex::new(Stopper::default()),
687            finished: Mutex::new(false),
688            recovered: Mutex::new(false),
689        });
690        Self {
691            name: String::new(),
692            spawned: false,
693            executor,
694            network: Arc::new(network),
695            storage: self.storage,
696            children: Arc::new(Mutex::new(Vec::new())),
697        }
698    }
699
700    pub fn auditor(&self) -> &Auditor {
701        &self.executor.auditor
702    }
703}
704
705impl Clone for Context {
706    fn clone(&self) -> Self {
707        Self {
708            name: self.name.clone(),
709            spawned: false,
710            executor: self.executor.clone(),
711            network: self.network.clone(),
712            storage: self.storage.clone(),
713            children: self.children.clone(),
714        }
715    }
716}
717
718impl crate::Spawner for Context {
719    fn spawn<F, Fut, T>(mut self, f: F) -> Handle<T>
720    where
721        F: FnOnce(Self) -> Fut + Send + 'static,
722        Fut: Future<Output = T> + Send + 'static,
723        T: Send + 'static,
724    {
725        // Ensure a context only spawns one task
726        assert!(!self.spawned, "already spawned");
727
728        // Get metrics
729        let (label, gauge) = spawn_metrics!(self, future);
730
731        // Set up the task
732        let executor = self.executor.clone();
733
734        // Give spawned task its own empty children list
735        let children = Arc::new(Mutex::new(Vec::new()));
736        self.children = children.clone();
737
738        let future = f(self);
739        let (f, handle) = Handle::init_future(future, gauge, false, children);
740
741        // Spawn the task
742        Tasks::register_work(&executor.tasks, label, Box::pin(f));
743        handle
744    }
745
746    fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
747    where
748        F: Future<Output = T> + Send + 'static,
749        T: Send + 'static,
750    {
751        // Ensure a context only spawns one task
752        assert!(!self.spawned, "already spawned");
753        self.spawned = true;
754
755        // Get metrics
756        let (label, gauge) = spawn_metrics!(self, future);
757
758        // Set up the task
759        let executor = self.executor.clone();
760
761        move |f: F| {
762            // Give spawned task its own empty children list
763            let (f, handle) =
764                Handle::init_future(f, gauge, false, Arc::new(Mutex::new(Vec::new())));
765
766            // Spawn the task
767            Tasks::register_work(&executor.tasks, label, Box::pin(f));
768            handle
769        }
770    }
771
772    fn spawn_child<F, Fut, T>(self, f: F) -> Handle<T>
773    where
774        F: FnOnce(Self) -> Fut + Send + 'static,
775        Fut: Future<Output = T> + Send + 'static,
776        T: Send + 'static,
777    {
778        // Store parent's children list
779        let parent_children = self.children.clone();
780
781        // Spawn the child
782        let child_handle = self.spawn(f);
783
784        // Register this child with the parent
785        if let Some(abort_handle) = child_handle.abort_handle() {
786            parent_children.lock().unwrap().push(abort_handle);
787        }
788
789        child_handle
790    }
791
792    fn spawn_blocking<F, T>(self, dedicated: bool, f: F) -> Handle<T>
793    where
794        F: FnOnce(Self) -> T + Send + 'static,
795        T: Send + 'static,
796    {
797        // Ensure a context only spawns one task
798        assert!(!self.spawned, "already spawned");
799
800        // Get metrics
801        let (label, gauge) = spawn_metrics!(self, blocking, dedicated);
802
803        // Initialize the blocking task
804        let executor = self.executor.clone();
805        let (f, handle) = Handle::init_blocking(|| f(self), gauge, false);
806
807        // Spawn the task
808        let f = async move { f() };
809        Tasks::register_work(&executor.tasks, label, Box::pin(f));
810        handle
811    }
812
813    fn spawn_blocking_ref<F, T>(&mut self, dedicated: bool) -> impl FnOnce(F) -> Handle<T> + 'static
814    where
815        F: FnOnce() -> T + Send + 'static,
816        T: Send + 'static,
817    {
818        // Ensure a context only spawns one task
819        assert!(!self.spawned, "already spawned");
820        self.spawned = true;
821
822        // Get metrics
823        let (label, gauge) = spawn_metrics!(self, blocking, dedicated);
824
825        // Set up the task
826        let executor = self.executor.clone();
827        move |f: F| {
828            let (f, handle) = Handle::init_blocking(f, gauge, false);
829
830            // Spawn the task
831            let f = async move { f() };
832            Tasks::register_work(&executor.tasks, label, Box::pin(f));
833            handle
834        }
835    }
836
837    async fn stop(self, value: i32, timeout: Option<Duration>) -> Result<(), Error> {
838        self.executor.auditor.event(b"stop", |hasher| {
839            hasher.update(value.to_be_bytes());
840        });
841        let stop_resolved = {
842            let mut shutdown = self.executor.shutdown.lock().unwrap();
843            shutdown.stop(value)
844        };
845
846        // Wait for all tasks to complete or the timeout to fire
847        let timeout_future = match timeout {
848            Some(duration) => futures::future::Either::Left(self.sleep(duration)),
849            None => futures::future::Either::Right(futures::future::pending()),
850        };
851        select! {
852            result = stop_resolved => {
853                result.map_err(|_| Error::Closed)?;
854                Ok(())
855            },
856            _ = timeout_future => {
857                Err(Error::Timeout)
858            }
859        }
860    }
861
862    fn stopped(&self) -> Signal {
863        self.executor.auditor.event(b"stopped", |_| {});
864        self.executor.shutdown.lock().unwrap().stopped()
865    }
866}
867
868impl crate::Metrics for Context {
869    fn with_label(&self, label: &str) -> Self {
870        let name = {
871            let prefix = self.name.clone();
872            if prefix.is_empty() {
873                label.to_string()
874            } else {
875                format!("{prefix}_{label}")
876            }
877        };
878        assert!(
879            !name.starts_with(METRICS_PREFIX),
880            "using runtime label is not allowed"
881        );
882        Self {
883            name,
884            spawned: false,
885            executor: self.executor.clone(),
886            network: self.network.clone(),
887            storage: self.storage.clone(),
888            children: self.children.clone(),
889        }
890    }
891
892    fn label(&self) -> String {
893        self.name.clone()
894    }
895
896    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
897        // Prepare args
898        let name = name.into();
899        let help = help.into();
900
901        // Register metric
902        self.executor.auditor.event(b"register", |hasher| {
903            hasher.update(name.as_bytes());
904            hasher.update(help.as_bytes());
905        });
906        let prefixed_name = {
907            let prefix = &self.name;
908            if prefix.is_empty() {
909                name
910            } else {
911                format!("{}_{}", *prefix, name)
912            }
913        };
914        self.executor
915            .registry
916            .lock()
917            .unwrap()
918            .register(prefixed_name, help, metric)
919    }
920
921    fn encode(&self) -> String {
922        self.executor.auditor.event(b"encode", |_| {});
923        let mut buffer = String::new();
924        encode(&mut buffer, &self.executor.registry.lock().unwrap()).expect("encoding failed");
925        buffer
926    }
927}
928
929struct Sleeper {
930    executor: Arc<Executor>,
931    time: SystemTime,
932    registered: bool,
933}
934
935struct Alarm {
936    time: SystemTime,
937    waker: Waker,
938}
939
940impl PartialEq for Alarm {
941    fn eq(&self, other: &Self) -> bool {
942        self.time.eq(&other.time)
943    }
944}
945
946impl Eq for Alarm {}
947
948impl PartialOrd for Alarm {
949    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
950        Some(self.cmp(other))
951    }
952}
953
954impl Ord for Alarm {
955    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
956        // Reverse the ordering for min-heap
957        other.time.cmp(&self.time)
958    }
959}
960
961impl Future for Sleeper {
962    type Output = ();
963
964    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
965        {
966            let current_time = *self.executor.time.lock().unwrap();
967            if current_time >= self.time {
968                return Poll::Ready(());
969            }
970        }
971        if !self.registered {
972            self.registered = true;
973            self.executor.sleeping.lock().unwrap().push(Alarm {
974                time: self.time,
975                waker: cx.waker().clone(),
976            });
977        }
978        Poll::Pending
979    }
980}
981
982impl Clock for Context {
983    fn current(&self) -> SystemTime {
984        *self.executor.time.lock().unwrap()
985    }
986
987    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
988        let deadline = self
989            .current()
990            .checked_add(duration)
991            .expect("overflow when setting wake time");
992        self.sleep_until(deadline)
993    }
994
995    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
996        Sleeper {
997            executor: self.executor.clone(),
998
999            time: deadline,
1000            registered: false,
1001        }
1002    }
1003}
1004
1005impl GClock for Context {
1006    type Instant = SystemTime;
1007
1008    fn now(&self) -> Self::Instant {
1009        self.current()
1010    }
1011}
1012
1013impl ReasonablyRealtime for Context {}
1014
1015impl crate::Network for Context {
1016    type Listener = ListenerOf<Network>;
1017
1018    async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
1019        self.network.bind(socket).await
1020    }
1021
1022    async fn dial(
1023        &self,
1024        socket: SocketAddr,
1025    ) -> Result<(crate::SinkOf<Self>, crate::StreamOf<Self>), Error> {
1026        self.network.dial(socket).await
1027    }
1028}
1029
1030impl RngCore for Context {
1031    fn next_u32(&mut self) -> u32 {
1032        self.executor.auditor.event(b"rand", |hasher| {
1033            hasher.update(b"next_u32");
1034        });
1035        self.executor.rng.lock().unwrap().next_u32()
1036    }
1037
1038    fn next_u64(&mut self) -> u64 {
1039        self.executor.auditor.event(b"rand", |hasher| {
1040            hasher.update(b"next_u64");
1041        });
1042        self.executor.rng.lock().unwrap().next_u64()
1043    }
1044
1045    fn fill_bytes(&mut self, dest: &mut [u8]) {
1046        self.executor.auditor.event(b"rand", |hasher| {
1047            hasher.update(b"fill_bytes");
1048        });
1049        self.executor.rng.lock().unwrap().fill_bytes(dest)
1050    }
1051
1052    fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1053        self.executor.auditor.event(b"rand", |hasher| {
1054            hasher.update(b"try_fill_bytes");
1055        });
1056        self.executor.rng.lock().unwrap().try_fill_bytes(dest)
1057    }
1058}
1059
1060impl CryptoRng for Context {}
1061
1062impl crate::Storage for Context {
1063    type Blob = <MeteredStorage<AuditedStorage<MemStorage>> as crate::Storage>::Blob;
1064
1065    async fn open(&self, partition: &str, name: &[u8]) -> Result<(Self::Blob, u64), Error> {
1066        self.storage.open(partition, name).await
1067    }
1068
1069    async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1070        self.storage.remove(partition, name).await
1071    }
1072
1073    async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1074        self.storage.scan(partition).await
1075    }
1076}
1077
1078#[cfg(test)]
1079mod tests {
1080    use super::*;
1081    use crate::{deterministic, utils::run_tasks, Blob, Runner as _, Storage};
1082    use commonware_macros::test_traced;
1083    use futures::task::noop_waker;
1084
1085    fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1086        let executor = deterministic::Runner::seeded(seed);
1087        run_tasks(5, executor)
1088    }
1089
1090    #[test]
1091    fn test_same_seed_same_order() {
1092        // Generate initial outputs
1093        let mut outputs = Vec::new();
1094        for seed in 0..1000 {
1095            let output = run_with_seed(seed);
1096            outputs.push(output);
1097        }
1098
1099        // Ensure they match
1100        for seed in 0..1000 {
1101            let output = run_with_seed(seed);
1102            assert_eq!(output, outputs[seed as usize]);
1103        }
1104    }
1105
1106    #[test_traced("TRACE")]
1107    fn test_different_seeds_different_order() {
1108        let output1 = run_with_seed(12345);
1109        let output2 = run_with_seed(54321);
1110        assert_ne!(output1, output2);
1111    }
1112
1113    #[test]
1114    fn test_alarm_min_heap() {
1115        // Populate heap
1116        let now = SystemTime::now();
1117        let alarms = vec![
1118            Alarm {
1119                time: now + Duration::new(10, 0),
1120                waker: noop_waker(),
1121            },
1122            Alarm {
1123                time: now + Duration::new(5, 0),
1124                waker: noop_waker(),
1125            },
1126            Alarm {
1127                time: now + Duration::new(15, 0),
1128                waker: noop_waker(),
1129            },
1130            Alarm {
1131                time: now + Duration::new(5, 0),
1132                waker: noop_waker(),
1133            },
1134        ];
1135        let mut heap = BinaryHeap::new();
1136        for alarm in alarms {
1137            heap.push(alarm);
1138        }
1139
1140        // Verify min-heap
1141        let mut sorted_times = Vec::new();
1142        while let Some(alarm) = heap.pop() {
1143            sorted_times.push(alarm.time);
1144        }
1145        assert_eq!(
1146            sorted_times,
1147            vec![
1148                now + Duration::new(5, 0),
1149                now + Duration::new(5, 0),
1150                now + Duration::new(10, 0),
1151                now + Duration::new(15, 0),
1152            ]
1153        );
1154    }
1155
1156    #[test]
1157    #[should_panic(expected = "runtime timeout")]
1158    fn test_timeout() {
1159        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1160        executor.start(|context| async move {
1161            loop {
1162                context.sleep(Duration::from_secs(1)).await;
1163            }
1164        });
1165    }
1166
1167    #[test]
1168    #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1169    fn test_bad_timeout() {
1170        let cfg = Config {
1171            timeout: Some(Duration::default()),
1172            cycle: Duration::default(),
1173            ..Config::default()
1174        };
1175        deterministic::Runner::new(cfg);
1176    }
1177
1178    #[test]
1179    fn test_recover_synced_storage_persists() {
1180        // Initialize the first runtime
1181        let executor1 = deterministic::Runner::default();
1182        let partition = "test_partition";
1183        let name = b"test_blob";
1184        let data = b"Hello, world!";
1185
1186        // Run some tasks, sync storage, and recover the runtime
1187        let (context, state) = executor1.start(|context| async move {
1188            let (blob, _) = context.open(partition, name).await.unwrap();
1189            blob.write_at(Vec::from(data), 0).await.unwrap();
1190            blob.sync().await.unwrap();
1191            let state = context.auditor().state();
1192            (context, state)
1193        });
1194        let recovered_context = context.recover();
1195
1196        // Verify auditor state is the same
1197        assert_eq!(state, recovered_context.auditor().state());
1198
1199        // Check that synced storage persists after recovery
1200        let executor = Runner::from(recovered_context);
1201        executor.start(|context| async move {
1202            let (blob, len) = context.open(partition, name).await.unwrap();
1203            assert_eq!(len, data.len() as u64);
1204            let read = blob.read_at(vec![0; data.len()], 0).await.unwrap();
1205            assert_eq!(read.as_ref(), data);
1206        });
1207    }
1208
1209    #[test]
1210    fn test_recover_unsynced_storage_does_not_persist() {
1211        // Initialize the first runtime
1212        let executor = deterministic::Runner::default();
1213        let partition = "test_partition";
1214        let name = b"test_blob";
1215        let data = Vec::from("Hello, world!");
1216
1217        // Run some tasks without syncing storage
1218        let context = executor.start(|context| async move {
1219            let context = context.clone();
1220            let (blob, _) = context.open(partition, name).await.unwrap();
1221            blob.write_at(data, 0).await.unwrap();
1222            // Intentionally do not call sync() here
1223            context
1224        });
1225
1226        // Recover the runtime
1227        let context = context.recover();
1228        let executor = Runner::from(context);
1229
1230        // Check that unsynced storage does not persist after recovery
1231        executor.start(|context| async move {
1232            let (_, len) = context.open(partition, name).await.unwrap();
1233            assert_eq!(len, 0);
1234        });
1235    }
1236
1237    #[test]
1238    #[should_panic(expected = "execution is not finished")]
1239    fn test_recover_before_finish_panics() {
1240        // Initialize runtime
1241        let executor = deterministic::Runner::default();
1242
1243        // Start runtime
1244        executor.start(|context| async move {
1245            // Attempt to recover before the runtime has finished
1246            context.recover();
1247        });
1248    }
1249
1250    #[test]
1251    #[should_panic(expected = "runtime has already been recovered")]
1252    fn test_recover_twice_panics() {
1253        // Initialize runtime
1254        let executor = deterministic::Runner::default();
1255
1256        // Finish runtime
1257        let context = executor.start(|context| async move { context });
1258
1259        // Recover for the first time
1260        let cloned_context = context.clone();
1261        context.recover();
1262
1263        // Attempt to recover again using the same context
1264        cloned_context.recover();
1265    }
1266
1267    #[test]
1268    fn test_default_time_zero() {
1269        // Initialize runtime
1270        let executor = deterministic::Runner::default();
1271
1272        executor.start(|context| async move {
1273            // Check that the time is zero
1274            assert_eq!(
1275                context.current().duration_since(UNIX_EPOCH).unwrap(),
1276                Duration::ZERO
1277            );
1278        });
1279    }
1280}