commonware_runtime/
deterministic.rs

1//! A deterministic runtime that randomly selects tasks to run based on a seed
2//!
3//! # Panics
4//!
5//! If any task panics, the runtime will panic (and shutdown).
6//!
7//! # Example
8//!
9//! ```rust
10//! use commonware_runtime::{Spawner, Runner, deterministic::Executor, Metrics};
11//!
12//! let (executor, context, auditor) = Executor::default();
13//! executor.start(async move {
14//!     println!("Parent started");
15//!     let result = context.with_label("child").spawn(|_| async move {
16//!         println!("Child started");
17//!         "hello"
18//!     });
19//!     println!("Child result: {:?}", result.await);
20//!     println!("Parent exited");
21//! });
22//! println!("Auditor state: {}", auditor.state());
23//! ```
24
25use crate::{mocks, utils::Signaler, Clock, Error, Handle, Signal, METRICS_PREFIX};
26use commonware_utils::{hex, SystemTimeExt};
27use futures::{
28    channel::mpsc,
29    task::{waker_ref, ArcWake},
30    SinkExt, StreamExt,
31};
32use governor::clock::{Clock as GClock, ReasonablyRealtime};
33use prometheus_client::{
34    encoding::{text::encode, EncodeLabelSet},
35    metrics::{counter::Counter, family::Family, gauge::Gauge},
36    registry::{Metric, Registry},
37};
38use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
39use sha2::{Digest, Sha256};
40use std::{
41    collections::{BinaryHeap, HashMap},
42    future::Future,
43    mem::replace,
44    net::{IpAddr, Ipv4Addr, SocketAddr},
45    ops::Range,
46    pin::Pin,
47    sync::{Arc, Mutex, RwLock},
48    task::{self, Poll, Waker},
49    time::{Duration, SystemTime, UNIX_EPOCH},
50};
51use tracing::trace;
52
53/// Range of ephemeral ports assigned to dialers.
54const EPHEMERAL_PORT_RANGE: Range<u16> = 32768..61000;
55
56/// Map of names to blob contents.
57pub type Partition = HashMap<Vec<u8>, Vec<u8>>;
58
59#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
60struct Work {
61    label: String,
62}
63
64#[derive(Debug)]
65struct Metrics {
66    tasks_spawned: Family<Work, Counter>,
67    tasks_running: Family<Work, Gauge>,
68    blocking_tasks_spawned: Family<Work, Counter>,
69    blocking_tasks_running: Family<Work, Gauge>,
70    task_polls: Family<Work, Counter>,
71
72    network_bandwidth: Counter,
73
74    open_blobs: Gauge,
75    storage_reads: Counter,
76    storage_read_bandwidth: Counter,
77    storage_writes: Counter,
78    storage_write_bandwidth: Counter,
79}
80
81impl Metrics {
82    pub fn init(registry: &mut Registry) -> Self {
83        let metrics = Self {
84            task_polls: Family::default(),
85            tasks_spawned: Family::default(),
86            tasks_running: Family::default(),
87            blocking_tasks_spawned: Family::default(),
88            blocking_tasks_running: Family::default(),
89            network_bandwidth: Counter::default(),
90            open_blobs: Gauge::default(),
91            storage_reads: Counter::default(),
92            storage_read_bandwidth: Counter::default(),
93            storage_writes: Counter::default(),
94            storage_write_bandwidth: Counter::default(),
95        };
96        registry.register(
97            "tasks_spawned",
98            "Total number of tasks spawned",
99            metrics.tasks_spawned.clone(),
100        );
101        registry.register(
102            "tasks_running",
103            "Number of tasks currently running",
104            metrics.tasks_running.clone(),
105        );
106        registry.register(
107            "blocking_tasks_spawned",
108            "Total number of blocking tasks spawned",
109            metrics.blocking_tasks_spawned.clone(),
110        );
111        registry.register(
112            "blocking_tasks_running",
113            "Number of blocking tasks currently running",
114            metrics.blocking_tasks_running.clone(),
115        );
116        registry.register(
117            "task_polls",
118            "Total number of task polls",
119            metrics.task_polls.clone(),
120        );
121        registry.register(
122            "bandwidth",
123            "Total amount of data sent over network",
124            metrics.network_bandwidth.clone(),
125        );
126        registry.register(
127            "open_blobs",
128            "Number of open blobs",
129            metrics.open_blobs.clone(),
130        );
131        registry.register(
132            "storage_reads",
133            "Total number of disk reads",
134            metrics.storage_reads.clone(),
135        );
136        registry.register(
137            "storage_read_bandwidth",
138            "Total amount of data read from disk",
139            metrics.storage_read_bandwidth.clone(),
140        );
141        registry.register(
142            "storage_writes",
143            "Total number of disk writes",
144            metrics.storage_writes.clone(),
145        );
146        registry.register(
147            "storage_write_bandwidth",
148            "Total amount of data written to disk",
149            metrics.storage_write_bandwidth.clone(),
150        );
151        metrics
152    }
153}
154
155/// Track the state of the runtime for determinism auditing.
156pub struct Auditor {
157    hash: Mutex<Vec<u8>>,
158}
159
160impl Auditor {
161    fn new() -> Self {
162        Self {
163            hash: Mutex::new(Vec::new()),
164        }
165    }
166
167    fn process_task(&self, task: u128, label: &str) {
168        let mut hash = self.hash.lock().unwrap();
169        let mut hasher = Sha256::new();
170        hasher.update(&*hash);
171        hasher.update(b"process_task");
172        hasher.update(task.to_be_bytes());
173        hasher.update(label.as_bytes());
174        *hash = hasher.finalize().to_vec();
175    }
176
177    fn stop(&self, value: i32) {
178        let mut hash = self.hash.lock().unwrap();
179        let mut hasher = Sha256::new();
180        hasher.update(&*hash);
181        hasher.update(b"stop");
182        hasher.update(value.to_be_bytes());
183        *hash = hasher.finalize().to_vec();
184    }
185
186    fn stopped(&self) {
187        let mut hash = self.hash.lock().unwrap();
188        let mut hasher = Sha256::new();
189        hasher.update(&*hash);
190        hasher.update(b"stopped");
191        *hash = hasher.finalize().to_vec();
192    }
193
194    fn bind(&self, address: SocketAddr) {
195        let mut hash = self.hash.lock().unwrap();
196        let mut hasher = Sha256::new();
197        hasher.update(&*hash);
198        hasher.update(b"bind");
199        hasher.update(address.to_string().as_bytes());
200        *hash = hasher.finalize().to_vec();
201    }
202
203    fn dial(&self, dialer: SocketAddr, listener: SocketAddr) {
204        let mut hash = self.hash.lock().unwrap();
205        let mut hasher = Sha256::new();
206        hasher.update(&*hash);
207        hasher.update(b"dial");
208        hasher.update(dialer.to_string().as_bytes());
209        hasher.update(listener.to_string().as_bytes());
210        *hash = hasher.finalize().to_vec();
211    }
212
213    fn accept(&self, listener: SocketAddr, dialer: SocketAddr) {
214        let mut hash = self.hash.lock().unwrap();
215        let mut hasher = Sha256::new();
216        hasher.update(&*hash);
217        hasher.update(b"accept");
218        hasher.update(listener.to_string().as_bytes());
219        hasher.update(dialer.to_string().as_bytes());
220        *hash = hasher.finalize().to_vec();
221    }
222
223    fn send(&self, sender: SocketAddr, receiver: SocketAddr, message: &[u8]) {
224        let mut hash = self.hash.lock().unwrap();
225        let mut hasher = Sha256::new();
226        hasher.update(&*hash);
227        hasher.update(b"send");
228        hasher.update(sender.to_string().as_bytes());
229        hasher.update(receiver.to_string().as_bytes());
230        hasher.update(message);
231        *hash = hasher.finalize().to_vec();
232    }
233
234    fn recv(&self, receiver: SocketAddr, sender: SocketAddr, message: &[u8]) {
235        let mut hash = self.hash.lock().unwrap();
236        let mut hasher = Sha256::new();
237        hasher.update(&*hash);
238        hasher.update(b"recv");
239        hasher.update(receiver.to_string().as_bytes());
240        hasher.update(sender.to_string().as_bytes());
241        hasher.update(message);
242        *hash = hasher.finalize().to_vec();
243    }
244
245    fn rand(&self, method: String) {
246        let mut hash = self.hash.lock().unwrap();
247        let mut hasher = Sha256::new();
248        hasher.update(&*hash);
249        hasher.update(b"rand");
250        hasher.update(method.as_bytes());
251        *hash = hasher.finalize().to_vec();
252    }
253
254    fn open(&self, partition: &str, name: &[u8]) {
255        let mut hash = self.hash.lock().unwrap();
256        let mut hasher = Sha256::new();
257        hasher.update(&*hash);
258        hasher.update(b"open");
259        hasher.update(partition.as_bytes());
260        hasher.update(name);
261        *hash = hasher.finalize().to_vec();
262    }
263
264    fn remove(&self, partition: &str, name: Option<&[u8]>) {
265        let mut hash = self.hash.lock().unwrap();
266        let mut hasher = Sha256::new();
267        hasher.update(&*hash);
268        hasher.update(b"remove");
269        hasher.update(partition.as_bytes());
270        if let Some(name) = name {
271            hasher.update(name);
272        }
273        *hash = hasher.finalize().to_vec();
274    }
275
276    fn scan(&self, partition: &str) {
277        let mut hash = self.hash.lock().unwrap();
278        let mut hasher = Sha256::new();
279        hasher.update(&*hash);
280        hasher.update(b"scan");
281        hasher.update(partition.as_bytes());
282        *hash = hasher.finalize().to_vec();
283    }
284
285    fn len(&self, partition: &str, name: &[u8]) {
286        let mut hash = self.hash.lock().unwrap();
287        let mut hasher = Sha256::new();
288        hasher.update(&*hash);
289        hasher.update(b"len");
290        hasher.update(partition.as_bytes());
291        hasher.update(name);
292        *hash = hasher.finalize().to_vec();
293    }
294
295    fn read_at(&self, partition: &str, name: &[u8], buf: usize, offset: u64) {
296        let mut hash = self.hash.lock().unwrap();
297        let mut hasher = Sha256::new();
298        hasher.update(&*hash);
299        hasher.update(b"read_at");
300        hasher.update(partition.as_bytes());
301        hasher.update(name);
302        hasher.update(buf.to_be_bytes());
303        hasher.update(offset.to_be_bytes());
304        *hash = hasher.finalize().to_vec();
305    }
306
307    fn write_at(&self, partition: &str, name: &[u8], buf: &[u8], offset: u64) {
308        let mut hash = self.hash.lock().unwrap();
309        let mut hasher = Sha256::new();
310        hasher.update(&*hash);
311        hasher.update(b"write_at");
312        hasher.update(partition.as_bytes());
313        hasher.update(name);
314        hasher.update(buf);
315        hasher.update(offset.to_be_bytes());
316        *hash = hasher.finalize().to_vec();
317    }
318
319    fn truncate(&self, partition: &str, name: &[u8], size: u64) {
320        let mut hash = self.hash.lock().unwrap();
321        let mut hasher = Sha256::new();
322        hasher.update(&*hash);
323        hasher.update(b"truncate");
324        hasher.update(partition.as_bytes());
325        hasher.update(name);
326        hasher.update(size.to_be_bytes());
327        *hash = hasher.finalize().to_vec();
328    }
329
330    fn sync(&self, partition: &str, name: &[u8]) {
331        let mut hash = self.hash.lock().unwrap();
332        let mut hasher = Sha256::new();
333        hasher.update(&*hash);
334        hasher.update(b"sync");
335        hasher.update(partition.as_bytes());
336        hasher.update(name);
337        *hash = hasher.finalize().to_vec();
338    }
339
340    fn close(&self, partition: &str, name: &[u8]) {
341        let mut hash = self.hash.lock().unwrap();
342        let mut hasher = Sha256::new();
343        hasher.update(&*hash);
344        hasher.update(b"close");
345        hasher.update(partition.as_bytes());
346        hasher.update(name);
347        *hash = hasher.finalize().to_vec();
348    }
349
350    fn register(&self, name: &str, help: &str) {
351        let mut hash = self.hash.lock().unwrap();
352        let mut hasher = Sha256::new();
353        hasher.update(&*hash);
354        hasher.update(b"register");
355        hasher.update(name.as_bytes());
356        hasher.update(help.as_bytes());
357        *hash = hasher.finalize().to_vec();
358    }
359
360    fn encode(&self) {
361        let mut hash = self.hash.lock().unwrap();
362        let mut hasher = Sha256::new();
363        hasher.update(&*hash);
364        hasher.update(b"encode");
365        *hash = hasher.finalize().to_vec();
366    }
367
368    /// Generate a representation of the current state of the runtime.
369    ///
370    /// This can be used to ensure that logic running on top
371    /// of the runtime is interacting deterministically.
372    pub fn state(&self) -> String {
373        let hash = self.hash.lock().unwrap().clone();
374        hex(&hash)
375    }
376}
377
378struct Task {
379    id: u128,
380    label: String,
381
382    tasks: Arc<Tasks>,
383
384    root: bool,
385    future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
386
387    completed: Mutex<bool>,
388}
389
390impl ArcWake for Task {
391    fn wake_by_ref(arc_self: &Arc<Self>) {
392        arc_self.tasks.enqueue(arc_self.clone());
393    }
394}
395
396struct Tasks {
397    counter: Mutex<u128>,
398    queue: Mutex<Vec<Arc<Task>>>,
399}
400
401impl Tasks {
402    fn register(
403        arc_self: &Arc<Self>,
404        label: &str,
405        root: bool,
406        future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
407    ) {
408        let mut queue = arc_self.queue.lock().unwrap();
409        let id = {
410            let mut l = arc_self.counter.lock().unwrap();
411            let old = *l;
412            *l = l.checked_add(1).expect("task counter overflow");
413            old
414        };
415        queue.push(Arc::new(Task {
416            id,
417            label: label.to_string(),
418            root,
419            future: Mutex::new(future),
420            tasks: arc_self.clone(),
421            completed: Mutex::new(false),
422        }));
423    }
424
425    fn enqueue(&self, task: Arc<Task>) {
426        let mut queue = self.queue.lock().unwrap();
427        queue.push(task);
428    }
429
430    fn drain(&self) -> Vec<Arc<Task>> {
431        let mut queue = self.queue.lock().unwrap();
432        let len = queue.len();
433        replace(&mut *queue, Vec::with_capacity(len))
434    }
435
436    fn len(&self) -> usize {
437        self.queue.lock().unwrap().len()
438    }
439}
440
441/// Configuration for the `deterministic` runtime.
442#[derive(Clone)]
443pub struct Config {
444    /// Seed for the random number generator.
445    pub seed: u64,
446
447    /// The cycle duration determines how much time is advanced after each iteration of the event
448    /// loop. This is useful to prevent starvation if some task never yields.
449    pub cycle: Duration,
450
451    /// If the runtime is still executing at this point (i.e. a test hasn't stopped), panic.
452    pub timeout: Option<Duration>,
453}
454
455impl Default for Config {
456    fn default() -> Self {
457        Self {
458            seed: 42,
459            cycle: Duration::from_millis(1),
460            timeout: None,
461        }
462    }
463}
464
465/// Deterministic runtime that randomly selects tasks to run based on a seed.
466pub struct Executor {
467    registry: Mutex<Registry>,
468    cycle: Duration,
469    deadline: Option<SystemTime>,
470    metrics: Arc<Metrics>,
471    auditor: Arc<Auditor>,
472    rng: Mutex<StdRng>,
473    time: Mutex<SystemTime>,
474    tasks: Arc<Tasks>,
475    sleeping: Mutex<BinaryHeap<Alarm>>,
476    partitions: Mutex<HashMap<String, Partition>>,
477    signaler: Mutex<Signaler>,
478    signal: Signal,
479    finished: Mutex<bool>,
480    recovered: Mutex<bool>,
481}
482
483impl Executor {
484    /// Initialize a new `deterministic` runtime with the given seed and cycle duration.
485    pub fn init(cfg: Config) -> (Runner, Context, Arc<Auditor>) {
486        // Ensure config is valid
487        if cfg.timeout.is_some() && cfg.cycle == Duration::default() {
488            panic!("cycle duration must be non-zero when timeout is set");
489        }
490
491        // Create a new registry
492        let mut registry = Registry::default();
493        let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
494
495        // Initialize runtime
496        let metrics = Arc::new(Metrics::init(runtime_registry));
497        let auditor = Arc::new(Auditor::new());
498        let start_time = UNIX_EPOCH;
499        let deadline = cfg
500            .timeout
501            .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
502        let (signaler, signal) = Signaler::new();
503        let executor = Arc::new(Self {
504            registry: Mutex::new(registry),
505            cycle: cfg.cycle,
506            deadline,
507            metrics: metrics.clone(),
508            auditor: auditor.clone(),
509            rng: Mutex::new(StdRng::seed_from_u64(cfg.seed)),
510            time: Mutex::new(start_time),
511            tasks: Arc::new(Tasks {
512                queue: Mutex::new(Vec::new()),
513                counter: Mutex::new(0),
514            }),
515            sleeping: Mutex::new(BinaryHeap::new()),
516            partitions: Mutex::new(HashMap::new()),
517            signaler: Mutex::new(signaler),
518            signal,
519            finished: Mutex::new(false),
520            recovered: Mutex::new(false),
521        });
522        (
523            Runner {
524                executor: executor.clone(),
525            },
526            Context {
527                label: String::new(),
528                spawned: false,
529                executor,
530                networking: Arc::new(Networking::new(metrics, auditor.clone())),
531            },
532            auditor,
533        )
534    }
535
536    /// Initialize a new `deterministic` runtime with the default configuration
537    /// and the provided seed.
538    pub fn seeded(seed: u64) -> (Runner, Context, Arc<Auditor>) {
539        let cfg = Config {
540            seed,
541            ..Config::default()
542        };
543        Self::init(cfg)
544    }
545
546    /// Initialize a new `deterministic` runtime with the default configuration
547    /// but exit after the given timeout.
548    pub fn timed(timeout: Duration) -> (Runner, Context, Arc<Auditor>) {
549        let cfg = Config {
550            timeout: Some(timeout),
551            ..Config::default()
552        };
553        Self::init(cfg)
554    }
555
556    /// Initialize a new `deterministic` runtime with the default configuration.
557    // We'd love to implement the trait but we can't because of the return type.
558    #[allow(clippy::should_implement_trait)]
559    pub fn default() -> (Runner, Context, Arc<Auditor>) {
560        Self::init(Config::default())
561    }
562}
563
564/// Implementation of [`crate::Runner`] for the `deterministic` runtime.
565pub struct Runner {
566    executor: Arc<Executor>,
567}
568
569impl crate::Runner for Runner {
570    fn start<F>(self, f: F) -> F::Output
571    where
572        F: Future + Send + 'static,
573        F::Output: Send + 'static,
574    {
575        // Add root task to the queue
576        let output = Arc::new(Mutex::new(None));
577        Tasks::register(
578            &self.executor.tasks,
579            "",
580            true,
581            Box::pin({
582                let output = output.clone();
583                async move {
584                    *output.lock().unwrap() = Some(f.await);
585                }
586            }),
587        );
588
589        // Process tasks until root task completes or progress stalls
590        let mut iter = 0;
591        loop {
592            // Ensure we have not exceeded our deadline
593            {
594                let current = self.executor.time.lock().unwrap();
595                if let Some(deadline) = self.executor.deadline {
596                    if *current >= deadline {
597                        panic!("runtime timeout");
598                    }
599                }
600            }
601
602            // Snapshot available tasks
603            let mut tasks = self.executor.tasks.drain();
604
605            // Shuffle tasks
606            {
607                let mut rng = self.executor.rng.lock().unwrap();
608                tasks.shuffle(&mut *rng);
609            }
610
611            // Run all snapshotted tasks
612            //
613            // This approach is more efficient than randomly selecting a task one-at-a-time
614            // because it ensures we don't pull the same pending task multiple times in a row (without
615            // processing a different task required for other tasks to make progress).
616            trace!(iter, tasks = tasks.len(), "starting loop");
617            for task in tasks {
618                // Record task for auditing
619                self.executor.auditor.process_task(task.id, &task.label);
620
621                // Check if task is already complete
622                if *task.completed.lock().unwrap() {
623                    trace!(id = task.id, "skipping already completed task");
624                    continue;
625                }
626                trace!(id = task.id, "processing task");
627
628                // Prepare task for polling
629                let waker = waker_ref(&task);
630                let mut context = task::Context::from_waker(&waker);
631                let mut future = task.future.lock().unwrap();
632
633                // Record task poll
634                self.executor
635                    .metrics
636                    .task_polls
637                    .get_or_create(&Work {
638                        label: task.label.clone(),
639                    })
640                    .inc();
641
642                // Task is re-queued in its `wake_by_ref` implementation as soon as we poll here (regardless
643                // of whether it is Pending/Ready).
644                let pending = future.as_mut().poll(&mut context).is_pending();
645                if pending {
646                    trace!(id = task.id, "task is still pending");
647                    continue;
648                }
649
650                // Mark task as completed
651                *task.completed.lock().unwrap() = true;
652                trace!(id = task.id, "task is complete");
653
654                // Root task completed
655                if task.root {
656                    *self.executor.finished.lock().unwrap() = true;
657                    return output.lock().unwrap().take().unwrap();
658                }
659            }
660
661            // Advance time by cycle
662            //
663            // This approach prevents starvation if some task never yields (to approximate this,
664            // duration can be set to 1ns).
665            let mut current;
666            {
667                let mut time = self.executor.time.lock().unwrap();
668                *time = time
669                    .checked_add(self.executor.cycle)
670                    .expect("executor time overflowed");
671                current = *time;
672            }
673            trace!(now = current.epoch_millis(), "time advanced",);
674
675            // Skip time if there is nothing to do
676            if self.executor.tasks.len() == 0 {
677                let mut skip = None;
678                {
679                    let sleeping = self.executor.sleeping.lock().unwrap();
680                    if let Some(next) = sleeping.peek() {
681                        if next.time > current {
682                            skip = Some(next.time);
683                        }
684                    }
685                }
686                if skip.is_some() {
687                    {
688                        let mut time = self.executor.time.lock().unwrap();
689                        *time = skip.unwrap();
690                        current = *time;
691                    }
692                    trace!(now = current.epoch_millis(), "time skipped",);
693                }
694            }
695
696            // Wake all sleeping tasks that are ready
697            let mut to_wake = Vec::new();
698            let mut remaining;
699            {
700                let mut sleeping = self.executor.sleeping.lock().unwrap();
701                while let Some(next) = sleeping.peek() {
702                    if next.time <= current {
703                        let sleeper = sleeping.pop().unwrap();
704                        to_wake.push(sleeper.waker);
705                    } else {
706                        break;
707                    }
708                }
709                remaining = sleeping.len();
710            }
711            for waker in to_wake {
712                waker.wake();
713            }
714
715            // Account for remaining tasks
716            remaining += self.executor.tasks.len();
717
718            // If there are no tasks to run and no tasks sleeping, the executor is stalled
719            // and will never finish.
720            if remaining == 0 {
721                panic!("runtime stalled");
722            }
723            iter += 1;
724        }
725    }
726}
727
728/// Implementation of [`crate::Spawner`], [`crate::Clock`],
729/// [`crate::Network`], and [`crate::Storage`] for the `deterministic`
730/// runtime.
731pub struct Context {
732    label: String,
733    spawned: bool,
734    executor: Arc<Executor>,
735    networking: Arc<Networking>,
736}
737
738impl Context {
739    /// Recover the inner state (deadline, metrics, auditor, rng, synced storage, etc.) from the
740    /// current runtime and use it to initialize a new instance of the runtime. A recovered runtime
741    /// does not inherit the current runtime's pending tasks, unsynced storage, network connections, nor
742    /// its shutdown signaler.
743    ///
744    /// This is useful for performing a deterministic simulation that spans multiple runtime instantiations,
745    /// like simulating unclean shutdown (which involves repeatedly halting the runtime at unexpected intervals).
746    ///
747    /// It is only permitted to call this method after the runtime has finished (i.e. once `start` returns)
748    /// and only permitted to do once (otherwise multiple recovered runtimes will share the same inner state).
749    /// If either one of these conditions is violated, this method will panic.
750    pub fn recover(self) -> (Runner, Self, Arc<Auditor>) {
751        // Ensure we are finished
752        if !*self.executor.finished.lock().unwrap() {
753            panic!("execution is not finished");
754        }
755
756        // Ensure runtime has not already been recovered
757        {
758            let mut recovered = self.executor.recovered.lock().unwrap();
759            if *recovered {
760                panic!("runtime has already been recovered");
761            }
762            *recovered = true;
763        }
764
765        // Rebuild metrics
766        let mut registry = Registry::default();
767        let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
768        let metrics = Arc::new(Metrics::init(runtime_registry));
769
770        // Copy state
771        let auditor = self.executor.auditor.clone();
772        let (signaler, signal) = Signaler::new();
773        let executor = Arc::new(Executor {
774            // Copied from the current runtime
775            cycle: self.executor.cycle,
776            deadline: self.executor.deadline,
777            auditor: auditor.clone(),
778            rng: Mutex::new(self.executor.rng.lock().unwrap().clone()),
779            time: Mutex::new(*self.executor.time.lock().unwrap()),
780            partitions: Mutex::new(self.executor.partitions.lock().unwrap().clone()),
781
782            // New state for the new runtime
783            registry: Mutex::new(registry),
784            metrics: metrics.clone(),
785            tasks: Arc::new(Tasks {
786                queue: Mutex::new(Vec::new()),
787                counter: Mutex::new(0),
788            }),
789            sleeping: Mutex::new(BinaryHeap::new()),
790            signaler: Mutex::new(signaler),
791            signal,
792            finished: Mutex::new(false),
793            recovered: Mutex::new(false),
794        });
795        (
796            Runner {
797                executor: executor.clone(),
798            },
799            Self {
800                label: String::new(),
801                spawned: false,
802                executor,
803                networking: Arc::new(Networking::new(metrics, auditor.clone())),
804            },
805            auditor,
806        )
807    }
808}
809
810impl Clone for Context {
811    fn clone(&self) -> Self {
812        Self {
813            label: self.label.clone(),
814            spawned: false,
815            executor: self.executor.clone(),
816            networking: self.networking.clone(),
817        }
818    }
819}
820
821impl crate::Spawner for Context {
822    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
823    where
824        F: FnOnce(Self) -> Fut + Send + 'static,
825        Fut: Future<Output = T> + Send + 'static,
826        T: Send + 'static,
827    {
828        // Ensure a context only spawns one task
829        assert!(!self.spawned, "already spawned");
830
831        // Get metrics
832        let label = self.label.clone();
833        let work = Work {
834            label: label.clone(),
835        };
836        self.executor
837            .metrics
838            .tasks_spawned
839            .get_or_create(&work)
840            .inc();
841        let gauge = self
842            .executor
843            .metrics
844            .tasks_running
845            .get_or_create(&work)
846            .clone();
847
848        // Set up the task
849        let executor = self.executor.clone();
850        let future = f(self);
851        let (f, handle) = Handle::init(future, gauge, false);
852
853        // Spawn the task
854        Tasks::register(&executor.tasks, &label, false, Box::pin(f));
855        handle
856    }
857
858    fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
859    where
860        F: Future<Output = T> + Send + 'static,
861        T: Send + 'static,
862    {
863        // Ensure a context only spawns one task
864        assert!(!self.spawned, "already spawned");
865        self.spawned = true;
866
867        // Get metrics
868        let work = Work {
869            label: self.label.clone(),
870        };
871        self.executor
872            .metrics
873            .tasks_spawned
874            .get_or_create(&work)
875            .inc();
876        let gauge = self
877            .executor
878            .metrics
879            .tasks_running
880            .get_or_create(&work)
881            .clone();
882
883        // Set up the task
884        let label = self.label.clone();
885        let executor = self.executor.clone();
886        move |f: F| {
887            let (f, handle) = Handle::init(f, gauge, false);
888
889            // Spawn the task
890            Tasks::register(&executor.tasks, &label, false, Box::pin(f));
891            handle
892        }
893    }
894
895    fn spawn_blocking<F, T>(self, f: F) -> Handle<T>
896    where
897        F: FnOnce() -> T + Send + 'static,
898        T: Send + 'static,
899    {
900        // Ensure a context only spawns one task
901        assert!(!self.spawned, "already spawned");
902
903        // Get metrics
904        let work = Work {
905            label: self.label.clone(),
906        };
907        self.executor
908            .metrics
909            .blocking_tasks_spawned
910            .get_or_create(&work)
911            .inc();
912        let gauge = self
913            .executor
914            .metrics
915            .blocking_tasks_running
916            .get_or_create(&work)
917            .clone();
918
919        // Initialize the blocking task
920        let (f, handle) = Handle::init_blocking(f, gauge, false);
921
922        // Spawn the task
923        let f = async move { f() };
924        Tasks::register(&self.executor.tasks, &self.label, false, Box::pin(f));
925        handle
926    }
927
928    fn stop(&self, value: i32) {
929        self.executor.auditor.stop(value);
930        self.executor.signaler.lock().unwrap().signal(value);
931    }
932
933    fn stopped(&self) -> Signal {
934        self.executor.auditor.stopped();
935        self.executor.signal.clone()
936    }
937}
938
939impl crate::Metrics for Context {
940    fn with_label(&self, label: &str) -> Self {
941        let label = {
942            let prefix = self.label.clone();
943            if prefix.is_empty() {
944                label.to_string()
945            } else {
946                format!("{}_{}", prefix, label)
947            }
948        };
949        assert!(
950            !label.starts_with(METRICS_PREFIX),
951            "using runtime label is not allowed"
952        );
953        Self {
954            label,
955            spawned: false,
956            executor: self.executor.clone(),
957            networking: self.networking.clone(),
958        }
959    }
960
961    fn label(&self) -> String {
962        self.label.clone()
963    }
964
965    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
966        // Prepare args
967        let name = name.into();
968        let help = help.into();
969
970        // Register metric
971        self.executor.auditor.register(&name, &help);
972        let prefixed_name = {
973            let prefix = &self.label;
974            if prefix.is_empty() {
975                name
976            } else {
977                format!("{}_{}", *prefix, name)
978            }
979        };
980        self.executor
981            .registry
982            .lock()
983            .unwrap()
984            .register(prefixed_name, help, metric)
985    }
986
987    fn encode(&self) -> String {
988        self.executor.auditor.encode();
989        let mut buffer = String::new();
990        encode(&mut buffer, &self.executor.registry.lock().unwrap()).expect("encoding failed");
991        buffer
992    }
993}
994
995struct Sleeper {
996    executor: Arc<Executor>,
997    time: SystemTime,
998    registered: bool,
999}
1000
1001struct Alarm {
1002    time: SystemTime,
1003    waker: Waker,
1004}
1005
1006impl PartialEq for Alarm {
1007    fn eq(&self, other: &Self) -> bool {
1008        self.time.eq(&other.time)
1009    }
1010}
1011
1012impl Eq for Alarm {}
1013
1014impl PartialOrd for Alarm {
1015    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1016        Some(self.cmp(other))
1017    }
1018}
1019
1020impl Ord for Alarm {
1021    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1022        // Reverse the ordering for min-heap
1023        other.time.cmp(&self.time)
1024    }
1025}
1026
1027impl Future for Sleeper {
1028    type Output = ();
1029
1030    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1031        {
1032            let current_time = *self.executor.time.lock().unwrap();
1033            if current_time >= self.time {
1034                return Poll::Ready(());
1035            }
1036        }
1037        if !self.registered {
1038            self.registered = true;
1039            self.executor.sleeping.lock().unwrap().push(Alarm {
1040                time: self.time,
1041                waker: cx.waker().clone(),
1042            });
1043        }
1044        Poll::Pending
1045    }
1046}
1047
1048impl Clock for Context {
1049    fn current(&self) -> SystemTime {
1050        *self.executor.time.lock().unwrap()
1051    }
1052
1053    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
1054        let time = self
1055            .current()
1056            .checked_add(duration)
1057            .expect("overflow when setting wake time");
1058        Sleeper {
1059            executor: self.executor.clone(),
1060
1061            time,
1062            registered: false,
1063        }
1064    }
1065
1066    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
1067        Sleeper {
1068            executor: self.executor.clone(),
1069
1070            time: deadline,
1071            registered: false,
1072        }
1073    }
1074}
1075
1076impl GClock for Context {
1077    type Instant = SystemTime;
1078
1079    fn now(&self) -> Self::Instant {
1080        self.current()
1081    }
1082}
1083
1084impl ReasonablyRealtime for Context {}
1085
1086type Dialable = mpsc::UnboundedSender<(
1087    SocketAddr,
1088    mocks::Sink,   // Listener -> Dialer
1089    mocks::Stream, // Dialer -> Listener
1090)>;
1091
1092/// Implementation of [`crate::Network`] for the `deterministic` runtime.
1093///
1094/// When a dialer connects to a listener, the listener is given a new ephemeral port
1095/// from the range `32768..61000`. To keep things simple, it is not possible to
1096/// bind to an ephemeral port. Likewise, if ports are not reused and when exhausted,
1097/// the runtime will panic.
1098struct Networking {
1099    metrics: Arc<Metrics>,
1100    auditor: Arc<Auditor>,
1101    ephemeral: Mutex<u16>,
1102    listeners: Mutex<HashMap<SocketAddr, Dialable>>,
1103}
1104
1105impl Networking {
1106    fn new(metrics: Arc<Metrics>, auditor: Arc<Auditor>) -> Self {
1107        Self {
1108            metrics,
1109            auditor,
1110            ephemeral: Mutex::new(EPHEMERAL_PORT_RANGE.start),
1111            listeners: Mutex::new(HashMap::new()),
1112        }
1113    }
1114
1115    fn bind(&self, socket: SocketAddr) -> Result<Listener, Error> {
1116        self.auditor.bind(socket);
1117
1118        // If the IP is localhost, ensure the port is not in the ephemeral range
1119        // so that it can be used for binding in the dial method
1120        if socket.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST)
1121            && EPHEMERAL_PORT_RANGE.contains(&socket.port())
1122        {
1123            return Err(Error::BindFailed);
1124        }
1125
1126        // Ensure the port is not already bound
1127        let mut listeners = self.listeners.lock().unwrap();
1128        if listeners.contains_key(&socket) {
1129            return Err(Error::BindFailed);
1130        }
1131
1132        // Bind the socket
1133        let (sender, receiver) = mpsc::unbounded();
1134        listeners.insert(socket, sender);
1135        Ok(Listener {
1136            auditor: self.auditor.clone(),
1137            address: socket,
1138            listener: receiver,
1139            metrics: self.metrics.clone(),
1140        })
1141    }
1142
1143    async fn dial(&self, socket: SocketAddr) -> Result<(Sink, Stream), Error> {
1144        // Assign dialer a port from the ephemeral range
1145        let dialer = {
1146            let mut ephemeral = self.ephemeral.lock().unwrap();
1147            let dialer = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), *ephemeral);
1148            *ephemeral = ephemeral
1149                .checked_add(1)
1150                .expect("ephemeral port range exhausted");
1151            dialer
1152        };
1153        self.auditor.dial(dialer, socket);
1154
1155        // Get listener
1156        let mut sender = {
1157            let listeners = self.listeners.lock().unwrap();
1158            let sender = listeners.get(&socket).ok_or(Error::ConnectionFailed)?;
1159            sender.clone()
1160        };
1161
1162        // Construct connection
1163        let (dialer_sender, dialer_receiver) = mocks::Channel::init();
1164        let (listener_sender, listener_receiver) = mocks::Channel::init();
1165        sender
1166            .send((dialer, dialer_sender, listener_receiver))
1167            .await
1168            .map_err(|_| Error::ConnectionFailed)?;
1169        Ok((
1170            Sink {
1171                metrics: self.metrics.clone(),
1172                auditor: self.auditor.clone(),
1173                me: dialer,
1174                peer: socket,
1175                sender: listener_sender,
1176            },
1177            Stream {
1178                auditor: self.auditor.clone(),
1179                me: dialer,
1180                peer: socket,
1181                receiver: dialer_receiver,
1182            },
1183        ))
1184    }
1185}
1186
1187impl crate::Network<Listener, Sink, Stream> for Context {
1188    async fn bind(&self, socket: SocketAddr) -> Result<Listener, Error> {
1189        self.networking.bind(socket)
1190    }
1191
1192    async fn dial(&self, socket: SocketAddr) -> Result<(Sink, Stream), Error> {
1193        self.networking.dial(socket).await
1194    }
1195}
1196
1197/// Implementation of [`crate::Listener`] for the `deterministic` runtime.
1198pub struct Listener {
1199    metrics: Arc<Metrics>,
1200    auditor: Arc<Auditor>,
1201    address: SocketAddr,
1202    listener: mpsc::UnboundedReceiver<(SocketAddr, mocks::Sink, mocks::Stream)>,
1203}
1204
1205impl crate::Listener<Sink, Stream> for Listener {
1206    async fn accept(&mut self) -> Result<(SocketAddr, Sink, Stream), Error> {
1207        let (socket, sender, receiver) = self.listener.next().await.ok_or(Error::ReadFailed)?;
1208        self.auditor.accept(self.address, socket);
1209        Ok((
1210            socket,
1211            Sink {
1212                metrics: self.metrics.clone(),
1213                auditor: self.auditor.clone(),
1214                me: self.address,
1215                peer: socket,
1216                sender,
1217            },
1218            Stream {
1219                auditor: self.auditor.clone(),
1220                me: self.address,
1221                peer: socket,
1222                receiver,
1223            },
1224        ))
1225    }
1226}
1227
1228/// Implementation of [`crate::Sink`] for the `deterministic` runtime.
1229pub struct Sink {
1230    metrics: Arc<Metrics>,
1231    auditor: Arc<Auditor>,
1232    me: SocketAddr,
1233    peer: SocketAddr,
1234    sender: mocks::Sink,
1235}
1236
1237impl crate::Sink for Sink {
1238    async fn send(&mut self, msg: &[u8]) -> Result<(), Error> {
1239        self.auditor.send(self.me, self.peer, msg);
1240        self.sender.send(msg).await.map_err(|_| Error::SendFailed)?;
1241        self.metrics.network_bandwidth.inc_by(msg.len() as u64);
1242        Ok(())
1243    }
1244}
1245
1246/// Implementation of [`crate::Stream`] for the `deterministic` runtime.
1247pub struct Stream {
1248    auditor: Arc<Auditor>,
1249    me: SocketAddr,
1250    peer: SocketAddr,
1251    receiver: mocks::Stream,
1252}
1253
1254impl crate::Stream for Stream {
1255    async fn recv(&mut self, buf: &mut [u8]) -> Result<(), Error> {
1256        self.receiver
1257            .recv(buf)
1258            .await
1259            .map_err(|_| Error::RecvFailed)?;
1260        self.auditor.recv(self.me, self.peer, buf);
1261        Ok(())
1262    }
1263}
1264
1265impl RngCore for Context {
1266    fn next_u32(&mut self) -> u32 {
1267        self.executor.auditor.rand("next_u32".to_string());
1268        self.executor.rng.lock().unwrap().next_u32()
1269    }
1270
1271    fn next_u64(&mut self) -> u64 {
1272        self.executor.auditor.rand("next_u64".to_string());
1273        self.executor.rng.lock().unwrap().next_u64()
1274    }
1275
1276    fn fill_bytes(&mut self, dest: &mut [u8]) {
1277        self.executor.auditor.rand("fill_bytes".to_string());
1278        self.executor.rng.lock().unwrap().fill_bytes(dest)
1279    }
1280
1281    fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1282        self.executor.auditor.rand("try_fill_bytes".to_string());
1283        self.executor.rng.lock().unwrap().try_fill_bytes(dest)
1284    }
1285}
1286
1287impl CryptoRng for Context {}
1288
1289/// Implementation of [`crate::Blob`] for the `deterministic` runtime.
1290pub struct Blob {
1291    executor: Arc<Executor>,
1292
1293    partition: String,
1294    name: Vec<u8>,
1295
1296    // For content to be updated for future opens,
1297    // it must be synced back to the partition (occurs on
1298    // `sync` and `close`).
1299    content: Arc<RwLock<Vec<u8>>>,
1300}
1301
1302impl Blob {
1303    fn new(executor: Arc<Executor>, partition: String, name: &[u8], content: Vec<u8>) -> Self {
1304        executor.metrics.open_blobs.inc();
1305        Self {
1306            executor,
1307            partition,
1308            name: name.into(),
1309            content: Arc::new(RwLock::new(content)),
1310        }
1311    }
1312}
1313
1314impl Clone for Blob {
1315    fn clone(&self) -> Self {
1316        // We implement `Clone` manually to ensure the `open_blobs` gauge is updated.
1317        self.executor.metrics.open_blobs.inc();
1318        Self {
1319            executor: self.executor.clone(),
1320            partition: self.partition.clone(),
1321            name: self.name.clone(),
1322            content: self.content.clone(),
1323        }
1324    }
1325}
1326
1327impl crate::Storage<Blob> for Context {
1328    async fn open(&self, partition: &str, name: &[u8]) -> Result<Blob, Error> {
1329        self.executor.auditor.open(partition, name);
1330        let mut partitions = self.executor.partitions.lock().unwrap();
1331        let partition_entry = partitions.entry(partition.into()).or_default();
1332        let content = partition_entry.entry(name.into()).or_default();
1333        Ok(Blob::new(
1334            self.executor.clone(),
1335            partition.into(),
1336            name,
1337            content.clone(),
1338        ))
1339    }
1340
1341    async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1342        self.executor.auditor.remove(partition, name);
1343        let mut partitions = self.executor.partitions.lock().unwrap();
1344        match name {
1345            Some(name) => {
1346                partitions
1347                    .get_mut(partition)
1348                    .ok_or(Error::PartitionMissing(partition.into()))?
1349                    .remove(name)
1350                    .ok_or(Error::BlobMissing(partition.into(), hex(name)))?;
1351            }
1352            None => {
1353                partitions
1354                    .remove(partition)
1355                    .ok_or(Error::PartitionMissing(partition.into()))?;
1356            }
1357        }
1358        Ok(())
1359    }
1360
1361    async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1362        self.executor.auditor.scan(partition);
1363        let partitions = self.executor.partitions.lock().unwrap();
1364        let partition = partitions
1365            .get(partition)
1366            .ok_or(Error::PartitionMissing(partition.into()))?;
1367        let mut results = Vec::with_capacity(partition.len());
1368        for name in partition.keys() {
1369            results.push(name.clone());
1370        }
1371        results.sort(); // deterministic output
1372        Ok(results)
1373    }
1374}
1375
1376impl crate::Blob for Blob {
1377    async fn len(&self) -> Result<u64, Error> {
1378        self.executor.auditor.len(&self.partition, &self.name);
1379        let content = self.content.read().unwrap();
1380        Ok(content.len() as u64)
1381    }
1382
1383    async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
1384        let buf_len = buf.len();
1385        self.executor
1386            .auditor
1387            .read_at(&self.partition, &self.name, buf_len, offset);
1388        let offset = offset.try_into().map_err(|_| Error::OffsetOverflow)?;
1389        let content = self.content.read().unwrap();
1390        let content_len = content.len();
1391        if offset + buf_len > content_len {
1392            return Err(Error::BlobInsufficientLength);
1393        }
1394        buf.copy_from_slice(&content[offset..offset + buf_len]);
1395        self.executor.metrics.storage_reads.inc();
1396        self.executor
1397            .metrics
1398            .storage_read_bandwidth
1399            .inc_by(buf_len as u64);
1400        Ok(())
1401    }
1402
1403    async fn write_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
1404        self.executor
1405            .auditor
1406            .write_at(&self.partition, &self.name, buf, offset);
1407        let offset = offset.try_into().map_err(|_| Error::OffsetOverflow)?;
1408        let mut content = self.content.write().unwrap();
1409        let required = offset + buf.len();
1410        if required > content.len() {
1411            content.resize(required, 0);
1412        }
1413        content[offset..offset + buf.len()].copy_from_slice(buf);
1414        self.executor.metrics.storage_writes.inc();
1415        self.executor
1416            .metrics
1417            .storage_write_bandwidth
1418            .inc_by(buf.len() as u64);
1419        Ok(())
1420    }
1421
1422    async fn truncate(&self, len: u64) -> Result<(), Error> {
1423        self.executor
1424            .auditor
1425            .truncate(&self.partition, &self.name, len);
1426        let len = len.try_into().map_err(|_| Error::OffsetOverflow)?;
1427        let mut content = self.content.write().unwrap();
1428        content.truncate(len);
1429        Ok(())
1430    }
1431
1432    async fn sync(&self) -> Result<(), Error> {
1433        // Create new content for partition
1434        //
1435        // Doing this first means we don't need to hold both the content
1436        // lock and the partition lock at the same time.
1437        let new_content = self.content.read().unwrap().clone();
1438
1439        // Update partition content
1440        self.executor.auditor.sync(&self.partition, &self.name);
1441        let mut partitions = self.executor.partitions.lock().unwrap();
1442        let partition = partitions
1443            .get_mut(&self.partition)
1444            .ok_or(Error::PartitionMissing(self.partition.clone()))?;
1445        let content = partition
1446            .get_mut(&self.name)
1447            .ok_or(Error::BlobMissing(self.partition.clone(), hex(&self.name)))?;
1448        *content = new_content;
1449        Ok(())
1450    }
1451
1452    async fn close(self) -> Result<(), Error> {
1453        self.executor.auditor.close(&self.partition, &self.name);
1454        self.sync().await?;
1455        Ok(())
1456    }
1457}
1458
1459impl Drop for Blob {
1460    fn drop(&mut self) {
1461        self.executor.metrics.open_blobs.dec();
1462    }
1463}
1464
1465#[cfg(test)]
1466mod tests {
1467    use super::*;
1468    use crate::{utils::run_tasks, Blob, Runner, Storage};
1469    use commonware_macros::test_traced;
1470    use futures::task::noop_waker;
1471
1472    fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1473        let (executor, context, auditor) = Executor::seeded(seed);
1474        let messages = run_tasks(5, executor, context);
1475        (auditor.state(), messages)
1476    }
1477
1478    #[test]
1479    fn test_same_seed_same_order() {
1480        // Generate initial outputs
1481        let mut outputs = Vec::new();
1482        for seed in 0..1000 {
1483            let output = run_with_seed(seed);
1484            outputs.push(output);
1485        }
1486
1487        // Ensure they match
1488        for seed in 0..1000 {
1489            let output = run_with_seed(seed);
1490            assert_eq!(output, outputs[seed as usize]);
1491        }
1492    }
1493
1494    #[test_traced("TRACE")]
1495    fn test_different_seeds_different_order() {
1496        let output1 = run_with_seed(12345);
1497        let output2 = run_with_seed(54321);
1498        assert_ne!(output1, output2);
1499    }
1500
1501    #[test]
1502    fn test_alarm_min_heap() {
1503        // Populate heap
1504        let now = SystemTime::now();
1505        let alarms = vec![
1506            Alarm {
1507                time: now + Duration::new(10, 0),
1508                waker: noop_waker(),
1509            },
1510            Alarm {
1511                time: now + Duration::new(5, 0),
1512                waker: noop_waker(),
1513            },
1514            Alarm {
1515                time: now + Duration::new(15, 0),
1516                waker: noop_waker(),
1517            },
1518            Alarm {
1519                time: now + Duration::new(5, 0),
1520                waker: noop_waker(),
1521            },
1522        ];
1523        let mut heap = BinaryHeap::new();
1524        for alarm in alarms {
1525            heap.push(alarm);
1526        }
1527
1528        // Verify min-heap
1529        let mut sorted_times = Vec::new();
1530        while let Some(alarm) = heap.pop() {
1531            sorted_times.push(alarm.time);
1532        }
1533        assert_eq!(
1534            sorted_times,
1535            vec![
1536                now + Duration::new(5, 0),
1537                now + Duration::new(5, 0),
1538                now + Duration::new(10, 0),
1539                now + Duration::new(15, 0),
1540            ]
1541        );
1542    }
1543
1544    #[test]
1545    #[should_panic(expected = "runtime timeout")]
1546    fn test_timeout() {
1547        let (executor, context, _) = Executor::timed(Duration::from_secs(10));
1548        executor.start(async move {
1549            loop {
1550                context.sleep(Duration::from_secs(1)).await;
1551            }
1552        });
1553    }
1554
1555    #[test]
1556    #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1557    fn test_bad_timeout() {
1558        let cfg = Config {
1559            timeout: Some(Duration::default()),
1560            cycle: Duration::default(),
1561            ..Config::default()
1562        };
1563        Executor::init(cfg);
1564    }
1565
1566    #[test]
1567    fn test_recover_synced_storage_persists() {
1568        // Initialize the first runtime
1569        let (executor1, context1, auditor1) = Executor::default();
1570        let partition = "test_partition";
1571        let name = b"test_blob";
1572        let data = b"Hello, world!".to_vec();
1573
1574        // Run some tasks and sync storage
1575        executor1.start({
1576            let context = context1.clone();
1577            let data = data.clone();
1578            async move {
1579                let blob = context.open(partition, name).await.unwrap();
1580                blob.write_at(&data, 0).await.unwrap();
1581                blob.sync().await.unwrap();
1582            }
1583        });
1584        let state1 = auditor1.state();
1585
1586        // Recover the runtime
1587        let (executor2, context2, auditor2) = context1.recover();
1588
1589        // Verify auditor state is the same
1590        let state2 = auditor2.state();
1591        assert_eq!(state1, state2);
1592
1593        // Check that synced storage persists after recovery
1594        executor2.start(async move {
1595            let blob = context2.open(partition, name).await.unwrap();
1596            let len = blob.len().await.unwrap();
1597            assert_eq!(len, data.len() as u64);
1598            let mut buf = vec![0; len as usize];
1599            blob.read_at(&mut buf, 0).await.unwrap();
1600            assert_eq!(buf, data);
1601        });
1602    }
1603
1604    #[test]
1605    fn test_recover_unsynced_storage_does_not_persist() {
1606        // Initialize the first runtime
1607        let (executor1, context1, _) = Executor::default();
1608        let partition = "test_partition";
1609        let name = b"test_blob";
1610        let data = b"Hello, world!".to_vec();
1611
1612        // Run some tasks without syncing storage
1613        executor1.start({
1614            let context = context1.clone();
1615            async move {
1616                let blob = context.open(partition, name).await.unwrap();
1617                blob.write_at(&data, 0).await.unwrap();
1618                // Intentionally do not call sync() here
1619            }
1620        });
1621
1622        // Recover the runtime
1623        let (executor2, context2, _) = context1.recover();
1624
1625        // Check that unsynced storage does not persist after recovery
1626        executor2.start(async move {
1627            let blob = context2.open(partition, name).await.unwrap();
1628            let len = blob.len().await.unwrap();
1629            assert_eq!(len, 0);
1630        });
1631    }
1632
1633    #[test]
1634    #[should_panic(expected = "execution is not finished")]
1635    fn test_recover_before_finish_panics() {
1636        // Initialize runtime
1637        let (_, context, _) = Executor::default();
1638
1639        // Attempt to recover before the runtime has finished
1640        context.recover();
1641    }
1642
1643    #[test]
1644    #[should_panic(expected = "runtime has already been recovered")]
1645    fn test_recover_twice_panics() {
1646        // Initialize runtime
1647        let (executor, context, _) = Executor::default();
1648
1649        // Finish runtime
1650        executor.start(async move {});
1651
1652        // Recover for the first time
1653        let cloned_context = context.clone();
1654        context.recover();
1655
1656        // Attempt to recover again using the same context
1657        cloned_context.recover();
1658    }
1659}