commonware_runtime/
deterministic.rs

1//! A deterministic runtime that randomly selects tasks to run based on a seed
2//!
3//! # Panics
4//!
5//! If any task panics, the runtime will panic (and shutdown).
6//!
7//! # Example
8//!
9//! ```rust
10//! use commonware_runtime::{Spawner, Runner, deterministic, Metrics};
11//!
12//! let executor =  deterministic::Runner::default();
13//! executor.start(|context| async move {
14//!     println!("Parent started");
15//!     let result = context.with_label("child").spawn(|_| async move {
16//!         println!("Child started");
17//!         "hello"
18//!     });
19//!     println!("Child result: {:?}", result.await);
20//!     println!("Parent exited");
21//!     println!("Auditor state: {}", context.auditor().state());
22//! });
23//! ```
24
25use crate::{
26    mocks,
27    storage::{
28        audited::Storage as AuditedStorage, memory::Storage as MemStorage,
29        metered::Storage as MeteredStorage,
30    },
31    utils::Signaler,
32    Clock, Error, Handle, Signal, METRICS_PREFIX,
33};
34use commonware_utils::{hex, SystemTimeExt};
35use futures::{
36    channel::mpsc,
37    task::{waker_ref, ArcWake},
38    Future, SinkExt, StreamExt,
39};
40use governor::clock::{Clock as GClock, ReasonablyRealtime};
41use prometheus_client::{
42    encoding::{text::encode, EncodeLabelSet},
43    metrics::{counter::Counter, family::Family, gauge::Gauge},
44    registry::{Metric, Registry},
45};
46use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
47use sha2::{Digest, Sha256};
48use std::{
49    collections::{BinaryHeap, HashMap},
50    mem::replace,
51    net::{IpAddr, Ipv4Addr, SocketAddr},
52    ops::Range,
53    pin::Pin,
54    sync::{Arc, Mutex},
55    task::{self, Poll, Waker},
56    time::{Duration, SystemTime, UNIX_EPOCH},
57};
58use tracing::trace;
59
60/// Range of ephemeral ports assigned to dialers.
61const EPHEMERAL_PORT_RANGE: Range<u16> = 32768..61000;
62
63/// Map of names to blob contents.
64pub type Partition = HashMap<Vec<u8>, Vec<u8>>;
65
66#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
67struct Work {
68    label: String,
69}
70
71#[derive(Debug)]
72struct Metrics {
73    tasks_spawned: Family<Work, Counter>,
74    tasks_running: Family<Work, Gauge>,
75    blocking_tasks_spawned: Family<Work, Counter>,
76    blocking_tasks_running: Family<Work, Gauge>,
77    task_polls: Family<Work, Counter>,
78
79    network_bandwidth: Counter,
80}
81
82impl Metrics {
83    pub fn init(registry: &mut Registry) -> Self {
84        let metrics = Self {
85            task_polls: Family::default(),
86            tasks_spawned: Family::default(),
87            tasks_running: Family::default(),
88            blocking_tasks_spawned: Family::default(),
89            blocking_tasks_running: Family::default(),
90            network_bandwidth: Counter::default(),
91        };
92        registry.register(
93            "tasks_spawned",
94            "Total number of tasks spawned",
95            metrics.tasks_spawned.clone(),
96        );
97        registry.register(
98            "tasks_running",
99            "Number of tasks currently running",
100            metrics.tasks_running.clone(),
101        );
102        registry.register(
103            "blocking_tasks_spawned",
104            "Total number of blocking tasks spawned",
105            metrics.blocking_tasks_spawned.clone(),
106        );
107        registry.register(
108            "blocking_tasks_running",
109            "Number of blocking tasks currently running",
110            metrics.blocking_tasks_running.clone(),
111        );
112        registry.register(
113            "task_polls",
114            "Total number of task polls",
115            metrics.task_polls.clone(),
116        );
117        registry.register(
118            "bandwidth",
119            "Total amount of data sent over network",
120            metrics.network_bandwidth.clone(),
121        );
122        metrics
123    }
124}
125
126/// Track the state of the runtime for determinism auditing.
127pub struct Auditor {
128    hash: Mutex<Vec<u8>>,
129}
130
131impl Default for Auditor {
132    fn default() -> Self {
133        Self {
134            hash: Vec::new().into(),
135        }
136    }
137}
138
139impl Auditor {
140    /// Record that an event happened.
141    /// This auditor's hash will be updated with the event's `label` and
142    /// whatever other data is passed in the `payload` closure.
143    pub(crate) fn event<F>(&self, label: &'static [u8], payload: F)
144    where
145        F: FnOnce(&mut Sha256),
146    {
147        let mut hash = self.hash.lock().unwrap();
148
149        let mut hasher = Sha256::new();
150        hasher.update(&*hash);
151        hasher.update(label);
152        payload(&mut hasher);
153
154        *hash = hasher.finalize().to_vec();
155    }
156
157    /// Generate a representation of the current state of the runtime.
158    ///
159    /// This can be used to ensure that logic running on top
160    /// of the runtime is interacting deterministically.
161    pub fn state(&self) -> String {
162        let hash = self.hash.lock().unwrap().clone();
163        hex(&hash)
164    }
165}
166
167/// Configuration for the `deterministic` runtime.
168#[derive(Clone)]
169pub struct Config {
170    /// Seed for the random number generator.
171    seed: u64,
172
173    /// The cycle duration determines how much time is advanced after each iteration of the event
174    /// loop. This is useful to prevent starvation if some task never yields.
175    cycle: Duration,
176
177    /// If the runtime is still executing at this point (i.e. a test hasn't stopped), panic.
178    timeout: Option<Duration>,
179}
180
181impl Config {
182    /// Returns a new [Config] with default values.
183    pub fn new() -> Self {
184        Self {
185            seed: 42,
186            cycle: Duration::from_millis(1),
187            timeout: None,
188        }
189    }
190
191    // Setters
192    /// See [Config]
193    pub fn with_seed(mut self, seed: u64) -> Self {
194        self.seed = seed;
195        self
196    }
197    /// See [Config]
198    pub fn with_cycle(mut self, cycle: Duration) -> Self {
199        self.cycle = cycle;
200        self
201    }
202    /// See [Config]
203    pub fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
204        self.timeout = timeout;
205        self
206    }
207
208    // Getters
209    /// See [Config]
210    pub fn seed(&self) -> u64 {
211        self.seed
212    }
213    /// See [Config]
214    pub fn cycle(&self) -> Duration {
215        self.cycle
216    }
217    /// See [Config]
218    pub fn timeout(&self) -> Option<Duration> {
219        self.timeout
220    }
221
222    /// Assert that the configuration is valid.
223    pub fn assert(&self) {
224        assert!(
225            self.cycle != Duration::default() || self.timeout.is_none(),
226            "cycle duration must be non-zero when timeout is set",
227        );
228    }
229}
230
231impl Default for Config {
232    fn default() -> Self {
233        Self::new()
234    }
235}
236
237/// Deterministic runtime that randomly selects tasks to run based on a seed.
238pub struct Executor {
239    registry: Mutex<Registry>,
240    cycle: Duration,
241    deadline: Option<SystemTime>,
242    metrics: Arc<Metrics>,
243    auditor: Arc<Auditor>,
244    rng: Mutex<StdRng>,
245    time: Mutex<SystemTime>,
246    tasks: Arc<Tasks>,
247    sleeping: Mutex<BinaryHeap<Alarm>>,
248    partitions: Mutex<HashMap<String, Partition>>,
249    signaler: Mutex<Signaler>,
250    signal: Signal,
251    finished: Mutex<bool>,
252    recovered: Mutex<bool>,
253}
254
255enum State {
256    Config(Config),
257    Context(Context),
258}
259
260/// Implementation of [crate::Runner] for the `deterministic` runtime.
261pub struct Runner {
262    state: State,
263}
264
265impl From<Config> for Runner {
266    fn from(cfg: Config) -> Self {
267        Self::new(cfg)
268    }
269}
270
271impl From<Context> for Runner {
272    fn from(context: Context) -> Self {
273        Self {
274            state: State::Context(context),
275        }
276    }
277}
278
279impl Runner {
280    /// Initialize a new `deterministic` runtime with the given seed and cycle duration.
281    pub fn new(cfg: Config) -> Self {
282        // Ensure config is valid
283        cfg.assert();
284        Runner {
285            state: State::Config(cfg),
286        }
287    }
288
289    /// Initialize a new `deterministic` runtime with the default configuration
290    /// and the provided seed.
291    pub fn seeded(seed: u64) -> Self {
292        let cfg = Config {
293            seed,
294            ..Config::default()
295        };
296        Self::new(cfg)
297    }
298
299    /// Initialize a new `deterministic` runtime with the default configuration
300    /// but exit after the given timeout.
301    pub fn timed(timeout: Duration) -> Self {
302        let cfg = Config {
303            timeout: Some(timeout),
304            ..Config::default()
305        };
306        Self::new(cfg)
307    }
308}
309
310impl Default for Runner {
311    fn default() -> Self {
312        Self::new(Config::default())
313    }
314}
315
316impl crate::Runner for Runner {
317    type Context = Context;
318
319    fn start<F, Fut>(self, f: F) -> Fut::Output
320    where
321        F: FnOnce(Self::Context) -> Fut,
322        Fut: Future,
323    {
324        // Setup context (depending on how the runtime was initialized)
325        let context = match self.state {
326            State::Config(config) => Context::new(config),
327            State::Context(context) => context,
328        };
329
330        // Pin root task to the heap
331        let executor = context.executor.clone();
332        let mut root = Box::pin(f(context));
333
334        // Register the root task
335        Tasks::register_root(&executor.tasks);
336
337        // Process tasks until root task completes or progress stalls
338        let mut iter = 0;
339        loop {
340            // Ensure we have not exceeded our deadline
341            {
342                let current = executor.time.lock().unwrap();
343                if let Some(deadline) = executor.deadline {
344                    if *current >= deadline {
345                        panic!("runtime timeout");
346                    }
347                }
348            }
349
350            // Snapshot available tasks
351            let mut tasks = executor.tasks.drain();
352
353            // Shuffle tasks
354            {
355                let mut rng = executor.rng.lock().unwrap();
356                tasks.shuffle(&mut *rng);
357            }
358
359            // Run all snapshotted tasks
360            //
361            // This approach is more efficient than randomly selecting a task one-at-a-time
362            // because it ensures we don't pull the same pending task multiple times in a row (without
363            // processing a different task required for other tasks to make progress).
364            trace!(iter, tasks = tasks.len(), "starting loop");
365            for task in tasks {
366                // Record task for auditing
367                executor.auditor.event(b"process_task", |hasher| {
368                    hasher.update(task.id.to_be_bytes());
369                    hasher.update(task.label.as_bytes());
370                });
371                trace!(id = task.id, "processing task");
372
373                // Record task poll
374                executor
375                    .metrics
376                    .task_polls
377                    .get_or_create(&Work {
378                        label: task.label.clone(),
379                    })
380                    .inc();
381
382                // Prepare task for polling
383                let waker = waker_ref(&task);
384                let mut cx = task::Context::from_waker(&waker);
385                match &task.operation {
386                    Operation::Root => {
387                        // Poll the root task
388                        if let Poll::Ready(output) = root.as_mut().poll(&mut cx) {
389                            trace!(id = task.id, "task is complete");
390                            *executor.finished.lock().unwrap() = true;
391                            return output;
392                        }
393                    }
394                    Operation::Work { future, completed } => {
395                        // If task is completed, skip it
396                        if *completed.lock().unwrap() {
397                            trace!(id = task.id, "dropping already complete task");
398                            continue;
399                        }
400
401                        // Poll the task
402                        let mut fut = future.lock().unwrap();
403                        if fut.as_mut().poll(&mut cx).is_ready() {
404                            trace!(id = task.id, "task is complete");
405                            *completed.lock().unwrap() = true;
406                            continue;
407                        }
408                    }
409                }
410
411                // Try again later if task is still pending
412                trace!(id = task.id, "task is still pending");
413            }
414
415            // Advance time by cycle
416            //
417            // This approach prevents starvation if some task never yields (to approximate this,
418            // duration can be set to 1ns).
419            let mut current;
420            {
421                let mut time = executor.time.lock().unwrap();
422                *time = time
423                    .checked_add(executor.cycle)
424                    .expect("executor time overflowed");
425                current = *time;
426            }
427            trace!(now = current.epoch_millis(), "time advanced");
428
429            // Skip time if there is nothing to do
430            if executor.tasks.len() == 0 {
431                let mut skip = None;
432                {
433                    let sleeping = executor.sleeping.lock().unwrap();
434                    if let Some(next) = sleeping.peek() {
435                        if next.time > current {
436                            skip = Some(next.time);
437                        }
438                    }
439                }
440                if skip.is_some() {
441                    {
442                        let mut time = executor.time.lock().unwrap();
443                        *time = skip.unwrap();
444                        current = *time;
445                    }
446                    trace!(now = current.epoch_millis(), "time skipped");
447                }
448            }
449
450            // Wake all sleeping tasks that are ready
451            let mut to_wake = Vec::new();
452            let mut remaining;
453            {
454                let mut sleeping = executor.sleeping.lock().unwrap();
455                while let Some(next) = sleeping.peek() {
456                    if next.time <= current {
457                        let sleeper = sleeping.pop().unwrap();
458                        to_wake.push(sleeper.waker);
459                    } else {
460                        break;
461                    }
462                }
463                remaining = sleeping.len();
464            }
465            for waker in to_wake {
466                waker.wake();
467            }
468
469            // Account for remaining tasks
470            remaining += executor.tasks.len();
471
472            // If there are no tasks to run and no tasks sleeping, the executor is stalled
473            // and will never finish.
474            if remaining == 0 {
475                panic!("runtime stalled");
476            }
477            iter += 1;
478        }
479    }
480}
481
482/// The operation that a task is performing.
483enum Operation {
484    Root,
485    Work {
486        future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
487        completed: Mutex<bool>,
488    },
489}
490
491/// A task that is being executed by the runtime.
492struct Task {
493    id: u128,
494    label: String,
495    tasks: Arc<Tasks>,
496
497    operation: Operation,
498}
499
500impl ArcWake for Task {
501    fn wake_by_ref(arc_self: &Arc<Self>) {
502        arc_self.tasks.enqueue(arc_self.clone());
503    }
504}
505
506/// A task queue that is used to manage the tasks that are being executed by the runtime.
507struct Tasks {
508    /// The current task counter.
509    counter: Mutex<u128>,
510    /// The queue of tasks that are waiting to be executed.
511    queue: Mutex<Vec<Arc<Task>>>,
512    /// Indicates whether the root task has been registered.
513    root_registered: Mutex<bool>,
514}
515
516impl Tasks {
517    /// Create a new task queue.
518    fn new() -> Self {
519        Self {
520            counter: Mutex::new(0),
521            queue: Mutex::new(Vec::new()),
522            root_registered: Mutex::new(false),
523        }
524    }
525
526    /// Increment the task counter and return the old value.
527    fn increment(&self) -> u128 {
528        let mut counter = self.counter.lock().unwrap();
529        let old = *counter;
530        *counter = counter.checked_add(1).expect("task counter overflow");
531        old
532    }
533
534    /// Register the root task.
535    ///
536    /// If the root task has already been registered, this function will panic.
537    fn register_root(arc_self: &Arc<Self>) {
538        {
539            let mut registered = arc_self.root_registered.lock().unwrap();
540            assert!(!*registered, "root already registered");
541            *registered = true;
542        }
543        let id = arc_self.increment();
544        let mut queue = arc_self.queue.lock().unwrap();
545        queue.push(Arc::new(Task {
546            id,
547            label: String::new(),
548            tasks: arc_self.clone(),
549            operation: Operation::Root,
550        }));
551    }
552
553    /// Register a new task to be executed.
554    fn register_work(
555        arc_self: &Arc<Self>,
556        label: &str,
557        future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
558    ) {
559        let id = arc_self.increment();
560        let mut queue = arc_self.queue.lock().unwrap();
561        queue.push(Arc::new(Task {
562            id,
563            label: label.to_string(),
564            tasks: arc_self.clone(),
565            operation: Operation::Work {
566                future: Mutex::new(future),
567                completed: Mutex::new(false),
568            },
569        }));
570    }
571
572    /// Enqueue an already registered task to be executed.
573    fn enqueue(&self, task: Arc<Task>) {
574        let mut queue = self.queue.lock().unwrap();
575        queue.push(task);
576    }
577
578    /// Dequeue all tasks that are ready to execute.
579    fn drain(&self) -> Vec<Arc<Task>> {
580        let mut queue = self.queue.lock().unwrap();
581        let len = queue.len();
582        replace(&mut *queue, Vec::with_capacity(len))
583    }
584
585    /// Get the number of tasks in the queue.
586    fn len(&self) -> usize {
587        self.queue.lock().unwrap().len()
588    }
589}
590
591/// Implementation of [crate::Spawner], [crate::Clock],
592/// [crate::Network], and [crate::Storage] for the `deterministic`
593/// runtime.
594pub struct Context {
595    label: String,
596    spawned: bool,
597    executor: Arc<Executor>,
598    networking: Arc<Networking>,
599    storage: MeteredStorage<AuditedStorage<MemStorage>>,
600}
601
602impl Default for Context {
603    fn default() -> Self {
604        Self::new(Config::default())
605    }
606}
607
608impl Context {
609    pub fn new(cfg: Config) -> Self {
610        // Create a new registry
611        let mut registry = Registry::default();
612        let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
613
614        // Initialize runtime
615        let metrics = Arc::new(Metrics::init(runtime_registry));
616        let start_time = UNIX_EPOCH;
617        let deadline = cfg
618            .timeout
619            .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
620        let (signaler, signal) = Signaler::new();
621        let auditor = Arc::new(Auditor::default());
622        let storage = MeteredStorage::new(
623            AuditedStorage::new(MemStorage::default(), auditor.clone()),
624            runtime_registry,
625        );
626        let executor = Arc::new(Executor {
627            registry: Mutex::new(registry),
628            cycle: cfg.cycle,
629            deadline,
630            metrics: metrics.clone(),
631            auditor: auditor.clone(),
632            rng: Mutex::new(StdRng::seed_from_u64(cfg.seed)),
633            time: Mutex::new(start_time),
634            tasks: Arc::new(Tasks::new()),
635            sleeping: Mutex::new(BinaryHeap::new()),
636            partitions: Mutex::new(HashMap::new()),
637            signaler: Mutex::new(signaler),
638            signal,
639            finished: Mutex::new(false),
640            recovered: Mutex::new(false),
641        });
642        Context {
643            label: String::new(),
644            spawned: false,
645            executor: executor.clone(),
646            networking: Arc::new(Networking::new(metrics, auditor)),
647            storage,
648        }
649    }
650
651    /// Recover the inner state (deadline, metrics, auditor, rng, synced storage, etc.) from the
652    /// current runtime and use it to initialize a new instance of the runtime. A recovered runtime
653    /// does not inherit the current runtime's pending tasks, unsynced storage, network connections, nor
654    /// its shutdown signaler.
655    ///
656    /// This is useful for performing a deterministic simulation that spans multiple runtime instantiations,
657    /// like simulating unclean shutdown (which involves repeatedly halting the runtime at unexpected intervals).
658    ///
659    /// It is only permitted to call this method after the runtime has finished (i.e. once `start` returns)
660    /// and only permitted to do once (otherwise multiple recovered runtimes will share the same inner state).
661    /// If either one of these conditions is violated, this method will panic.
662    pub fn recover(self) -> Self {
663        // Ensure we are finished
664        if !*self.executor.finished.lock().unwrap() {
665            panic!("execution is not finished");
666        }
667
668        // Ensure runtime has not already been recovered
669        {
670            let mut recovered = self.executor.recovered.lock().unwrap();
671            if *recovered {
672                panic!("runtime has already been recovered");
673            }
674            *recovered = true;
675        }
676
677        // Rebuild metrics
678        let mut registry = Registry::default();
679        let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
680        let metrics = Arc::new(Metrics::init(runtime_registry));
681
682        // Copy state
683        let auditor = self.executor.auditor.clone();
684        let (signaler, signal) = Signaler::new();
685        let executor = Arc::new(Executor {
686            // Copied from the current runtime
687            cycle: self.executor.cycle,
688            deadline: self.executor.deadline,
689            auditor: auditor.clone(),
690            rng: Mutex::new(self.executor.rng.lock().unwrap().clone()),
691            time: Mutex::new(*self.executor.time.lock().unwrap()),
692            partitions: Mutex::new(self.executor.partitions.lock().unwrap().clone()),
693
694            // New state for the new runtime
695            registry: Mutex::new(registry),
696            metrics: metrics.clone(),
697            tasks: Arc::new(Tasks::new()),
698            sleeping: Mutex::new(BinaryHeap::new()),
699            signaler: Mutex::new(signaler),
700            signal,
701            finished: Mutex::new(false),
702            recovered: Mutex::new(false),
703        });
704        Self {
705            label: String::new(),
706            spawned: false,
707            executor,
708            networking: Arc::new(Networking::new(metrics, auditor.clone())),
709            storage: self.storage,
710        }
711    }
712
713    pub fn auditor(&self) -> &Auditor {
714        &self.executor.auditor
715    }
716}
717
718impl Clone for Context {
719    fn clone(&self) -> Self {
720        Self {
721            label: self.label.clone(),
722            spawned: false,
723            executor: self.executor.clone(),
724            networking: self.networking.clone(),
725            storage: self.storage.clone(),
726        }
727    }
728}
729
730impl crate::Spawner for Context {
731    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
732    where
733        F: FnOnce(Self) -> Fut + Send + 'static,
734        Fut: Future<Output = T> + Send + 'static,
735        T: Send + 'static,
736    {
737        // Ensure a context only spawns one task
738        assert!(!self.spawned, "already spawned");
739
740        // Get metrics
741        let label = self.label.clone();
742        let work = Work {
743            label: label.clone(),
744        };
745        self.executor
746            .metrics
747            .tasks_spawned
748            .get_or_create(&work)
749            .inc();
750        let gauge = self
751            .executor
752            .metrics
753            .tasks_running
754            .get_or_create(&work)
755            .clone();
756
757        // Set up the task
758        let executor = self.executor.clone();
759        let future = f(self);
760        let (f, handle) = Handle::init(future, gauge, false);
761
762        // Spawn the task
763        Tasks::register_work(&executor.tasks, &label, Box::pin(f));
764        handle
765    }
766
767    fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
768    where
769        F: Future<Output = T> + Send + 'static,
770        T: Send + 'static,
771    {
772        // Ensure a context only spawns one task
773        assert!(!self.spawned, "already spawned");
774        self.spawned = true;
775
776        // Get metrics
777        let work = Work {
778            label: self.label.clone(),
779        };
780        self.executor
781            .metrics
782            .tasks_spawned
783            .get_or_create(&work)
784            .inc();
785        let gauge = self
786            .executor
787            .metrics
788            .tasks_running
789            .get_or_create(&work)
790            .clone();
791
792        // Set up the task
793        let label = self.label.clone();
794        let executor = self.executor.clone();
795        move |f: F| {
796            let (f, handle) = Handle::init(f, gauge, false);
797
798            // Spawn the task
799            Tasks::register_work(&executor.tasks, &label, Box::pin(f));
800            handle
801        }
802    }
803
804    fn spawn_blocking<F, T>(self, f: F) -> Handle<T>
805    where
806        F: FnOnce() -> T + Send + 'static,
807        T: Send + 'static,
808    {
809        // Ensure a context only spawns one task
810        assert!(!self.spawned, "already spawned");
811
812        // Get metrics
813        let work = Work {
814            label: self.label.clone(),
815        };
816        self.executor
817            .metrics
818            .blocking_tasks_spawned
819            .get_or_create(&work)
820            .inc();
821        let gauge = self
822            .executor
823            .metrics
824            .blocking_tasks_running
825            .get_or_create(&work)
826            .clone();
827
828        // Initialize the blocking task
829        let (f, handle) = Handle::init_blocking(f, gauge, false);
830
831        // Spawn the task
832        let f = async move { f() };
833        Tasks::register_work(&self.executor.tasks, &self.label, Box::pin(f));
834        handle
835    }
836
837    fn stop(&self, value: i32) {
838        self.executor.auditor.event(b"stop", |hasher| {
839            hasher.update(value.to_be_bytes());
840        });
841        self.executor.signaler.lock().unwrap().signal(value);
842    }
843
844    fn stopped(&self) -> Signal {
845        self.executor.auditor.event(b"stopped", |_| {});
846        self.executor.signal.clone()
847    }
848}
849
850impl crate::Metrics for Context {
851    fn with_label(&self, label: &str) -> Self {
852        let label = {
853            let prefix = self.label.clone();
854            if prefix.is_empty() {
855                label.to_string()
856            } else {
857                format!("{}_{}", prefix, label)
858            }
859        };
860        assert!(
861            !label.starts_with(METRICS_PREFIX),
862            "using runtime label is not allowed"
863        );
864        Self {
865            label,
866            spawned: false,
867            executor: self.executor.clone(),
868            networking: self.networking.clone(),
869            storage: self.storage.clone(),
870        }
871    }
872
873    fn label(&self) -> String {
874        self.label.clone()
875    }
876
877    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
878        // Prepare args
879        let name = name.into();
880        let help = help.into();
881
882        // Register metric
883        self.executor.auditor.event(b"register", |hasher| {
884            hasher.update(name.as_bytes());
885            hasher.update(help.as_bytes());
886        });
887        let prefixed_name = {
888            let prefix = &self.label;
889            if prefix.is_empty() {
890                name
891            } else {
892                format!("{}_{}", *prefix, name)
893            }
894        };
895        self.executor
896            .registry
897            .lock()
898            .unwrap()
899            .register(prefixed_name, help, metric)
900    }
901
902    fn encode(&self) -> String {
903        self.executor.auditor.event(b"encode", |_| {});
904        let mut buffer = String::new();
905        encode(&mut buffer, &self.executor.registry.lock().unwrap()).expect("encoding failed");
906        buffer
907    }
908}
909
910struct Sleeper {
911    executor: Arc<Executor>,
912    time: SystemTime,
913    registered: bool,
914}
915
916struct Alarm {
917    time: SystemTime,
918    waker: Waker,
919}
920
921impl PartialEq for Alarm {
922    fn eq(&self, other: &Self) -> bool {
923        self.time.eq(&other.time)
924    }
925}
926
927impl Eq for Alarm {}
928
929impl PartialOrd for Alarm {
930    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
931        Some(self.cmp(other))
932    }
933}
934
935impl Ord for Alarm {
936    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
937        // Reverse the ordering for min-heap
938        other.time.cmp(&self.time)
939    }
940}
941
942impl Future for Sleeper {
943    type Output = ();
944
945    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
946        {
947            let current_time = *self.executor.time.lock().unwrap();
948            if current_time >= self.time {
949                return Poll::Ready(());
950            }
951        }
952        if !self.registered {
953            self.registered = true;
954            self.executor.sleeping.lock().unwrap().push(Alarm {
955                time: self.time,
956                waker: cx.waker().clone(),
957            });
958        }
959        Poll::Pending
960    }
961}
962
963impl Clock for Context {
964    fn current(&self) -> SystemTime {
965        *self.executor.time.lock().unwrap()
966    }
967
968    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
969        let deadline = self
970            .current()
971            .checked_add(duration)
972            .expect("overflow when setting wake time");
973        self.sleep_until(deadline)
974    }
975
976    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
977        Sleeper {
978            executor: self.executor.clone(),
979
980            time: deadline,
981            registered: false,
982        }
983    }
984}
985
986impl GClock for Context {
987    type Instant = SystemTime;
988
989    fn now(&self) -> Self::Instant {
990        self.current()
991    }
992}
993
994impl ReasonablyRealtime for Context {}
995
996type Dialable = mpsc::UnboundedSender<(
997    SocketAddr,
998    mocks::Sink,   // Listener -> Dialer
999    mocks::Stream, // Dialer -> Listener
1000)>;
1001
1002/// Implementation of [crate::Network] for the `deterministic` runtime.
1003///
1004/// When a dialer connects to a listener, the listener is given a new ephemeral port
1005/// from the range `32768..61000`. To keep things simple, it is not possible to
1006/// bind to an ephemeral port. Likewise, if ports are not reused and when exhausted,
1007/// the runtime will panic.
1008struct Networking {
1009    metrics: Arc<Metrics>,
1010    auditor: Arc<Auditor>,
1011    ephemeral: Mutex<u16>,
1012    listeners: Mutex<HashMap<SocketAddr, Dialable>>,
1013}
1014
1015impl Networking {
1016    fn new(metrics: Arc<Metrics>, auditor: Arc<Auditor>) -> Self {
1017        Self {
1018            metrics,
1019            auditor,
1020            ephemeral: Mutex::new(EPHEMERAL_PORT_RANGE.start),
1021            listeners: Mutex::new(HashMap::new()),
1022        }
1023    }
1024
1025    fn bind(&self, socket: SocketAddr) -> Result<Listener, Error> {
1026        self.auditor.event(b"bind", |hasher| {
1027            hasher.update(socket.to_string().as_bytes());
1028        });
1029
1030        // If the IP is localhost, ensure the port is not in the ephemeral range
1031        // so that it can be used for binding in the dial method
1032        if socket.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST)
1033            && EPHEMERAL_PORT_RANGE.contains(&socket.port())
1034        {
1035            return Err(Error::BindFailed);
1036        }
1037
1038        // Ensure the port is not already bound
1039        let mut listeners = self.listeners.lock().unwrap();
1040        if listeners.contains_key(&socket) {
1041            return Err(Error::BindFailed);
1042        }
1043
1044        // Bind the socket
1045        let (sender, receiver) = mpsc::unbounded();
1046        listeners.insert(socket, sender);
1047        Ok(Listener {
1048            auditor: self.auditor.clone(),
1049            address: socket,
1050            listener: receiver,
1051            metrics: self.metrics.clone(),
1052        })
1053    }
1054
1055    async fn dial(&self, socket: SocketAddr) -> Result<(Sink, Stream), Error> {
1056        // Assign dialer a port from the ephemeral range
1057        let dialer = {
1058            let mut ephemeral = self.ephemeral.lock().unwrap();
1059            let dialer = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), *ephemeral);
1060            *ephemeral = ephemeral
1061                .checked_add(1)
1062                .expect("ephemeral port range exhausted");
1063            dialer
1064        };
1065        self.auditor.event(b"dial", |hasher| {
1066            hasher.update(dialer.to_string().as_bytes());
1067            hasher.update(socket.to_string().as_bytes());
1068        });
1069
1070        // Get listener
1071        let mut sender = {
1072            let listeners = self.listeners.lock().unwrap();
1073            let sender = listeners.get(&socket).ok_or(Error::ConnectionFailed)?;
1074            sender.clone()
1075        };
1076
1077        // Construct connection
1078        let (dialer_sender, dialer_receiver) = mocks::Channel::init();
1079        let (listener_sender, listener_receiver) = mocks::Channel::init();
1080        sender
1081            .send((dialer, dialer_sender, listener_receiver))
1082            .await
1083            .map_err(|_| Error::ConnectionFailed)?;
1084        Ok((
1085            Sink {
1086                metrics: self.metrics.clone(),
1087                auditor: self.auditor.clone(),
1088                me: dialer,
1089                peer: socket,
1090                sender: listener_sender,
1091            },
1092            Stream {
1093                auditor: self.auditor.clone(),
1094                me: dialer,
1095                peer: socket,
1096                receiver: dialer_receiver,
1097            },
1098        ))
1099    }
1100}
1101
1102impl crate::Network for Context {
1103    type Listener = Listener;
1104
1105    async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
1106        self.networking.bind(socket)
1107    }
1108
1109    async fn dial(&self, socket: SocketAddr) -> Result<(Sink, Stream), Error> {
1110        self.networking.dial(socket).await
1111    }
1112}
1113
1114/// Implementation of [crate::Listener] for the `deterministic` runtime.
1115pub struct Listener {
1116    metrics: Arc<Metrics>,
1117    auditor: Arc<Auditor>,
1118    address: SocketAddr,
1119    listener: mpsc::UnboundedReceiver<(SocketAddr, mocks::Sink, mocks::Stream)>,
1120}
1121
1122impl crate::Listener for Listener {
1123    type Sink = Sink;
1124    type Stream = Stream;
1125
1126    async fn accept(&mut self) -> Result<(SocketAddr, Self::Sink, Self::Stream), Error> {
1127        let (socket, sender, receiver) = self.listener.next().await.ok_or(Error::ReadFailed)?;
1128        self.auditor.event(b"accept", |hasher| {
1129            hasher.update(self.address.to_string().as_bytes());
1130            hasher.update(socket.to_string().as_bytes());
1131        });
1132        Ok((
1133            socket,
1134            Sink {
1135                metrics: self.metrics.clone(),
1136                auditor: self.auditor.clone(),
1137                me: self.address,
1138                peer: socket,
1139                sender,
1140            },
1141            Stream {
1142                auditor: self.auditor.clone(),
1143                me: self.address,
1144                peer: socket,
1145                receiver,
1146            },
1147        ))
1148    }
1149}
1150
1151/// Implementation of [crate::Sink] for the `deterministic` runtime.
1152pub struct Sink {
1153    metrics: Arc<Metrics>,
1154    auditor: Arc<Auditor>,
1155    me: SocketAddr,
1156    peer: SocketAddr,
1157    sender: mocks::Sink,
1158}
1159
1160impl crate::Sink for Sink {
1161    async fn send(&mut self, msg: &[u8]) -> Result<(), Error> {
1162        self.auditor.event(b"send", |hasher| {
1163            hasher.update(self.me.to_string().as_bytes());
1164            hasher.update(self.peer.to_string().as_bytes());
1165            hasher.update(msg);
1166        });
1167        self.sender.send(msg).await.map_err(|_| Error::SendFailed)?;
1168        self.metrics.network_bandwidth.inc_by(msg.len() as u64);
1169        Ok(())
1170    }
1171}
1172
1173/// Implementation of [crate::Stream] for the `deterministic` runtime.
1174pub struct Stream {
1175    auditor: Arc<Auditor>,
1176    me: SocketAddr,
1177    peer: SocketAddr,
1178    receiver: mocks::Stream,
1179}
1180
1181impl crate::Stream for Stream {
1182    async fn recv(&mut self, buf: &mut [u8]) -> Result<(), Error> {
1183        self.receiver
1184            .recv(buf)
1185            .await
1186            .map_err(|_| Error::RecvFailed)?;
1187        self.auditor.event(b"recv", |hasher| {
1188            hasher.update(self.me.to_string().as_bytes());
1189            hasher.update(self.peer.to_string().as_bytes());
1190            hasher.update(buf);
1191        });
1192        Ok(())
1193    }
1194}
1195
1196impl RngCore for Context {
1197    fn next_u32(&mut self) -> u32 {
1198        self.executor.auditor.event(b"rand", |hasher| {
1199            hasher.update(b"next_u32");
1200        });
1201        self.executor.rng.lock().unwrap().next_u32()
1202    }
1203
1204    fn next_u64(&mut self) -> u64 {
1205        self.executor.auditor.event(b"rand", |hasher| {
1206            hasher.update(b"next_u64");
1207        });
1208        self.executor.rng.lock().unwrap().next_u64()
1209    }
1210
1211    fn fill_bytes(&mut self, dest: &mut [u8]) {
1212        self.executor.auditor.event(b"rand", |hasher| {
1213            hasher.update(b"fill_bytes");
1214        });
1215        self.executor.rng.lock().unwrap().fill_bytes(dest)
1216    }
1217
1218    fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1219        self.executor.auditor.event(b"rand", |hasher| {
1220            hasher.update(b"try_fill_bytes");
1221        });
1222        self.executor.rng.lock().unwrap().try_fill_bytes(dest)
1223    }
1224}
1225
1226impl CryptoRng for Context {}
1227
1228impl crate::Storage for Context {
1229    type Blob = <MeteredStorage<AuditedStorage<MemStorage>> as crate::Storage>::Blob;
1230
1231    async fn open(&self, partition: &str, name: &[u8]) -> Result<(Self::Blob, u64), Error> {
1232        self.storage.open(partition, name).await
1233    }
1234
1235    async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1236        self.storage.remove(partition, name).await
1237    }
1238
1239    async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1240        self.storage.scan(partition).await
1241    }
1242}
1243
1244#[cfg(test)]
1245mod tests {
1246    use super::*;
1247    use crate::{deterministic, utils::run_tasks, Blob, Runner as _, Storage};
1248    use commonware_macros::test_traced;
1249    use futures::task::noop_waker;
1250
1251    fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1252        let executor = deterministic::Runner::seeded(seed);
1253        run_tasks(5, executor)
1254    }
1255
1256    #[test]
1257    fn test_same_seed_same_order() {
1258        // Generate initial outputs
1259        let mut outputs = Vec::new();
1260        for seed in 0..1000 {
1261            let output = run_with_seed(seed);
1262            outputs.push(output);
1263        }
1264
1265        // Ensure they match
1266        for seed in 0..1000 {
1267            let output = run_with_seed(seed);
1268            assert_eq!(output, outputs[seed as usize]);
1269        }
1270    }
1271
1272    #[test_traced("TRACE")]
1273    fn test_different_seeds_different_order() {
1274        let output1 = run_with_seed(12345);
1275        let output2 = run_with_seed(54321);
1276        assert_ne!(output1, output2);
1277    }
1278
1279    #[test]
1280    fn test_alarm_min_heap() {
1281        // Populate heap
1282        let now = SystemTime::now();
1283        let alarms = vec![
1284            Alarm {
1285                time: now + Duration::new(10, 0),
1286                waker: noop_waker(),
1287            },
1288            Alarm {
1289                time: now + Duration::new(5, 0),
1290                waker: noop_waker(),
1291            },
1292            Alarm {
1293                time: now + Duration::new(15, 0),
1294                waker: noop_waker(),
1295            },
1296            Alarm {
1297                time: now + Duration::new(5, 0),
1298                waker: noop_waker(),
1299            },
1300        ];
1301        let mut heap = BinaryHeap::new();
1302        for alarm in alarms {
1303            heap.push(alarm);
1304        }
1305
1306        // Verify min-heap
1307        let mut sorted_times = Vec::new();
1308        while let Some(alarm) = heap.pop() {
1309            sorted_times.push(alarm.time);
1310        }
1311        assert_eq!(
1312            sorted_times,
1313            vec![
1314                now + Duration::new(5, 0),
1315                now + Duration::new(5, 0),
1316                now + Duration::new(10, 0),
1317                now + Duration::new(15, 0),
1318            ]
1319        );
1320    }
1321
1322    #[test]
1323    #[should_panic(expected = "runtime timeout")]
1324    fn test_timeout() {
1325        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1326        executor.start(|context| async move {
1327            loop {
1328                context.sleep(Duration::from_secs(1)).await;
1329            }
1330        });
1331    }
1332
1333    #[test]
1334    #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1335    fn test_bad_timeout() {
1336        let cfg = Config {
1337            timeout: Some(Duration::default()),
1338            cycle: Duration::default(),
1339            ..Config::default()
1340        };
1341        deterministic::Runner::new(cfg);
1342    }
1343
1344    #[test]
1345    fn test_recover_synced_storage_persists() {
1346        // Initialize the first runtime
1347        let executor1 = deterministic::Runner::default();
1348        let partition = "test_partition";
1349        let name = b"test_blob";
1350        let data = b"Hello, world!";
1351
1352        // Run some tasks, sync storage, and recover the runtime
1353        let (context, state) = executor1.start(|context| async move {
1354            let (blob, _) = context.open(partition, name).await.unwrap();
1355            blob.write_at(data, 0).await.unwrap();
1356            blob.sync().await.unwrap();
1357            let state = context.auditor().state();
1358            (context, state)
1359        });
1360        let recovered_context = context.recover();
1361
1362        // Verify auditor state is the same
1363        assert_eq!(state, recovered_context.auditor().state());
1364
1365        // Check that synced storage persists after recovery
1366        let executor = Runner::from(recovered_context);
1367        executor.start(|context| async move {
1368            let (blob, len) = context.open(partition, name).await.unwrap();
1369            assert_eq!(len, data.len() as u64);
1370            let mut buf = vec![0; data.len()];
1371            blob.read_at(&mut buf, 0).await.unwrap();
1372            assert_eq!(buf, data);
1373        });
1374    }
1375
1376    #[test]
1377    fn test_recover_unsynced_storage_does_not_persist() {
1378        // Initialize the first runtime
1379        let executor = deterministic::Runner::default();
1380        let partition = "test_partition";
1381        let name = b"test_blob";
1382        let data = b"Hello, world!".to_vec();
1383
1384        // Run some tasks without syncing storage
1385        let context = executor.start(|context| async move {
1386            let context = context.clone();
1387            let (blob, _) = context.open(partition, name).await.unwrap();
1388            blob.write_at(&data, 0).await.unwrap();
1389            // Intentionally do not call sync() here
1390            context
1391        });
1392
1393        // Recover the runtime
1394        let context = context.recover();
1395        let executor = Runner::from(context);
1396
1397        // Check that unsynced storage does not persist after recovery
1398        executor.start(|context| async move {
1399            let (_, len) = context.open(partition, name).await.unwrap();
1400            assert_eq!(len, 0);
1401        });
1402    }
1403
1404    #[test]
1405    #[should_panic(expected = "execution is not finished")]
1406    fn test_recover_before_finish_panics() {
1407        // Initialize runtime
1408        let executor = deterministic::Runner::default();
1409
1410        // Start runtime
1411        executor.start(|context| async move {
1412            // Attempt to recover before the runtime has finished
1413            context.recover();
1414        });
1415    }
1416
1417    #[test]
1418    #[should_panic(expected = "runtime has already been recovered")]
1419    fn test_recover_twice_panics() {
1420        // Initialize runtime
1421        let executor = deterministic::Runner::default();
1422
1423        // Finish runtime
1424        let context = executor.start(|context| async move { context });
1425
1426        // Recover for the first time
1427        let cloned_context = context.clone();
1428        context.recover();
1429
1430        // Attempt to recover again using the same context
1431        cloned_context.recover();
1432    }
1433
1434    #[test]
1435    fn test_default_time_zero() {
1436        // Initialize runtime
1437        let executor = deterministic::Runner::default();
1438
1439        executor.start(|context| async move {
1440            // Check that the time is zero
1441            assert_eq!(
1442                context.current().duration_since(UNIX_EPOCH).unwrap(),
1443                Duration::ZERO
1444            );
1445        });
1446    }
1447}