commonware_runtime/
deterministic.rs

1//! A deterministic runtime that randomly selects tasks to run based on a seed
2//!
3//! # Panics
4//!
5//! If any task panics, the runtime will panic (and shutdown).
6//!
7//! # Example
8//!
9//! ```rust
10//! use commonware_runtime::{Spawner, Runner, deterministic::Executor};
11//!
12//! let (executor, runtime, auditor) = Executor::default();
13//! executor.start(async move {
14//!     println!("Parent started");
15//!     let result = runtime.spawn("child", async move {
16//!         println!("Child started");
17//!         "hello"
18//!     });
19//!     println!("Child result: {:?}", result.await);
20//!     println!("Parent exited");
21//! });
22//! println!("Auditor state: {}", auditor.state());
23//! ```
24
25use crate::{mocks, utils::Signaler, Clock, Error, Handle, Signal};
26use commonware_utils::{hex, SystemTimeExt};
27use futures::{
28    channel::mpsc,
29    task::{waker_ref, ArcWake},
30    SinkExt, StreamExt,
31};
32use governor::clock::{Clock as GClock, ReasonablyRealtime};
33use prometheus_client::{
34    encoding::EncodeLabelSet,
35    metrics::{counter::Counter, family::Family, gauge::Gauge},
36    registry::Registry,
37};
38use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
39use sha2::{Digest, Sha256};
40use std::{
41    collections::{BinaryHeap, HashMap},
42    future::Future,
43    mem::replace,
44    net::{IpAddr, Ipv4Addr, SocketAddr},
45    ops::Range,
46    pin::Pin,
47    sync::{Arc, Mutex, RwLock},
48    task::{self, Poll, Waker},
49    time::{Duration, SystemTime, UNIX_EPOCH},
50};
51use tracing::trace;
52
53/// Range of ephemeral ports assigned to dialers.
54const EPHEMERAL_PORT_RANGE: Range<u16> = 32768..61000;
55
56/// Label for root task created during `Runner::start`.
57const ROOT_TASK: &str = "root";
58
59/// Map of names to blob contents.
60pub type Partition = HashMap<Vec<u8>, Vec<u8>>;
61
62#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
63struct Work {
64    label: String,
65}
66
67#[derive(Debug)]
68struct Metrics {
69    tasks_spawned: Family<Work, Counter>,
70    tasks_running: Family<Work, Gauge>,
71    task_polls: Family<Work, Counter>,
72
73    network_bandwidth: Counter,
74
75    open_blobs: Gauge,
76    storage_reads: Counter,
77    storage_read_bandwidth: Counter,
78    storage_writes: Counter,
79    storage_write_bandwidth: Counter,
80}
81
82impl Metrics {
83    pub fn init(registry: Arc<Mutex<Registry>>) -> Self {
84        let metrics = Self {
85            task_polls: Family::default(),
86            tasks_running: Family::default(),
87            tasks_spawned: Family::default(),
88            network_bandwidth: Counter::default(),
89            open_blobs: Gauge::default(),
90            storage_reads: Counter::default(),
91            storage_read_bandwidth: Counter::default(),
92            storage_writes: Counter::default(),
93            storage_write_bandwidth: Counter::default(),
94        };
95        {
96            let mut registry = registry.lock().unwrap();
97            registry.register(
98                "tasks_spawned",
99                "Total number of tasks spawned",
100                metrics.tasks_spawned.clone(),
101            );
102            registry.register(
103                "tasks_running",
104                "Number of tasks currently running",
105                metrics.tasks_running.clone(),
106            );
107            registry.register(
108                "task_polls",
109                "Total number of task polls",
110                metrics.task_polls.clone(),
111            );
112            registry.register(
113                "bandwidth",
114                "Total amount of data sent over network",
115                metrics.network_bandwidth.clone(),
116            );
117            registry.register(
118                "open_blobs",
119                "Number of open blobs",
120                metrics.open_blobs.clone(),
121            );
122            registry.register(
123                "storage_reads",
124                "Total number of disk reads",
125                metrics.storage_reads.clone(),
126            );
127            registry.register(
128                "storage_read_bandwidth",
129                "Total amount of data read from disk",
130                metrics.storage_read_bandwidth.clone(),
131            );
132            registry.register(
133                "storage_writes",
134                "Total number of disk writes",
135                metrics.storage_writes.clone(),
136            );
137            registry.register(
138                "storage_write_bandwidth",
139                "Total amount of data written to disk",
140                metrics.storage_write_bandwidth.clone(),
141            );
142        }
143        metrics
144    }
145}
146
147/// Track the state of the runtime for determinism auditing.
148pub struct Auditor {
149    hash: Mutex<Vec<u8>>,
150}
151
152impl Auditor {
153    fn new() -> Self {
154        Self {
155            hash: Mutex::new(Vec::new()),
156        }
157    }
158
159    fn process_task(&self, task: u128, label: &str) {
160        let mut hash = self.hash.lock().unwrap();
161        let mut hasher = Sha256::new();
162        hasher.update(&*hash);
163        hasher.update(b"process_task");
164        hasher.update(task.to_be_bytes());
165        hasher.update(label.as_bytes());
166        *hash = hasher.finalize().to_vec();
167    }
168
169    fn stop(&self, value: i32) {
170        let mut hash = self.hash.lock().unwrap();
171        let mut hasher = Sha256::new();
172        hasher.update(&*hash);
173        hasher.update(b"stop");
174        hasher.update(value.to_be_bytes());
175        *hash = hasher.finalize().to_vec();
176    }
177
178    fn stopped(&self) {
179        let mut hash = self.hash.lock().unwrap();
180        let mut hasher = Sha256::new();
181        hasher.update(&*hash);
182        hasher.update(b"stopped");
183        *hash = hasher.finalize().to_vec();
184    }
185
186    fn bind(&self, address: SocketAddr) {
187        let mut hash = self.hash.lock().unwrap();
188        let mut hasher = Sha256::new();
189        hasher.update(&*hash);
190        hasher.update(b"bind");
191        hasher.update(address.to_string().as_bytes());
192        *hash = hasher.finalize().to_vec();
193    }
194
195    fn dial(&self, dialer: SocketAddr, dialee: SocketAddr) {
196        let mut hash = self.hash.lock().unwrap();
197        let mut hasher = Sha256::new();
198        hasher.update(&*hash);
199        hasher.update(b"dial");
200        hasher.update(dialer.to_string().as_bytes());
201        hasher.update(dialee.to_string().as_bytes());
202        *hash = hasher.finalize().to_vec();
203    }
204
205    fn accept(&self, dialee: SocketAddr, dialer: SocketAddr) {
206        let mut hash = self.hash.lock().unwrap();
207        let mut hasher = Sha256::new();
208        hasher.update(&*hash);
209        hasher.update(b"accept");
210        hasher.update(dialee.to_string().as_bytes());
211        hasher.update(dialer.to_string().as_bytes());
212        *hash = hasher.finalize().to_vec();
213    }
214
215    fn send(&self, sender: SocketAddr, receiver: SocketAddr, message: &[u8]) {
216        let mut hash = self.hash.lock().unwrap();
217        let mut hasher = Sha256::new();
218        hasher.update(&*hash);
219        hasher.update(b"send");
220        hasher.update(sender.to_string().as_bytes());
221        hasher.update(receiver.to_string().as_bytes());
222        hasher.update(message);
223        *hash = hasher.finalize().to_vec();
224    }
225
226    fn recv(&self, receiver: SocketAddr, sender: SocketAddr, message: &[u8]) {
227        let mut hash = self.hash.lock().unwrap();
228        let mut hasher = Sha256::new();
229        hasher.update(&*hash);
230        hasher.update(b"recv");
231        hasher.update(receiver.to_string().as_bytes());
232        hasher.update(sender.to_string().as_bytes());
233        hasher.update(message);
234        *hash = hasher.finalize().to_vec();
235    }
236
237    fn rand(&self, method: String) {
238        let mut hash = self.hash.lock().unwrap();
239        let mut hasher = Sha256::new();
240        hasher.update(&*hash);
241        hasher.update(b"rand");
242        hasher.update(method.as_bytes());
243        *hash = hasher.finalize().to_vec();
244    }
245
246    fn open(&self, partition: &str, name: &[u8]) {
247        let mut hash = self.hash.lock().unwrap();
248        let mut hasher = Sha256::new();
249        hasher.update(&*hash);
250        hasher.update(b"open");
251        hasher.update(partition.as_bytes());
252        hasher.update(name);
253        *hash = hasher.finalize().to_vec();
254    }
255
256    fn remove(&self, partition: &str, name: Option<&[u8]>) {
257        let mut hash = self.hash.lock().unwrap();
258        let mut hasher = Sha256::new();
259        hasher.update(&*hash);
260        hasher.update(b"remove");
261        hasher.update(partition.as_bytes());
262        if let Some(name) = name {
263            hasher.update(name);
264        }
265        *hash = hasher.finalize().to_vec();
266    }
267
268    fn scan(&self, partition: &str) {
269        let mut hash = self.hash.lock().unwrap();
270        let mut hasher = Sha256::new();
271        hasher.update(&*hash);
272        hasher.update(b"scan");
273        hasher.update(partition.as_bytes());
274        *hash = hasher.finalize().to_vec();
275    }
276
277    fn len(&self, partition: &str, name: &[u8]) {
278        let mut hash = self.hash.lock().unwrap();
279        let mut hasher = Sha256::new();
280        hasher.update(&*hash);
281        hasher.update(b"len");
282        hasher.update(partition.as_bytes());
283        hasher.update(name);
284        *hash = hasher.finalize().to_vec();
285    }
286
287    fn read_at(&self, partition: &str, name: &[u8], buf: usize, offset: u64) {
288        let mut hash = self.hash.lock().unwrap();
289        let mut hasher = Sha256::new();
290        hasher.update(&*hash);
291        hasher.update(b"read_at");
292        hasher.update(partition.as_bytes());
293        hasher.update(name);
294        hasher.update(buf.to_be_bytes());
295        hasher.update(offset.to_be_bytes());
296        *hash = hasher.finalize().to_vec();
297    }
298
299    fn write_at(&self, partition: &str, name: &[u8], buf: &[u8], offset: u64) {
300        let mut hash = self.hash.lock().unwrap();
301        let mut hasher = Sha256::new();
302        hasher.update(&*hash);
303        hasher.update(b"write_at");
304        hasher.update(partition.as_bytes());
305        hasher.update(name);
306        hasher.update(buf);
307        hasher.update(offset.to_be_bytes());
308        *hash = hasher.finalize().to_vec();
309    }
310
311    fn truncate(&self, partition: &str, name: &[u8], size: u64) {
312        let mut hash = self.hash.lock().unwrap();
313        let mut hasher = Sha256::new();
314        hasher.update(&*hash);
315        hasher.update(b"truncate");
316        hasher.update(partition.as_bytes());
317        hasher.update(name);
318        hasher.update(size.to_be_bytes());
319        *hash = hasher.finalize().to_vec();
320    }
321
322    fn sync(&self, partition: &str, name: &[u8]) {
323        let mut hash = self.hash.lock().unwrap();
324        let mut hasher = Sha256::new();
325        hasher.update(&*hash);
326        hasher.update(b"sync");
327        hasher.update(partition.as_bytes());
328        hasher.update(name);
329        *hash = hasher.finalize().to_vec();
330    }
331
332    fn close(&self, partition: &str, name: &[u8]) {
333        let mut hash = self.hash.lock().unwrap();
334        let mut hasher = Sha256::new();
335        hasher.update(&*hash);
336        hasher.update(b"close");
337        hasher.update(partition.as_bytes());
338        hasher.update(name);
339        *hash = hasher.finalize().to_vec();
340    }
341
342    /// Generate a representation of the current state of the runtime.
343    ///
344    /// This can be used to ensure that logic running on top
345    /// of the runtime is interacting deterministically.
346    pub fn state(&self) -> String {
347        let hash = self.hash.lock().unwrap().clone();
348        hex(&hash)
349    }
350}
351
352struct Task {
353    id: u128,
354    label: String,
355
356    tasks: Arc<Tasks>,
357
358    root: bool,
359    future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
360
361    completed: Mutex<bool>,
362}
363
364impl ArcWake for Task {
365    fn wake_by_ref(arc_self: &Arc<Self>) {
366        arc_self.tasks.enqueue(arc_self.clone());
367    }
368}
369
370struct Tasks {
371    counter: Mutex<u128>,
372    queue: Mutex<Vec<Arc<Task>>>,
373}
374
375impl Tasks {
376    fn register(
377        arc_self: &Arc<Self>,
378        label: &str,
379        root: bool,
380        future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
381    ) {
382        let mut queue = arc_self.queue.lock().unwrap();
383        let id = {
384            let mut l = arc_self.counter.lock().unwrap();
385            let old = *l;
386            *l = l.checked_add(1).expect("task counter overflow");
387            old
388        };
389        queue.push(Arc::new(Task {
390            id,
391            label: label.to_string(),
392            root,
393            future: Mutex::new(future),
394            tasks: arc_self.clone(),
395            completed: Mutex::new(false),
396        }));
397    }
398
399    fn enqueue(&self, task: Arc<Task>) {
400        let mut queue = self.queue.lock().unwrap();
401        queue.push(task);
402    }
403
404    fn drain(&self) -> Vec<Arc<Task>> {
405        let mut queue = self.queue.lock().unwrap();
406        let len = queue.len();
407        replace(&mut *queue, Vec::with_capacity(len))
408    }
409
410    fn len(&self) -> usize {
411        self.queue.lock().unwrap().len()
412    }
413}
414
415/// Configuration for the `deterministic` runtime.
416#[derive(Clone)]
417pub struct Config {
418    /// Registry for metrics.
419    pub registry: Arc<Mutex<Registry>>,
420
421    /// Seed for the random number generator.
422    pub seed: u64,
423
424    /// The cycle duration determines how much time is advanced after each iteration of the event
425    /// loop. This is useful to prevent starvation if some task never yields.
426    pub cycle: Duration,
427
428    /// If the runtime is still executing at this point (i.e. a test hasn't stopped), panic.
429    pub timeout: Option<Duration>,
430}
431
432impl Default for Config {
433    fn default() -> Self {
434        Self {
435            registry: Arc::new(Mutex::new(Registry::default())),
436            seed: 42,
437            cycle: Duration::from_millis(1),
438            timeout: None,
439        }
440    }
441}
442
443/// Deterministic runtime that randomly selects tasks to run based on a seed.
444pub struct Executor {
445    cycle: Duration,
446    deadline: Option<SystemTime>,
447    metrics: Arc<Metrics>,
448    auditor: Arc<Auditor>,
449    rng: Mutex<StdRng>,
450    prefix: Mutex<String>,
451    time: Mutex<SystemTime>,
452    tasks: Arc<Tasks>,
453    sleeping: Mutex<BinaryHeap<Alarm>>,
454    partitions: Mutex<HashMap<String, Partition>>,
455    signaler: Mutex<Signaler>,
456    signal: Signal,
457    finished: Mutex<bool>,
458    recovered: Mutex<bool>,
459}
460
461impl Executor {
462    /// Initialize a new `deterministic` runtime with the given seed and cycle duration.
463    pub fn init(cfg: Config) -> (Runner, Context, Arc<Auditor>) {
464        // Ensure config is valid
465        if cfg.timeout.is_some() && cfg.cycle == Duration::default() {
466            panic!("cycle duration must be non-zero when timeout is set");
467        }
468
469        // Initialize runtime
470        let metrics = Arc::new(Metrics::init(cfg.registry));
471        let auditor = Arc::new(Auditor::new());
472        let start_time = UNIX_EPOCH;
473        let deadline = cfg
474            .timeout
475            .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
476        let (signaler, signal) = Signaler::new();
477        let executor = Arc::new(Self {
478            cycle: cfg.cycle,
479            deadline,
480            metrics: metrics.clone(),
481            auditor: auditor.clone(),
482            rng: Mutex::new(StdRng::seed_from_u64(cfg.seed)),
483            prefix: Mutex::new(String::new()),
484            time: Mutex::new(start_time),
485            tasks: Arc::new(Tasks {
486                queue: Mutex::new(Vec::new()),
487                counter: Mutex::new(0),
488            }),
489            sleeping: Mutex::new(BinaryHeap::new()),
490            partitions: Mutex::new(HashMap::new()),
491            signaler: Mutex::new(signaler),
492            signal,
493            finished: Mutex::new(false),
494            recovered: Mutex::new(false),
495        });
496        (
497            Runner {
498                executor: executor.clone(),
499            },
500            Context {
501                executor,
502                networking: Arc::new(Networking::new(metrics, auditor.clone())),
503            },
504            auditor,
505        )
506    }
507
508    /// Initialize a new `deterministic` runtime with the default configuration
509    /// and the provided seed.
510    pub fn seeded(seed: u64) -> (Runner, Context, Arc<Auditor>) {
511        let cfg = Config {
512            seed,
513            ..Config::default()
514        };
515        Self::init(cfg)
516    }
517
518    /// Initialize a new `deterministic` runtime with the default configuration
519    /// but exit after the given timeout.
520    pub fn timed(timeout: Duration) -> (Runner, Context, Arc<Auditor>) {
521        let cfg = Config {
522            timeout: Some(timeout),
523            ..Config::default()
524        };
525        Self::init(cfg)
526    }
527
528    /// Initialize a new `deterministic` runtime with the default configuration.
529    // We'd love to implement the trait but we can't because of the return type.
530    #[allow(clippy::should_implement_trait)]
531    pub fn default() -> (Runner, Context, Arc<Auditor>) {
532        Self::init(Config::default())
533    }
534}
535
536/// Implementation of [`crate::Runner`] for the `deterministic` runtime.
537pub struct Runner {
538    executor: Arc<Executor>,
539}
540
541impl crate::Runner for Runner {
542    fn start<F>(self, f: F) -> F::Output
543    where
544        F: Future + Send + 'static,
545        F::Output: Send + 'static,
546    {
547        // Add root task to the queue
548        let output = Arc::new(Mutex::new(None));
549        Tasks::register(
550            &self.executor.tasks,
551            ROOT_TASK,
552            true,
553            Box::pin({
554                let output = output.clone();
555                async move {
556                    *output.lock().unwrap() = Some(f.await);
557                }
558            }),
559        );
560
561        // Process tasks until root task completes or progress stalls
562        let mut iter = 0;
563        loop {
564            // Ensure we have not exceeded our deadline
565            {
566                let current = self.executor.time.lock().unwrap();
567                if let Some(deadline) = self.executor.deadline {
568                    if *current >= deadline {
569                        panic!("runtime timeout");
570                    }
571                }
572            }
573
574            // Snapshot available tasks
575            let mut tasks = self.executor.tasks.drain();
576
577            // Shuffle tasks
578            {
579                let mut rng = self.executor.rng.lock().unwrap();
580                tasks.shuffle(&mut *rng);
581            }
582
583            // Run all snapshotted tasks
584            //
585            // This approach is more efficient than randomly selecting a task one-at-a-time
586            // because it ensures we don't pull the same pending task multiple times in a row (without
587            // processing a different task required for other tasks to make progress).
588            trace!(iter, tasks = tasks.len(), "starting loop");
589            for task in tasks {
590                // Set current task prefix
591                {
592                    let mut prefix = self.executor.prefix.lock().unwrap();
593                    prefix.clone_from(&task.label);
594                }
595
596                // Record task for auditing
597                self.executor.auditor.process_task(task.id, &task.label);
598
599                // Check if task is already complete
600                if *task.completed.lock().unwrap() {
601                    trace!(id = task.id, "skipping already completed task");
602                    continue;
603                }
604                trace!(id = task.id, "processing task");
605
606                // Prepare task for polling
607                let waker = waker_ref(&task);
608                let mut context = task::Context::from_waker(&waker);
609                let mut future = task.future.lock().unwrap();
610
611                // Record task poll
612                self.executor
613                    .metrics
614                    .task_polls
615                    .get_or_create(&Work {
616                        label: task.label.clone(),
617                    })
618                    .inc();
619
620                // Task is re-queued in its `wake_by_ref` implementation as soon as we poll here (regardless
621                // of whether it is Pending/Ready).
622                let pending = future.as_mut().poll(&mut context).is_pending();
623                if pending {
624                    trace!(id = task.id, "task is still pending");
625                    continue;
626                }
627
628                // Mark task as completed
629                *task.completed.lock().unwrap() = true;
630                trace!(id = task.id, "task is complete");
631
632                // Root task completed
633                if task.root {
634                    *self.executor.finished.lock().unwrap() = true;
635                    return output.lock().unwrap().take().unwrap();
636                }
637            }
638
639            // Advance time by cycle
640            //
641            // This approach prevents starvation if some task never yields (to approximate this,
642            // duration can be set to 1ns).
643            let mut current;
644            {
645                let mut time = self.executor.time.lock().unwrap();
646                *time = time
647                    .checked_add(self.executor.cycle)
648                    .expect("executor time overflowed");
649                current = *time;
650            }
651            trace!(now = current.epoch_millis(), "time advanced",);
652
653            // Skip time if there is nothing to do
654            if self.executor.tasks.len() == 0 {
655                let mut skip = None;
656                {
657                    let sleeping = self.executor.sleeping.lock().unwrap();
658                    if let Some(next) = sleeping.peek() {
659                        if next.time > current {
660                            skip = Some(next.time);
661                        }
662                    }
663                }
664                if skip.is_some() {
665                    {
666                        let mut time = self.executor.time.lock().unwrap();
667                        *time = skip.unwrap();
668                        current = *time;
669                    }
670                    trace!(now = current.epoch_millis(), "time skipped",);
671                }
672            }
673
674            // Wake all sleeping tasks that are ready
675            let mut to_wake = Vec::new();
676            let mut remaining;
677            {
678                let mut sleeping = self.executor.sleeping.lock().unwrap();
679                while let Some(next) = sleeping.peek() {
680                    if next.time <= current {
681                        let sleeper = sleeping.pop().unwrap();
682                        to_wake.push(sleeper.waker);
683                    } else {
684                        break;
685                    }
686                }
687                remaining = sleeping.len();
688            }
689            for waker in to_wake {
690                waker.wake();
691            }
692
693            // Account for remaining tasks
694            remaining += self.executor.tasks.len();
695
696            // If there are no tasks to run and no tasks sleeping, the executor is stalled
697            // and will never finish.
698            if remaining == 0 {
699                panic!("runtime stalled");
700            }
701            iter += 1;
702        }
703    }
704}
705
706/// Implementation of [`crate::Spawner`], [`crate::Clock`],
707/// [`crate::Network`], and [`crate::Storage`] for the `deterministic`
708/// runtime.
709#[derive(Clone)]
710pub struct Context {
711    executor: Arc<Executor>,
712    networking: Arc<Networking>,
713}
714
715impl Context {
716    /// Recover the inner state (deadline, metrics, auditor, rng, synced storage, etc.) from the
717    /// current runtime and use it to initialize a new instance of the runtime. A recovered runtime
718    /// does not inherit the current runtime's pending tasks, unsynced storage, network connections, nor
719    /// its shutdown signaler.
720    ///
721    /// This is useful for performing a deterministic simulation that spans multiple runtime instantiations,
722    /// like simulating unclean shutdown (which involves repeatedly halting the runtime at unexpected intervals).
723    ///
724    /// It is only permitted to call this method after the runtime has finished (i.e. once `start` returns)
725    /// and only permitted to do once (otherwise multiple recovered runtimes will share the same inner state).
726    /// If either one of these conditions is violated, this method will panic.
727    pub fn recover(self) -> (Runner, Self, Arc<Auditor>) {
728        // Ensure we are finished
729        if !*self.executor.finished.lock().unwrap() {
730            panic!("execution is not finished");
731        }
732
733        // Ensure runtime has not already been recovered
734        {
735            let mut recovered = self.executor.recovered.lock().unwrap();
736            if *recovered {
737                panic!("runtime has already been recovered");
738            }
739            *recovered = true;
740        }
741
742        // Prepare metrics
743        let metrics = self.executor.metrics.clone();
744        metrics.open_blobs.set(0);
745        metrics.tasks_running.clear();
746
747        // Copy state
748        let auditor = self.executor.auditor.clone();
749        let (signaler, signal) = Signaler::new();
750        let executor = Arc::new(Executor {
751            // Copied from the current runtime
752            cycle: self.executor.cycle,
753            deadline: self.executor.deadline,
754            metrics: metrics.clone(),
755            auditor: auditor.clone(),
756            rng: Mutex::new(self.executor.rng.lock().unwrap().clone()),
757            time: Mutex::new(*self.executor.time.lock().unwrap()),
758            partitions: Mutex::new(self.executor.partitions.lock().unwrap().clone()),
759
760            // New state for the new runtime
761            prefix: Mutex::new(String::new()),
762            tasks: Arc::new(Tasks {
763                queue: Mutex::new(Vec::new()),
764                counter: Mutex::new(0),
765            }),
766            sleeping: Mutex::new(BinaryHeap::new()),
767            signaler: Mutex::new(signaler),
768            signal,
769            finished: Mutex::new(false),
770            recovered: Mutex::new(false),
771        });
772        (
773            Runner {
774                executor: executor.clone(),
775            },
776            Self {
777                executor,
778                networking: Arc::new(Networking::new(metrics, auditor.clone())),
779            },
780            auditor,
781        )
782    }
783}
784
785impl crate::Spawner for Context {
786    fn spawn<F, T>(&self, label: &str, f: F) -> Handle<T>
787    where
788        F: Future<Output = T> + Send + 'static,
789        T: Send + 'static,
790    {
791        let label = {
792            let prefix = self.executor.prefix.lock().unwrap();
793            if prefix.is_empty() || *prefix == ROOT_TASK {
794                label.to_string()
795            } else {
796                format!("{}_{}", *prefix, label)
797            }
798        };
799        if label == ROOT_TASK {
800            panic!("root task cannot be spawned");
801        }
802        let work = Work {
803            label: label.clone(),
804        };
805        self.executor
806            .metrics
807            .tasks_spawned
808            .get_or_create(&work)
809            .inc();
810        let gauge = self
811            .executor
812            .metrics
813            .tasks_running
814            .get_or_create(&work)
815            .clone();
816        let (f, handle) = Handle::init(f, gauge, false);
817        Tasks::register(&self.executor.tasks, &label, false, Box::pin(f));
818        handle
819    }
820
821    fn stop(&self, value: i32) {
822        self.executor.auditor.stop(value);
823        self.executor.signaler.lock().unwrap().signal(value);
824    }
825
826    fn stopped(&self) -> Signal {
827        self.executor.auditor.stopped();
828        self.executor.signal.clone()
829    }
830}
831
832struct Sleeper {
833    executor: Arc<Executor>,
834    time: SystemTime,
835    registered: bool,
836}
837
838struct Alarm {
839    time: SystemTime,
840    waker: Waker,
841}
842
843impl PartialEq for Alarm {
844    fn eq(&self, other: &Self) -> bool {
845        self.time.eq(&other.time)
846    }
847}
848
849impl Eq for Alarm {}
850
851impl PartialOrd for Alarm {
852    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
853        Some(self.cmp(other))
854    }
855}
856
857impl Ord for Alarm {
858    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
859        // Reverse the ordering for min-heap
860        other.time.cmp(&self.time)
861    }
862}
863
864impl Future for Sleeper {
865    type Output = ();
866
867    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
868        {
869            let current_time = *self.executor.time.lock().unwrap();
870            if current_time >= self.time {
871                return Poll::Ready(());
872            }
873        }
874        if !self.registered {
875            self.registered = true;
876            self.executor.sleeping.lock().unwrap().push(Alarm {
877                time: self.time,
878                waker: cx.waker().clone(),
879            });
880        }
881        Poll::Pending
882    }
883}
884
885impl Clock for Context {
886    fn current(&self) -> SystemTime {
887        *self.executor.time.lock().unwrap()
888    }
889
890    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
891        let time = self
892            .current()
893            .checked_add(duration)
894            .expect("overflow when setting wake time");
895        Sleeper {
896            executor: self.executor.clone(),
897
898            time,
899            registered: false,
900        }
901    }
902
903    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
904        Sleeper {
905            executor: self.executor.clone(),
906
907            time: deadline,
908            registered: false,
909        }
910    }
911}
912
913impl GClock for Context {
914    type Instant = SystemTime;
915
916    fn now(&self) -> Self::Instant {
917        self.current()
918    }
919}
920
921impl ReasonablyRealtime for Context {}
922
923type Dialable = mpsc::UnboundedSender<(
924    SocketAddr,
925    mocks::Sink,   // Dialee -> Dialer
926    mocks::Stream, // Dialer -> Dialee
927)>;
928
929/// Implementation of [`crate::Network`] for the `deterministic` runtime.
930///
931/// When a dialer connects to a dialee, the dialee is given a new ephemeral port
932/// from the range `32768..61000`. To keep things simple, it is not possible to
933/// bind to an ephemeral port. Likewise, if ports are not reused and when exhausted,
934/// the runtime will panic.
935struct Networking {
936    metrics: Arc<Metrics>,
937    auditor: Arc<Auditor>,
938    ephemeral: Mutex<u16>,
939    listeners: Mutex<HashMap<SocketAddr, Dialable>>,
940}
941
942impl Networking {
943    fn new(metrics: Arc<Metrics>, auditor: Arc<Auditor>) -> Self {
944        Self {
945            metrics,
946            auditor,
947            ephemeral: Mutex::new(EPHEMERAL_PORT_RANGE.start),
948            listeners: Mutex::new(HashMap::new()),
949        }
950    }
951
952    fn bind(&self, socket: SocketAddr) -> Result<Listener, Error> {
953        self.auditor.bind(socket);
954
955        // If the IP is localhost, ensure the port is not in the ephemeral range
956        // so that it can be used for binding in the dial method
957        if socket.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST)
958            && EPHEMERAL_PORT_RANGE.contains(&socket.port())
959        {
960            return Err(Error::BindFailed);
961        }
962
963        // Ensure the port is not already bound
964        let mut listeners = self.listeners.lock().unwrap();
965        if listeners.contains_key(&socket) {
966            return Err(Error::BindFailed);
967        }
968
969        // Bind the socket
970        let (sender, receiver) = mpsc::unbounded();
971        listeners.insert(socket, sender);
972        Ok(Listener {
973            auditor: self.auditor.clone(),
974            address: socket,
975            listener: receiver,
976            metrics: self.metrics.clone(),
977        })
978    }
979
980    async fn dial(&self, socket: SocketAddr) -> Result<(Sink, Stream), Error> {
981        // Assign dialer a port from the ephemeral range
982        let dialer = {
983            let mut ephemeral = self.ephemeral.lock().unwrap();
984            let dialer = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), *ephemeral);
985            *ephemeral = ephemeral
986                .checked_add(1)
987                .expect("ephemeral port range exhausted");
988            dialer
989        };
990        self.auditor.dial(dialer, socket);
991
992        // Get dialee
993        let mut sender = {
994            let listeners = self.listeners.lock().unwrap();
995            let sender = listeners.get(&socket).ok_or(Error::ConnectionFailed)?;
996            sender.clone()
997        };
998
999        // Construct connection
1000        let (dialer_sender, dialer_receiver) = mocks::Channel::init();
1001        let (dialee_sender, dialee_receiver) = mocks::Channel::init();
1002        sender
1003            .send((dialer, dialer_sender, dialee_receiver))
1004            .await
1005            .map_err(|_| Error::ConnectionFailed)?;
1006        Ok((
1007            Sink {
1008                metrics: self.metrics.clone(),
1009                auditor: self.auditor.clone(),
1010                me: dialer,
1011                peer: socket,
1012                sender: dialee_sender,
1013            },
1014            Stream {
1015                auditor: self.auditor.clone(),
1016                me: dialer,
1017                peer: socket,
1018                receiver: dialer_receiver,
1019            },
1020        ))
1021    }
1022}
1023
1024impl crate::Network<Listener, Sink, Stream> for Context {
1025    async fn bind(&self, socket: SocketAddr) -> Result<Listener, Error> {
1026        self.networking.bind(socket)
1027    }
1028
1029    async fn dial(&self, socket: SocketAddr) -> Result<(Sink, Stream), Error> {
1030        self.networking.dial(socket).await
1031    }
1032}
1033
1034/// Implementation of [`crate::Listener`] for the `deterministic` runtime.
1035pub struct Listener {
1036    metrics: Arc<Metrics>,
1037    auditor: Arc<Auditor>,
1038    address: SocketAddr,
1039    listener: mpsc::UnboundedReceiver<(SocketAddr, mocks::Sink, mocks::Stream)>,
1040}
1041
1042impl crate::Listener<Sink, Stream> for Listener {
1043    async fn accept(&mut self) -> Result<(SocketAddr, Sink, Stream), Error> {
1044        let (socket, sender, receiver) = self.listener.next().await.ok_or(Error::ReadFailed)?;
1045        self.auditor.accept(self.address, socket);
1046        Ok((
1047            socket,
1048            Sink {
1049                metrics: self.metrics.clone(),
1050                auditor: self.auditor.clone(),
1051                me: self.address,
1052                peer: socket,
1053                sender,
1054            },
1055            Stream {
1056                auditor: self.auditor.clone(),
1057                me: self.address,
1058                peer: socket,
1059                receiver,
1060            },
1061        ))
1062    }
1063}
1064
1065/// Implementation of [`crate::Sink`] for the `deterministic` runtime.
1066pub struct Sink {
1067    metrics: Arc<Metrics>,
1068    auditor: Arc<Auditor>,
1069    me: SocketAddr,
1070    peer: SocketAddr,
1071    sender: mocks::Sink,
1072}
1073
1074impl crate::Sink for Sink {
1075    async fn send(&mut self, msg: &[u8]) -> Result<(), Error> {
1076        self.auditor.send(self.me, self.peer, msg);
1077        self.sender.send(msg).await.map_err(|_| Error::SendFailed)?;
1078        self.metrics.network_bandwidth.inc_by(msg.len() as u64);
1079        Ok(())
1080    }
1081}
1082
1083/// Implementation of [`crate::Stream`] for the `deterministic` runtime.
1084pub struct Stream {
1085    auditor: Arc<Auditor>,
1086    me: SocketAddr,
1087    peer: SocketAddr,
1088    receiver: mocks::Stream,
1089}
1090
1091impl crate::Stream for Stream {
1092    async fn recv(&mut self, buf: &mut [u8]) -> Result<(), Error> {
1093        self.receiver
1094            .recv(buf)
1095            .await
1096            .map_err(|_| Error::RecvFailed)?;
1097        self.auditor.recv(self.me, self.peer, buf);
1098        Ok(())
1099    }
1100}
1101
1102impl RngCore for Context {
1103    fn next_u32(&mut self) -> u32 {
1104        self.executor.auditor.rand("next_u32".to_string());
1105        self.executor.rng.lock().unwrap().next_u32()
1106    }
1107
1108    fn next_u64(&mut self) -> u64 {
1109        self.executor.auditor.rand("next_u64".to_string());
1110        self.executor.rng.lock().unwrap().next_u64()
1111    }
1112
1113    fn fill_bytes(&mut self, dest: &mut [u8]) {
1114        self.executor.auditor.rand("fill_bytes".to_string());
1115        self.executor.rng.lock().unwrap().fill_bytes(dest)
1116    }
1117
1118    fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1119        self.executor.auditor.rand("try_fill_bytes".to_string());
1120        self.executor.rng.lock().unwrap().try_fill_bytes(dest)
1121    }
1122}
1123
1124impl CryptoRng for Context {}
1125
1126/// Implementation of [`crate::Blob`] for the `deterministic` runtime.
1127pub struct Blob {
1128    executor: Arc<Executor>,
1129
1130    partition: String,
1131    name: Vec<u8>,
1132
1133    // For content to be updated for future opens,
1134    // it must be synced back to the partition (occurs on
1135    // `sync` and `close`).
1136    content: Arc<RwLock<Vec<u8>>>,
1137}
1138
1139impl Blob {
1140    fn new(executor: Arc<Executor>, partition: String, name: &[u8], content: Vec<u8>) -> Self {
1141        executor.metrics.open_blobs.inc();
1142        Self {
1143            executor,
1144            partition,
1145            name: name.into(),
1146            content: Arc::new(RwLock::new(content)),
1147        }
1148    }
1149}
1150
1151impl Clone for Blob {
1152    fn clone(&self) -> Self {
1153        // We implement `Clone` manually to ensure the `open_blobs` gauge is updated.
1154        self.executor.metrics.open_blobs.inc();
1155        Self {
1156            executor: self.executor.clone(),
1157            partition: self.partition.clone(),
1158            name: self.name.clone(),
1159            content: self.content.clone(),
1160        }
1161    }
1162}
1163
1164impl crate::Storage<Blob> for Context {
1165    async fn open(&self, partition: &str, name: &[u8]) -> Result<Blob, Error> {
1166        self.executor.auditor.open(partition, name);
1167        let mut partitions = self.executor.partitions.lock().unwrap();
1168        let partition_entry = partitions.entry(partition.into()).or_default();
1169        let content = partition_entry.entry(name.into()).or_default();
1170        Ok(Blob::new(
1171            self.executor.clone(),
1172            partition.into(),
1173            name,
1174            content.clone(),
1175        ))
1176    }
1177
1178    async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1179        self.executor.auditor.remove(partition, name);
1180        let mut partitions = self.executor.partitions.lock().unwrap();
1181        match name {
1182            Some(name) => {
1183                partitions
1184                    .get_mut(partition)
1185                    .ok_or(Error::PartitionMissing(partition.into()))?
1186                    .remove(name)
1187                    .ok_or(Error::BlobMissing(partition.into(), hex(name)))?;
1188            }
1189            None => {
1190                partitions
1191                    .remove(partition)
1192                    .ok_or(Error::PartitionMissing(partition.into()))?;
1193            }
1194        }
1195        Ok(())
1196    }
1197
1198    async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1199        self.executor.auditor.scan(partition);
1200        let partitions = self.executor.partitions.lock().unwrap();
1201        let partition = partitions
1202            .get(partition)
1203            .ok_or(Error::PartitionMissing(partition.into()))?;
1204        let mut results = Vec::with_capacity(partition.len());
1205        for name in partition.keys() {
1206            results.push(name.clone());
1207        }
1208        results.sort(); // deterministic output
1209        Ok(results)
1210    }
1211}
1212
1213impl crate::Blob for Blob {
1214    async fn len(&self) -> Result<u64, Error> {
1215        self.executor.auditor.len(&self.partition, &self.name);
1216        let content = self.content.read().unwrap();
1217        Ok(content.len() as u64)
1218    }
1219
1220    async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
1221        let buf_len = buf.len();
1222        self.executor
1223            .auditor
1224            .read_at(&self.partition, &self.name, buf_len, offset);
1225        let offset = offset.try_into().map_err(|_| Error::OffsetOverflow)?;
1226        let content = self.content.read().unwrap();
1227        let content_len = content.len();
1228        if offset + buf_len > content_len {
1229            return Err(Error::BlobInsufficientLength);
1230        }
1231        buf.copy_from_slice(&content[offset..offset + buf_len]);
1232        self.executor.metrics.storage_reads.inc();
1233        self.executor
1234            .metrics
1235            .storage_read_bandwidth
1236            .inc_by(buf_len as u64);
1237        Ok(())
1238    }
1239
1240    async fn write_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
1241        self.executor
1242            .auditor
1243            .write_at(&self.partition, &self.name, buf, offset);
1244        let offset = offset.try_into().map_err(|_| Error::OffsetOverflow)?;
1245        let mut content = self.content.write().unwrap();
1246        let required = offset + buf.len();
1247        if required > content.len() {
1248            content.resize(required, 0);
1249        }
1250        content[offset..offset + buf.len()].copy_from_slice(buf);
1251        self.executor.metrics.storage_writes.inc();
1252        self.executor
1253            .metrics
1254            .storage_write_bandwidth
1255            .inc_by(buf.len() as u64);
1256        Ok(())
1257    }
1258
1259    async fn truncate(&self, len: u64) -> Result<(), Error> {
1260        self.executor
1261            .auditor
1262            .truncate(&self.partition, &self.name, len);
1263        let len = len.try_into().map_err(|_| Error::OffsetOverflow)?;
1264        let mut content = self.content.write().unwrap();
1265        content.truncate(len);
1266        Ok(())
1267    }
1268
1269    async fn sync(&self) -> Result<(), Error> {
1270        // Create new content for partition
1271        //
1272        // Doing this first means we don't need to hold both the content
1273        // lock and the partition lock at the same time.
1274        let new_content = self.content.read().unwrap().clone();
1275
1276        // Update partition content
1277        self.executor.auditor.sync(&self.partition, &self.name);
1278        let mut partitions = self.executor.partitions.lock().unwrap();
1279        let partition = partitions
1280            .get_mut(&self.partition)
1281            .ok_or(Error::PartitionMissing(self.partition.clone()))?;
1282        let content = partition
1283            .get_mut(&self.name)
1284            .ok_or(Error::BlobMissing(self.partition.clone(), hex(&self.name)))?;
1285        *content = new_content;
1286        Ok(())
1287    }
1288
1289    async fn close(self) -> Result<(), Error> {
1290        self.executor.auditor.close(&self.partition, &self.name);
1291        self.sync().await?;
1292        Ok(())
1293    }
1294}
1295
1296impl Drop for Blob {
1297    fn drop(&mut self) {
1298        self.executor.metrics.open_blobs.dec();
1299    }
1300}
1301
1302#[cfg(test)]
1303mod tests {
1304    use super::*;
1305    use crate::{utils::run_tasks, Blob, Runner, Spawner, Storage};
1306    use commonware_macros::test_traced;
1307    use futures::task::noop_waker;
1308
1309    fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1310        let (executor, runtime, auditor) = Executor::seeded(seed);
1311        let messages = run_tasks(5, executor, runtime);
1312        (auditor.state(), messages)
1313    }
1314
1315    #[test]
1316    fn test_same_seed_same_order() {
1317        // Generate initial outputs
1318        let mut outputs = Vec::new();
1319        for seed in 0..1000 {
1320            let output = run_with_seed(seed);
1321            outputs.push(output);
1322        }
1323
1324        // Ensure they match
1325        for seed in 0..1000 {
1326            let output = run_with_seed(seed);
1327            assert_eq!(output, outputs[seed as usize]);
1328        }
1329    }
1330
1331    #[test_traced("TRACE")]
1332    fn test_different_seeds_different_order() {
1333        let output1 = run_with_seed(12345);
1334        let output2 = run_with_seed(54321);
1335        assert_ne!(output1, output2);
1336    }
1337
1338    #[test]
1339    fn test_alarm_min_heap() {
1340        // Populate heap
1341        let now = SystemTime::now();
1342        let alarms = vec![
1343            Alarm {
1344                time: now + Duration::new(10, 0),
1345                waker: noop_waker(),
1346            },
1347            Alarm {
1348                time: now + Duration::new(5, 0),
1349                waker: noop_waker(),
1350            },
1351            Alarm {
1352                time: now + Duration::new(15, 0),
1353                waker: noop_waker(),
1354            },
1355            Alarm {
1356                time: now + Duration::new(5, 0),
1357                waker: noop_waker(),
1358            },
1359        ];
1360        let mut heap = BinaryHeap::new();
1361        for alarm in alarms {
1362            heap.push(alarm);
1363        }
1364
1365        // Verify min-heap
1366        let mut sorted_times = Vec::new();
1367        while let Some(alarm) = heap.pop() {
1368            sorted_times.push(alarm.time);
1369        }
1370        assert_eq!(
1371            sorted_times,
1372            vec![
1373                now + Duration::new(5, 0),
1374                now + Duration::new(5, 0),
1375                now + Duration::new(10, 0),
1376                now + Duration::new(15, 0),
1377            ]
1378        );
1379    }
1380
1381    #[test]
1382    #[should_panic(expected = "runtime timeout")]
1383    fn test_timeout() {
1384        let (executor, runtime, _) = Executor::timed(Duration::from_secs(10));
1385        executor.start(async move {
1386            loop {
1387                runtime.sleep(Duration::from_secs(1)).await;
1388            }
1389        });
1390    }
1391
1392    #[test]
1393    #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1394    fn test_bad_timeout() {
1395        let cfg = Config {
1396            timeout: Some(Duration::default()),
1397            cycle: Duration::default(),
1398            ..Config::default()
1399        };
1400        Executor::init(cfg);
1401    }
1402
1403    #[test]
1404    #[should_panic(expected = "root task cannot be spawned")]
1405    fn test_spawn_root_task() {
1406        let (executor, context, _) = Executor::default();
1407        executor.start(async move {
1408            let _ = context
1409                .spawn(ROOT_TASK, async move {
1410                    // This task should never run
1411                    panic!("root task should not run");
1412                })
1413                .await;
1414            panic!("root task should not be spawned");
1415        });
1416    }
1417
1418    #[test]
1419    fn test_recover_synced_storage_persists() {
1420        // Initialize the first runtime
1421        let (executor1, context1, auditor1) = Executor::default();
1422        let partition = "test_partition";
1423        let name = b"test_blob";
1424        let data = b"Hello, world!".to_vec();
1425
1426        // Run some tasks and sync storage
1427        executor1.start({
1428            let context = context1.clone();
1429            let data = data.clone();
1430            async move {
1431                let blob = context.open(partition, name).await.unwrap();
1432                blob.write_at(&data, 0).await.unwrap();
1433                blob.sync().await.unwrap();
1434            }
1435        });
1436        let state1 = auditor1.state();
1437
1438        // Recover the runtime
1439        let (executor2, context2, auditor2) = context1.recover();
1440
1441        // Verify auditor state is the same
1442        let state2 = auditor2.state();
1443        assert_eq!(state1, state2);
1444
1445        // Check that synced storage persists after recovery
1446        executor2.start(async move {
1447            let blob = context2.open(partition, name).await.unwrap();
1448            let len = blob.len().await.unwrap();
1449            assert_eq!(len, data.len() as u64);
1450            let mut buf = vec![0; len as usize];
1451            blob.read_at(&mut buf, 0).await.unwrap();
1452            assert_eq!(buf, data);
1453        });
1454    }
1455
1456    #[test]
1457    fn test_recover_unsynced_storage_does_not_persist() {
1458        // Initialize the first runtime
1459        let (executor1, context1, _) = Executor::default();
1460        let partition = "test_partition";
1461        let name = b"test_blob";
1462        let data = b"Hello, world!".to_vec();
1463
1464        // Run some tasks without syncing storage
1465        executor1.start({
1466            let context = context1.clone();
1467            async move {
1468                let blob = context.open(partition, name).await.unwrap();
1469                blob.write_at(&data, 0).await.unwrap();
1470                // Intentionally do not call sync() here
1471            }
1472        });
1473
1474        // Recover the runtime
1475        let (executor2, context2, _) = context1.recover();
1476
1477        // Check that unsynced storage does not persist after recovery
1478        executor2.start(async move {
1479            let blob = context2.open(partition, name).await.unwrap();
1480            let len = blob.len().await.unwrap();
1481            assert_eq!(len, 0);
1482        });
1483    }
1484
1485    #[test]
1486    #[should_panic(expected = "execution is not finished")]
1487    fn test_recover_before_finish_panics() {
1488        // Initialize runtime
1489        let (_, context, _) = Executor::default();
1490
1491        // Attempt to recover before the runtime has finished
1492        context.recover();
1493    }
1494
1495    #[test]
1496    #[should_panic(expected = "runtime has already been recovered")]
1497    fn test_recover_twice_panics() {
1498        // Initialize runtime
1499        let (executor, context, _) = Executor::default();
1500
1501        // Finish runtime
1502        executor.start(async move {});
1503
1504        // Recover for the first time
1505        let cloned_context = context.clone();
1506        context.recover();
1507
1508        // Attempt to recover again using the same context
1509        cloned_context.recover();
1510    }
1511}