commonware_runtime/
deterministic.rs

1//! A deterministic runtime that randomly selects tasks to run based on a seed
2//!
3//! # Panics
4//!
5//! If any task panics, the runtime will panic (and shutdown).
6//!
7//! # Example
8//!
9//! ```rust
10//! use commonware_runtime::{Spawner, Runner, deterministic::Executor, Metrics};
11//!
12//! let (executor, context, auditor) = Executor::default();
13//! executor.start(async move {
14//!     println!("Parent started");
15//!     let result = context.with_label("child").spawn(|_| async move {
16//!         println!("Child started");
17//!         "hello"
18//!     });
19//!     println!("Child result: {:?}", result.await);
20//!     println!("Parent exited");
21//! });
22//! println!("Auditor state: {}", auditor.state());
23//! ```
24
25use crate::{mocks, utils::Signaler, Clock, Error, Handle, Signal, METRICS_PREFIX};
26use commonware_utils::{hex, SystemTimeExt};
27use futures::{
28    channel::mpsc,
29    task::{waker_ref, ArcWake},
30    SinkExt, StreamExt,
31};
32use governor::clock::{Clock as GClock, ReasonablyRealtime};
33use prometheus_client::{
34    encoding::{text::encode, EncodeLabelSet},
35    metrics::{counter::Counter, family::Family, gauge::Gauge},
36    registry::{Metric, Registry},
37};
38use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
39use sha2::{Digest, Sha256};
40use std::{
41    collections::{BinaryHeap, HashMap},
42    future::Future,
43    mem::replace,
44    net::{IpAddr, Ipv4Addr, SocketAddr},
45    ops::Range,
46    pin::Pin,
47    sync::{Arc, Mutex, RwLock},
48    task::{self, Poll, Waker},
49    time::{Duration, SystemTime, UNIX_EPOCH},
50};
51use tracing::trace;
52
53/// Range of ephemeral ports assigned to dialers.
54const EPHEMERAL_PORT_RANGE: Range<u16> = 32768..61000;
55
56/// Map of names to blob contents.
57pub type Partition = HashMap<Vec<u8>, Vec<u8>>;
58
59#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
60struct Work {
61    label: String,
62}
63
64#[derive(Debug)]
65struct Metrics {
66    tasks_spawned: Family<Work, Counter>,
67    tasks_running: Family<Work, Gauge>,
68    task_polls: Family<Work, Counter>,
69
70    network_bandwidth: Counter,
71
72    open_blobs: Gauge,
73    storage_reads: Counter,
74    storage_read_bandwidth: Counter,
75    storage_writes: Counter,
76    storage_write_bandwidth: Counter,
77}
78
79impl Metrics {
80    pub fn init(registry: &mut Registry) -> Self {
81        let metrics = Self {
82            task_polls: Family::default(),
83            tasks_running: Family::default(),
84            tasks_spawned: Family::default(),
85            network_bandwidth: Counter::default(),
86            open_blobs: Gauge::default(),
87            storage_reads: Counter::default(),
88            storage_read_bandwidth: Counter::default(),
89            storage_writes: Counter::default(),
90            storage_write_bandwidth: Counter::default(),
91        };
92        registry.register(
93            "tasks_spawned",
94            "Total number of tasks spawned",
95            metrics.tasks_spawned.clone(),
96        );
97        registry.register(
98            "tasks_running",
99            "Number of tasks currently running",
100            metrics.tasks_running.clone(),
101        );
102        registry.register(
103            "task_polls",
104            "Total number of task polls",
105            metrics.task_polls.clone(),
106        );
107        registry.register(
108            "bandwidth",
109            "Total amount of data sent over network",
110            metrics.network_bandwidth.clone(),
111        );
112        registry.register(
113            "open_blobs",
114            "Number of open blobs",
115            metrics.open_blobs.clone(),
116        );
117        registry.register(
118            "storage_reads",
119            "Total number of disk reads",
120            metrics.storage_reads.clone(),
121        );
122        registry.register(
123            "storage_read_bandwidth",
124            "Total amount of data read from disk",
125            metrics.storage_read_bandwidth.clone(),
126        );
127        registry.register(
128            "storage_writes",
129            "Total number of disk writes",
130            metrics.storage_writes.clone(),
131        );
132        registry.register(
133            "storage_write_bandwidth",
134            "Total amount of data written to disk",
135            metrics.storage_write_bandwidth.clone(),
136        );
137        metrics
138    }
139}
140
141/// Track the state of the runtime for determinism auditing.
142pub struct Auditor {
143    hash: Mutex<Vec<u8>>,
144}
145
146impl Auditor {
147    fn new() -> Self {
148        Self {
149            hash: Mutex::new(Vec::new()),
150        }
151    }
152
153    fn process_task(&self, task: u128, label: &str) {
154        let mut hash = self.hash.lock().unwrap();
155        let mut hasher = Sha256::new();
156        hasher.update(&*hash);
157        hasher.update(b"process_task");
158        hasher.update(task.to_be_bytes());
159        hasher.update(label.as_bytes());
160        *hash = hasher.finalize().to_vec();
161    }
162
163    fn stop(&self, value: i32) {
164        let mut hash = self.hash.lock().unwrap();
165        let mut hasher = Sha256::new();
166        hasher.update(&*hash);
167        hasher.update(b"stop");
168        hasher.update(value.to_be_bytes());
169        *hash = hasher.finalize().to_vec();
170    }
171
172    fn stopped(&self) {
173        let mut hash = self.hash.lock().unwrap();
174        let mut hasher = Sha256::new();
175        hasher.update(&*hash);
176        hasher.update(b"stopped");
177        *hash = hasher.finalize().to_vec();
178    }
179
180    fn bind(&self, address: SocketAddr) {
181        let mut hash = self.hash.lock().unwrap();
182        let mut hasher = Sha256::new();
183        hasher.update(&*hash);
184        hasher.update(b"bind");
185        hasher.update(address.to_string().as_bytes());
186        *hash = hasher.finalize().to_vec();
187    }
188
189    fn dial(&self, dialer: SocketAddr, dialee: SocketAddr) {
190        let mut hash = self.hash.lock().unwrap();
191        let mut hasher = Sha256::new();
192        hasher.update(&*hash);
193        hasher.update(b"dial");
194        hasher.update(dialer.to_string().as_bytes());
195        hasher.update(dialee.to_string().as_bytes());
196        *hash = hasher.finalize().to_vec();
197    }
198
199    fn accept(&self, dialee: SocketAddr, dialer: SocketAddr) {
200        let mut hash = self.hash.lock().unwrap();
201        let mut hasher = Sha256::new();
202        hasher.update(&*hash);
203        hasher.update(b"accept");
204        hasher.update(dialee.to_string().as_bytes());
205        hasher.update(dialer.to_string().as_bytes());
206        *hash = hasher.finalize().to_vec();
207    }
208
209    fn send(&self, sender: SocketAddr, receiver: SocketAddr, message: &[u8]) {
210        let mut hash = self.hash.lock().unwrap();
211        let mut hasher = Sha256::new();
212        hasher.update(&*hash);
213        hasher.update(b"send");
214        hasher.update(sender.to_string().as_bytes());
215        hasher.update(receiver.to_string().as_bytes());
216        hasher.update(message);
217        *hash = hasher.finalize().to_vec();
218    }
219
220    fn recv(&self, receiver: SocketAddr, sender: SocketAddr, message: &[u8]) {
221        let mut hash = self.hash.lock().unwrap();
222        let mut hasher = Sha256::new();
223        hasher.update(&*hash);
224        hasher.update(b"recv");
225        hasher.update(receiver.to_string().as_bytes());
226        hasher.update(sender.to_string().as_bytes());
227        hasher.update(message);
228        *hash = hasher.finalize().to_vec();
229    }
230
231    fn rand(&self, method: String) {
232        let mut hash = self.hash.lock().unwrap();
233        let mut hasher = Sha256::new();
234        hasher.update(&*hash);
235        hasher.update(b"rand");
236        hasher.update(method.as_bytes());
237        *hash = hasher.finalize().to_vec();
238    }
239
240    fn open(&self, partition: &str, name: &[u8]) {
241        let mut hash = self.hash.lock().unwrap();
242        let mut hasher = Sha256::new();
243        hasher.update(&*hash);
244        hasher.update(b"open");
245        hasher.update(partition.as_bytes());
246        hasher.update(name);
247        *hash = hasher.finalize().to_vec();
248    }
249
250    fn remove(&self, partition: &str, name: Option<&[u8]>) {
251        let mut hash = self.hash.lock().unwrap();
252        let mut hasher = Sha256::new();
253        hasher.update(&*hash);
254        hasher.update(b"remove");
255        hasher.update(partition.as_bytes());
256        if let Some(name) = name {
257            hasher.update(name);
258        }
259        *hash = hasher.finalize().to_vec();
260    }
261
262    fn scan(&self, partition: &str) {
263        let mut hash = self.hash.lock().unwrap();
264        let mut hasher = Sha256::new();
265        hasher.update(&*hash);
266        hasher.update(b"scan");
267        hasher.update(partition.as_bytes());
268        *hash = hasher.finalize().to_vec();
269    }
270
271    fn len(&self, partition: &str, name: &[u8]) {
272        let mut hash = self.hash.lock().unwrap();
273        let mut hasher = Sha256::new();
274        hasher.update(&*hash);
275        hasher.update(b"len");
276        hasher.update(partition.as_bytes());
277        hasher.update(name);
278        *hash = hasher.finalize().to_vec();
279    }
280
281    fn read_at(&self, partition: &str, name: &[u8], buf: usize, offset: u64) {
282        let mut hash = self.hash.lock().unwrap();
283        let mut hasher = Sha256::new();
284        hasher.update(&*hash);
285        hasher.update(b"read_at");
286        hasher.update(partition.as_bytes());
287        hasher.update(name);
288        hasher.update(buf.to_be_bytes());
289        hasher.update(offset.to_be_bytes());
290        *hash = hasher.finalize().to_vec();
291    }
292
293    fn write_at(&self, partition: &str, name: &[u8], buf: &[u8], offset: u64) {
294        let mut hash = self.hash.lock().unwrap();
295        let mut hasher = Sha256::new();
296        hasher.update(&*hash);
297        hasher.update(b"write_at");
298        hasher.update(partition.as_bytes());
299        hasher.update(name);
300        hasher.update(buf);
301        hasher.update(offset.to_be_bytes());
302        *hash = hasher.finalize().to_vec();
303    }
304
305    fn truncate(&self, partition: &str, name: &[u8], size: u64) {
306        let mut hash = self.hash.lock().unwrap();
307        let mut hasher = Sha256::new();
308        hasher.update(&*hash);
309        hasher.update(b"truncate");
310        hasher.update(partition.as_bytes());
311        hasher.update(name);
312        hasher.update(size.to_be_bytes());
313        *hash = hasher.finalize().to_vec();
314    }
315
316    fn sync(&self, partition: &str, name: &[u8]) {
317        let mut hash = self.hash.lock().unwrap();
318        let mut hasher = Sha256::new();
319        hasher.update(&*hash);
320        hasher.update(b"sync");
321        hasher.update(partition.as_bytes());
322        hasher.update(name);
323        *hash = hasher.finalize().to_vec();
324    }
325
326    fn close(&self, partition: &str, name: &[u8]) {
327        let mut hash = self.hash.lock().unwrap();
328        let mut hasher = Sha256::new();
329        hasher.update(&*hash);
330        hasher.update(b"close");
331        hasher.update(partition.as_bytes());
332        hasher.update(name);
333        *hash = hasher.finalize().to_vec();
334    }
335
336    fn register(&self, name: &str, help: &str) {
337        let mut hash = self.hash.lock().unwrap();
338        let mut hasher = Sha256::new();
339        hasher.update(&*hash);
340        hasher.update(b"register");
341        hasher.update(name.as_bytes());
342        hasher.update(help.as_bytes());
343        *hash = hasher.finalize().to_vec();
344    }
345
346    fn encode(&self) {
347        let mut hash = self.hash.lock().unwrap();
348        let mut hasher = Sha256::new();
349        hasher.update(&*hash);
350        hasher.update(b"encode");
351        *hash = hasher.finalize().to_vec();
352    }
353
354    /// Generate a representation of the current state of the runtime.
355    ///
356    /// This can be used to ensure that logic running on top
357    /// of the runtime is interacting deterministically.
358    pub fn state(&self) -> String {
359        let hash = self.hash.lock().unwrap().clone();
360        hex(&hash)
361    }
362}
363
364struct Task {
365    id: u128,
366    label: String,
367
368    tasks: Arc<Tasks>,
369
370    root: bool,
371    future: Mutex<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
372
373    completed: Mutex<bool>,
374}
375
376impl ArcWake for Task {
377    fn wake_by_ref(arc_self: &Arc<Self>) {
378        arc_self.tasks.enqueue(arc_self.clone());
379    }
380}
381
382struct Tasks {
383    counter: Mutex<u128>,
384    queue: Mutex<Vec<Arc<Task>>>,
385}
386
387impl Tasks {
388    fn register(
389        arc_self: &Arc<Self>,
390        label: &str,
391        root: bool,
392        future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
393    ) {
394        let mut queue = arc_self.queue.lock().unwrap();
395        let id = {
396            let mut l = arc_self.counter.lock().unwrap();
397            let old = *l;
398            *l = l.checked_add(1).expect("task counter overflow");
399            old
400        };
401        queue.push(Arc::new(Task {
402            id,
403            label: label.to_string(),
404            root,
405            future: Mutex::new(future),
406            tasks: arc_self.clone(),
407            completed: Mutex::new(false),
408        }));
409    }
410
411    fn enqueue(&self, task: Arc<Task>) {
412        let mut queue = self.queue.lock().unwrap();
413        queue.push(task);
414    }
415
416    fn drain(&self) -> Vec<Arc<Task>> {
417        let mut queue = self.queue.lock().unwrap();
418        let len = queue.len();
419        replace(&mut *queue, Vec::with_capacity(len))
420    }
421
422    fn len(&self) -> usize {
423        self.queue.lock().unwrap().len()
424    }
425}
426
427/// Configuration for the `deterministic` runtime.
428#[derive(Clone)]
429pub struct Config {
430    /// Seed for the random number generator.
431    pub seed: u64,
432
433    /// The cycle duration determines how much time is advanced after each iteration of the event
434    /// loop. This is useful to prevent starvation if some task never yields.
435    pub cycle: Duration,
436
437    /// If the runtime is still executing at this point (i.e. a test hasn't stopped), panic.
438    pub timeout: Option<Duration>,
439}
440
441impl Default for Config {
442    fn default() -> Self {
443        Self {
444            seed: 42,
445            cycle: Duration::from_millis(1),
446            timeout: None,
447        }
448    }
449}
450
451/// Deterministic runtime that randomly selects tasks to run based on a seed.
452pub struct Executor {
453    registry: Mutex<Registry>,
454    cycle: Duration,
455    deadline: Option<SystemTime>,
456    metrics: Arc<Metrics>,
457    auditor: Arc<Auditor>,
458    rng: Mutex<StdRng>,
459    time: Mutex<SystemTime>,
460    tasks: Arc<Tasks>,
461    sleeping: Mutex<BinaryHeap<Alarm>>,
462    partitions: Mutex<HashMap<String, Partition>>,
463    signaler: Mutex<Signaler>,
464    signal: Signal,
465    finished: Mutex<bool>,
466    recovered: Mutex<bool>,
467}
468
469impl Executor {
470    /// Initialize a new `deterministic` runtime with the given seed and cycle duration.
471    pub fn init(cfg: Config) -> (Runner, Context, Arc<Auditor>) {
472        // Ensure config is valid
473        if cfg.timeout.is_some() && cfg.cycle == Duration::default() {
474            panic!("cycle duration must be non-zero when timeout is set");
475        }
476
477        // Create a new registry
478        let mut registry = Registry::default();
479        let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
480
481        // Initialize runtime
482        let metrics = Arc::new(Metrics::init(runtime_registry));
483        let auditor = Arc::new(Auditor::new());
484        let start_time = UNIX_EPOCH;
485        let deadline = cfg
486            .timeout
487            .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
488        let (signaler, signal) = Signaler::new();
489        let executor = Arc::new(Self {
490            registry: Mutex::new(registry),
491            cycle: cfg.cycle,
492            deadline,
493            metrics: metrics.clone(),
494            auditor: auditor.clone(),
495            rng: Mutex::new(StdRng::seed_from_u64(cfg.seed)),
496            time: Mutex::new(start_time),
497            tasks: Arc::new(Tasks {
498                queue: Mutex::new(Vec::new()),
499                counter: Mutex::new(0),
500            }),
501            sleeping: Mutex::new(BinaryHeap::new()),
502            partitions: Mutex::new(HashMap::new()),
503            signaler: Mutex::new(signaler),
504            signal,
505            finished: Mutex::new(false),
506            recovered: Mutex::new(false),
507        });
508        (
509            Runner {
510                executor: executor.clone(),
511            },
512            Context {
513                label: String::new(),
514                spawned: false,
515                executor,
516                networking: Arc::new(Networking::new(metrics, auditor.clone())),
517            },
518            auditor,
519        )
520    }
521
522    /// Initialize a new `deterministic` runtime with the default configuration
523    /// and the provided seed.
524    pub fn seeded(seed: u64) -> (Runner, Context, Arc<Auditor>) {
525        let cfg = Config {
526            seed,
527            ..Config::default()
528        };
529        Self::init(cfg)
530    }
531
532    /// Initialize a new `deterministic` runtime with the default configuration
533    /// but exit after the given timeout.
534    pub fn timed(timeout: Duration) -> (Runner, Context, Arc<Auditor>) {
535        let cfg = Config {
536            timeout: Some(timeout),
537            ..Config::default()
538        };
539        Self::init(cfg)
540    }
541
542    /// Initialize a new `deterministic` runtime with the default configuration.
543    // We'd love to implement the trait but we can't because of the return type.
544    #[allow(clippy::should_implement_trait)]
545    pub fn default() -> (Runner, Context, Arc<Auditor>) {
546        Self::init(Config::default())
547    }
548}
549
550/// Implementation of [`crate::Runner`] for the `deterministic` runtime.
551pub struct Runner {
552    executor: Arc<Executor>,
553}
554
555impl crate::Runner for Runner {
556    fn start<F>(self, f: F) -> F::Output
557    where
558        F: Future + Send + 'static,
559        F::Output: Send + 'static,
560    {
561        // Add root task to the queue
562        let output = Arc::new(Mutex::new(None));
563        Tasks::register(
564            &self.executor.tasks,
565            "",
566            true,
567            Box::pin({
568                let output = output.clone();
569                async move {
570                    *output.lock().unwrap() = Some(f.await);
571                }
572            }),
573        );
574
575        // Process tasks until root task completes or progress stalls
576        let mut iter = 0;
577        loop {
578            // Ensure we have not exceeded our deadline
579            {
580                let current = self.executor.time.lock().unwrap();
581                if let Some(deadline) = self.executor.deadline {
582                    if *current >= deadline {
583                        panic!("runtime timeout");
584                    }
585                }
586            }
587
588            // Snapshot available tasks
589            let mut tasks = self.executor.tasks.drain();
590
591            // Shuffle tasks
592            {
593                let mut rng = self.executor.rng.lock().unwrap();
594                tasks.shuffle(&mut *rng);
595            }
596
597            // Run all snapshotted tasks
598            //
599            // This approach is more efficient than randomly selecting a task one-at-a-time
600            // because it ensures we don't pull the same pending task multiple times in a row (without
601            // processing a different task required for other tasks to make progress).
602            trace!(iter, tasks = tasks.len(), "starting loop");
603            for task in tasks {
604                // Record task for auditing
605                self.executor.auditor.process_task(task.id, &task.label);
606
607                // Check if task is already complete
608                if *task.completed.lock().unwrap() {
609                    trace!(id = task.id, "skipping already completed task");
610                    continue;
611                }
612                trace!(id = task.id, "processing task");
613
614                // Prepare task for polling
615                let waker = waker_ref(&task);
616                let mut context = task::Context::from_waker(&waker);
617                let mut future = task.future.lock().unwrap();
618
619                // Record task poll
620                self.executor
621                    .metrics
622                    .task_polls
623                    .get_or_create(&Work {
624                        label: task.label.clone(),
625                    })
626                    .inc();
627
628                // Task is re-queued in its `wake_by_ref` implementation as soon as we poll here (regardless
629                // of whether it is Pending/Ready).
630                let pending = future.as_mut().poll(&mut context).is_pending();
631                if pending {
632                    trace!(id = task.id, "task is still pending");
633                    continue;
634                }
635
636                // Mark task as completed
637                *task.completed.lock().unwrap() = true;
638                trace!(id = task.id, "task is complete");
639
640                // Root task completed
641                if task.root {
642                    *self.executor.finished.lock().unwrap() = true;
643                    return output.lock().unwrap().take().unwrap();
644                }
645            }
646
647            // Advance time by cycle
648            //
649            // This approach prevents starvation if some task never yields (to approximate this,
650            // duration can be set to 1ns).
651            let mut current;
652            {
653                let mut time = self.executor.time.lock().unwrap();
654                *time = time
655                    .checked_add(self.executor.cycle)
656                    .expect("executor time overflowed");
657                current = *time;
658            }
659            trace!(now = current.epoch_millis(), "time advanced",);
660
661            // Skip time if there is nothing to do
662            if self.executor.tasks.len() == 0 {
663                let mut skip = None;
664                {
665                    let sleeping = self.executor.sleeping.lock().unwrap();
666                    if let Some(next) = sleeping.peek() {
667                        if next.time > current {
668                            skip = Some(next.time);
669                        }
670                    }
671                }
672                if skip.is_some() {
673                    {
674                        let mut time = self.executor.time.lock().unwrap();
675                        *time = skip.unwrap();
676                        current = *time;
677                    }
678                    trace!(now = current.epoch_millis(), "time skipped",);
679                }
680            }
681
682            // Wake all sleeping tasks that are ready
683            let mut to_wake = Vec::new();
684            let mut remaining;
685            {
686                let mut sleeping = self.executor.sleeping.lock().unwrap();
687                while let Some(next) = sleeping.peek() {
688                    if next.time <= current {
689                        let sleeper = sleeping.pop().unwrap();
690                        to_wake.push(sleeper.waker);
691                    } else {
692                        break;
693                    }
694                }
695                remaining = sleeping.len();
696            }
697            for waker in to_wake {
698                waker.wake();
699            }
700
701            // Account for remaining tasks
702            remaining += self.executor.tasks.len();
703
704            // If there are no tasks to run and no tasks sleeping, the executor is stalled
705            // and will never finish.
706            if remaining == 0 {
707                panic!("runtime stalled");
708            }
709            iter += 1;
710        }
711    }
712}
713
714/// Implementation of [`crate::Spawner`], [`crate::Clock`],
715/// [`crate::Network`], and [`crate::Storage`] for the `deterministic`
716/// runtime.
717pub struct Context {
718    label: String,
719    spawned: bool,
720    executor: Arc<Executor>,
721    networking: Arc<Networking>,
722}
723
724impl Context {
725    /// Recover the inner state (deadline, metrics, auditor, rng, synced storage, etc.) from the
726    /// current runtime and use it to initialize a new instance of the runtime. A recovered runtime
727    /// does not inherit the current runtime's pending tasks, unsynced storage, network connections, nor
728    /// its shutdown signaler.
729    ///
730    /// This is useful for performing a deterministic simulation that spans multiple runtime instantiations,
731    /// like simulating unclean shutdown (which involves repeatedly halting the runtime at unexpected intervals).
732    ///
733    /// It is only permitted to call this method after the runtime has finished (i.e. once `start` returns)
734    /// and only permitted to do once (otherwise multiple recovered runtimes will share the same inner state).
735    /// If either one of these conditions is violated, this method will panic.
736    pub fn recover(self) -> (Runner, Self, Arc<Auditor>) {
737        // Ensure we are finished
738        if !*self.executor.finished.lock().unwrap() {
739            panic!("execution is not finished");
740        }
741
742        // Ensure runtime has not already been recovered
743        {
744            let mut recovered = self.executor.recovered.lock().unwrap();
745            if *recovered {
746                panic!("runtime has already been recovered");
747            }
748            *recovered = true;
749        }
750
751        // Rebuild metrics
752        let mut registry = Registry::default();
753        let runtime_registry = registry.sub_registry_with_prefix(METRICS_PREFIX);
754        let metrics = Arc::new(Metrics::init(runtime_registry));
755
756        // Copy state
757        let auditor = self.executor.auditor.clone();
758        let (signaler, signal) = Signaler::new();
759        let executor = Arc::new(Executor {
760            // Copied from the current runtime
761            cycle: self.executor.cycle,
762            deadline: self.executor.deadline,
763            auditor: auditor.clone(),
764            rng: Mutex::new(self.executor.rng.lock().unwrap().clone()),
765            time: Mutex::new(*self.executor.time.lock().unwrap()),
766            partitions: Mutex::new(self.executor.partitions.lock().unwrap().clone()),
767
768            // New state for the new runtime
769            registry: Mutex::new(registry),
770            metrics: metrics.clone(),
771            tasks: Arc::new(Tasks {
772                queue: Mutex::new(Vec::new()),
773                counter: Mutex::new(0),
774            }),
775            sleeping: Mutex::new(BinaryHeap::new()),
776            signaler: Mutex::new(signaler),
777            signal,
778            finished: Mutex::new(false),
779            recovered: Mutex::new(false),
780        });
781        (
782            Runner {
783                executor: executor.clone(),
784            },
785            Self {
786                label: String::new(),
787                spawned: false,
788                executor,
789                networking: Arc::new(Networking::new(metrics, auditor.clone())),
790            },
791            auditor,
792        )
793    }
794}
795
796impl Clone for Context {
797    fn clone(&self) -> Self {
798        Self {
799            label: self.label.clone(),
800            spawned: false,
801            executor: self.executor.clone(),
802            networking: self.networking.clone(),
803        }
804    }
805}
806
807impl crate::Spawner for Context {
808    fn spawn<F, Fut, T>(self, f: F) -> Handle<T>
809    where
810        F: FnOnce(Self) -> Fut + Send + 'static,
811        Fut: Future<Output = T> + Send + 'static,
812        T: Send + 'static,
813    {
814        // Ensure a context only spawns one task
815        assert!(!self.spawned, "already spawned");
816
817        // Get metrics
818        let label = self.label.clone();
819        let work = Work {
820            label: label.clone(),
821        };
822        self.executor
823            .metrics
824            .tasks_spawned
825            .get_or_create(&work)
826            .inc();
827        let gauge = self
828            .executor
829            .metrics
830            .tasks_running
831            .get_or_create(&work)
832            .clone();
833
834        // Set up the task
835        let executor = self.executor.clone();
836        let future = f(self);
837        let (f, handle) = Handle::init(future, gauge, false);
838
839        // Spawn the task
840        Tasks::register(&executor.tasks, &label, false, Box::pin(f));
841        handle
842    }
843
844    fn spawn_ref<F, T>(&mut self) -> impl FnOnce(F) -> Handle<T> + 'static
845    where
846        F: Future<Output = T> + Send + 'static,
847        T: Send + 'static,
848    {
849        // Ensure a context only spawns one task
850        assert!(!self.spawned, "already spawned");
851        self.spawned = true;
852
853        // Get metrics
854        let work = Work {
855            label: self.label.clone(),
856        };
857        self.executor
858            .metrics
859            .tasks_spawned
860            .get_or_create(&work)
861            .inc();
862        let gauge = self
863            .executor
864            .metrics
865            .tasks_running
866            .get_or_create(&work)
867            .clone();
868
869        // Set up the task
870        let label = self.label.clone();
871        let executor = self.executor.clone();
872        move |f: F| {
873            let (f, handle) = Handle::init(f, gauge, false);
874
875            // Spawn the task
876            Tasks::register(&executor.tasks, &label, false, Box::pin(f));
877            handle
878        }
879    }
880
881    fn stop(&self, value: i32) {
882        self.executor.auditor.stop(value);
883        self.executor.signaler.lock().unwrap().signal(value);
884    }
885
886    fn stopped(&self) -> Signal {
887        self.executor.auditor.stopped();
888        self.executor.signal.clone()
889    }
890}
891
892impl crate::Metrics for Context {
893    fn with_label(&self, label: &str) -> Self {
894        let label = {
895            let prefix = self.label.clone();
896            if prefix.is_empty() {
897                label.to_string()
898            } else {
899                format!("{}_{}", prefix, label)
900            }
901        };
902        assert!(
903            !label.starts_with(METRICS_PREFIX),
904            "using runtime label is not allowed"
905        );
906        Self {
907            label,
908            spawned: false,
909            executor: self.executor.clone(),
910            networking: self.networking.clone(),
911        }
912    }
913
914    fn label(&self) -> String {
915        self.label.clone()
916    }
917
918    fn register<N: Into<String>, H: Into<String>>(&self, name: N, help: H, metric: impl Metric) {
919        // Prepare args
920        let name = name.into();
921        let help = help.into();
922
923        // Register metric
924        self.executor.auditor.register(&name, &help);
925        let prefixed_name = {
926            let prefix = &self.label;
927            if prefix.is_empty() {
928                name
929            } else {
930                format!("{}_{}", *prefix, name)
931            }
932        };
933        self.executor
934            .registry
935            .lock()
936            .unwrap()
937            .register(prefixed_name, help, metric)
938    }
939
940    fn encode(&self) -> String {
941        self.executor.auditor.encode();
942        let mut buffer = String::new();
943        encode(&mut buffer, &self.executor.registry.lock().unwrap()).expect("encoding failed");
944        buffer
945    }
946}
947
948struct Sleeper {
949    executor: Arc<Executor>,
950    time: SystemTime,
951    registered: bool,
952}
953
954struct Alarm {
955    time: SystemTime,
956    waker: Waker,
957}
958
959impl PartialEq for Alarm {
960    fn eq(&self, other: &Self) -> bool {
961        self.time.eq(&other.time)
962    }
963}
964
965impl Eq for Alarm {}
966
967impl PartialOrd for Alarm {
968    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
969        Some(self.cmp(other))
970    }
971}
972
973impl Ord for Alarm {
974    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
975        // Reverse the ordering for min-heap
976        other.time.cmp(&self.time)
977    }
978}
979
980impl Future for Sleeper {
981    type Output = ();
982
983    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
984        {
985            let current_time = *self.executor.time.lock().unwrap();
986            if current_time >= self.time {
987                return Poll::Ready(());
988            }
989        }
990        if !self.registered {
991            self.registered = true;
992            self.executor.sleeping.lock().unwrap().push(Alarm {
993                time: self.time,
994                waker: cx.waker().clone(),
995            });
996        }
997        Poll::Pending
998    }
999}
1000
1001impl Clock for Context {
1002    fn current(&self) -> SystemTime {
1003        *self.executor.time.lock().unwrap()
1004    }
1005
1006    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
1007        let time = self
1008            .current()
1009            .checked_add(duration)
1010            .expect("overflow when setting wake time");
1011        Sleeper {
1012            executor: self.executor.clone(),
1013
1014            time,
1015            registered: false,
1016        }
1017    }
1018
1019    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
1020        Sleeper {
1021            executor: self.executor.clone(),
1022
1023            time: deadline,
1024            registered: false,
1025        }
1026    }
1027}
1028
1029impl GClock for Context {
1030    type Instant = SystemTime;
1031
1032    fn now(&self) -> Self::Instant {
1033        self.current()
1034    }
1035}
1036
1037impl ReasonablyRealtime for Context {}
1038
1039type Dialable = mpsc::UnboundedSender<(
1040    SocketAddr,
1041    mocks::Sink,   // Dialee -> Dialer
1042    mocks::Stream, // Dialer -> Dialee
1043)>;
1044
1045/// Implementation of [`crate::Network`] for the `deterministic` runtime.
1046///
1047/// When a dialer connects to a dialee, the dialee is given a new ephemeral port
1048/// from the range `32768..61000`. To keep things simple, it is not possible to
1049/// bind to an ephemeral port. Likewise, if ports are not reused and when exhausted,
1050/// the runtime will panic.
1051struct Networking {
1052    metrics: Arc<Metrics>,
1053    auditor: Arc<Auditor>,
1054    ephemeral: Mutex<u16>,
1055    listeners: Mutex<HashMap<SocketAddr, Dialable>>,
1056}
1057
1058impl Networking {
1059    fn new(metrics: Arc<Metrics>, auditor: Arc<Auditor>) -> Self {
1060        Self {
1061            metrics,
1062            auditor,
1063            ephemeral: Mutex::new(EPHEMERAL_PORT_RANGE.start),
1064            listeners: Mutex::new(HashMap::new()),
1065        }
1066    }
1067
1068    fn bind(&self, socket: SocketAddr) -> Result<Listener, Error> {
1069        self.auditor.bind(socket);
1070
1071        // If the IP is localhost, ensure the port is not in the ephemeral range
1072        // so that it can be used for binding in the dial method
1073        if socket.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST)
1074            && EPHEMERAL_PORT_RANGE.contains(&socket.port())
1075        {
1076            return Err(Error::BindFailed);
1077        }
1078
1079        // Ensure the port is not already bound
1080        let mut listeners = self.listeners.lock().unwrap();
1081        if listeners.contains_key(&socket) {
1082            return Err(Error::BindFailed);
1083        }
1084
1085        // Bind the socket
1086        let (sender, receiver) = mpsc::unbounded();
1087        listeners.insert(socket, sender);
1088        Ok(Listener {
1089            auditor: self.auditor.clone(),
1090            address: socket,
1091            listener: receiver,
1092            metrics: self.metrics.clone(),
1093        })
1094    }
1095
1096    async fn dial(&self, socket: SocketAddr) -> Result<(Sink, Stream), Error> {
1097        // Assign dialer a port from the ephemeral range
1098        let dialer = {
1099            let mut ephemeral = self.ephemeral.lock().unwrap();
1100            let dialer = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), *ephemeral);
1101            *ephemeral = ephemeral
1102                .checked_add(1)
1103                .expect("ephemeral port range exhausted");
1104            dialer
1105        };
1106        self.auditor.dial(dialer, socket);
1107
1108        // Get dialee
1109        let mut sender = {
1110            let listeners = self.listeners.lock().unwrap();
1111            let sender = listeners.get(&socket).ok_or(Error::ConnectionFailed)?;
1112            sender.clone()
1113        };
1114
1115        // Construct connection
1116        let (dialer_sender, dialer_receiver) = mocks::Channel::init();
1117        let (dialee_sender, dialee_receiver) = mocks::Channel::init();
1118        sender
1119            .send((dialer, dialer_sender, dialee_receiver))
1120            .await
1121            .map_err(|_| Error::ConnectionFailed)?;
1122        Ok((
1123            Sink {
1124                metrics: self.metrics.clone(),
1125                auditor: self.auditor.clone(),
1126                me: dialer,
1127                peer: socket,
1128                sender: dialee_sender,
1129            },
1130            Stream {
1131                auditor: self.auditor.clone(),
1132                me: dialer,
1133                peer: socket,
1134                receiver: dialer_receiver,
1135            },
1136        ))
1137    }
1138}
1139
1140impl crate::Network<Listener, Sink, Stream> for Context {
1141    async fn bind(&self, socket: SocketAddr) -> Result<Listener, Error> {
1142        self.networking.bind(socket)
1143    }
1144
1145    async fn dial(&self, socket: SocketAddr) -> Result<(Sink, Stream), Error> {
1146        self.networking.dial(socket).await
1147    }
1148}
1149
1150/// Implementation of [`crate::Listener`] for the `deterministic` runtime.
1151pub struct Listener {
1152    metrics: Arc<Metrics>,
1153    auditor: Arc<Auditor>,
1154    address: SocketAddr,
1155    listener: mpsc::UnboundedReceiver<(SocketAddr, mocks::Sink, mocks::Stream)>,
1156}
1157
1158impl crate::Listener<Sink, Stream> for Listener {
1159    async fn accept(&mut self) -> Result<(SocketAddr, Sink, Stream), Error> {
1160        let (socket, sender, receiver) = self.listener.next().await.ok_or(Error::ReadFailed)?;
1161        self.auditor.accept(self.address, socket);
1162        Ok((
1163            socket,
1164            Sink {
1165                metrics: self.metrics.clone(),
1166                auditor: self.auditor.clone(),
1167                me: self.address,
1168                peer: socket,
1169                sender,
1170            },
1171            Stream {
1172                auditor: self.auditor.clone(),
1173                me: self.address,
1174                peer: socket,
1175                receiver,
1176            },
1177        ))
1178    }
1179}
1180
1181/// Implementation of [`crate::Sink`] for the `deterministic` runtime.
1182pub struct Sink {
1183    metrics: Arc<Metrics>,
1184    auditor: Arc<Auditor>,
1185    me: SocketAddr,
1186    peer: SocketAddr,
1187    sender: mocks::Sink,
1188}
1189
1190impl crate::Sink for Sink {
1191    async fn send(&mut self, msg: &[u8]) -> Result<(), Error> {
1192        self.auditor.send(self.me, self.peer, msg);
1193        self.sender.send(msg).await.map_err(|_| Error::SendFailed)?;
1194        self.metrics.network_bandwidth.inc_by(msg.len() as u64);
1195        Ok(())
1196    }
1197}
1198
1199/// Implementation of [`crate::Stream`] for the `deterministic` runtime.
1200pub struct Stream {
1201    auditor: Arc<Auditor>,
1202    me: SocketAddr,
1203    peer: SocketAddr,
1204    receiver: mocks::Stream,
1205}
1206
1207impl crate::Stream for Stream {
1208    async fn recv(&mut self, buf: &mut [u8]) -> Result<(), Error> {
1209        self.receiver
1210            .recv(buf)
1211            .await
1212            .map_err(|_| Error::RecvFailed)?;
1213        self.auditor.recv(self.me, self.peer, buf);
1214        Ok(())
1215    }
1216}
1217
1218impl RngCore for Context {
1219    fn next_u32(&mut self) -> u32 {
1220        self.executor.auditor.rand("next_u32".to_string());
1221        self.executor.rng.lock().unwrap().next_u32()
1222    }
1223
1224    fn next_u64(&mut self) -> u64 {
1225        self.executor.auditor.rand("next_u64".to_string());
1226        self.executor.rng.lock().unwrap().next_u64()
1227    }
1228
1229    fn fill_bytes(&mut self, dest: &mut [u8]) {
1230        self.executor.auditor.rand("fill_bytes".to_string());
1231        self.executor.rng.lock().unwrap().fill_bytes(dest)
1232    }
1233
1234    fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1235        self.executor.auditor.rand("try_fill_bytes".to_string());
1236        self.executor.rng.lock().unwrap().try_fill_bytes(dest)
1237    }
1238}
1239
1240impl CryptoRng for Context {}
1241
1242/// Implementation of [`crate::Blob`] for the `deterministic` runtime.
1243pub struct Blob {
1244    executor: Arc<Executor>,
1245
1246    partition: String,
1247    name: Vec<u8>,
1248
1249    // For content to be updated for future opens,
1250    // it must be synced back to the partition (occurs on
1251    // `sync` and `close`).
1252    content: Arc<RwLock<Vec<u8>>>,
1253}
1254
1255impl Blob {
1256    fn new(executor: Arc<Executor>, partition: String, name: &[u8], content: Vec<u8>) -> Self {
1257        executor.metrics.open_blobs.inc();
1258        Self {
1259            executor,
1260            partition,
1261            name: name.into(),
1262            content: Arc::new(RwLock::new(content)),
1263        }
1264    }
1265}
1266
1267impl Clone for Blob {
1268    fn clone(&self) -> Self {
1269        // We implement `Clone` manually to ensure the `open_blobs` gauge is updated.
1270        self.executor.metrics.open_blobs.inc();
1271        Self {
1272            executor: self.executor.clone(),
1273            partition: self.partition.clone(),
1274            name: self.name.clone(),
1275            content: self.content.clone(),
1276        }
1277    }
1278}
1279
1280impl crate::Storage<Blob> for Context {
1281    async fn open(&self, partition: &str, name: &[u8]) -> Result<Blob, Error> {
1282        self.executor.auditor.open(partition, name);
1283        let mut partitions = self.executor.partitions.lock().unwrap();
1284        let partition_entry = partitions.entry(partition.into()).or_default();
1285        let content = partition_entry.entry(name.into()).or_default();
1286        Ok(Blob::new(
1287            self.executor.clone(),
1288            partition.into(),
1289            name,
1290            content.clone(),
1291        ))
1292    }
1293
1294    async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1295        self.executor.auditor.remove(partition, name);
1296        let mut partitions = self.executor.partitions.lock().unwrap();
1297        match name {
1298            Some(name) => {
1299                partitions
1300                    .get_mut(partition)
1301                    .ok_or(Error::PartitionMissing(partition.into()))?
1302                    .remove(name)
1303                    .ok_or(Error::BlobMissing(partition.into(), hex(name)))?;
1304            }
1305            None => {
1306                partitions
1307                    .remove(partition)
1308                    .ok_or(Error::PartitionMissing(partition.into()))?;
1309            }
1310        }
1311        Ok(())
1312    }
1313
1314    async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1315        self.executor.auditor.scan(partition);
1316        let partitions = self.executor.partitions.lock().unwrap();
1317        let partition = partitions
1318            .get(partition)
1319            .ok_or(Error::PartitionMissing(partition.into()))?;
1320        let mut results = Vec::with_capacity(partition.len());
1321        for name in partition.keys() {
1322            results.push(name.clone());
1323        }
1324        results.sort(); // deterministic output
1325        Ok(results)
1326    }
1327}
1328
1329impl crate::Blob for Blob {
1330    async fn len(&self) -> Result<u64, Error> {
1331        self.executor.auditor.len(&self.partition, &self.name);
1332        let content = self.content.read().unwrap();
1333        Ok(content.len() as u64)
1334    }
1335
1336    async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
1337        let buf_len = buf.len();
1338        self.executor
1339            .auditor
1340            .read_at(&self.partition, &self.name, buf_len, offset);
1341        let offset = offset.try_into().map_err(|_| Error::OffsetOverflow)?;
1342        let content = self.content.read().unwrap();
1343        let content_len = content.len();
1344        if offset + buf_len > content_len {
1345            return Err(Error::BlobInsufficientLength);
1346        }
1347        buf.copy_from_slice(&content[offset..offset + buf_len]);
1348        self.executor.metrics.storage_reads.inc();
1349        self.executor
1350            .metrics
1351            .storage_read_bandwidth
1352            .inc_by(buf_len as u64);
1353        Ok(())
1354    }
1355
1356    async fn write_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
1357        self.executor
1358            .auditor
1359            .write_at(&self.partition, &self.name, buf, offset);
1360        let offset = offset.try_into().map_err(|_| Error::OffsetOverflow)?;
1361        let mut content = self.content.write().unwrap();
1362        let required = offset + buf.len();
1363        if required > content.len() {
1364            content.resize(required, 0);
1365        }
1366        content[offset..offset + buf.len()].copy_from_slice(buf);
1367        self.executor.metrics.storage_writes.inc();
1368        self.executor
1369            .metrics
1370            .storage_write_bandwidth
1371            .inc_by(buf.len() as u64);
1372        Ok(())
1373    }
1374
1375    async fn truncate(&self, len: u64) -> Result<(), Error> {
1376        self.executor
1377            .auditor
1378            .truncate(&self.partition, &self.name, len);
1379        let len = len.try_into().map_err(|_| Error::OffsetOverflow)?;
1380        let mut content = self.content.write().unwrap();
1381        content.truncate(len);
1382        Ok(())
1383    }
1384
1385    async fn sync(&self) -> Result<(), Error> {
1386        // Create new content for partition
1387        //
1388        // Doing this first means we don't need to hold both the content
1389        // lock and the partition lock at the same time.
1390        let new_content = self.content.read().unwrap().clone();
1391
1392        // Update partition content
1393        self.executor.auditor.sync(&self.partition, &self.name);
1394        let mut partitions = self.executor.partitions.lock().unwrap();
1395        let partition = partitions
1396            .get_mut(&self.partition)
1397            .ok_or(Error::PartitionMissing(self.partition.clone()))?;
1398        let content = partition
1399            .get_mut(&self.name)
1400            .ok_or(Error::BlobMissing(self.partition.clone(), hex(&self.name)))?;
1401        *content = new_content;
1402        Ok(())
1403    }
1404
1405    async fn close(self) -> Result<(), Error> {
1406        self.executor.auditor.close(&self.partition, &self.name);
1407        self.sync().await?;
1408        Ok(())
1409    }
1410}
1411
1412impl Drop for Blob {
1413    fn drop(&mut self) {
1414        self.executor.metrics.open_blobs.dec();
1415    }
1416}
1417
1418#[cfg(test)]
1419mod tests {
1420    use super::*;
1421    use crate::{utils::run_tasks, Blob, Runner, Storage};
1422    use commonware_macros::test_traced;
1423    use futures::task::noop_waker;
1424
1425    fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1426        let (executor, context, auditor) = Executor::seeded(seed);
1427        let messages = run_tasks(5, executor, context);
1428        (auditor.state(), messages)
1429    }
1430
1431    #[test]
1432    fn test_same_seed_same_order() {
1433        // Generate initial outputs
1434        let mut outputs = Vec::new();
1435        for seed in 0..1000 {
1436            let output = run_with_seed(seed);
1437            outputs.push(output);
1438        }
1439
1440        // Ensure they match
1441        for seed in 0..1000 {
1442            let output = run_with_seed(seed);
1443            assert_eq!(output, outputs[seed as usize]);
1444        }
1445    }
1446
1447    #[test_traced("TRACE")]
1448    fn test_different_seeds_different_order() {
1449        let output1 = run_with_seed(12345);
1450        let output2 = run_with_seed(54321);
1451        assert_ne!(output1, output2);
1452    }
1453
1454    #[test]
1455    fn test_alarm_min_heap() {
1456        // Populate heap
1457        let now = SystemTime::now();
1458        let alarms = vec![
1459            Alarm {
1460                time: now + Duration::new(10, 0),
1461                waker: noop_waker(),
1462            },
1463            Alarm {
1464                time: now + Duration::new(5, 0),
1465                waker: noop_waker(),
1466            },
1467            Alarm {
1468                time: now + Duration::new(15, 0),
1469                waker: noop_waker(),
1470            },
1471            Alarm {
1472                time: now + Duration::new(5, 0),
1473                waker: noop_waker(),
1474            },
1475        ];
1476        let mut heap = BinaryHeap::new();
1477        for alarm in alarms {
1478            heap.push(alarm);
1479        }
1480
1481        // Verify min-heap
1482        let mut sorted_times = Vec::new();
1483        while let Some(alarm) = heap.pop() {
1484            sorted_times.push(alarm.time);
1485        }
1486        assert_eq!(
1487            sorted_times,
1488            vec![
1489                now + Duration::new(5, 0),
1490                now + Duration::new(5, 0),
1491                now + Duration::new(10, 0),
1492                now + Duration::new(15, 0),
1493            ]
1494        );
1495    }
1496
1497    #[test]
1498    #[should_panic(expected = "runtime timeout")]
1499    fn test_timeout() {
1500        let (executor, context, _) = Executor::timed(Duration::from_secs(10));
1501        executor.start(async move {
1502            loop {
1503                context.sleep(Duration::from_secs(1)).await;
1504            }
1505        });
1506    }
1507
1508    #[test]
1509    #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1510    fn test_bad_timeout() {
1511        let cfg = Config {
1512            timeout: Some(Duration::default()),
1513            cycle: Duration::default(),
1514            ..Config::default()
1515        };
1516        Executor::init(cfg);
1517    }
1518
1519    #[test]
1520    fn test_recover_synced_storage_persists() {
1521        // Initialize the first runtime
1522        let (executor1, context1, auditor1) = Executor::default();
1523        let partition = "test_partition";
1524        let name = b"test_blob";
1525        let data = b"Hello, world!".to_vec();
1526
1527        // Run some tasks and sync storage
1528        executor1.start({
1529            let context = context1.clone();
1530            let data = data.clone();
1531            async move {
1532                let blob = context.open(partition, name).await.unwrap();
1533                blob.write_at(&data, 0).await.unwrap();
1534                blob.sync().await.unwrap();
1535            }
1536        });
1537        let state1 = auditor1.state();
1538
1539        // Recover the runtime
1540        let (executor2, context2, auditor2) = context1.recover();
1541
1542        // Verify auditor state is the same
1543        let state2 = auditor2.state();
1544        assert_eq!(state1, state2);
1545
1546        // Check that synced storage persists after recovery
1547        executor2.start(async move {
1548            let blob = context2.open(partition, name).await.unwrap();
1549            let len = blob.len().await.unwrap();
1550            assert_eq!(len, data.len() as u64);
1551            let mut buf = vec![0; len as usize];
1552            blob.read_at(&mut buf, 0).await.unwrap();
1553            assert_eq!(buf, data);
1554        });
1555    }
1556
1557    #[test]
1558    fn test_recover_unsynced_storage_does_not_persist() {
1559        // Initialize the first runtime
1560        let (executor1, context1, _) = Executor::default();
1561        let partition = "test_partition";
1562        let name = b"test_blob";
1563        let data = b"Hello, world!".to_vec();
1564
1565        // Run some tasks without syncing storage
1566        executor1.start({
1567            let context = context1.clone();
1568            async move {
1569                let blob = context.open(partition, name).await.unwrap();
1570                blob.write_at(&data, 0).await.unwrap();
1571                // Intentionally do not call sync() here
1572            }
1573        });
1574
1575        // Recover the runtime
1576        let (executor2, context2, _) = context1.recover();
1577
1578        // Check that unsynced storage does not persist after recovery
1579        executor2.start(async move {
1580            let blob = context2.open(partition, name).await.unwrap();
1581            let len = blob.len().await.unwrap();
1582            assert_eq!(len, 0);
1583        });
1584    }
1585
1586    #[test]
1587    #[should_panic(expected = "execution is not finished")]
1588    fn test_recover_before_finish_panics() {
1589        // Initialize runtime
1590        let (_, context, _) = Executor::default();
1591
1592        // Attempt to recover before the runtime has finished
1593        context.recover();
1594    }
1595
1596    #[test]
1597    #[should_panic(expected = "runtime has already been recovered")]
1598    fn test_recover_twice_panics() {
1599        // Initialize runtime
1600        let (executor, context, _) = Executor::default();
1601
1602        // Finish runtime
1603        executor.start(async move {});
1604
1605        // Recover for the first time
1606        let cloned_context = context.clone();
1607        context.recover();
1608
1609        // Attempt to recover again using the same context
1610        cloned_context.recover();
1611    }
1612}