commonware_runtime/
deterministic.rs

1//! A deterministic runtime that randomly selects tasks to run based on a seed
2//!
3//! # Panics
4//!
5//! If any task panics, the runtime will panic (and shutdown).
6//!
7//! # Example
8//!
9//! ```rust
10//! use commonware_runtime::{Spawner, Runner, deterministic, Metrics};
11//!
12//! let executor =  deterministic::Runner::default();
13//! executor.start(|context| async move {
14//!     println!("Parent started");
15//!     let result = context.with_label("child").spawn(|_| async move {
16//!         println!("Child started");
17//!         "hello"
18//!     });
19//!     println!("Child result: {:?}", result.await);
20//!     println!("Parent exited");
21//!     println!("Auditor state: {}", context.auditor().state());
22//! });
23//! ```
24
25use crate::{
26    mocks,
27    storage::{
28        audited::Storage as AuditedStorage, memory::Storage as MemStorage,
29        metered::Storage as MeteredStorage,
30    },
31    utils::Signaler,
32    Clock, Error, Handle, Signal, METRICS_PREFIX,
33};
34use commonware_utils::{hex, SystemTimeExt};
35use futures::{
36    channel::mpsc,
37    task::{waker_ref, ArcWake},
38    Future, SinkExt, StreamExt,
39};
40use governor::clock::{Clock as GClock, ReasonablyRealtime};
41use prometheus_client::{
42    encoding::{text::encode, EncodeLabelSet},
43    metrics::{counter::Counter, family::Family, gauge::Gauge},
44    registry::{Metric, Registry},
45};
46use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
47use sha2::{Digest, Sha256};
48use std::{
49    collections::{BinaryHeap, HashMap},
50    mem::replace,
51    net::{IpAddr, Ipv4Addr, SocketAddr},
52    ops::Range,
53    pin::Pin,
54    sync::{Arc, Mutex},
55    task::{self, Poll, Waker},
56    time::{Duration, SystemTime, UNIX_EPOCH},
57};
58use tracing::trace;
59
60/// Range of ephemeral ports assigned to dialers.
61const EPHEMERAL_PORT_RANGE: Range<u16> = 32768..61000;
62
63/// Map of names to blob contents.
64pub type Partition = HashMap<Vec<u8>, Vec<u8>>;
65
66#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
67struct Work {
68    label: String,
69}
70
71#[derive(Debug)]
72struct Metrics {
73    tasks_spawned: Family<Work, Counter>,
74    tasks_running: Family<Work, Gauge>,
75    blocking_tasks_spawned: Family<Work, Counter>,
76    blocking_tasks_running: Family<Work, Gauge>,
77    task_polls: Family<Work, Counter>,
78
79    network_bandwidth: Counter,
80}
81
82impl Metrics {
83    pub fn init(registry: &mut Registry) -> Self {
84        let metrics = Self {
85            task_polls: Family::default(),
86            tasks_spawned: Family::default(),
87            tasks_running: Family::default(),
88            blocking_tasks_spawned: Family::default(),
89            blocking_tasks_running: Family::default(),
90            network_bandwidth: Counter::default(),
91        };
92        registry.register(
93            "tasks_spawned",
94            "Total number of tasks spawned",
95            metrics.tasks_spawned.clone(),
96        );
97        registry.register(
98            "tasks_running",
99            "Number of tasks currently running",
100            metrics.tasks_running.clone(),
101        );
102        registry.register(
103            "blocking_tasks_spawned",
104            "Total number of blocking tasks spawned",
105            metrics.blocking_tasks_spawned.clone(),
106        );
107        registry.register(
108            "blocking_tasks_running",
109            "Number of blocking tasks currently running",
110            metrics.blocking_tasks_running.clone(),
111        );
112        registry.register(
113            "task_polls",
114            "Total number of task polls",
115            metrics.task_polls.clone(),
116        );
117        registry.register(
118            "bandwidth",
119            "Total amount of data sent over network",
120            metrics.network_bandwidth.clone(),
121        );
122        metrics
123    }
124}
125
126/// Track the state of the runtime for determinism auditing.
127pub struct Auditor {
128    hash: Mutex<Vec<u8>>,
129}
130
131impl Default for Auditor {
132    fn default() -> Self {
133        Self {
134            hash: Vec::new().into(),
135        }
136    }
137}
138
139impl Auditor {
140    fn process_task(&self, task: u128, label: &str) {
141        let mut hash = self.hash.lock().unwrap();
142        let mut hasher = Sha256::new();
143        hasher.update(&*hash);
144        hasher.update(b"process_task");
145        hasher.update(task.to_be_bytes());
146        hasher.update(label.as_bytes());
147        *hash = hasher.finalize().to_vec();
148    }
149
150    fn stop(&self, value: i32) {
151        let mut hash = self.hash.lock().unwrap();
152        let mut hasher = Sha256::new();
153        hasher.update(&*hash);
154        hasher.update(b"stop");
155        hasher.update(value.to_be_bytes());
156        *hash = hasher.finalize().to_vec();
157    }
158
159    fn stopped(&self) {
160        let mut hash = self.hash.lock().unwrap();
161        let mut hasher = Sha256::new();
162        hasher.update(&*hash);
163        hasher.update(b"stopped");
164        *hash = hasher.finalize().to_vec();
165    }
166
167    fn bind(&self, address: SocketAddr) {
168        let mut hash = self.hash.lock().unwrap();
169        let mut hasher = Sha256::new();
170        hasher.update(&*hash);
171        hasher.update(b"bind");
172        hasher.update(address.to_string().as_bytes());
173        *hash = hasher.finalize().to_vec();
174    }
175
176    fn dial(&self, dialer: SocketAddr, listener: SocketAddr) {
177        let mut hash = self.hash.lock().unwrap();
178        let mut hasher = Sha256::new();
179        hasher.update(&*hash);
180        hasher.update(b"dial");
181        hasher.update(dialer.to_string().as_bytes());
182        hasher.update(listener.to_string().as_bytes());
183        *hash = hasher.finalize().to_vec();
184    }
185
186    fn accept(&self, listener: SocketAddr, dialer: SocketAddr) {
187        let mut hash = self.hash.lock().unwrap();
188        let mut hasher = Sha256::new();
189        hasher.update(&*hash);
190        hasher.update(b"accept");
191        hasher.update(listener.to_string().as_bytes());
192        hasher.update(dialer.to_string().as_bytes());
193        *hash = hasher.finalize().to_vec();
194    }
195
196    fn send(&self, sender: SocketAddr, receiver: SocketAddr, message: &[u8]) {
197        let mut hash = self.hash.lock().unwrap();
198        let mut hasher = Sha256::new();
199        hasher.update(&*hash);
200        hasher.update(b"send");
201        hasher.update(sender.to_string().as_bytes());
202        hasher.update(receiver.to_string().as_bytes());
203        hasher.update(message);
204        *hash = hasher.finalize().to_vec();
205    }
206
207    fn recv(&self, receiver: SocketAddr, sender: SocketAddr, message: &[u8]) {
208        let mut hash = self.hash.lock().unwrap();
209        let mut hasher = Sha256::new();
210        hasher.update(&*hash);
211        hasher.update(b"recv");
212        hasher.update(receiver.to_string().as_bytes());
213        hasher.update(sender.to_string().as_bytes());
214        hasher.update(message);
215        *hash = hasher.finalize().to_vec();
216    }
217
218    fn rand(&self, method: String) {
219        let mut hash = self.hash.lock().unwrap();
220        let mut hasher = Sha256::new();
221        hasher.update(&*hash);
222        hasher.update(b"rand");
223        hasher.update(method.as_bytes());
224        *hash = hasher.finalize().to_vec();
225    }
226
227    pub(crate) fn open(&self, partition: &str, name: &[u8]) {
228        let mut hash = self.hash.lock().unwrap();
229        let mut hasher = Sha256::new();
230        hasher.update(&*hash);
231        hasher.update(b"open");
232        hasher.update(partition.as_bytes());
233        hasher.update(name);
234        *hash = hasher.finalize().to_vec();
235    }
236
237    pub(crate) fn remove(&self, partition: &str, name: Option<&[u8]>) {
238        let mut hash = self.hash.lock().unwrap();
239        let mut hasher = Sha256::new();
240        hasher.update(&*hash);
241        hasher.update(b"remove");
242        hasher.update(partition.as_bytes());
243        if let Some(name) = name {
244            hasher.update(name);
245        }
246        *hash = hasher.finalize().to_vec();
247    }
248
249    pub(crate) fn scan(&self, partition: &str) {
250        let mut hash = self.hash.lock().unwrap();
251        let mut hasher = Sha256::new();
252        hasher.update(&*hash);
253        hasher.update(b"scan");
254        hasher.update(partition.as_bytes());
255        *hash = hasher.finalize().to_vec();
256    }
257
258    pub(crate) fn read_at(&self, partition: &str, name: &[u8], buf: usize, offset: u64) {
259        let mut hash = self.hash.lock().unwrap();
260        let mut hasher = Sha256::new();
261        hasher.update(&*hash);
262        hasher.update(b"read_at");
263        hasher.update(partition.as_bytes());
264        hasher.update(name);
265        hasher.update(buf.to_be_bytes());
266        hasher.update(offset.to_be_bytes());
267        *hash = hasher.finalize().to_vec();
268    }
269
270    pub(crate) fn write_at(&self, partition: &str, name: &[u8], buf: &[u8], offset: u64) {
271        let mut hash = self.hash.lock().unwrap();
272        let mut hasher = Sha256::new();
273        hasher.update(&*hash);
274        hasher.update(b"write_at");
275        hasher.update(partition.as_bytes());
276        hasher.update(name);
277        hasher.update(buf);
278        hasher.update(offset.to_be_bytes());
279        *hash = hasher.finalize().to_vec();
280    }
281
282    pub(crate) fn truncate(&self, partition: &str, name: &[u8], size: u64) {
283        let mut hash = self.hash.lock().unwrap();
284        let mut hasher = Sha256::new();
285        hasher.update(&*hash);
286        hasher.update(b"truncate");
287        hasher.update(partition.as_bytes());
288        hasher.update(name);
289        hasher.update(size.to_be_bytes());
290        *hash = hasher.finalize().to_vec();
291    }
292
293    pub(crate) fn sync(&self, partition: &str, name: &[u8]) {
294        let mut hash = self.hash.lock().unwrap();
295        let mut hasher = Sha256::new();
296        hasher.update(&*hash);
297        hasher.update(b"sync");
298        hasher.update(partition.as_bytes());
299        hasher.update(name);
300        *hash = hasher.finalize().to_vec();
301    }
302
303    pub(crate) fn close(&self, partition: &str, name: &[u8]) {
304        let mut hash = self.hash.lock().unwrap();
305        let mut hasher = Sha256::new();
306        hasher.update(&*hash);
307        hasher.update(b"close");
308        hasher.update(partition.as_bytes());
309        hasher.update(name);
310        *hash = hasher.finalize().to_vec();
311    }
312
313    fn register(&self, name: &str, help: &str) {
314        let mut hash = self.hash.lock().unwrap();
315        let mut hasher = Sha256::new();
316        hasher.update(&*hash);
317        hasher.update(b"register");
318        hasher.update(name.as_bytes());
319        hasher.update(help.as_bytes());
320        *hash = hasher.finalize().to_vec();
321    }
322
323    fn encode(&self) {
324        let mut hash = self.hash.lock().unwrap();
325        let mut hasher = Sha256::new();
326        hasher.update(&*hash);
327        hasher.update(b"encode");
328        *hash = hasher.finalize().to_vec();
329    }
330
331    /// Generate a representation of the current state of the runtime.
332    ///
333    /// This can be used to ensure that logic running on top
334    /// of the runtime is interacting deterministically.
335    pub fn state(&self) -> String {
336        let hash = self.hash.lock().unwrap().clone();
337        hex(&hash)
338    }
339}
340
341/// Configuration for the `deterministic` runtime.
342#[derive(Clone)]
343pub struct Config {
344    /// Seed for the random number generator.
345    pub seed: u64,
346
347    /// The cycle duration determines how much time is advanced after each iteration of the event
348    /// loop. This is useful to prevent starvation if some task never yields.
349    pub cycle: Duration,
350
351    /// If the runtime is still executing at this point (i.e. a test hasn't stopped), panic.
352    pub timeout: Option<Duration>,
353}
354
355impl Config {
356    pub fn assert(&self) {
357        assert!(
358            self.cycle != Duration::default() || self.timeout.is_none(),
359            "cycle duration must be non-zero when timeout is set",
360        );
361    }
362}
363
364impl Default for Config {
365    fn default() -> Self {
366        Self {
367            seed: 42,
368            cycle: Duration::from_millis(1),
369            timeout: None,
370        }
371    }
372}
373
374/// Deterministic runtime that randomly selects tasks to run based on a seed.
375pub struct Executor {
376    registry: Mutex<Registry>,
377    cycle: Duration,
378    deadline: Option<SystemTime>,
379    metrics: Arc<Metrics>,
380    auditor: Arc<Auditor>,
381    rng: Mutex<StdRng>,
382    time: Mutex<SystemTime>,
383    tasks: Arc<Tasks>,
384    sleeping: Mutex<BinaryHeap<Alarm>>,
385    partitions: Mutex<HashMap<String, Partition>>,
386    signaler: Mutex<Signaler>,
387    signal: Signal,
388    finished: Mutex<bool>,
389    recovered: Mutex<bool>,
390}
391
392enum State {
393    Config(Config),
394    Context(Context),
395}
396
397/// Implementation of [`crate::Runner`] for the `deterministic` runtime.
398pub struct Runner {
399    state: State,
400}
401
402impl From<Config> for Runner {
403    fn from(cfg: Config) -> Self {
404        Self::new(cfg)
405    }
406}
407
408impl From<Context> for Runner {
409    fn from(context: Context) -> Self {
410        Self {
411            state: State::Context(context),
412        }
413    }
414}
415
416impl Runner {
417    /// Initialize a new `deterministic` runtime with the given seed and cycle duration.
418    pub fn new(cfg: Config) -> Self {
419        // Ensure config is valid
420        cfg.assert();
421        Runner {
422            state: State::Config(cfg),
423        }
424    }
425
426    /// Initialize a new `deterministic` runtime with the default configuration
427    /// and the provided seed.
428    pub fn seeded(seed: u64) -> Self {
429        let cfg = Config {
430            seed,
431            ..Config::default()
432        };
433        Self::new(cfg)
434    }
435
436    /// Initialize a new `deterministic` runtime with the default configuration
437    /// but exit after the given timeout.
438    pub fn timed(timeout: Duration) -> Self {
439        let cfg = Config {
440            timeout: Some(timeout),
441            ..Config::default()
442        };
443        Self::new(cfg)
444    }
445}
446
447impl Default for Runner {
448    fn default() -> Self {
449        Self::new(Config::default())
450    }
451}
452
453impl crate::Runner for Runner {
454    type Context = Context;
455
456    fn start<F, Fut>(self, f: F) -> Fut::Output
457    where
458        F: FnOnce(Self::Context) -> Fut,
459        Fut: Future,
460    {
461        // Setup context (depending on how the runtime was initialized)
462        let context = match self.state {
463            State::Config(config) => Context::new(config),
464            State::Context(context) => context,
465        };
466
467        // Pin root task to the heap
468        let executor = context.executor.clone();
469        let mut root = Box::pin(f(context));
470
471        // Register the root task
472        Tasks::register_root(&executor.tasks);
473
474        // Process tasks until root task completes or progress stalls
475        let mut iter = 0;
476        loop {
477            // Ensure we have not exceeded our deadline
478            {
479                let current = executor.time.lock().unwrap();
480                if let Some(deadline) = executor.deadline {
481                    if *current >= deadline {
482                        panic!("runtime timeout");
483                    }
484                }
485            }
486
487            // Snapshot available tasks
488            let mut tasks = executor.tasks.drain();
489
490            // Shuffle tasks
491            {
492                let mut rng = executor.rng.lock().unwrap();
493                tasks.shuffle(&mut *rng);
494            }
495
496            // Run all snapshotted tasks
497            //
498            // This approach is more efficient than randomly selecting a task one-at-a-time
499            // because it ensures we don't pull the same pending task multiple times in a row (without
500            // processing a different task required for other tasks to make progress).
501            trace!(iter, tasks = tasks.len(), "starting loop");
502            for task in tasks {
503                // Record task for auditing
504                executor.auditor.process_task(task.id, &task.label);
505                trace!(id = task.id, "processing task");
506
507                // Record task poll
508                executor
509                    .metrics
510                    .task_polls
511                    .get_or_create(&Work {
512                        label: task.label.clone(),
513                    })
514                    .inc();
515
516                // Prepare task for polling
517                let waker = waker_ref(&task);
518                let mut cx = task::Context::from_waker(&waker);
519                match &task.operation {
520                    Operation::Root => {
521                        // Poll the root task
522                        if let Poll::Ready(output) = root.as_mut().poll(&mut cx) {
523                            trace!(id = task.id, "task is complete");
524                            *executor.finished.lock().unwrap() = true;
525                            return output;
526                        }
527                    }
528                    Operation::Work { future, completed } => {
529                        // If task is completed, skip it
530                        if *completed.lock().unwrap() {
531                            trace!(id = task.id, "dropping already complete task");
532                            continue;
533                        }
534
535                        // Poll the task
536                        let mut fut = future.lock().unwrap();
537                        if fut.as_mut().poll(&mut cx).is_ready() {
538                            trace!(id = task.id, "task is complete");
539                            *completed.lock().unwrap() = true;
540                            continue;
541                        }
542                    }
543                }
544
545                // Try again later if task is still pending
546                trace!(id = task.id, "task is still pending");
547            }
548
549            // Advance time by cycle
550            //
551            // This approach prevents starvation if some task never yields (to approximate this,
552            // duration can be set to 1ns).
553            let mut current;
554            {
555                let mut time = executor.time.lock().unwrap();
556                *time = time
557                    .checked_add(executor.cycle)
558                    .expect("executor time overflowed");
559                current = *time;
560            }
561            trace!(now = current.epoch_millis(), "time advanced");
562
563            // Skip time if there is nothing to do
564            if executor.tasks.len() == 0 {
565                let mut skip = None;
566                {
567                    let sleeping = executor.sleeping.lock().unwrap();
568                    if let Some(next) = sleeping.peek() {
569                        if next.time > current {
570                            skip = Some(next.time);
571                        }
572                    }
573                }
574                if skip.is_some() {
575                    {
576                        let mut time = executor.time.lock().unwrap();
577                        *time = skip.unwrap();
578                        current = *time;
579                    }
580                    trace!(now = current.epoch_millis(), "time skipped");
581                }
582            }
583
584            // Wake all sleeping tasks that are ready
585            let mut to_wake = Vec::new();
586            let mut remaining;
587            {
588                let mut sleeping = executor.sleeping.lock().unwrap();
589                while let Some(next) = sleeping.peek() {
590                    if next.time <= current {
591                        let sleeper = sleeping.pop().unwrap();
592                        to_wake.push(sleeper.waker);
593                    } else {
594                        break;
595                    }
596                }
597                remaining = sleeping.len();
598            }
599            for waker in to_wake {
600                waker.wake();
601            }
602
603            // Account for remaining tasks
604            remaining += executor.tasks.len();
605
606            // If there are no tasks to run and no tasks sleeping, the executor is stalled
607            // and will never finish.
608            if remaining == 0 {
609                panic!("runtime stalled");
610            }
611            iter += 1;
612        }
613    }
614}
615
616/// The operation that a task is performing.
617enum Operation {
618    Root,
619    Work {
620        future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
621        completed: Mutex<bool>,
622    },
623}
624
625/// A task that is being executed by the runtime.
626struct Task {
627    id: u128,
628    label: String,
629    tasks: Arc<Tasks>,
630
631    operation: Operation,
632}
633
634impl ArcWake for Task {
635    fn wake_by_ref(arc_self: &Arc<Self>) {
636        arc_self.tasks.enqueue(arc_self.clone());
637    }
638}
639
640/// A task queue that is used to manage the tasks that are being executed by the runtime.
641struct Tasks {
642    /// The current task counter.
643    counter: Mutex<u128>,
644    /// The queue of tasks that are waiting to be executed.
645    queue: Mutex<Vec<Arc<Task>>>,
646    /// Indicates whether the root task has been registered.
647    root_registered: Mutex<bool>,
648}
649
650impl Tasks {
651    /// Create a new task queue.
652    fn new() -> Self {
653        Self {
654            counter: Mutex::new(0),
655            queue: Mutex::new(Vec::new()),
656            root_registered: Mutex::new(false),
657        }
658    }
659
660    /// Increment the task counter and return the old value.
661    fn increment(&self) -> u128 {
662        let mut counter = self.counter.lock().unwrap();
663        let old = *counter;
664        *counter = counter.checked_add(1).expect("task counter overflow");
665        old
666    }
667
668    /// Register the root task.
669    ///
670    /// If the root task has already been registered, this function will panic.
671    fn register_root(arc_self: &Arc<Self>) {
672        {
673            let mut registered = arc_self.root_registered.lock().unwrap();
674            assert!(!*registered, "root already registered");
675            *registered = true;
676        }
677        let id = arc_self.increment();
678        let mut queue = arc_self.queue.lock().unwrap();
679        queue.push(Arc::new(Task {
680            id,
681            label: String::new(),
682            tasks: arc_self.clone(),
683            operation: Operation::Root,
684        }));
685    }
686
687    /// Register a new task to be executed.
688    fn register_work(
689        arc_self: &Arc<Self>,
690        label: &str,
691        future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
692    ) {
693        let id = arc_self.increment();
694        let mut queue = arc_self.queue.lock().unwrap();
695        queue.push(Arc::new(Task {
696            id,
697            label: label.to_string(),
698            tasks: arc_self.clone(),
699            operation: Operation::Work {
700                future: Mutex::new(future),
701                completed: Mutex::new(false),
702            },
703        }));
704    }
705
706    /// Enqueue an already registered task to be executed.
707    fn enqueue(&self, task: Arc<Task>) {
708        let mut queue = self.queue.lock().unwrap();
709        queue.push(task);
710    }
711
712    /// Dequeue all tasks that are ready to execute.
713    fn drain(&self) -> Vec<Arc<Task>> {
714        let mut queue = self.queue.lock().unwrap();
715        let len = queue.len();
716        replace(&mut *queue, Vec::with_capacity(len))
717    }
718
719    /// Get the number of tasks in the queue.
720    fn len(&self) -> usize {
721        self.queue.lock().unwrap().len()
722    }
723}
724
725/// Implementation of [`crate::Spawner`], [`crate::Clock`],
726/// [`crate::Network`], and [`crate::Storage`] for the `deterministic`
727/// runtime.
728pub struct Context {
729    label: String,
730    spawned: bool,
731    executor: Arc<Executor>,
732    networking: Arc<Networking>,
733    storage: MeteredStorage<AuditedStorage<MemStorage>>,
734}
735
736impl Default for Context {
737    fn default() -> Self {
738        Self::new(Config::default())
739    }
740}
741
742impl Context {
743    pub fn new(cfg: Config) -> Self {
744        // Create a new registry
745        let mut registry = Registry::default();
746        let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
747
748        // Initialize runtime
749        let metrics = Arc::new(Metrics::init(runtime_registry));
750        let start_time = UNIX_EPOCH;
751        let deadline = cfg
752            .timeout
753            .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
754        let (signaler, signal) = Signaler::new();
755        let auditor = Arc::new(Auditor::default());
756        let storage = MeteredStorage::new(
757            AuditedStorage::new(MemStorage::default(), auditor.clone()),
758            runtime_registry,
759        );
760        let executor = Arc::new(Executor {
761            registry: Mutex::new(registry),
762            cycle: cfg.cycle,
763            deadline,
764            metrics: metrics.clone(),
765            auditor: auditor.clone(),
766            rng: Mutex::new(StdRng::seed_from_u64(cfg.seed)),
767            time: Mutex::new(start_time),
768            tasks: Arc::new(Tasks::new()),
769            sleeping: Mutex::new(BinaryHeap::new()),
770            partitions: Mutex::new(HashMap::new()),
771            signaler: Mutex::new(signaler),
772            signal,
773            finished: Mutex::new(false),
774            recovered: Mutex::new(false),
775        });
776        Context {
777            label: String::new(),
778            spawned: false,
779            executor: executor.clone(),
780            networking: Arc::new(Networking::new(metrics, auditor)),
781            storage,
782        }
783    }
784
785    /// Recover the inner state (deadline, metrics, auditor, rng, synced storage, etc.) from the
786    /// current runtime and use it to initialize a new instance of the runtime. A recovered runtime
787    /// does not inherit the current runtime's pending tasks, unsynced storage, network connections, nor
788    /// its shutdown signaler.
789    ///
790    /// This is useful for performing a deterministic simulation that spans multiple runtime instantiations,
791    /// like simulating unclean shutdown (which involves repeatedly halting the runtime at unexpected intervals).
792    ///
793    /// It is only permitted to call this method after the runtime has finished (i.e. once `start` returns)
794    /// and only permitted to do once (otherwise multiple recovered runtimes will share the same inner state).
795    /// If either one of these conditions is violated, this method will panic.
796    pub fn recover(self) -> Self {
797        // Ensure we are finished
798        if !*self.executor.finished.lock().unwrap() {
799            panic!("execution is not finished");
800        }
801
802        // Ensure runtime has not already been recovered
803        {
804            let mut recovered = self.executor.recovered.lock().unwrap();
805            if *recovered {
806                panic!("runtime has already been recovered");
807            }
808            *recovered = true;
809        }
810
811        // Rebuild metrics
812        let mut registry = Registry::default();
813        let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
814        let metrics = Arc::new(Metrics::init(runtime_registry));
815
816        // Copy state
817        let auditor = self.executor.auditor.clone();
818        let (signaler, signal) = Signaler::new();
819        let executor = Arc::new(Executor {
820            // Copied from the current runtime
821            cycle: self.executor.cycle,
822            deadline: self.executor.deadline,
823            auditor: auditor.clone(),
824            rng: Mutex::new(self.executor.rng.lock().unwrap().clone()),
825            time: Mutex::new(*self.executor.time.lock().unwrap()),
826            partitions: Mutex::new(self.executor.partitions.lock().unwrap().clone()),
827
828            // New state for the new runtime
829            registry: Mutex::new(registry),
830            metrics: metrics.clone(),
831            tasks: Arc::new(Tasks::new()),
832            sleeping: Mutex::new(BinaryHeap::new()),
833            signaler: Mutex::new(signaler),
834            signal,
835            finished: Mutex::new(false),
836            recovered: Mutex::new(false),
837        });
838        Self {
839            label: String::new(),
840            spawned: false,
841            executor,
842            networking: Arc::new(Networking::new(metrics, auditor.clone())),
843            storage: self.storage,
844        }
845    }
846
847    pub fn auditor(&self) -> &Auditor {
848        &self.executor.auditor
849    }
850}
851
852impl Clone for Context {
853    fn clone(&self) -> Self {
854        Self {
855            label: self.label.clone(),
856            spawned: false,
857            executor: self.executor.clone(),
858            networking: self.networking.clone(),
859            storage: self.storage.clone(),
860        }
861    }
862}
863
864impl crate::Spawner for Context {
865    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
866    where
867        F: FnOnce(Self) -> Fut + Send + 'static,
868        Fut: Future<Output = T> + Send + 'static,
869        T: Send + 'static,
870    {
871        // Ensure a context only spawns one task
872        assert!(!self.spawned, "already spawned");
873
874        // Get metrics
875        let label = self.label.clone();
876        let work = Work {
877            label: label.clone(),
878        };
879        self.executor
880            .metrics
881            .tasks_spawned
882            .get_or_create(&work)
883            .inc();
884        let gauge = self
885            .executor
886            .metrics
887            .tasks_running
888            .get_or_create(&work)
889            .clone();
890
891        // Set up the task
892        let executor = self.executor.clone();
893        let future = f(self);
894        let (f, handle) = Handle::init(future, gauge, false);
895
896        // Spawn the task
897        Tasks::register_work(&executor.tasks, &label, Box::pin(f));
898        handle
899    }
900
901    fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
902    where
903        F: Future<Output = T> + Send + 'static,
904        T: Send + 'static,
905    {
906        // Ensure a context only spawns one task
907        assert!(!self.spawned, "already spawned");
908        self.spawned = true;
909
910        // Get metrics
911        let work = Work {
912            label: self.label.clone(),
913        };
914        self.executor
915            .metrics
916            .tasks_spawned
917            .get_or_create(&work)
918            .inc();
919        let gauge = self
920            .executor
921            .metrics
922            .tasks_running
923            .get_or_create(&work)
924            .clone();
925
926        // Set up the task
927        let label = self.label.clone();
928        let executor = self.executor.clone();
929        move |f: F| {
930            let (f, handle) = Handle::init(f, gauge, false);
931
932            // Spawn the task
933            Tasks::register_work(&executor.tasks, &label, Box::pin(f));
934            handle
935        }
936    }
937
938    fn spawn_blocking<F, T>(self, f: F) -> Handle<T>
939    where
940        F: FnOnce() -> T + Send + 'static,
941        T: Send + 'static,
942    {
943        // Ensure a context only spawns one task
944        assert!(!self.spawned, "already spawned");
945
946        // Get metrics
947        let work = Work {
948            label: self.label.clone(),
949        };
950        self.executor
951            .metrics
952            .blocking_tasks_spawned
953            .get_or_create(&work)
954            .inc();
955        let gauge = self
956            .executor
957            .metrics
958            .blocking_tasks_running
959            .get_or_create(&work)
960            .clone();
961
962        // Initialize the blocking task
963        let (f, handle) = Handle::init_blocking(f, gauge, false);
964
965        // Spawn the task
966        let f = async move { f() };
967        Tasks::register_work(&self.executor.tasks, &self.label, Box::pin(f));
968        handle
969    }
970
971    fn stop(&self, value: i32) {
972        self.executor.auditor.stop(value);
973        self.executor.signaler.lock().unwrap().signal(value);
974    }
975
976    fn stopped(&self) -> Signal {
977        self.executor.auditor.stopped();
978        self.executor.signal.clone()
979    }
980}
981
982impl crate::Metrics for Context {
983    fn with_label(&self, label: &str) -> Self {
984        let label = {
985            let prefix = self.label.clone();
986            if prefix.is_empty() {
987                label.to_string()
988            } else {
989                format!("{}_{}", prefix, label)
990            }
991        };
992        assert!(
993            !label.starts_with(METRICS_PREFIX),
994            "using runtime label is not allowed"
995        );
996        Self {
997            label,
998            spawned: false,
999            executor: self.executor.clone(),
1000            networking: self.networking.clone(),
1001            storage: self.storage.clone(),
1002        }
1003    }
1004
1005    fn label(&self) -> String {
1006        self.label.clone()
1007    }
1008
1009    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
1010        // Prepare args
1011        let name = name.into();
1012        let help = help.into();
1013
1014        // Register metric
1015        self.executor.auditor.register(&name, &help);
1016        let prefixed_name = {
1017            let prefix = &self.label;
1018            if prefix.is_empty() {
1019                name
1020            } else {
1021                format!("{}_{}", *prefix, name)
1022            }
1023        };
1024        self.executor
1025            .registry
1026            .lock()
1027            .unwrap()
1028            .register(prefixed_name, help, metric)
1029    }
1030
1031    fn encode(&self) -> String {
1032        self.executor.auditor.encode();
1033        let mut buffer = String::new();
1034        encode(&mut buffer, &self.executor.registry.lock().unwrap()).expect("encoding failed");
1035        buffer
1036    }
1037}
1038
1039struct Sleeper {
1040    executor: Arc<Executor>,
1041    time: SystemTime,
1042    registered: bool,
1043}
1044
1045struct Alarm {
1046    time: SystemTime,
1047    waker: Waker,
1048}
1049
1050impl PartialEq for Alarm {
1051    fn eq(&self, other: &Self) -> bool {
1052        self.time.eq(&other.time)
1053    }
1054}
1055
1056impl Eq for Alarm {}
1057
1058impl PartialOrd for Alarm {
1059    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1060        Some(self.cmp(other))
1061    }
1062}
1063
1064impl Ord for Alarm {
1065    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1066        // Reverse the ordering for min-heap
1067        other.time.cmp(&self.time)
1068    }
1069}
1070
1071impl Future for Sleeper {
1072    type Output = ();
1073
1074    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1075        {
1076            let current_time = *self.executor.time.lock().unwrap();
1077            if current_time >= self.time {
1078                return Poll::Ready(());
1079            }
1080        }
1081        if !self.registered {
1082            self.registered = true;
1083            self.executor.sleeping.lock().unwrap().push(Alarm {
1084                time: self.time,
1085                waker: cx.waker().clone(),
1086            });
1087        }
1088        Poll::Pending
1089    }
1090}
1091
1092impl Clock for Context {
1093    fn current(&self) -> SystemTime {
1094        *self.executor.time.lock().unwrap()
1095    }
1096
1097    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
1098        let deadline = self
1099            .current()
1100            .checked_add(duration)
1101            .expect("overflow when setting wake time");
1102        self.sleep_until(deadline)
1103    }
1104
1105    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
1106        Sleeper {
1107            executor: self.executor.clone(),
1108
1109            time: deadline,
1110            registered: false,
1111        }
1112    }
1113}
1114
1115impl GClock for Context {
1116    type Instant = SystemTime;
1117
1118    fn now(&self) -> Self::Instant {
1119        self.current()
1120    }
1121}
1122
1123impl ReasonablyRealtime for Context {}
1124
1125type Dialable = mpsc::UnboundedSender<(
1126    SocketAddr,
1127    mocks::Sink,   // Listener -> Dialer
1128    mocks::Stream, // Dialer -> Listener
1129)>;
1130
1131/// Implementation of [`crate::Network`] for the `deterministic` runtime.
1132///
1133/// When a dialer connects to a listener, the listener is given a new ephemeral port
1134/// from the range `32768..61000`. To keep things simple, it is not possible to
1135/// bind to an ephemeral port. Likewise, if ports are not reused and when exhausted,
1136/// the runtime will panic.
1137struct Networking {
1138    metrics: Arc<Metrics>,
1139    auditor: Arc<Auditor>,
1140    ephemeral: Mutex<u16>,
1141    listeners: Mutex<HashMap<SocketAddr, Dialable>>,
1142}
1143
1144impl Networking {
1145    fn new(metrics: Arc<Metrics>, auditor: Arc<Auditor>) -> Self {
1146        Self {
1147            metrics,
1148            auditor,
1149            ephemeral: Mutex::new(EPHEMERAL_PORT_RANGE.start),
1150            listeners: Mutex::new(HashMap::new()),
1151        }
1152    }
1153
1154    fn bind(&self, socket: SocketAddr) -> Result<Listener, Error> {
1155        self.auditor.bind(socket);
1156
1157        // If the IP is localhost, ensure the port is not in the ephemeral range
1158        // so that it can be used for binding in the dial method
1159        if socket.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST)
1160            && EPHEMERAL_PORT_RANGE.contains(&socket.port())
1161        {
1162            return Err(Error::BindFailed);
1163        }
1164
1165        // Ensure the port is not already bound
1166        let mut listeners = self.listeners.lock().unwrap();
1167        if listeners.contains_key(&socket) {
1168            return Err(Error::BindFailed);
1169        }
1170
1171        // Bind the socket
1172        let (sender, receiver) = mpsc::unbounded();
1173        listeners.insert(socket, sender);
1174        Ok(Listener {
1175            auditor: self.auditor.clone(),
1176            address: socket,
1177            listener: receiver,
1178            metrics: self.metrics.clone(),
1179        })
1180    }
1181
1182    async fn dial(&self, socket: SocketAddr) -> Result<(Sink, Stream), Error> {
1183        // Assign dialer a port from the ephemeral range
1184        let dialer = {
1185            let mut ephemeral = self.ephemeral.lock().unwrap();
1186            let dialer = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), *ephemeral);
1187            *ephemeral = ephemeral
1188                .checked_add(1)
1189                .expect("ephemeral port range exhausted");
1190            dialer
1191        };
1192        self.auditor.dial(dialer, socket);
1193
1194        // Get listener
1195        let mut sender = {
1196            let listeners = self.listeners.lock().unwrap();
1197            let sender = listeners.get(&socket).ok_or(Error::ConnectionFailed)?;
1198            sender.clone()
1199        };
1200
1201        // Construct connection
1202        let (dialer_sender, dialer_receiver) = mocks::Channel::init();
1203        let (listener_sender, listener_receiver) = mocks::Channel::init();
1204        sender
1205            .send((dialer, dialer_sender, listener_receiver))
1206            .await
1207            .map_err(|_| Error::ConnectionFailed)?;
1208        Ok((
1209            Sink {
1210                metrics: self.metrics.clone(),
1211                auditor: self.auditor.clone(),
1212                me: dialer,
1213                peer: socket,
1214                sender: listener_sender,
1215            },
1216            Stream {
1217                auditor: self.auditor.clone(),
1218                me: dialer,
1219                peer: socket,
1220                receiver: dialer_receiver,
1221            },
1222        ))
1223    }
1224}
1225
1226impl crate::Network<Listener, Sink, Stream> for Context {
1227    async fn bind(&self, socket: SocketAddr) -> Result<Listener, Error> {
1228        self.networking.bind(socket)
1229    }
1230
1231    async fn dial(&self, socket: SocketAddr) -> Result<(Sink, Stream), Error> {
1232        self.networking.dial(socket).await
1233    }
1234}
1235
1236/// Implementation of [`crate::Listener`] for the `deterministic` runtime.
1237pub struct Listener {
1238    metrics: Arc<Metrics>,
1239    auditor: Arc<Auditor>,
1240    address: SocketAddr,
1241    listener: mpsc::UnboundedReceiver<(SocketAddr, mocks::Sink, mocks::Stream)>,
1242}
1243
1244impl crate::Listener<Sink, Stream> for Listener {
1245    async fn accept(&mut self) -> Result<(SocketAddr, Sink, Stream), Error> {
1246        let (socket, sender, receiver) = self.listener.next().await.ok_or(Error::ReadFailed)?;
1247        self.auditor.accept(self.address, socket);
1248        Ok((
1249            socket,
1250            Sink {
1251                metrics: self.metrics.clone(),
1252                auditor: self.auditor.clone(),
1253                me: self.address,
1254                peer: socket,
1255                sender,
1256            },
1257            Stream {
1258                auditor: self.auditor.clone(),
1259                me: self.address,
1260                peer: socket,
1261                receiver,
1262            },
1263        ))
1264    }
1265}
1266
1267/// Implementation of [`crate::Sink`] for the `deterministic` runtime.
1268pub struct Sink {
1269    metrics: Arc<Metrics>,
1270    auditor: Arc<Auditor>,
1271    me: SocketAddr,
1272    peer: SocketAddr,
1273    sender: mocks::Sink,
1274}
1275
1276impl crate::Sink for Sink {
1277    async fn send(&mut self, msg: &[u8]) -> Result<(), Error> {
1278        self.auditor.send(self.me, self.peer, msg);
1279        self.sender.send(msg).await.map_err(|_| Error::SendFailed)?;
1280        self.metrics.network_bandwidth.inc_by(msg.len() as u64);
1281        Ok(())
1282    }
1283}
1284
1285/// Implementation of [`crate::Stream`] for the `deterministic` runtime.
1286pub struct Stream {
1287    auditor: Arc<Auditor>,
1288    me: SocketAddr,
1289    peer: SocketAddr,
1290    receiver: mocks::Stream,
1291}
1292
1293impl crate::Stream for Stream {
1294    async fn recv(&mut self, buf: &mut [u8]) -> Result<(), Error> {
1295        self.receiver
1296            .recv(buf)
1297            .await
1298            .map_err(|_| Error::RecvFailed)?;
1299        self.auditor.recv(self.me, self.peer, buf);
1300        Ok(())
1301    }
1302}
1303
1304impl RngCore for Context {
1305    fn next_u32(&mut self) -> u32 {
1306        self.executor.auditor.rand("next_u32".to_string());
1307        self.executor.rng.lock().unwrap().next_u32()
1308    }
1309
1310    fn next_u64(&mut self) -> u64 {
1311        self.executor.auditor.rand("next_u64".to_string());
1312        self.executor.rng.lock().unwrap().next_u64()
1313    }
1314
1315    fn fill_bytes(&mut self, dest: &mut [u8]) {
1316        self.executor.auditor.rand("fill_bytes".to_string());
1317        self.executor.rng.lock().unwrap().fill_bytes(dest)
1318    }
1319
1320    fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1321        self.executor.auditor.rand("try_fill_bytes".to_string());
1322        self.executor.rng.lock().unwrap().try_fill_bytes(dest)
1323    }
1324}
1325
1326impl CryptoRng for Context {}
1327
1328impl crate::Storage for Context {
1329    type Blob = <MeteredStorage<AuditedStorage<MemStorage>> as crate::Storage>::Blob;
1330
1331    async fn open(&self, partition: &str, name: &[u8]) -> Result<(Self::Blob, u64), Error> {
1332        self.storage.open(partition, name).await
1333    }
1334
1335    async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1336        self.storage.remove(partition, name).await
1337    }
1338
1339    async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1340        self.storage.scan(partition).await
1341    }
1342}
1343
1344#[cfg(test)]
1345mod tests {
1346    use super::*;
1347    use crate::{deterministic, utils::run_tasks, Blob, Runner as _, Storage};
1348    use commonware_macros::test_traced;
1349    use futures::task::noop_waker;
1350
1351    fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1352        let executor = deterministic::Runner::seeded(seed);
1353        run_tasks(5, executor)
1354    }
1355
1356    #[test]
1357    fn test_same_seed_same_order() {
1358        // Generate initial outputs
1359        let mut outputs = Vec::new();
1360        for seed in 0..1000 {
1361            let output = run_with_seed(seed);
1362            outputs.push(output);
1363        }
1364
1365        // Ensure they match
1366        for seed in 0..1000 {
1367            let output = run_with_seed(seed);
1368            assert_eq!(output, outputs[seed as usize]);
1369        }
1370    }
1371
1372    #[test_traced("TRACE")]
1373    fn test_different_seeds_different_order() {
1374        let output1 = run_with_seed(12345);
1375        let output2 = run_with_seed(54321);
1376        assert_ne!(output1, output2);
1377    }
1378
1379    #[test]
1380    fn test_alarm_min_heap() {
1381        // Populate heap
1382        let now = SystemTime::now();
1383        let alarms = vec![
1384            Alarm {
1385                time: now + Duration::new(10, 0),
1386                waker: noop_waker(),
1387            },
1388            Alarm {
1389                time: now + Duration::new(5, 0),
1390                waker: noop_waker(),
1391            },
1392            Alarm {
1393                time: now + Duration::new(15, 0),
1394                waker: noop_waker(),
1395            },
1396            Alarm {
1397                time: now + Duration::new(5, 0),
1398                waker: noop_waker(),
1399            },
1400        ];
1401        let mut heap = BinaryHeap::new();
1402        for alarm in alarms {
1403            heap.push(alarm);
1404        }
1405
1406        // Verify min-heap
1407        let mut sorted_times = Vec::new();
1408        while let Some(alarm) = heap.pop() {
1409            sorted_times.push(alarm.time);
1410        }
1411        assert_eq!(
1412            sorted_times,
1413            vec![
1414                now + Duration::new(5, 0),
1415                now + Duration::new(5, 0),
1416                now + Duration::new(10, 0),
1417                now + Duration::new(15, 0),
1418            ]
1419        );
1420    }
1421
1422    #[test]
1423    #[should_panic(expected = "runtime timeout")]
1424    fn test_timeout() {
1425        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1426        executor.start(|context| async move {
1427            loop {
1428                context.sleep(Duration::from_secs(1)).await;
1429            }
1430        });
1431    }
1432
1433    #[test]
1434    #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1435    fn test_bad_timeout() {
1436        let cfg = Config {
1437            timeout: Some(Duration::default()),
1438            cycle: Duration::default(),
1439            ..Config::default()
1440        };
1441        deterministic::Runner::new(cfg);
1442    }
1443
1444    #[test]
1445    fn test_recover_synced_storage_persists() {
1446        // Initialize the first runtime
1447        let executor1 = deterministic::Runner::default();
1448        let partition = "test_partition";
1449        let name = b"test_blob";
1450        let data = b"Hello, world!";
1451
1452        // Run some tasks, sync storage, and recover the runtime
1453        let (context, state) = executor1.start(|context| async move {
1454            let (blob, _) = context.open(partition, name).await.unwrap();
1455            blob.write_at(data, 0).await.unwrap();
1456            blob.sync().await.unwrap();
1457            let state = context.auditor().state();
1458            (context, state)
1459        });
1460        let recovered_context = context.recover();
1461
1462        // Verify auditor state is the same
1463        assert_eq!(state, recovered_context.auditor().state());
1464
1465        // Check that synced storage persists after recovery
1466        let executor = Runner::from(recovered_context);
1467        executor.start(|context| async move {
1468            let (blob, len) = context.open(partition, name).await.unwrap();
1469            assert_eq!(len, data.len() as u64);
1470            let mut buf = vec![0; data.len()];
1471            blob.read_at(&mut buf, 0).await.unwrap();
1472            assert_eq!(buf, data);
1473        });
1474    }
1475
1476    #[test]
1477    fn test_recover_unsynced_storage_does_not_persist() {
1478        // Initialize the first runtime
1479        let executor = deterministic::Runner::default();
1480        let partition = "test_partition";
1481        let name = b"test_blob";
1482        let data = b"Hello, world!".to_vec();
1483
1484        // Run some tasks without syncing storage
1485        let context = executor.start(|context| async move {
1486            let context = context.clone();
1487            let (blob, _) = context.open(partition, name).await.unwrap();
1488            blob.write_at(&data, 0).await.unwrap();
1489            // Intentionally do not call sync() here
1490            context
1491        });
1492
1493        // Recover the runtime
1494        let context = context.recover();
1495        let executor = Runner::from(context);
1496
1497        // Check that unsynced storage does not persist after recovery
1498        executor.start(|context| async move {
1499            let (_, len) = context.open(partition, name).await.unwrap();
1500            assert_eq!(len, 0);
1501        });
1502    }
1503
1504    #[test]
1505    #[should_panic(expected = "execution is not finished")]
1506    fn test_recover_before_finish_panics() {
1507        // Initialize runtime
1508        let executor = deterministic::Runner::default();
1509
1510        // Start runtime
1511        executor.start(|context| async move {
1512            // Attempt to recover before the runtime has finished
1513            context.recover();
1514        });
1515    }
1516
1517    #[test]
1518    #[should_panic(expected = "runtime has already been recovered")]
1519    fn test_recover_twice_panics() {
1520        // Initialize runtime
1521        let executor = deterministic::Runner::default();
1522
1523        // Finish runtime
1524        let context = executor.start(|context| async move { context });
1525
1526        // Recover for the first time
1527        let cloned_context = context.clone();
1528        context.recover();
1529
1530        // Attempt to recover again using the same context
1531        cloned_context.recover();
1532    }
1533
1534    #[test]
1535    fn test_default_time_zero() {
1536        // Initialize runtime
1537        let executor = deterministic::Runner::default();
1538
1539        executor.start(|context| async move {
1540            // Check that the time is zero
1541            assert_eq!(
1542                context.current().duration_since(UNIX_EPOCH).unwrap(),
1543                Duration::ZERO
1544            );
1545        });
1546    }
1547}