commonware_runtime/
deterministic.rs

1//! A deterministic runtime that randomly selects tasks to run based on a seed
2//!
3//! # Panics
4//!
5//! If any task panics, the runtime will panic (and shutdown).
6//!
7//! # Example
8//!
9//! ```rust
10//! use commonware_runtime::{Spawner, Runner, deterministic, Metrics};
11//!
12//! let executor =  deterministic::Runner::default();
13//! executor.start(|context| async move {
14//!     println!("Parent started");
15//!     let result = context.with_label("child").spawn(|_| async move {
16//!         println!("Child started");
17//!         "hello"
18//!     });
19//!     println!("Child result: {:?}", result.await);
20//!     println!("Parent exited");
21//!     println!("Auditor state: {}", context.auditor().state());
22//! });
23//! ```
24
25use crate::{
26    network::{
27        audited::Network as AuditedNetwork, deterministic::Network as DeterministicNetwork,
28        metered::Network as MeteredNetwork,
29    },
30    storage::{
31        audited::Storage as AuditedStorage, memory::Storage as MemStorage,
32        metered::Storage as MeteredStorage,
33    },
34    utils::Signaler,
35    Clock, Error, Handle, ListenerOf, Signal, METRICS_PREFIX,
36};
37use commonware_utils::{hex, SystemTimeExt};
38use futures::{
39    task::{waker_ref, ArcWake},
40    Future,
41};
42use governor::clock::{Clock as GClock, ReasonablyRealtime};
43use prometheus_client::{
44    encoding::{text::encode, EncodeLabelSet},
45    metrics::{counter::Counter, family::Family, gauge::Gauge},
46    registry::{Metric, Registry},
47};
48use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
49use sha2::{Digest, Sha256};
50use std::{
51    collections::{BinaryHeap, HashMap},
52    mem::replace,
53    net::SocketAddr,
54    pin::Pin,
55    sync::{Arc, Mutex},
56    task::{self, Poll, Waker},
57    time::{Duration, SystemTime, UNIX_EPOCH},
58};
59use tracing::trace;
60
61/// Map of names to blob contents.
62pub type Partition = HashMap<Vec<u8>, Vec<u8>>;
63
64#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
65struct Work {
66    label: String,
67}
68
69#[derive(Debug)]
70struct Metrics {
71    tasks_spawned: Family<Work, Counter>,
72    tasks_running: Family<Work, Gauge>,
73    blocking_tasks_spawned: Family<Work, Counter>,
74    blocking_tasks_running: Family<Work, Gauge>,
75    task_polls: Family<Work, Counter>,
76
77    network_bandwidth: Counter,
78}
79
80impl Metrics {
81    pub fn init(registry: &mut Registry) -> Self {
82        let metrics = Self {
83            task_polls: Family::default(),
84            tasks_spawned: Family::default(),
85            tasks_running: Family::default(),
86            blocking_tasks_spawned: Family::default(),
87            blocking_tasks_running: Family::default(),
88            network_bandwidth: Counter::default(),
89        };
90        registry.register(
91            "tasks_spawned",
92            "Total number of tasks spawned",
93            metrics.tasks_spawned.clone(),
94        );
95        registry.register(
96            "tasks_running",
97            "Number of tasks currently running",
98            metrics.tasks_running.clone(),
99        );
100        registry.register(
101            "blocking_tasks_spawned",
102            "Total number of blocking tasks spawned",
103            metrics.blocking_tasks_spawned.clone(),
104        );
105        registry.register(
106            "blocking_tasks_running",
107            "Number of blocking tasks currently running",
108            metrics.blocking_tasks_running.clone(),
109        );
110        registry.register(
111            "task_polls",
112            "Total number of task polls",
113            metrics.task_polls.clone(),
114        );
115        registry.register(
116            "bandwidth",
117            "Total amount of data sent over network",
118            metrics.network_bandwidth.clone(),
119        );
120        metrics
121    }
122}
123
124/// Track the state of the runtime for determinism auditing.
125pub struct Auditor {
126    hash: Mutex<Vec<u8>>,
127}
128
129impl Default for Auditor {
130    fn default() -> Self {
131        Self {
132            hash: Vec::new().into(),
133        }
134    }
135}
136
137impl Auditor {
138    /// Record that an event happened.
139    /// This auditor's hash will be updated with the event's `label` and
140    /// whatever other data is passed in the `payload` closure.
141    pub(crate) fn event<F>(&self, label: &'static [u8], payload: F)
142    where
143        F: FnOnce(&mut Sha256),
144    {
145        let mut hash = self.hash.lock().unwrap();
146
147        let mut hasher = Sha256::new();
148        hasher.update(&*hash);
149        hasher.update(label);
150        payload(&mut hasher);
151
152        *hash = hasher.finalize().to_vec();
153    }
154
155    /// Generate a representation of the current state of the runtime.
156    ///
157    /// This can be used to ensure that logic running on top
158    /// of the runtime is interacting deterministically.
159    pub fn state(&self) -> String {
160        let hash = self.hash.lock().unwrap().clone();
161        hex(&hash)
162    }
163}
164
165/// Configuration for the `deterministic` runtime.
166#[derive(Clone)]
167pub struct Config {
168    /// Seed for the random number generator.
169    seed: u64,
170
171    /// The cycle duration determines how much time is advanced after each iteration of the event
172    /// loop. This is useful to prevent starvation if some task never yields.
173    cycle: Duration,
174
175    /// If the runtime is still executing at this point (i.e. a test hasn't stopped), panic.
176    timeout: Option<Duration>,
177}
178
179impl Config {
180    /// Returns a new [Config] with default values.
181    pub fn new() -> Self {
182        Self {
183            seed: 42,
184            cycle: Duration::from_millis(1),
185            timeout: None,
186        }
187    }
188
189    // Setters
190    /// See [Config]
191    pub fn with_seed(mut self, seed: u64) -> Self {
192        self.seed = seed;
193        self
194    }
195    /// See [Config]
196    pub fn with_cycle(mut self, cycle: Duration) -> Self {
197        self.cycle = cycle;
198        self
199    }
200    /// See [Config]
201    pub fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
202        self.timeout = timeout;
203        self
204    }
205
206    // Getters
207    /// See [Config]
208    pub fn seed(&self) -> u64 {
209        self.seed
210    }
211    /// See [Config]
212    pub fn cycle(&self) -> Duration {
213        self.cycle
214    }
215    /// See [Config]
216    pub fn timeout(&self) -> Option<Duration> {
217        self.timeout
218    }
219
220    /// Assert that the configuration is valid.
221    pub fn assert(&self) {
222        assert!(
223            self.cycle != Duration::default() || self.timeout.is_none(),
224            "cycle duration must be non-zero when timeout is set",
225        );
226    }
227}
228
229impl Default for Config {
230    fn default() -> Self {
231        Self::new()
232    }
233}
234
235/// Deterministic runtime that randomly selects tasks to run based on a seed.
236pub struct Executor {
237    registry: Mutex<Registry>,
238    cycle: Duration,
239    deadline: Option<SystemTime>,
240    metrics: Arc<Metrics>,
241    auditor: Arc<Auditor>,
242    rng: Mutex<StdRng>,
243    time: Mutex<SystemTime>,
244    tasks: Arc<Tasks>,
245    sleeping: Mutex<BinaryHeap<Alarm>>,
246    partitions: Mutex<HashMap<String, Partition>>,
247    signaler: Mutex<Signaler>,
248    signal: Signal,
249    finished: Mutex<bool>,
250    recovered: Mutex<bool>,
251}
252
253enum State {
254    Config(Config),
255    Context(Context),
256}
257
258/// Implementation of [crate::Runner] for the `deterministic` runtime.
259pub struct Runner {
260    state: State,
261}
262
263impl From<Config> for Runner {
264    fn from(cfg: Config) -> Self {
265        Self::new(cfg)
266    }
267}
268
269impl From<Context> for Runner {
270    fn from(context: Context) -> Self {
271        Self {
272            state: State::Context(context),
273        }
274    }
275}
276
277impl Runner {
278    /// Initialize a new `deterministic` runtime with the given seed and cycle duration.
279    pub fn new(cfg: Config) -> Self {
280        // Ensure config is valid
281        cfg.assert();
282        Runner {
283            state: State::Config(cfg),
284        }
285    }
286
287    /// Initialize a new `deterministic` runtime with the default configuration
288    /// and the provided seed.
289    pub fn seeded(seed: u64) -> Self {
290        let cfg = Config {
291            seed,
292            ..Config::default()
293        };
294        Self::new(cfg)
295    }
296
297    /// Initialize a new `deterministic` runtime with the default configuration
298    /// but exit after the given timeout.
299    pub fn timed(timeout: Duration) -> Self {
300        let cfg = Config {
301            timeout: Some(timeout),
302            ..Config::default()
303        };
304        Self::new(cfg)
305    }
306}
307
308impl Default for Runner {
309    fn default() -> Self {
310        Self::new(Config::default())
311    }
312}
313
314impl crate::Runner for Runner {
315    type Context = Context;
316
317    fn start<F, Fut>(self, f: F) -> Fut::Output
318    where
319        F: FnOnce(Self::Context) -> Fut,
320        Fut: Future,
321    {
322        // Setup context (depending on how the runtime was initialized)
323        let context = match self.state {
324            State::Config(config) => Context::new(config),
325            State::Context(context) => context,
326        };
327
328        // Pin root task to the heap
329        let executor = context.executor.clone();
330        let mut root = Box::pin(f(context));
331
332        // Register the root task
333        Tasks::register_root(&executor.tasks);
334
335        // Process tasks until root task completes or progress stalls
336        let mut iter = 0;
337        loop {
338            // Ensure we have not exceeded our deadline
339            {
340                let current = executor.time.lock().unwrap();
341                if let Some(deadline) = executor.deadline {
342                    if *current >= deadline {
343                        panic!("runtime timeout");
344                    }
345                }
346            }
347
348            // Snapshot available tasks
349            let mut tasks = executor.tasks.drain();
350
351            // Shuffle tasks
352            {
353                let mut rng = executor.rng.lock().unwrap();
354                tasks.shuffle(&mut *rng);
355            }
356
357            // Run all snapshotted tasks
358            //
359            // This approach is more efficient than randomly selecting a task one-at-a-time
360            // because it ensures we don't pull the same pending task multiple times in a row (without
361            // processing a different task required for other tasks to make progress).
362            trace!(iter, tasks = tasks.len(), "starting loop");
363            for task in tasks {
364                // Record task for auditing
365                executor.auditor.event(b"process_task", |hasher| {
366                    hasher.update(task.id.to_be_bytes());
367                    hasher.update(task.label.as_bytes());
368                });
369                trace!(id = task.id, "processing task");
370
371                // Record task poll
372                executor
373                    .metrics
374                    .task_polls
375                    .get_or_create(&Work {
376                        label: task.label.clone(),
377                    })
378                    .inc();
379
380                // Prepare task for polling
381                let waker = waker_ref(&task);
382                let mut cx = task::Context::from_waker(&waker);
383                match &task.operation {
384                    Operation::Root => {
385                        // Poll the root task
386                        if let Poll::Ready(output) = root.as_mut().poll(&mut cx) {
387                            trace!(id = task.id, "task is complete");
388                            *executor.finished.lock().unwrap() = true;
389                            return output;
390                        }
391                    }
392                    Operation::Work { future, completed } => {
393                        // If task is completed, skip it
394                        if *completed.lock().unwrap() {
395                            trace!(id = task.id, "dropping already complete task");
396                            continue;
397                        }
398
399                        // Poll the task
400                        let mut fut = future.lock().unwrap();
401                        if fut.as_mut().poll(&mut cx).is_ready() {
402                            trace!(id = task.id, "task is complete");
403                            *completed.lock().unwrap() = true;
404                            continue;
405                        }
406                    }
407                }
408
409                // Try again later if task is still pending
410                trace!(id = task.id, "task is still pending");
411            }
412
413            // Advance time by cycle
414            //
415            // This approach prevents starvation if some task never yields (to approximate this,
416            // duration can be set to 1ns).
417            let mut current;
418            {
419                let mut time = executor.time.lock().unwrap();
420                *time = time
421                    .checked_add(executor.cycle)
422                    .expect("executor time overflowed");
423                current = *time;
424            }
425            trace!(now = current.epoch_millis(), "time advanced");
426
427            // Skip time if there is nothing to do
428            if executor.tasks.len() == 0 {
429                let mut skip = None;
430                {
431                    let sleeping = executor.sleeping.lock().unwrap();
432                    if let Some(next) = sleeping.peek() {
433                        if next.time > current {
434                            skip = Some(next.time);
435                        }
436                    }
437                }
438                if skip.is_some() {
439                    {
440                        let mut time = executor.time.lock().unwrap();
441                        *time = skip.unwrap();
442                        current = *time;
443                    }
444                    trace!(now = current.epoch_millis(), "time skipped");
445                }
446            }
447
448            // Wake all sleeping tasks that are ready
449            let mut to_wake = Vec::new();
450            let mut remaining;
451            {
452                let mut sleeping = executor.sleeping.lock().unwrap();
453                while let Some(next) = sleeping.peek() {
454                    if next.time <= current {
455                        let sleeper = sleeping.pop().unwrap();
456                        to_wake.push(sleeper.waker);
457                    } else {
458                        break;
459                    }
460                }
461                remaining = sleeping.len();
462            }
463            for waker in to_wake {
464                waker.wake();
465            }
466
467            // Account for remaining tasks
468            remaining += executor.tasks.len();
469
470            // If there are no tasks to run and no tasks sleeping, the executor is stalled
471            // and will never finish.
472            if remaining == 0 {
473                panic!("runtime stalled");
474            }
475            iter += 1;
476        }
477    }
478}
479
480/// The operation that a task is performing.
481enum Operation {
482    Root,
483    Work {
484        future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
485        completed: Mutex<bool>,
486    },
487}
488
489/// A task that is being executed by the runtime.
490struct Task {
491    id: u128,
492    label: String,
493    tasks: Arc<Tasks>,
494
495    operation: Operation,
496}
497
498impl ArcWake for Task {
499    fn wake_by_ref(arc_self: &Arc<Self>) {
500        arc_self.tasks.enqueue(arc_self.clone());
501    }
502}
503
504/// A task queue that is used to manage the tasks that are being executed by the runtime.
505struct Tasks {
506    /// The current task counter.
507    counter: Mutex<u128>,
508    /// The queue of tasks that are waiting to be executed.
509    queue: Mutex<Vec<Arc<Task>>>,
510    /// Indicates whether the root task has been registered.
511    root_registered: Mutex<bool>,
512}
513
514impl Tasks {
515    /// Create a new task queue.
516    fn new() -> Self {
517        Self {
518            counter: Mutex::new(0),
519            queue: Mutex::new(Vec::new()),
520            root_registered: Mutex::new(false),
521        }
522    }
523
524    /// Increment the task counter and return the old value.
525    fn increment(&self) -> u128 {
526        let mut counter = self.counter.lock().unwrap();
527        let old = *counter;
528        *counter = counter.checked_add(1).expect("task counter overflow");
529        old
530    }
531
532    /// Register the root task.
533    ///
534    /// If the root task has already been registered, this function will panic.
535    fn register_root(arc_self: &Arc<Self>) {
536        {
537            let mut registered = arc_self.root_registered.lock().unwrap();
538            assert!(!*registered, "root already registered");
539            *registered = true;
540        }
541        let id = arc_self.increment();
542        let mut queue = arc_self.queue.lock().unwrap();
543        queue.push(Arc::new(Task {
544            id,
545            label: String::new(),
546            tasks: arc_self.clone(),
547            operation: Operation::Root,
548        }));
549    }
550
551    /// Register a new task to be executed.
552    fn register_work(
553        arc_self: &Arc<Self>,
554        label: &str,
555        future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
556    ) {
557        let id = arc_self.increment();
558        let mut queue = arc_self.queue.lock().unwrap();
559        queue.push(Arc::new(Task {
560            id,
561            label: label.to_string(),
562            tasks: arc_self.clone(),
563            operation: Operation::Work {
564                future: Mutex::new(future),
565                completed: Mutex::new(false),
566            },
567        }));
568    }
569
570    /// Enqueue an already registered task to be executed.
571    fn enqueue(&self, task: Arc<Task>) {
572        let mut queue = self.queue.lock().unwrap();
573        queue.push(task);
574    }
575
576    /// Dequeue all tasks that are ready to execute.
577    fn drain(&self) -> Vec<Arc<Task>> {
578        let mut queue = self.queue.lock().unwrap();
579        let len = queue.len();
580        replace(&mut *queue, Vec::with_capacity(len))
581    }
582
583    /// Get the number of tasks in the queue.
584    fn len(&self) -> usize {
585        self.queue.lock().unwrap().len()
586    }
587}
588
589type Network = MeteredNetwork<AuditedNetwork<DeterministicNetwork>>;
590
591/// Implementation of [crate::Spawner], [crate::Clock],
592/// [crate::Network], and [crate::Storage] for the `deterministic`
593/// runtime.
594pub struct Context {
595    label: String,
596    spawned: bool,
597    executor: Arc<Executor>,
598    network: Arc<Network>,
599    storage: MeteredStorage<AuditedStorage<MemStorage>>,
600}
601
602impl Default for Context {
603    fn default() -> Self {
604        Self::new(Config::default())
605    }
606}
607
608impl Context {
609    pub fn new(cfg: Config) -> Self {
610        // Create a new registry
611        let mut registry = Registry::default();
612        let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
613
614        // Initialize runtime
615        let metrics = Arc::new(Metrics::init(runtime_registry));
616        let start_time = UNIX_EPOCH;
617        let deadline = cfg
618            .timeout
619            .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
620        let (signaler, signal) = Signaler::new();
621        let auditor = Arc::new(Auditor::default());
622        let storage = MeteredStorage::new(
623            AuditedStorage::new(MemStorage::default(), auditor.clone()),
624            runtime_registry,
625        );
626        let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
627        let network = MeteredNetwork::new(network, runtime_registry);
628
629        let executor = Arc::new(Executor {
630            registry: Mutex::new(registry),
631            cycle: cfg.cycle,
632            deadline,
633            metrics: metrics.clone(),
634            auditor: auditor.clone(),
635            rng: Mutex::new(StdRng::seed_from_u64(cfg.seed)),
636            time: Mutex::new(start_time),
637            tasks: Arc::new(Tasks::new()),
638            sleeping: Mutex::new(BinaryHeap::new()),
639            partitions: Mutex::new(HashMap::new()),
640            signaler: Mutex::new(signaler),
641            signal,
642            finished: Mutex::new(false),
643            recovered: Mutex::new(false),
644        });
645
646        Context {
647            label: String::new(),
648            spawned: false,
649            executor: executor.clone(),
650            network: Arc::new(network),
651            storage,
652        }
653    }
654
655    /// Recover the inner state (deadline, metrics, auditor, rng, synced storage, etc.) from the
656    /// current runtime and use it to initialize a new instance of the runtime. A recovered runtime
657    /// does not inherit the current runtime's pending tasks, unsynced storage, network connections, nor
658    /// its shutdown signaler.
659    ///
660    /// This is useful for performing a deterministic simulation that spans multiple runtime instantiations,
661    /// like simulating unclean shutdown (which involves repeatedly halting the runtime at unexpected intervals).
662    ///
663    /// It is only permitted to call this method after the runtime has finished (i.e. once `start` returns)
664    /// and only permitted to do once (otherwise multiple recovered runtimes will share the same inner state).
665    /// If either one of these conditions is violated, this method will panic.
666    pub fn recover(self) -> Self {
667        // Ensure we are finished
668        if !*self.executor.finished.lock().unwrap() {
669            panic!("execution is not finished");
670        }
671
672        // Ensure runtime has not already been recovered
673        {
674            let mut recovered = self.executor.recovered.lock().unwrap();
675            if *recovered {
676                panic!("runtime has already been recovered");
677            }
678            *recovered = true;
679        }
680
681        // Rebuild metrics
682        let mut registry = Registry::default();
683        let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
684        let metrics = Arc::new(Metrics::init(runtime_registry));
685
686        // Copy state
687        let auditor = self.executor.auditor.clone();
688        let (signaler, signal) = Signaler::new();
689        let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
690        let network = MeteredNetwork::new(network, runtime_registry);
691
692        let executor = Arc::new(Executor {
693            // Copied from the current runtime
694            cycle: self.executor.cycle,
695            deadline: self.executor.deadline,
696            auditor: auditor.clone(),
697            rng: Mutex::new(self.executor.rng.lock().unwrap().clone()),
698            time: Mutex::new(*self.executor.time.lock().unwrap()),
699            partitions: Mutex::new(self.executor.partitions.lock().unwrap().clone()),
700
701            // New state for the new runtime
702            registry: Mutex::new(registry),
703            metrics: metrics.clone(),
704            tasks: Arc::new(Tasks::new()),
705            sleeping: Mutex::new(BinaryHeap::new()),
706            signaler: Mutex::new(signaler),
707            signal,
708            finished: Mutex::new(false),
709            recovered: Mutex::new(false),
710        });
711        Self {
712            label: String::new(),
713            spawned: false,
714            executor,
715            network: Arc::new(network),
716            storage: self.storage,
717        }
718    }
719
720    pub fn auditor(&self) -> &Auditor {
721        &self.executor.auditor
722    }
723}
724
725impl Clone for Context {
726    fn clone(&self) -> Self {
727        Self {
728            label: self.label.clone(),
729            spawned: false,
730            executor: self.executor.clone(),
731            network: self.network.clone(),
732            storage: self.storage.clone(),
733        }
734    }
735}
736
737impl crate::Spawner for Context {
738    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
739    where
740        F: FnOnce(Self) -> Fut + Send + 'static,
741        Fut: Future<Output = T> + Send + 'static,
742        T: Send + 'static,
743    {
744        // Ensure a context only spawns one task
745        assert!(!self.spawned, "already spawned");
746
747        // Get metrics
748        let label = self.label.clone();
749        let work = Work {
750            label: label.clone(),
751        };
752        self.executor
753            .metrics
754            .tasks_spawned
755            .get_or_create(&work)
756            .inc();
757        let gauge = self
758            .executor
759            .metrics
760            .tasks_running
761            .get_or_create(&work)
762            .clone();
763
764        // Set up the task
765        let executor = self.executor.clone();
766        let future = f(self);
767        let (f, handle) = Handle::init(future, gauge, false);
768
769        // Spawn the task
770        Tasks::register_work(&executor.tasks, &label, Box::pin(f));
771        handle
772    }
773
774    fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
775    where
776        F: Future<Output = T> + Send + 'static,
777        T: Send + 'static,
778    {
779        // Ensure a context only spawns one task
780        assert!(!self.spawned, "already spawned");
781        self.spawned = true;
782
783        // Get metrics
784        let work = Work {
785            label: self.label.clone(),
786        };
787        self.executor
788            .metrics
789            .tasks_spawned
790            .get_or_create(&work)
791            .inc();
792        let gauge = self
793            .executor
794            .metrics
795            .tasks_running
796            .get_or_create(&work)
797            .clone();
798
799        // Set up the task
800        let label = self.label.clone();
801        let executor = self.executor.clone();
802        move |f: F| {
803            let (f, handle) = Handle::init(f, gauge, false);
804
805            // Spawn the task
806            Tasks::register_work(&executor.tasks, &label, Box::pin(f));
807            handle
808        }
809    }
810
811    fn spawn_blocking<F, T>(self, f: F) -> Handle<T>
812    where
813        F: FnOnce() -> T + Send + 'static,
814        T: Send + 'static,
815    {
816        // Ensure a context only spawns one task
817        assert!(!self.spawned, "already spawned");
818
819        // Get metrics
820        let work = Work {
821            label: self.label.clone(),
822        };
823        self.executor
824            .metrics
825            .blocking_tasks_spawned
826            .get_or_create(&work)
827            .inc();
828        let gauge = self
829            .executor
830            .metrics
831            .blocking_tasks_running
832            .get_or_create(&work)
833            .clone();
834
835        // Initialize the blocking task
836        let (f, handle) = Handle::init_blocking(f, gauge, false);
837
838        // Spawn the task
839        let f = async move { f() };
840        Tasks::register_work(&self.executor.tasks, &self.label, Box::pin(f));
841        handle
842    }
843
844    fn stop(&self, value: i32) {
845        self.executor.auditor.event(b"stop", |hasher| {
846            hasher.update(value.to_be_bytes());
847        });
848        self.executor.signaler.lock().unwrap().signal(value);
849    }
850
851    fn stopped(&self) -> Signal {
852        self.executor.auditor.event(b"stopped", |_| {});
853        self.executor.signal.clone()
854    }
855}
856
857impl crate::Metrics for Context {
858    fn with_label(&self, label: &str) -> Self {
859        let label = {
860            let prefix = self.label.clone();
861            if prefix.is_empty() {
862                label.to_string()
863            } else {
864                format!("{}_{}", prefix, label)
865            }
866        };
867        assert!(
868            !label.starts_with(METRICS_PREFIX),
869            "using runtime label is not allowed"
870        );
871        Self {
872            label,
873            spawned: false,
874            executor: self.executor.clone(),
875            network: self.network.clone(),
876            storage: self.storage.clone(),
877        }
878    }
879
880    fn label(&self) -> String {
881        self.label.clone()
882    }
883
884    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
885        // Prepare args
886        let name = name.into();
887        let help = help.into();
888
889        // Register metric
890        self.executor.auditor.event(b"register", |hasher| {
891            hasher.update(name.as_bytes());
892            hasher.update(help.as_bytes());
893        });
894        let prefixed_name = {
895            let prefix = &self.label;
896            if prefix.is_empty() {
897                name
898            } else {
899                format!("{}_{}", *prefix, name)
900            }
901        };
902        self.executor
903            .registry
904            .lock()
905            .unwrap()
906            .register(prefixed_name, help, metric)
907    }
908
909    fn encode(&self) -> String {
910        self.executor.auditor.event(b"encode", |_| {});
911        let mut buffer = String::new();
912        encode(&mut buffer, &self.executor.registry.lock().unwrap()).expect("encoding failed");
913        buffer
914    }
915}
916
917struct Sleeper {
918    executor: Arc<Executor>,
919    time: SystemTime,
920    registered: bool,
921}
922
923struct Alarm {
924    time: SystemTime,
925    waker: Waker,
926}
927
928impl PartialEq for Alarm {
929    fn eq(&self, other: &Self) -> bool {
930        self.time.eq(&other.time)
931    }
932}
933
934impl Eq for Alarm {}
935
936impl PartialOrd for Alarm {
937    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
938        Some(self.cmp(other))
939    }
940}
941
942impl Ord for Alarm {
943    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
944        // Reverse the ordering for min-heap
945        other.time.cmp(&self.time)
946    }
947}
948
949impl Future for Sleeper {
950    type Output = ();
951
952    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
953        {
954            let current_time = *self.executor.time.lock().unwrap();
955            if current_time >= self.time {
956                return Poll::Ready(());
957            }
958        }
959        if !self.registered {
960            self.registered = true;
961            self.executor.sleeping.lock().unwrap().push(Alarm {
962                time: self.time,
963                waker: cx.waker().clone(),
964            });
965        }
966        Poll::Pending
967    }
968}
969
970impl Clock for Context {
971    fn current(&self) -> SystemTime {
972        *self.executor.time.lock().unwrap()
973    }
974
975    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
976        let deadline = self
977            .current()
978            .checked_add(duration)
979            .expect("overflow when setting wake time");
980        self.sleep_until(deadline)
981    }
982
983    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
984        Sleeper {
985            executor: self.executor.clone(),
986
987            time: deadline,
988            registered: false,
989        }
990    }
991}
992
993impl GClock for Context {
994    type Instant = SystemTime;
995
996    fn now(&self) -> Self::Instant {
997        self.current()
998    }
999}
1000
1001impl ReasonablyRealtime for Context {}
1002
1003impl crate::Network for Context {
1004    type Listener = ListenerOf<Network>;
1005
1006    async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
1007        self.network.bind(socket).await
1008    }
1009
1010    async fn dial(
1011        &self,
1012        socket: SocketAddr,
1013    ) -> Result<(crate::SinkOf<Self>, crate::StreamOf<Self>), Error> {
1014        self.network.dial(socket).await
1015    }
1016}
1017
1018impl RngCore for Context {
1019    fn next_u32(&mut self) -> u32 {
1020        self.executor.auditor.event(b"rand", |hasher| {
1021            hasher.update(b"next_u32");
1022        });
1023        self.executor.rng.lock().unwrap().next_u32()
1024    }
1025
1026    fn next_u64(&mut self) -> u64 {
1027        self.executor.auditor.event(b"rand", |hasher| {
1028            hasher.update(b"next_u64");
1029        });
1030        self.executor.rng.lock().unwrap().next_u64()
1031    }
1032
1033    fn fill_bytes(&mut self, dest: &mut [u8]) {
1034        self.executor.auditor.event(b"rand", |hasher| {
1035            hasher.update(b"fill_bytes");
1036        });
1037        self.executor.rng.lock().unwrap().fill_bytes(dest)
1038    }
1039
1040    fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1041        self.executor.auditor.event(b"rand", |hasher| {
1042            hasher.update(b"try_fill_bytes");
1043        });
1044        self.executor.rng.lock().unwrap().try_fill_bytes(dest)
1045    }
1046}
1047
1048impl CryptoRng for Context {}
1049
1050impl crate::Storage for Context {
1051    type Blob = <MeteredStorage<AuditedStorage<MemStorage>> as crate::Storage>::Blob;
1052
1053    async fn open(&self, partition: &str, name: &[u8]) -> Result<(Self::Blob, u64), Error> {
1054        self.storage.open(partition, name).await
1055    }
1056
1057    async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1058        self.storage.remove(partition, name).await
1059    }
1060
1061    async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1062        self.storage.scan(partition).await
1063    }
1064}
1065
1066#[cfg(test)]
1067mod tests {
1068    use super::*;
1069    use crate::{deterministic, utils::run_tasks, Blob, Runner as _, Storage};
1070    use commonware_macros::test_traced;
1071    use futures::task::noop_waker;
1072
1073    fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1074        let executor = deterministic::Runner::seeded(seed);
1075        run_tasks(5, executor)
1076    }
1077
1078    #[test]
1079    fn test_same_seed_same_order() {
1080        // Generate initial outputs
1081        let mut outputs = Vec::new();
1082        for seed in 0..1000 {
1083            let output = run_with_seed(seed);
1084            outputs.push(output);
1085        }
1086
1087        // Ensure they match
1088        for seed in 0..1000 {
1089            let output = run_with_seed(seed);
1090            assert_eq!(output, outputs[seed as usize]);
1091        }
1092    }
1093
1094    #[test_traced("TRACE")]
1095    fn test_different_seeds_different_order() {
1096        let output1 = run_with_seed(12345);
1097        let output2 = run_with_seed(54321);
1098        assert_ne!(output1, output2);
1099    }
1100
1101    #[test]
1102    fn test_alarm_min_heap() {
1103        // Populate heap
1104        let now = SystemTime::now();
1105        let alarms = vec![
1106            Alarm {
1107                time: now + Duration::new(10, 0),
1108                waker: noop_waker(),
1109            },
1110            Alarm {
1111                time: now + Duration::new(5, 0),
1112                waker: noop_waker(),
1113            },
1114            Alarm {
1115                time: now + Duration::new(15, 0),
1116                waker: noop_waker(),
1117            },
1118            Alarm {
1119                time: now + Duration::new(5, 0),
1120                waker: noop_waker(),
1121            },
1122        ];
1123        let mut heap = BinaryHeap::new();
1124        for alarm in alarms {
1125            heap.push(alarm);
1126        }
1127
1128        // Verify min-heap
1129        let mut sorted_times = Vec::new();
1130        while let Some(alarm) = heap.pop() {
1131            sorted_times.push(alarm.time);
1132        }
1133        assert_eq!(
1134            sorted_times,
1135            vec![
1136                now + Duration::new(5, 0),
1137                now + Duration::new(5, 0),
1138                now + Duration::new(10, 0),
1139                now + Duration::new(15, 0),
1140            ]
1141        );
1142    }
1143
1144    #[test]
1145    #[should_panic(expected = "runtime timeout")]
1146    fn test_timeout() {
1147        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1148        executor.start(|context| async move {
1149            loop {
1150                context.sleep(Duration::from_secs(1)).await;
1151            }
1152        });
1153    }
1154
1155    #[test]
1156    #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1157    fn test_bad_timeout() {
1158        let cfg = Config {
1159            timeout: Some(Duration::default()),
1160            cycle: Duration::default(),
1161            ..Config::default()
1162        };
1163        deterministic::Runner::new(cfg);
1164    }
1165
1166    #[test]
1167    fn test_recover_synced_storage_persists() {
1168        // Initialize the first runtime
1169        let executor1 = deterministic::Runner::default();
1170        let partition = "test_partition";
1171        let name = b"test_blob";
1172        let data = b"Hello, world!";
1173
1174        // Run some tasks, sync storage, and recover the runtime
1175        let (context, state) = executor1.start(|context| async move {
1176            let (blob, _) = context.open(partition, name).await.unwrap();
1177            blob.write_at(data, 0).await.unwrap();
1178            blob.sync().await.unwrap();
1179            let state = context.auditor().state();
1180            (context, state)
1181        });
1182        let recovered_context = context.recover();
1183
1184        // Verify auditor state is the same
1185        assert_eq!(state, recovered_context.auditor().state());
1186
1187        // Check that synced storage persists after recovery
1188        let executor = Runner::from(recovered_context);
1189        executor.start(|context| async move {
1190            let (blob, len) = context.open(partition, name).await.unwrap();
1191            assert_eq!(len, data.len() as u64);
1192            let mut buf = vec![0; data.len()];
1193            blob.read_at(&mut buf, 0).await.unwrap();
1194            assert_eq!(buf, data);
1195        });
1196    }
1197
1198    #[test]
1199    fn test_recover_unsynced_storage_does_not_persist() {
1200        // Initialize the first runtime
1201        let executor = deterministic::Runner::default();
1202        let partition = "test_partition";
1203        let name = b"test_blob";
1204        let data = b"Hello, world!".to_vec();
1205
1206        // Run some tasks without syncing storage
1207        let context = executor.start(|context| async move {
1208            let context = context.clone();
1209            let (blob, _) = context.open(partition, name).await.unwrap();
1210            blob.write_at(&data, 0).await.unwrap();
1211            // Intentionally do not call sync() here
1212            context
1213        });
1214
1215        // Recover the runtime
1216        let context = context.recover();
1217        let executor = Runner::from(context);
1218
1219        // Check that unsynced storage does not persist after recovery
1220        executor.start(|context| async move {
1221            let (_, len) = context.open(partition, name).await.unwrap();
1222            assert_eq!(len, 0);
1223        });
1224    }
1225
1226    #[test]
1227    #[should_panic(expected = "execution is not finished")]
1228    fn test_recover_before_finish_panics() {
1229        // Initialize runtime
1230        let executor = deterministic::Runner::default();
1231
1232        // Start runtime
1233        executor.start(|context| async move {
1234            // Attempt to recover before the runtime has finished
1235            context.recover();
1236        });
1237    }
1238
1239    #[test]
1240    #[should_panic(expected = "runtime has already been recovered")]
1241    fn test_recover_twice_panics() {
1242        // Initialize runtime
1243        let executor = deterministic::Runner::default();
1244
1245        // Finish runtime
1246        let context = executor.start(|context| async move { context });
1247
1248        // Recover for the first time
1249        let cloned_context = context.clone();
1250        context.recover();
1251
1252        // Attempt to recover again using the same context
1253        cloned_context.recover();
1254    }
1255
1256    #[test]
1257    fn test_default_time_zero() {
1258        // Initialize runtime
1259        let executor = deterministic::Runner::default();
1260
1261        executor.start(|context| async move {
1262            // Check that the time is zero
1263            assert_eq!(
1264                context.current().duration_since(UNIX_EPOCH).unwrap(),
1265                Duration::ZERO
1266            );
1267        });
1268    }
1269}