commonware_runtime/
deterministic.rs

1//! A deterministic runtime that randomly selects tasks to run based on a seed
2//!
3//! # Panics
4//!
5//! If any task panics, the runtime will panic (and shutdown).
6//!
7//! # Example
8//!
9//! ```rust
10//! use commonware_runtime::{Spawner, Runner, deterministic::Executor, Metrics};
11//!
12//! let (executor, context, auditor) = Executor::default();
13//! executor.start(async move {
14//!     println!("Parent started");
15//!     let result = context.with_label("child").spawn(|_| async move {
16//!         println!("Child started");
17//!         "hello"
18//!     });
19//!     println!("Child result: {:?}", result.await);
20//!     println!("Parent exited");
21//! });
22//! println!("Auditor state: {}", auditor.state());
23//! ```
24
25use crate::{
26    mocks, storage::audited::Storage as AuditedStorage, storage::memory::Storage as MemStorage,
27    storage::metered::Storage as MeteredStorage, utils::Signaler, Clock, Error, Handle, Signal,
28    METRICS_PREFIX,
29};
30use commonware_utils::{hex, SystemTimeExt};
31use futures::{
32    channel::mpsc,
33    task::{waker_ref, ArcWake},
34    SinkExt, StreamExt,
35};
36use governor::clock::{Clock as GClock, ReasonablyRealtime};
37use prometheus_client::{
38    encoding::{text::encode, EncodeLabelSet},
39    metrics::{counter::Counter, family::Family, gauge::Gauge},
40    registry::{Metric, Registry},
41};
42use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
43use sha2::{Digest, Sha256};
44use std::{
45    collections::{BinaryHeap, HashMap},
46    future::Future,
47    mem::replace,
48    net::{IpAddr, Ipv4Addr, SocketAddr},
49    ops::Range,
50    pin::Pin,
51    sync::{Arc, Mutex},
52    task::{self, Poll, Waker},
53    time::{Duration, SystemTime, UNIX_EPOCH},
54};
55use tracing::trace;
56
57/// Range of ephemeral ports assigned to dialers.
58const EPHEMERAL_PORT_RANGE: Range<u16> = 32768..61000;
59
60/// Map of names to blob contents.
61pub type Partition = HashMap<Vec<u8>, Vec<u8>>;
62
63#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
64struct Work {
65    label: String,
66}
67
68#[derive(Debug)]
69struct Metrics {
70    tasks_spawned: Family<Work, Counter>,
71    tasks_running: Family<Work, Gauge>,
72    blocking_tasks_spawned: Family<Work, Counter>,
73    blocking_tasks_running: Family<Work, Gauge>,
74    task_polls: Family<Work, Counter>,
75
76    network_bandwidth: Counter,
77}
78
79impl Metrics {
80    pub fn init(registry: &mut Registry) -> Self {
81        let metrics = Self {
82            task_polls: Family::default(),
83            tasks_spawned: Family::default(),
84            tasks_running: Family::default(),
85            blocking_tasks_spawned: Family::default(),
86            blocking_tasks_running: Family::default(),
87            network_bandwidth: Counter::default(),
88        };
89        registry.register(
90            "tasks_spawned",
91            "Total number of tasks spawned",
92            metrics.tasks_spawned.clone(),
93        );
94        registry.register(
95            "tasks_running",
96            "Number of tasks currently running",
97            metrics.tasks_running.clone(),
98        );
99        registry.register(
100            "blocking_tasks_spawned",
101            "Total number of blocking tasks spawned",
102            metrics.blocking_tasks_spawned.clone(),
103        );
104        registry.register(
105            "blocking_tasks_running",
106            "Number of blocking tasks currently running",
107            metrics.blocking_tasks_running.clone(),
108        );
109        registry.register(
110            "task_polls",
111            "Total number of task polls",
112            metrics.task_polls.clone(),
113        );
114        registry.register(
115            "bandwidth",
116            "Total amount of data sent over network",
117            metrics.network_bandwidth.clone(),
118        );
119        metrics
120    }
121}
122
123/// Track the state of the runtime for determinism auditing.
124pub struct Auditor {
125    hash: Mutex<Vec<u8>>,
126}
127
128impl Default for Auditor {
129    fn default() -> Self {
130        Self {
131            hash: Vec::new().into(),
132        }
133    }
134}
135
136impl Auditor {
137    fn process_task(&self, task: u128, label: &str) {
138        let mut hash = self.hash.lock().unwrap();
139        let mut hasher = Sha256::new();
140        hasher.update(&*hash);
141        hasher.update(b"process_task");
142        hasher.update(task.to_be_bytes());
143        hasher.update(label.as_bytes());
144        *hash = hasher.finalize().to_vec();
145    }
146
147    fn stop(&self, value: i32) {
148        let mut hash = self.hash.lock().unwrap();
149        let mut hasher = Sha256::new();
150        hasher.update(&*hash);
151        hasher.update(b"stop");
152        hasher.update(value.to_be_bytes());
153        *hash = hasher.finalize().to_vec();
154    }
155
156    fn stopped(&self) {
157        let mut hash = self.hash.lock().unwrap();
158        let mut hasher = Sha256::new();
159        hasher.update(&*hash);
160        hasher.update(b"stopped");
161        *hash = hasher.finalize().to_vec();
162    }
163
164    fn bind(&self, address: SocketAddr) {
165        let mut hash = self.hash.lock().unwrap();
166        let mut hasher = Sha256::new();
167        hasher.update(&*hash);
168        hasher.update(b"bind");
169        hasher.update(address.to_string().as_bytes());
170        *hash = hasher.finalize().to_vec();
171    }
172
173    fn dial(&self, dialer: SocketAddr, listener: SocketAddr) {
174        let mut hash = self.hash.lock().unwrap();
175        let mut hasher = Sha256::new();
176        hasher.update(&*hash);
177        hasher.update(b"dial");
178        hasher.update(dialer.to_string().as_bytes());
179        hasher.update(listener.to_string().as_bytes());
180        *hash = hasher.finalize().to_vec();
181    }
182
183    fn accept(&self, listener: SocketAddr, dialer: SocketAddr) {
184        let mut hash = self.hash.lock().unwrap();
185        let mut hasher = Sha256::new();
186        hasher.update(&*hash);
187        hasher.update(b"accept");
188        hasher.update(listener.to_string().as_bytes());
189        hasher.update(dialer.to_string().as_bytes());
190        *hash = hasher.finalize().to_vec();
191    }
192
193    fn send(&self, sender: SocketAddr, receiver: SocketAddr, message: &[u8]) {
194        let mut hash = self.hash.lock().unwrap();
195        let mut hasher = Sha256::new();
196        hasher.update(&*hash);
197        hasher.update(b"send");
198        hasher.update(sender.to_string().as_bytes());
199        hasher.update(receiver.to_string().as_bytes());
200        hasher.update(message);
201        *hash = hasher.finalize().to_vec();
202    }
203
204    fn recv(&self, receiver: SocketAddr, sender: SocketAddr, message: &[u8]) {
205        let mut hash = self.hash.lock().unwrap();
206        let mut hasher = Sha256::new();
207        hasher.update(&*hash);
208        hasher.update(b"recv");
209        hasher.update(receiver.to_string().as_bytes());
210        hasher.update(sender.to_string().as_bytes());
211        hasher.update(message);
212        *hash = hasher.finalize().to_vec();
213    }
214
215    fn rand(&self, method: String) {
216        let mut hash = self.hash.lock().unwrap();
217        let mut hasher = Sha256::new();
218        hasher.update(&*hash);
219        hasher.update(b"rand");
220        hasher.update(method.as_bytes());
221        *hash = hasher.finalize().to_vec();
222    }
223
224    pub(crate) fn open(&self, partition: &str, name: &[u8]) {
225        let mut hash = self.hash.lock().unwrap();
226        let mut hasher = Sha256::new();
227        hasher.update(&*hash);
228        hasher.update(b"open");
229        hasher.update(partition.as_bytes());
230        hasher.update(name);
231        *hash = hasher.finalize().to_vec();
232    }
233
234    pub(crate) fn remove(&self, partition: &str, name: Option<&[u8]>) {
235        let mut hash = self.hash.lock().unwrap();
236        let mut hasher = Sha256::new();
237        hasher.update(&*hash);
238        hasher.update(b"remove");
239        hasher.update(partition.as_bytes());
240        if let Some(name) = name {
241            hasher.update(name);
242        }
243        *hash = hasher.finalize().to_vec();
244    }
245
246    pub(crate) fn scan(&self, partition: &str) {
247        let mut hash = self.hash.lock().unwrap();
248        let mut hasher = Sha256::new();
249        hasher.update(&*hash);
250        hasher.update(b"scan");
251        hasher.update(partition.as_bytes());
252        *hash = hasher.finalize().to_vec();
253    }
254
255    pub(crate) fn len(&self, partition: &str, name: &[u8]) {
256        let mut hash = self.hash.lock().unwrap();
257        let mut hasher = Sha256::new();
258        hasher.update(&*hash);
259        hasher.update(b"len");
260        hasher.update(partition.as_bytes());
261        hasher.update(name);
262        *hash = hasher.finalize().to_vec();
263    }
264
265    pub(crate) fn read_at(&self, partition: &str, name: &[u8], buf: usize, offset: u64) {
266        let mut hash = self.hash.lock().unwrap();
267        let mut hasher = Sha256::new();
268        hasher.update(&*hash);
269        hasher.update(b"read_at");
270        hasher.update(partition.as_bytes());
271        hasher.update(name);
272        hasher.update(buf.to_be_bytes());
273        hasher.update(offset.to_be_bytes());
274        *hash = hasher.finalize().to_vec();
275    }
276
277    pub(crate) fn write_at(&self, partition: &str, name: &[u8], buf: &[u8], offset: u64) {
278        let mut hash = self.hash.lock().unwrap();
279        let mut hasher = Sha256::new();
280        hasher.update(&*hash);
281        hasher.update(b"write_at");
282        hasher.update(partition.as_bytes());
283        hasher.update(name);
284        hasher.update(buf);
285        hasher.update(offset.to_be_bytes());
286        *hash = hasher.finalize().to_vec();
287    }
288
289    pub(crate) fn truncate(&self, partition: &str, name: &[u8], size: u64) {
290        let mut hash = self.hash.lock().unwrap();
291        let mut hasher = Sha256::new();
292        hasher.update(&*hash);
293        hasher.update(b"truncate");
294        hasher.update(partition.as_bytes());
295        hasher.update(name);
296        hasher.update(size.to_be_bytes());
297        *hash = hasher.finalize().to_vec();
298    }
299
300    pub(crate) fn sync(&self, partition: &str, name: &[u8]) {
301        let mut hash = self.hash.lock().unwrap();
302        let mut hasher = Sha256::new();
303        hasher.update(&*hash);
304        hasher.update(b"sync");
305        hasher.update(partition.as_bytes());
306        hasher.update(name);
307        *hash = hasher.finalize().to_vec();
308    }
309
310    pub(crate) fn close(&self, partition: &str, name: &[u8]) {
311        let mut hash = self.hash.lock().unwrap();
312        let mut hasher = Sha256::new();
313        hasher.update(&*hash);
314        hasher.update(b"close");
315        hasher.update(partition.as_bytes());
316        hasher.update(name);
317        *hash = hasher.finalize().to_vec();
318    }
319
320    fn register(&self, name: &str, help: &str) {
321        let mut hash = self.hash.lock().unwrap();
322        let mut hasher = Sha256::new();
323        hasher.update(&*hash);
324        hasher.update(b"register");
325        hasher.update(name.as_bytes());
326        hasher.update(help.as_bytes());
327        *hash = hasher.finalize().to_vec();
328    }
329
330    fn encode(&self) {
331        let mut hash = self.hash.lock().unwrap();
332        let mut hasher = Sha256::new();
333        hasher.update(&*hash);
334        hasher.update(b"encode");
335        *hash = hasher.finalize().to_vec();
336    }
337
338    /// Generate a representation of the current state of the runtime.
339    ///
340    /// This can be used to ensure that logic running on top
341    /// of the runtime is interacting deterministically.
342    pub fn state(&self) -> String {
343        let hash = self.hash.lock().unwrap().clone();
344        hex(&hash)
345    }
346}
347
348/// Configuration for the `deterministic` runtime.
349#[derive(Clone)]
350pub struct Config {
351    /// Seed for the random number generator.
352    pub seed: u64,
353
354    /// The cycle duration determines how much time is advanced after each iteration of the event
355    /// loop. This is useful to prevent starvation if some task never yields.
356    pub cycle: Duration,
357
358    /// If the runtime is still executing at this point (i.e. a test hasn't stopped), panic.
359    pub timeout: Option<Duration>,
360}
361
362impl Default for Config {
363    fn default() -> Self {
364        Self {
365            seed: 42,
366            cycle: Duration::from_millis(1),
367            timeout: None,
368        }
369    }
370}
371
372/// Deterministic runtime that randomly selects tasks to run based on a seed.
373pub struct Executor {
374    registry: Mutex<Registry>,
375    cycle: Duration,
376    deadline: Option<SystemTime>,
377    metrics: Arc<Metrics>,
378    auditor: Arc<Auditor>,
379    rng: Mutex<StdRng>,
380    time: Mutex<SystemTime>,
381    tasks: Arc<Tasks>,
382    sleeping: Mutex<BinaryHeap<Alarm>>,
383    partitions: Mutex<HashMap<String, Partition>>,
384    signaler: Mutex<Signaler>,
385    signal: Signal,
386    finished: Mutex<bool>,
387    recovered: Mutex<bool>,
388}
389
390impl Executor {
391    /// Initialize a new `deterministic` runtime with the given seed and cycle duration.
392    pub fn init(cfg: Config) -> (Runner, Context, Arc<Auditor>) {
393        // Ensure config is valid
394        if cfg.timeout.is_some() && cfg.cycle == Duration::default() {
395            panic!("cycle duration must be non-zero when timeout is set");
396        }
397
398        // Create a new registry
399        let mut registry = Registry::default();
400        let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
401
402        // Initialize runtime
403        let metrics = Arc::new(Metrics::init(runtime_registry));
404        let auditor = Arc::new(Auditor::default());
405        let start_time = UNIX_EPOCH;
406        let deadline = cfg
407            .timeout
408            .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
409        let (signaler, signal) = Signaler::new();
410        let storage = MeteredStorage::new(
411            AuditedStorage::new(MemStorage::default(), auditor.clone()),
412            runtime_registry,
413        );
414        let executor = Arc::new(Self {
415            registry: Mutex::new(registry),
416            cycle: cfg.cycle,
417            deadline,
418            metrics: metrics.clone(),
419            auditor: auditor.clone(),
420            rng: Mutex::new(StdRng::seed_from_u64(cfg.seed)),
421            time: Mutex::new(start_time),
422            tasks: Arc::new(Tasks::new()),
423            sleeping: Mutex::new(BinaryHeap::new()),
424            partitions: Mutex::new(HashMap::new()),
425            signaler: Mutex::new(signaler),
426            signal,
427            finished: Mutex::new(false),
428            recovered: Mutex::new(false),
429        });
430        (
431            Runner {
432                executor: executor.clone(),
433            },
434            Context {
435                label: String::new(),
436                spawned: false,
437                executor,
438                networking: Arc::new(Networking::new(metrics, auditor.clone())),
439                storage,
440            },
441            auditor,
442        )
443    }
444
445    /// Initialize a new `deterministic` runtime with the default configuration
446    /// and the provided seed.
447    pub fn seeded(seed: u64) -> (Runner, Context, Arc<Auditor>) {
448        let cfg = Config {
449            seed,
450            ..Config::default()
451        };
452        Self::init(cfg)
453    }
454
455    /// Initialize a new `deterministic` runtime with the default configuration
456    /// but exit after the given timeout.
457    pub fn timed(timeout: Duration) -> (Runner, Context, Arc<Auditor>) {
458        let cfg = Config {
459            timeout: Some(timeout),
460            ..Config::default()
461        };
462        Self::init(cfg)
463    }
464
465    /// Initialize a new `deterministic` runtime with the default configuration.
466    // We'd love to implement the trait but we can't because of the return type.
467    #[allow(clippy::should_implement_trait)]
468    pub fn default() -> (Runner, Context, Arc<Auditor>) {
469        Self::init(Config::default())
470    }
471}
472
473/// The operation that a task is performing.
474enum Operation {
475    Root,
476    Work {
477        future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
478        completed: Mutex<bool>,
479    },
480}
481
482/// A task that is being executed by the runtime.
483struct Task {
484    id: u128,
485    label: String,
486    tasks: Arc<Tasks>,
487
488    operation: Operation,
489}
490
491impl ArcWake for Task {
492    fn wake_by_ref(arc_self: &Arc<Self>) {
493        arc_self.tasks.enqueue(arc_self.clone());
494    }
495}
496
497/// A task queue that is used to manage the tasks that are being executed by the runtime.
498struct Tasks {
499    /// The current task counter.
500    counter: Mutex<u128>,
501    /// The queue of tasks that are waiting to be executed.
502    queue: Mutex<Vec<Arc<Task>>>,
503    /// Indicates whether the root task has been registered.
504    root_registered: Mutex<bool>,
505}
506
507impl Tasks {
508    /// Create a new task queue.
509    fn new() -> Self {
510        Self {
511            counter: Mutex::new(0),
512            queue: Mutex::new(Vec::new()),
513            root_registered: Mutex::new(false),
514        }
515    }
516
517    /// Increment the task counter and return the old value.
518    fn increment(&self) -> u128 {
519        let mut counter = self.counter.lock().unwrap();
520        let old = *counter;
521        *counter = counter.checked_add(1).expect("task counter overflow");
522        old
523    }
524
525    /// Register the root task.
526    ///
527    /// If the root task has already been registered, this function will panic.
528    fn register_root(arc_self: &Arc<Self>) {
529        {
530            let mut registered = arc_self.root_registered.lock().unwrap();
531            assert!(!*registered, "root already registered");
532            *registered = true;
533        }
534        let id = arc_self.increment();
535        let mut queue = arc_self.queue.lock().unwrap();
536        queue.push(Arc::new(Task {
537            id,
538            label: String::new(),
539            tasks: arc_self.clone(),
540            operation: Operation::Root,
541        }));
542    }
543
544    /// Register a new task to be executed.
545    fn register_work(
546        arc_self: &Arc<Self>,
547        label: &str,
548        future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
549    ) {
550        let id = arc_self.increment();
551        let mut queue = arc_self.queue.lock().unwrap();
552        queue.push(Arc::new(Task {
553            id,
554            label: label.to_string(),
555            tasks: arc_self.clone(),
556            operation: Operation::Work {
557                future: Mutex::new(future),
558                completed: Mutex::new(false),
559            },
560        }));
561    }
562
563    /// Enqueue an already registered task to be executed.
564    fn enqueue(&self, task: Arc<Task>) {
565        let mut queue = self.queue.lock().unwrap();
566        queue.push(task);
567    }
568
569    /// Dequeue all tasks that are ready to execute.
570    fn drain(&self) -> Vec<Arc<Task>> {
571        let mut queue = self.queue.lock().unwrap();
572        let len = queue.len();
573        replace(&mut *queue, Vec::with_capacity(len))
574    }
575
576    /// Get the number of tasks in the queue.
577    fn len(&self) -> usize {
578        self.queue.lock().unwrap().len()
579    }
580}
581
582/// Implementation of [`crate::Runner`] for the `deterministic` runtime.
583pub struct Runner {
584    executor: Arc<Executor>,
585}
586
587impl crate::Runner for Runner {
588    fn start<F>(self, f: F) -> F::Output
589    where
590        F: Future,
591    {
592        // Pin root task to the heap
593        let mut root = Box::pin(f);
594
595        // Register the root task
596        Tasks::register_root(&self.executor.tasks);
597
598        // Process tasks until root task completes or progress stalls
599        let mut iter = 0;
600        loop {
601            // Ensure we have not exceeded our deadline
602            {
603                let current = self.executor.time.lock().unwrap();
604                if let Some(deadline) = self.executor.deadline {
605                    if *current >= deadline {
606                        panic!("runtime timeout");
607                    }
608                }
609            }
610
611            // Snapshot available tasks
612            let mut tasks = self.executor.tasks.drain();
613
614            // Shuffle tasks
615            {
616                let mut rng = self.executor.rng.lock().unwrap();
617                tasks.shuffle(&mut *rng);
618            }
619
620            // Run all snapshotted tasks
621            //
622            // This approach is more efficient than randomly selecting a task one-at-a-time
623            // because it ensures we don't pull the same pending task multiple times in a row (without
624            // processing a different task required for other tasks to make progress).
625            trace!(iter, tasks = tasks.len(), "starting loop");
626            for task in tasks {
627                // Record task for auditing
628                self.executor.auditor.process_task(task.id, &task.label);
629                trace!(id = task.id, "processing task");
630
631                // Record task poll
632                self.executor
633                    .metrics
634                    .task_polls
635                    .get_or_create(&Work {
636                        label: task.label.clone(),
637                    })
638                    .inc();
639
640                // Prepare task for polling
641                let waker = waker_ref(&task);
642                let mut cx = task::Context::from_waker(&waker);
643                match &task.operation {
644                    Operation::Root => {
645                        // Poll the root task
646                        if let Poll::Ready(output) = root.as_mut().poll(&mut cx) {
647                            trace!(id = task.id, "task is complete");
648                            *self.executor.finished.lock().unwrap() = true;
649                            return output;
650                        }
651                    }
652                    Operation::Work { future, completed } => {
653                        // If task is completed, skip it
654                        if *completed.lock().unwrap() {
655                            trace!(id = task.id, "dropping already complete task");
656                            continue;
657                        }
658
659                        // Poll the task
660                        let mut fut = future.lock().unwrap();
661                        if fut.as_mut().poll(&mut cx).is_ready() {
662                            trace!(id = task.id, "task is complete");
663                            *completed.lock().unwrap() = true;
664                            continue;
665                        }
666                    }
667                }
668
669                // Try again later if task is still pending
670                trace!(id = task.id, "task is still pending");
671            }
672
673            // Advance time by cycle
674            //
675            // This approach prevents starvation if some task never yields (to approximate this,
676            // duration can be set to 1ns).
677            let mut current;
678            {
679                let mut time = self.executor.time.lock().unwrap();
680                *time = time
681                    .checked_add(self.executor.cycle)
682                    .expect("executor time overflowed");
683                current = *time;
684            }
685            trace!(now = current.epoch_millis(), "time advanced");
686
687            // Skip time if there is nothing to do
688            if self.executor.tasks.len() == 0 {
689                let mut skip = None;
690                {
691                    let sleeping = self.executor.sleeping.lock().unwrap();
692                    if let Some(next) = sleeping.peek() {
693                        if next.time > current {
694                            skip = Some(next.time);
695                        }
696                    }
697                }
698                if skip.is_some() {
699                    {
700                        let mut time = self.executor.time.lock().unwrap();
701                        *time = skip.unwrap();
702                        current = *time;
703                    }
704                    trace!(now = current.epoch_millis(), "time skipped");
705                }
706            }
707
708            // Wake all sleeping tasks that are ready
709            let mut to_wake = Vec::new();
710            let mut remaining;
711            {
712                let mut sleeping = self.executor.sleeping.lock().unwrap();
713                while let Some(next) = sleeping.peek() {
714                    if next.time <= current {
715                        let sleeper = sleeping.pop().unwrap();
716                        to_wake.push(sleeper.waker);
717                    } else {
718                        break;
719                    }
720                }
721                remaining = sleeping.len();
722            }
723            for waker in to_wake {
724                waker.wake();
725            }
726
727            // Account for remaining tasks
728            remaining += self.executor.tasks.len();
729
730            // If there are no tasks to run and no tasks sleeping, the executor is stalled
731            // and will never finish.
732            if remaining == 0 {
733                panic!("runtime stalled");
734            }
735            iter += 1;
736        }
737    }
738}
739
740/// Implementation of [`crate::Spawner`], [`crate::Clock`],
741/// [`crate::Network`], and [`crate::Storage`] for the `deterministic`
742/// runtime.
743pub struct Context {
744    label: String,
745    spawned: bool,
746    executor: Arc<Executor>,
747    networking: Arc<Networking>,
748    storage: MeteredStorage<AuditedStorage<MemStorage>>,
749}
750
751impl Context {
752    /// Recover the inner state (deadline, metrics, auditor, rng, synced storage, etc.) from the
753    /// current runtime and use it to initialize a new instance of the runtime. A recovered runtime
754    /// does not inherit the current runtime's pending tasks, unsynced storage, network connections, nor
755    /// its shutdown signaler.
756    ///
757    /// This is useful for performing a deterministic simulation that spans multiple runtime instantiations,
758    /// like simulating unclean shutdown (which involves repeatedly halting the runtime at unexpected intervals).
759    ///
760    /// It is only permitted to call this method after the runtime has finished (i.e. once `start` returns)
761    /// and only permitted to do once (otherwise multiple recovered runtimes will share the same inner state).
762    /// If either one of these conditions is violated, this method will panic.
763    pub fn recover(self) -> (Runner, Self, Arc<Auditor>) {
764        // Ensure we are finished
765        if !*self.executor.finished.lock().unwrap() {
766            panic!("execution is not finished");
767        }
768
769        // Ensure runtime has not already been recovered
770        {
771            let mut recovered = self.executor.recovered.lock().unwrap();
772            if *recovered {
773                panic!("runtime has already been recovered");
774            }
775            *recovered = true;
776        }
777
778        // Rebuild metrics
779        let mut registry = Registry::default();
780        let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
781        let metrics = Arc::new(Metrics::init(runtime_registry));
782
783        // Copy state
784        let auditor = self.executor.auditor.clone();
785        let (signaler, signal) = Signaler::new();
786        let executor = Arc::new(Executor {
787            // Copied from the current runtime
788            cycle: self.executor.cycle,
789            deadline: self.executor.deadline,
790            auditor: auditor.clone(),
791            rng: Mutex::new(self.executor.rng.lock().unwrap().clone()),
792            time: Mutex::new(*self.executor.time.lock().unwrap()),
793            partitions: Mutex::new(self.executor.partitions.lock().unwrap().clone()),
794
795            // New state for the new runtime
796            registry: Mutex::new(registry),
797            metrics: metrics.clone(),
798            tasks: Arc::new(Tasks::new()),
799            sleeping: Mutex::new(BinaryHeap::new()),
800            signaler: Mutex::new(signaler),
801            signal,
802            finished: Mutex::new(false),
803            recovered: Mutex::new(false),
804        });
805        (
806            Runner {
807                executor: executor.clone(),
808            },
809            Self {
810                label: String::new(),
811                spawned: false,
812                executor,
813                networking: Arc::new(Networking::new(metrics, auditor.clone())),
814                storage: self.storage,
815            },
816            auditor,
817        )
818    }
819}
820
821impl Clone for Context {
822    fn clone(&self) -> Self {
823        Self {
824            label: self.label.clone(),
825            spawned: false,
826            executor: self.executor.clone(),
827            networking: self.networking.clone(),
828            storage: self.storage.clone(),
829        }
830    }
831}
832
833impl crate::Spawner for Context {
834    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
835    where
836        F: FnOnce(Self) -> Fut + Send + 'static,
837        Fut: Future<Output = T> + Send + 'static,
838        T: Send + 'static,
839    {
840        // Ensure a context only spawns one task
841        assert!(!self.spawned, "already spawned");
842
843        // Get metrics
844        let label = self.label.clone();
845        let work = Work {
846            label: label.clone(),
847        };
848        self.executor
849            .metrics
850            .tasks_spawned
851            .get_or_create(&work)
852            .inc();
853        let gauge = self
854            .executor
855            .metrics
856            .tasks_running
857            .get_or_create(&work)
858            .clone();
859
860        // Set up the task
861        let executor = self.executor.clone();
862        let future = f(self);
863        let (f, handle) = Handle::init(future, gauge, false);
864
865        // Spawn the task
866        Tasks::register_work(&executor.tasks, &label, Box::pin(f));
867        handle
868    }
869
870    fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
871    where
872        F: Future<Output = T> + Send + 'static,
873        T: Send + 'static,
874    {
875        // Ensure a context only spawns one task
876        assert!(!self.spawned, "already spawned");
877        self.spawned = true;
878
879        // Get metrics
880        let work = Work {
881            label: self.label.clone(),
882        };
883        self.executor
884            .metrics
885            .tasks_spawned
886            .get_or_create(&work)
887            .inc();
888        let gauge = self
889            .executor
890            .metrics
891            .tasks_running
892            .get_or_create(&work)
893            .clone();
894
895        // Set up the task
896        let label = self.label.clone();
897        let executor = self.executor.clone();
898        move |f: F| {
899            let (f, handle) = Handle::init(f, gauge, false);
900
901            // Spawn the task
902            Tasks::register_work(&executor.tasks, &label, Box::pin(f));
903            handle
904        }
905    }
906
907    fn spawn_blocking<F, T>(self, f: F) -> Handle<T>
908    where
909        F: FnOnce() -> T + Send + 'static,
910        T: Send + 'static,
911    {
912        // Ensure a context only spawns one task
913        assert!(!self.spawned, "already spawned");
914
915        // Get metrics
916        let work = Work {
917            label: self.label.clone(),
918        };
919        self.executor
920            .metrics
921            .blocking_tasks_spawned
922            .get_or_create(&work)
923            .inc();
924        let gauge = self
925            .executor
926            .metrics
927            .blocking_tasks_running
928            .get_or_create(&work)
929            .clone();
930
931        // Initialize the blocking task
932        let (f, handle) = Handle::init_blocking(f, gauge, false);
933
934        // Spawn the task
935        let f = async move { f() };
936        Tasks::register_work(&self.executor.tasks, &self.label, Box::pin(f));
937        handle
938    }
939
940    fn stop(&self, value: i32) {
941        self.executor.auditor.stop(value);
942        self.executor.signaler.lock().unwrap().signal(value);
943    }
944
945    fn stopped(&self) -> Signal {
946        self.executor.auditor.stopped();
947        self.executor.signal.clone()
948    }
949}
950
951impl crate::Metrics for Context {
952    fn with_label(&self, label: &str) -> Self {
953        let label = {
954            let prefix = self.label.clone();
955            if prefix.is_empty() {
956                label.to_string()
957            } else {
958                format!("{}_{}", prefix, label)
959            }
960        };
961        assert!(
962            !label.starts_with(METRICS_PREFIX),
963            "using runtime label is not allowed"
964        );
965        Self {
966            label,
967            spawned: false,
968            executor: self.executor.clone(),
969            networking: self.networking.clone(),
970            storage: self.storage.clone(),
971        }
972    }
973
974    fn label(&self) -> String {
975        self.label.clone()
976    }
977
978    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
979        // Prepare args
980        let name = name.into();
981        let help = help.into();
982
983        // Register metric
984        self.executor.auditor.register(&name, &help);
985        let prefixed_name = {
986            let prefix = &self.label;
987            if prefix.is_empty() {
988                name
989            } else {
990                format!("{}_{}", *prefix, name)
991            }
992        };
993        self.executor
994            .registry
995            .lock()
996            .unwrap()
997            .register(prefixed_name, help, metric)
998    }
999
1000    fn encode(&self) -> String {
1001        self.executor.auditor.encode();
1002        let mut buffer = String::new();
1003        encode(&mut buffer, &self.executor.registry.lock().unwrap()).expect("encoding failed");
1004        buffer
1005    }
1006}
1007
1008struct Sleeper {
1009    executor: Arc<Executor>,
1010    time: SystemTime,
1011    registered: bool,
1012}
1013
1014struct Alarm {
1015    time: SystemTime,
1016    waker: Waker,
1017}
1018
1019impl PartialEq for Alarm {
1020    fn eq(&self, other: &Self) -> bool {
1021        self.time.eq(&other.time)
1022    }
1023}
1024
1025impl Eq for Alarm {}
1026
1027impl PartialOrd for Alarm {
1028    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1029        Some(self.cmp(other))
1030    }
1031}
1032
1033impl Ord for Alarm {
1034    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1035        // Reverse the ordering for min-heap
1036        other.time.cmp(&self.time)
1037    }
1038}
1039
1040impl Future for Sleeper {
1041    type Output = ();
1042
1043    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1044        {
1045            let current_time = *self.executor.time.lock().unwrap();
1046            if current_time >= self.time {
1047                return Poll::Ready(());
1048            }
1049        }
1050        if !self.registered {
1051            self.registered = true;
1052            self.executor.sleeping.lock().unwrap().push(Alarm {
1053                time: self.time,
1054                waker: cx.waker().clone(),
1055            });
1056        }
1057        Poll::Pending
1058    }
1059}
1060
1061impl Clock for Context {
1062    fn current(&self) -> SystemTime {
1063        *self.executor.time.lock().unwrap()
1064    }
1065
1066    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
1067        let deadline = self
1068            .current()
1069            .checked_add(duration)
1070            .expect("overflow when setting wake time");
1071        self.sleep_until(deadline)
1072    }
1073
1074    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
1075        Sleeper {
1076            executor: self.executor.clone(),
1077
1078            time: deadline,
1079            registered: false,
1080        }
1081    }
1082}
1083
1084impl GClock for Context {
1085    type Instant = SystemTime;
1086
1087    fn now(&self) -> Self::Instant {
1088        self.current()
1089    }
1090}
1091
1092impl ReasonablyRealtime for Context {}
1093
1094type Dialable = mpsc::UnboundedSender<(
1095    SocketAddr,
1096    mocks::Sink,   // Listener -> Dialer
1097    mocks::Stream, // Dialer -> Listener
1098)>;
1099
1100/// Implementation of [`crate::Network`] for the `deterministic` runtime.
1101///
1102/// When a dialer connects to a listener, the listener is given a new ephemeral port
1103/// from the range `32768..61000`. To keep things simple, it is not possible to
1104/// bind to an ephemeral port. Likewise, if ports are not reused and when exhausted,
1105/// the runtime will panic.
1106struct Networking {
1107    metrics: Arc<Metrics>,
1108    auditor: Arc<Auditor>,
1109    ephemeral: Mutex<u16>,
1110    listeners: Mutex<HashMap<SocketAddr, Dialable>>,
1111}
1112
1113impl Networking {
1114    fn new(metrics: Arc<Metrics>, auditor: Arc<Auditor>) -> Self {
1115        Self {
1116            metrics,
1117            auditor,
1118            ephemeral: Mutex::new(EPHEMERAL_PORT_RANGE.start),
1119            listeners: Mutex::new(HashMap::new()),
1120        }
1121    }
1122
1123    fn bind(&self, socket: SocketAddr) -> Result<Listener, Error> {
1124        self.auditor.bind(socket);
1125
1126        // If the IP is localhost, ensure the port is not in the ephemeral range
1127        // so that it can be used for binding in the dial method
1128        if socket.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST)
1129            && EPHEMERAL_PORT_RANGE.contains(&socket.port())
1130        {
1131            return Err(Error::BindFailed);
1132        }
1133
1134        // Ensure the port is not already bound
1135        let mut listeners = self.listeners.lock().unwrap();
1136        if listeners.contains_key(&socket) {
1137            return Err(Error::BindFailed);
1138        }
1139
1140        // Bind the socket
1141        let (sender, receiver) = mpsc::unbounded();
1142        listeners.insert(socket, sender);
1143        Ok(Listener {
1144            auditor: self.auditor.clone(),
1145            address: socket,
1146            listener: receiver,
1147            metrics: self.metrics.clone(),
1148        })
1149    }
1150
1151    async fn dial(&self, socket: SocketAddr) -> Result<(Sink, Stream), Error> {
1152        // Assign dialer a port from the ephemeral range
1153        let dialer = {
1154            let mut ephemeral = self.ephemeral.lock().unwrap();
1155            let dialer = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), *ephemeral);
1156            *ephemeral = ephemeral
1157                .checked_add(1)
1158                .expect("ephemeral port range exhausted");
1159            dialer
1160        };
1161        self.auditor.dial(dialer, socket);
1162
1163        // Get listener
1164        let mut sender = {
1165            let listeners = self.listeners.lock().unwrap();
1166            let sender = listeners.get(&socket).ok_or(Error::ConnectionFailed)?;
1167            sender.clone()
1168        };
1169
1170        // Construct connection
1171        let (dialer_sender, dialer_receiver) = mocks::Channel::init();
1172        let (listener_sender, listener_receiver) = mocks::Channel::init();
1173        sender
1174            .send((dialer, dialer_sender, listener_receiver))
1175            .await
1176            .map_err(|_| Error::ConnectionFailed)?;
1177        Ok((
1178            Sink {
1179                metrics: self.metrics.clone(),
1180                auditor: self.auditor.clone(),
1181                me: dialer,
1182                peer: socket,
1183                sender: listener_sender,
1184            },
1185            Stream {
1186                auditor: self.auditor.clone(),
1187                me: dialer,
1188                peer: socket,
1189                receiver: dialer_receiver,
1190            },
1191        ))
1192    }
1193}
1194
1195impl crate::Network<Listener, Sink, Stream> for Context {
1196    async fn bind(&self, socket: SocketAddr) -> Result<Listener, Error> {
1197        self.networking.bind(socket)
1198    }
1199
1200    async fn dial(&self, socket: SocketAddr) -> Result<(Sink, Stream), Error> {
1201        self.networking.dial(socket).await
1202    }
1203}
1204
1205/// Implementation of [`crate::Listener`] for the `deterministic` runtime.
1206pub struct Listener {
1207    metrics: Arc<Metrics>,
1208    auditor: Arc<Auditor>,
1209    address: SocketAddr,
1210    listener: mpsc::UnboundedReceiver<(SocketAddr, mocks::Sink, mocks::Stream)>,
1211}
1212
1213impl crate::Listener<Sink, Stream> for Listener {
1214    async fn accept(&mut self) -> Result<(SocketAddr, Sink, Stream), Error> {
1215        let (socket, sender, receiver) = self.listener.next().await.ok_or(Error::ReadFailed)?;
1216        self.auditor.accept(self.address, socket);
1217        Ok((
1218            socket,
1219            Sink {
1220                metrics: self.metrics.clone(),
1221                auditor: self.auditor.clone(),
1222                me: self.address,
1223                peer: socket,
1224                sender,
1225            },
1226            Stream {
1227                auditor: self.auditor.clone(),
1228                me: self.address,
1229                peer: socket,
1230                receiver,
1231            },
1232        ))
1233    }
1234}
1235
1236/// Implementation of [`crate::Sink`] for the `deterministic` runtime.
1237pub struct Sink {
1238    metrics: Arc<Metrics>,
1239    auditor: Arc<Auditor>,
1240    me: SocketAddr,
1241    peer: SocketAddr,
1242    sender: mocks::Sink,
1243}
1244
1245impl crate::Sink for Sink {
1246    async fn send(&mut self, msg: &[u8]) -> Result<(), Error> {
1247        self.auditor.send(self.me, self.peer, msg);
1248        self.sender.send(msg).await.map_err(|_| Error::SendFailed)?;
1249        self.metrics.network_bandwidth.inc_by(msg.len() as u64);
1250        Ok(())
1251    }
1252}
1253
1254/// Implementation of [`crate::Stream`] for the `deterministic` runtime.
1255pub struct Stream {
1256    auditor: Arc<Auditor>,
1257    me: SocketAddr,
1258    peer: SocketAddr,
1259    receiver: mocks::Stream,
1260}
1261
1262impl crate::Stream for Stream {
1263    async fn recv(&mut self, buf: &mut [u8]) -> Result<(), Error> {
1264        self.receiver
1265            .recv(buf)
1266            .await
1267            .map_err(|_| Error::RecvFailed)?;
1268        self.auditor.recv(self.me, self.peer, buf);
1269        Ok(())
1270    }
1271}
1272
1273impl RngCore for Context {
1274    fn next_u32(&mut self) -> u32 {
1275        self.executor.auditor.rand("next_u32".to_string());
1276        self.executor.rng.lock().unwrap().next_u32()
1277    }
1278
1279    fn next_u64(&mut self) -> u64 {
1280        self.executor.auditor.rand("next_u64".to_string());
1281        self.executor.rng.lock().unwrap().next_u64()
1282    }
1283
1284    fn fill_bytes(&mut self, dest: &mut [u8]) {
1285        self.executor.auditor.rand("fill_bytes".to_string());
1286        self.executor.rng.lock().unwrap().fill_bytes(dest)
1287    }
1288
1289    fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1290        self.executor.auditor.rand("try_fill_bytes".to_string());
1291        self.executor.rng.lock().unwrap().try_fill_bytes(dest)
1292    }
1293}
1294
1295impl CryptoRng for Context {}
1296
1297impl crate::Storage for Context {
1298    type Blob = <MeteredStorage<AuditedStorage<MemStorage>> as crate::Storage>::Blob;
1299
1300    async fn open(&self, partition: &str, name: &[u8]) -> Result<Self::Blob, Error> {
1301        self.storage.open(partition, name).await
1302    }
1303
1304    async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1305        self.storage.remove(partition, name).await
1306    }
1307
1308    async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1309        self.storage.scan(partition).await
1310    }
1311}
1312
1313#[cfg(test)]
1314mod tests {
1315    use super::*;
1316    use crate::{utils::run_tasks, Blob, Runner, Storage};
1317    use commonware_macros::test_traced;
1318    use futures::task::noop_waker;
1319
1320    fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1321        let (executor, context, auditor) = Executor::seeded(seed);
1322        let messages = run_tasks(5, executor, context);
1323        (auditor.state(), messages)
1324    }
1325
1326    #[test]
1327    fn test_same_seed_same_order() {
1328        // Generate initial outputs
1329        let mut outputs = Vec::new();
1330        for seed in 0..1000 {
1331            let output = run_with_seed(seed);
1332            outputs.push(output);
1333        }
1334
1335        // Ensure they match
1336        for seed in 0..1000 {
1337            let output = run_with_seed(seed);
1338            assert_eq!(output, outputs[seed as usize]);
1339        }
1340    }
1341
1342    #[test_traced("TRACE")]
1343    fn test_different_seeds_different_order() {
1344        let output1 = run_with_seed(12345);
1345        let output2 = run_with_seed(54321);
1346        assert_ne!(output1, output2);
1347    }
1348
1349    #[test]
1350    fn test_alarm_min_heap() {
1351        // Populate heap
1352        let now = SystemTime::now();
1353        let alarms = vec![
1354            Alarm {
1355                time: now + Duration::new(10, 0),
1356                waker: noop_waker(),
1357            },
1358            Alarm {
1359                time: now + Duration::new(5, 0),
1360                waker: noop_waker(),
1361            },
1362            Alarm {
1363                time: now + Duration::new(15, 0),
1364                waker: noop_waker(),
1365            },
1366            Alarm {
1367                time: now + Duration::new(5, 0),
1368                waker: noop_waker(),
1369            },
1370        ];
1371        let mut heap = BinaryHeap::new();
1372        for alarm in alarms {
1373            heap.push(alarm);
1374        }
1375
1376        // Verify min-heap
1377        let mut sorted_times = Vec::new();
1378        while let Some(alarm) = heap.pop() {
1379            sorted_times.push(alarm.time);
1380        }
1381        assert_eq!(
1382            sorted_times,
1383            vec![
1384                now + Duration::new(5, 0),
1385                now + Duration::new(5, 0),
1386                now + Duration::new(10, 0),
1387                now + Duration::new(15, 0),
1388            ]
1389        );
1390    }
1391
1392    #[test]
1393    #[should_panic(expected = "runtime timeout")]
1394    fn test_timeout() {
1395        let (executor, context, _) = Executor::timed(Duration::from_secs(10));
1396        executor.start(async move {
1397            loop {
1398                context.sleep(Duration::from_secs(1)).await;
1399            }
1400        });
1401    }
1402
1403    #[test]
1404    #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1405    fn test_bad_timeout() {
1406        let cfg = Config {
1407            timeout: Some(Duration::default()),
1408            cycle: Duration::default(),
1409            ..Config::default()
1410        };
1411        Executor::init(cfg);
1412    }
1413
1414    #[test]
1415    fn test_recover_synced_storage_persists() {
1416        // Initialize the first runtime
1417        let (executor1, context1, auditor1) = Executor::default();
1418        let partition = "test_partition";
1419        let name = b"test_blob";
1420        let data = b"Hello, world!".to_vec();
1421
1422        // Run some tasks and sync storage
1423        executor1.start({
1424            let context = context1.clone();
1425            let data = data.clone();
1426            async move {
1427                let blob = context.open(partition, name).await.unwrap();
1428                blob.write_at(&data, 0).await.unwrap();
1429                blob.sync().await.unwrap();
1430            }
1431        });
1432        let state1 = auditor1.state();
1433
1434        // Recover the runtime
1435        let (executor2, context2, auditor2) = context1.recover();
1436
1437        // Verify auditor state is the same
1438        let state2 = auditor2.state();
1439        assert_eq!(state1, state2);
1440
1441        // Check that synced storage persists after recovery
1442        executor2.start(async move {
1443            let blob = context2.open(partition, name).await.unwrap();
1444            let len = blob.len().await.unwrap();
1445            assert_eq!(len, data.len() as u64);
1446            let mut buf = vec![0; len as usize];
1447            blob.read_at(&mut buf, 0).await.unwrap();
1448            assert_eq!(buf, data);
1449        });
1450    }
1451
1452    #[test]
1453    fn test_recover_unsynced_storage_does_not_persist() {
1454        // Initialize the first runtime
1455        let (executor1, context1, _) = Executor::default();
1456        let partition = "test_partition";
1457        let name = b"test_blob";
1458        let data = b"Hello, world!".to_vec();
1459
1460        // Run some tasks without syncing storage
1461        executor1.start({
1462            let context = context1.clone();
1463            async move {
1464                let blob = context.open(partition, name).await.unwrap();
1465                blob.write_at(&data, 0).await.unwrap();
1466                // Intentionally do not call sync() here
1467            }
1468        });
1469
1470        // Recover the runtime
1471        let (executor2, context2, _) = context1.recover();
1472
1473        // Check that unsynced storage does not persist after recovery
1474        executor2.start(async move {
1475            let blob = context2.open(partition, name).await.unwrap();
1476            let len = blob.len().await.unwrap();
1477            assert_eq!(len, 0);
1478        });
1479    }
1480
1481    #[test]
1482    #[should_panic(expected = "execution is not finished")]
1483    fn test_recover_before_finish_panics() {
1484        // Initialize runtime
1485        let (_, context, _) = Executor::default();
1486
1487        // Attempt to recover before the runtime has finished
1488        context.recover();
1489    }
1490
1491    #[test]
1492    #[should_panic(expected = "runtime has already been recovered")]
1493    fn test_recover_twice_panics() {
1494        // Initialize runtime
1495        let (executor, context, _) = Executor::default();
1496
1497        // Finish runtime
1498        executor.start(async move {});
1499
1500        // Recover for the first time
1501        let cloned_context = context.clone();
1502        context.recover();
1503
1504        // Attempt to recover again using the same context
1505        cloned_context.recover();
1506    }
1507}