Skip to main content

commonware_runtime/
deterministic.rs

1//! A deterministic runtime that randomly selects tasks to run based on a seed
2//!
3//! # Panics
4//!
5//! Unless configured otherwise, any task panic will lead to a runtime panic.
6//!
7//! # External Processes
8//!
9//! When testing an application that interacts with some external process, it can appear to
10//! the runtime that progress has stalled because no pending tasks can make progress and/or
11//! that futures resolve at variable latency (which in turn triggers non-deterministic execution).
12//!
13//! To support such applications, the runtime can be built with the `external` feature to both
14//! sleep for each [Config::cycle] (opting to wait if all futures are pending) and to constrain
15//! the resolution latency of any future (with `pace()`).
16//!
17//! **Applications that do not interact with external processes (or are able to mock them) should never
18//! need to enable this feature. It is commonly used when testing consensus with external execution environments
19//! that use their own runtime (but are deterministic over some set of inputs).**
20//!
21//! # Metrics
22//!
23//! This runtime enforces metrics are unique and well-formed:
24//! - Labels must start with `[a-zA-Z]` and contain only `[a-zA-Z0-9_]`
25//! - Re-registering the same metric key reuses the existing metric handle when the type matches
26//!
27//! # Example
28//!
29//! ```rust
30//! use commonware_runtime::{Spawner, Runner, deterministic, Metrics, Supervisor};
31//!
32//! let executor =  deterministic::Runner::default();
33//! executor.start(|context| async move {
34//!     println!("Parent started");
35//!     let result = context.child("child").spawn(|_| async move {
36//!         println!("Child started");
37//!         "hello"
38//!     });
39//!     println!("Child result: {:?}", result.await);
40//!     println!("Parent exited");
41//!     println!("Auditor state: {}", context.auditor().state());
42//! });
43//! ```
44
45pub use crate::storage::faulty::Config as FaultConfig;
46use crate::{
47    child_label,
48    network::{
49        audited::Network as AuditedNetwork, deterministic::Network as DeterministicNetwork,
50        metered::Network as MeteredNetwork,
51    },
52    prefixed_name,
53    storage::{
54        audited::Storage as AuditedStorage, faulty::Storage as FaultyStorage,
55        memory::Storage as MemStorage, metered::Storage as MeteredStorage,
56    },
57    telemetry::metrics::{
58        add_attribute, raw, task::Label, validate_label, Counter, CounterFamily, GaugeFamily,
59        Metric, Register, Registered, Registry,
60    },
61    utils::{
62        signal::{Signal, Stopper},
63        supervision::Tree,
64        Panicker,
65    },
66    BufferPool, BufferPoolConfig, Clock, Error, Execution, Handle, ListenerOf, Name, Panicked,
67    Spawner as _, Supervisor as _, METRICS_PREFIX,
68};
69#[cfg(feature = "external")]
70use crate::{Blocker, Pacer};
71use commonware_codec::Encode;
72use commonware_formatting::hex;
73use commonware_macros::select;
74use commonware_parallel::ThreadPool;
75use commonware_utils::{
76    sync::{Mutex, RwLock},
77    time::SYSTEM_TIME_PRECISION,
78    SystemTimeExt,
79};
80#[cfg(feature = "external")]
81use futures::task::noop_waker;
82use futures::{
83    future::Either,
84    task::{waker, ArcWake},
85    Future,
86};
87use governor::clock::{Clock as GClock, ReasonablyRealtime};
88#[cfg(feature = "external")]
89use pin_project::pin_project;
90use rand::{prelude::SliceRandom, rngs::StdRng, CryptoRng, RngCore, SeedableRng};
91use rand_core::CryptoRngCore;
92use rayon::{ThreadPoolBuildError, ThreadPoolBuilder};
93use sha2::{Digest as _, Sha256};
94use std::{
95    collections::{BTreeMap, BinaryHeap, HashMap},
96    mem::{replace, take},
97    net::{IpAddr, SocketAddr},
98    num::NonZeroUsize,
99    panic::{catch_unwind, resume_unwind, AssertUnwindSafe},
100    pin::Pin,
101    sync::{Arc, Weak},
102    task::{self, Poll, Waker},
103    time::{Duration, SystemTime, UNIX_EPOCH},
104};
105use tracing::{info_span, trace, Instrument};
106use tracing_opentelemetry::OpenTelemetrySpanExt;
107
108#[derive(Debug)]
109struct Metrics {
110    iterations: Counter,
111    tasks_spawned: CounterFamily<Label>,
112    tasks_running: GaugeFamily<Label>,
113    task_polls: CounterFamily<Label>,
114}
115
116impl Metrics {
117    pub fn init(registry: &mut impl Register) -> Self {
118        Self {
119            iterations: registry.register(
120                "iterations",
121                "Total number of iterations",
122                raw::Counter::default(),
123            ),
124            tasks_spawned: registry.register(
125                "tasks_spawned",
126                "Total number of tasks spawned",
127                raw::Family::default(),
128            ),
129            tasks_running: registry.register(
130                "tasks_running",
131                "Number of tasks currently running",
132                raw::Family::default(),
133            ),
134            task_polls: registry.register(
135                "task_polls",
136                "Total number of task polls",
137                raw::Family::default(),
138            ),
139        }
140    }
141}
142
143/// A SHA-256 digest.
144type Digest = [u8; 32];
145
146/// Track the state of the runtime for determinism auditing.
147pub struct Auditor {
148    digest: Mutex<Digest>,
149}
150
151impl Default for Auditor {
152    fn default() -> Self {
153        Self {
154            digest: Digest::default().into(),
155        }
156    }
157}
158
159impl Auditor {
160    /// Record that an event happened.
161    /// This auditor's hash will be updated with the event's `label` and
162    /// whatever other data is passed in the `payload` closure.
163    pub(crate) fn event<F>(&self, label: &'static [u8], payload: F)
164    where
165        F: FnOnce(&mut Sha256),
166    {
167        let mut digest = self.digest.lock();
168
169        let mut hasher = Sha256::new();
170        hasher.update(digest.as_ref());
171        hasher.update(label);
172        payload(&mut hasher);
173
174        *digest = hasher.finalize().into();
175    }
176
177    /// Generate a representation of the current state of the runtime.
178    ///
179    /// This can be used to ensure that logic running on top
180    /// of the runtime is interacting deterministically.
181    pub fn state(&self) -> String {
182        let hash = self.digest.lock();
183        hex(hash.as_ref())
184    }
185}
186
187/// A dynamic RNG that can safely be sent between threads.
188pub type BoxDynRng = Box<dyn CryptoRngCore + Send + 'static>;
189
190/// Configuration for the `deterministic` runtime.
191pub struct Config {
192    /// Random number generator.
193    rng: BoxDynRng,
194
195    /// The cycle duration determines how much time is advanced after each iteration of the event
196    /// loop. This is useful to prevent starvation if some task never yields.
197    cycle: Duration,
198
199    /// Time the runtime starts at.
200    start_time: SystemTime,
201
202    /// If the runtime is still executing at this point (i.e. a test hasn't stopped), panic.
203    timeout: Option<Duration>,
204
205    /// Whether spawned tasks should catch panics instead of propagating them.
206    catch_panics: bool,
207
208    /// Configuration for deterministic storage fault injection.
209    /// Defaults to no faults being injected.
210    storage_fault_cfg: FaultConfig,
211
212    /// Buffer pool configuration for network I/O.
213    network_buffer_pool_cfg: BufferPoolConfig,
214
215    /// Buffer pool configuration for storage I/O.
216    storage_buffer_pool_cfg: BufferPoolConfig,
217}
218
219impl Config {
220    /// Returns a new [Config] with default values.
221    pub fn new() -> Self {
222        cfg_if::cfg_if! {
223            if #[cfg(miri)] {
224                // Reduce max_per_class to avoid slow atomics under Miri
225                let network_buffer_pool_cfg = BufferPoolConfig::for_network()
226                    .with_max_per_class(commonware_utils::NZU32!(32))
227                    .with_thread_cache_disabled();
228                let storage_buffer_pool_cfg = BufferPoolConfig::for_storage()
229                    .with_max_per_class(commonware_utils::NZU32!(32))
230                    .with_thread_cache_disabled();
231            } else {
232                let network_buffer_pool_cfg =
233                    BufferPoolConfig::for_network().with_thread_cache_disabled();
234                let storage_buffer_pool_cfg =
235                    BufferPoolConfig::for_storage().with_thread_cache_disabled();
236            }
237        }
238
239        Self {
240            rng: Box::new(StdRng::seed_from_u64(42)),
241            cycle: Duration::from_millis(1),
242            start_time: UNIX_EPOCH,
243            timeout: None,
244            catch_panics: false,
245            storage_fault_cfg: FaultConfig::default(),
246            network_buffer_pool_cfg,
247            storage_buffer_pool_cfg,
248        }
249    }
250
251    // Setters
252    /// See [Config]
253    pub fn with_seed(self, seed: u64) -> Self {
254        self.with_rng(Box::new(StdRng::seed_from_u64(seed)))
255    }
256
257    /// Provide the config with a dynamic RNG directly.
258    ///
259    /// This can be useful for, e.g. fuzzing, where beyond just having randomness,
260    /// you might want to control specific bytes of the RNG. By taking in a dynamic
261    /// RNG object, any behavior is possible.
262    pub fn with_rng(mut self, rng: BoxDynRng) -> Self {
263        self.rng = rng;
264        self
265    }
266
267    /// See [Config]
268    pub const fn with_cycle(mut self, cycle: Duration) -> Self {
269        self.cycle = cycle;
270        self
271    }
272    /// See [Config]
273    pub const fn with_start_time(mut self, start_time: SystemTime) -> Self {
274        self.start_time = start_time;
275        self
276    }
277    /// See [Config]
278    pub const fn with_timeout(mut self, timeout: Option<Duration>) -> Self {
279        self.timeout = timeout;
280        self
281    }
282    /// See [Config]
283    pub const fn with_catch_panics(mut self, catch_panics: bool) -> Self {
284        self.catch_panics = catch_panics;
285        self
286    }
287    /// See [Config]
288    pub const fn with_network_buffer_pool_config(mut self, cfg: BufferPoolConfig) -> Self {
289        self.network_buffer_pool_cfg = cfg;
290        self
291    }
292    /// See [Config]
293    pub const fn with_storage_buffer_pool_config(mut self, cfg: BufferPoolConfig) -> Self {
294        self.storage_buffer_pool_cfg = cfg;
295        self
296    }
297
298    /// Configure storage fault injection.
299    ///
300    /// When set, the runtime will inject deterministic storage errors based on
301    /// the provided configuration. Faults are drawn from the shared RNG, ensuring
302    /// reproducible failure patterns for a given seed.
303    pub const fn with_storage_fault_config(mut self, faults: FaultConfig) -> Self {
304        self.storage_fault_cfg = faults;
305        self
306    }
307
308    // Getters
309    /// See [Config]
310    pub const fn cycle(&self) -> Duration {
311        self.cycle
312    }
313    /// See [Config]
314    pub const fn start_time(&self) -> SystemTime {
315        self.start_time
316    }
317    /// See [Config]
318    pub const fn timeout(&self) -> Option<Duration> {
319        self.timeout
320    }
321    /// See [Config]
322    pub const fn catch_panics(&self) -> bool {
323        self.catch_panics
324    }
325    /// See [Config]
326    pub const fn network_buffer_pool_config(&self) -> &BufferPoolConfig {
327        &self.network_buffer_pool_cfg
328    }
329    /// See [Config]
330    pub const fn storage_buffer_pool_config(&self) -> &BufferPoolConfig {
331        &self.storage_buffer_pool_cfg
332    }
333
334    /// Assert that the configuration is valid.
335    pub fn assert(&self) {
336        assert!(
337            self.cycle != Duration::default() || self.timeout.is_none(),
338            "cycle duration must be non-zero when timeout is set",
339        );
340        assert!(
341            self.cycle >= SYSTEM_TIME_PRECISION,
342            "cycle duration must be greater than or equal to system time precision"
343        );
344        assert!(
345            self.start_time >= UNIX_EPOCH,
346            "start time must be greater than or equal to unix epoch"
347        );
348    }
349}
350
351impl Default for Config {
352    fn default() -> Self {
353        Self::new()
354    }
355}
356
357/// Deterministic runtime that randomly selects tasks to run based on a seed.
358pub struct Executor {
359    registry: Registry,
360    cycle: Duration,
361    deadline: Option<SystemTime>,
362    metrics: Arc<Metrics>,
363    auditor: Arc<Auditor>,
364    rng: Arc<Mutex<BoxDynRng>>,
365    time: Mutex<SystemTime>,
366    tasks: Arc<Tasks>,
367    sleeping: Mutex<BinaryHeap<Alarm>>,
368    shutdown: Mutex<Stopper>,
369    panicker: Panicker,
370    dns: Mutex<HashMap<String, Vec<IpAddr>>>,
371}
372
373impl Executor {
374    /// Advance simulated time by [Config::cycle].
375    ///
376    /// When built with the `external` feature, sleep for [Config::cycle] to let
377    /// external processes make progress.
378    fn advance_time(&self) -> SystemTime {
379        #[cfg(feature = "external")]
380        std::thread::sleep(self.cycle);
381
382        let mut time = self.time.lock();
383        *time = time
384            .checked_add(self.cycle)
385            .expect("executor time overflowed");
386        let now = *time;
387        trace!(now = now.epoch_millis(), "time advanced");
388        now
389    }
390
391    /// When idle, jump directly to the next actionable time.
392    ///
393    /// When built with the `external` feature, never skip ahead (to ensure we poll all pending tasks
394    /// every [Config::cycle]).
395    fn skip_idle_time(&self, current: SystemTime) -> SystemTime {
396        if cfg!(feature = "external") || self.tasks.ready() != 0 {
397            return current;
398        }
399
400        let mut skip_until = None;
401        {
402            let sleeping = self.sleeping.lock();
403            if let Some(next) = sleeping.peek() {
404                if next.time > current {
405                    skip_until = Some(next.time);
406                }
407            }
408        }
409
410        skip_until.map_or(current, |deadline| {
411            let mut time = self.time.lock();
412            *time = deadline;
413            let now = *time;
414            trace!(now = now.epoch_millis(), "time skipped");
415            now
416        })
417    }
418
419    /// Wake any sleepers whose deadlines have elapsed.
420    fn wake_ready_sleepers(&self, current: SystemTime) {
421        let mut sleeping = self.sleeping.lock();
422        while let Some(next) = sleeping.peek() {
423            if next.time <= current {
424                let sleeper = sleeping.pop().unwrap();
425                sleeper.waker.wake();
426            } else {
427                break;
428            }
429        }
430    }
431
432    /// Ensure the runtime is making progress.
433    ///
434    /// When built with the `external` feature, always poll pending tasks after the passage of time.
435    fn assert_liveness(&self) {
436        if cfg!(feature = "external") || self.tasks.ready() != 0 {
437            return;
438        }
439
440        panic!("runtime stalled");
441    }
442}
443
444/// An artifact that can be used to recover the state of the runtime.
445///
446/// This is useful when mocking unclean shutdown (while retaining deterministic behavior).
447pub struct Checkpoint {
448    cycle: Duration,
449    deadline: Option<SystemTime>,
450    auditor: Arc<Auditor>,
451    rng: Arc<Mutex<BoxDynRng>>,
452    time: Mutex<SystemTime>,
453    storage: Arc<Storage>,
454    dns: Mutex<HashMap<String, Vec<IpAddr>>>,
455    catch_panics: bool,
456    network_buffer_pool_cfg: BufferPoolConfig,
457    storage_buffer_pool_cfg: BufferPoolConfig,
458}
459
460impl Checkpoint {
461    /// Get a reference to the [Auditor].
462    pub fn auditor(&self) -> Arc<Auditor> {
463        self.auditor.clone()
464    }
465}
466
467#[allow(clippy::large_enum_variant)]
468enum State {
469    Config(Config),
470    Checkpoint(Checkpoint),
471}
472
473/// Implementation of [crate::Runner] for the `deterministic` runtime.
474pub struct Runner {
475    state: State,
476}
477
478impl From<Config> for Runner {
479    fn from(cfg: Config) -> Self {
480        Self::new(cfg)
481    }
482}
483
484impl From<Checkpoint> for Runner {
485    fn from(checkpoint: Checkpoint) -> Self {
486        Self {
487            state: State::Checkpoint(checkpoint),
488        }
489    }
490}
491
492impl Runner {
493    /// Initialize a new `deterministic` runtime with the given seed and cycle duration.
494    pub fn new(cfg: Config) -> Self {
495        // Ensure config is valid
496        cfg.assert();
497        Self {
498            state: State::Config(cfg),
499        }
500    }
501
502    /// Initialize a new `deterministic` runtime with the default configuration
503    /// and the provided seed.
504    pub fn seeded(seed: u64) -> Self {
505        Self::new(Config::default().with_seed(seed))
506    }
507
508    /// Initialize a new `deterministic` runtime with the default configuration
509    /// but exit after the given timeout.
510    pub fn timed(timeout: Duration) -> Self {
511        let cfg = Config {
512            timeout: Some(timeout),
513            ..Config::default()
514        };
515        Self::new(cfg)
516    }
517
518    /// Like [crate::Runner::start], but also returns a [Checkpoint] that can be used
519    /// to recover the state of the runtime in a subsequent run.
520    pub fn start_and_recover<F, Fut>(self, f: F) -> (Fut::Output, Checkpoint)
521    where
522        F: FnOnce(Context) -> Fut,
523        Fut: Future,
524    {
525        // Setup context and return strong reference to executor
526        let (context, executor, panicked) = match self.state {
527            State::Config(config) => Context::new(config),
528            State::Checkpoint(checkpoint) => Context::recover(checkpoint),
529        };
530
531        // Pin root task to the heap
532        let storage = context.storage.clone();
533        let network_buffer_pool_cfg = context.network_buffer_pool.config().clone();
534        let storage_buffer_pool_cfg = context.storage_buffer_pool.config().clone();
535        let mut root = Box::pin(panicked.interrupt(f(context)));
536
537        // Register the root task
538        Tasks::register_root(&executor.tasks);
539
540        // Process tasks until root task completes or progress stalls.
541        // Wrap the loop in catch_unwind to ensure task cleanup runs even if the loop or a task panics.
542        let result = catch_unwind(AssertUnwindSafe(|| loop {
543            // Ensure we have not exceeded our deadline
544            {
545                let current = executor.time.lock();
546                if let Some(deadline) = executor.deadline {
547                    if *current >= deadline {
548                        drop(current);
549                        panic!("runtime timeout");
550                    }
551                }
552            }
553
554            // Drain all ready tasks
555            let mut queue = executor.tasks.drain();
556
557            // Shuffle tasks (if more than one)
558            if queue.len() > 1 {
559                let mut rng = executor.rng.lock();
560                queue.shuffle(&mut *rng);
561            }
562
563            // Run all snapshotted tasks
564            //
565            // This approach is more efficient than randomly selecting a task one-at-a-time
566            // because it ensures we don't pull the same pending task multiple times in a row (without
567            // processing a different task required for other tasks to make progress).
568            trace!(
569                iter = executor.metrics.iterations.get(),
570                tasks = queue.len(),
571                "starting loop"
572            );
573            let mut output = None;
574            for id in queue {
575                // Lookup the task (it may have completed already)
576                let Some(task) = executor.tasks.get(id) else {
577                    trace!(id, "skipping missing task");
578                    continue;
579                };
580
581                // Record task for auditing
582                executor.auditor.event(b"process_task", |hasher| {
583                    hasher.update(task.id.to_be_bytes());
584                    hasher.update(task.label.name().as_bytes());
585                });
586                executor.metrics.task_polls.get_or_create(&task.label).inc();
587                trace!(id, "processing task");
588
589                // Prepare task for polling
590                let waker = waker(Arc::new(TaskWaker {
591                    id,
592                    tasks: Arc::downgrade(&executor.tasks),
593                }));
594                let mut cx = task::Context::from_waker(&waker);
595
596                // Poll the task
597                match &task.mode {
598                    Mode::Root => {
599                        // Poll the root task
600                        if let Poll::Ready(result) = root.as_mut().poll(&mut cx) {
601                            trace!(id, "root task is complete");
602                            output = Some(result);
603                            break;
604                        }
605                    }
606                    Mode::Work(future) => {
607                        // Get the future (if it still exists)
608                        let mut fut_opt = future.lock();
609                        let Some(fut) = fut_opt.as_mut() else {
610                            trace!(id, "skipping already complete task");
611
612                            // Remove the future
613                            executor.tasks.remove(id);
614                            continue;
615                        };
616
617                        // Poll the task
618                        if fut.as_mut().poll(&mut cx).is_ready() {
619                            trace!(id, "task is complete");
620
621                            // Remove the future
622                            executor.tasks.remove(id);
623                            *fut_opt = None;
624                            continue;
625                        }
626                    }
627                }
628
629                // Try again later if task is still pending
630                trace!(id, "task is still pending");
631            }
632
633            // If the root task has completed, exit as soon as possible
634            if let Some(output) = output {
635                break output;
636            }
637
638            // Advance time (skipping ahead if no tasks are ready yet)
639            let mut current = executor.advance_time();
640            current = executor.skip_idle_time(current);
641
642            // Wake sleepers and ensure we continue to make progress
643            executor.wake_ready_sleepers(current);
644            executor.assert_liveness();
645
646            // Record that we completed another iteration of the event loop.
647            executor.metrics.iterations.inc();
648        }));
649
650        // Clear remaining tasks from the executor.
651        //
652        // It is critical that we wait to drop the strong
653        // reference to executor until after we have dropped
654        // all tasks (as they may attempt to upgrade their weak
655        // reference to the executor during drop).
656        executor.sleeping.lock().clear(); // included in tasks
657        let tasks = executor.tasks.clear();
658        for task in tasks {
659            let Mode::Work(future) = &task.mode else {
660                continue;
661            };
662            *future.lock() = None;
663        }
664
665        // Drop the root task to release any Context references it may still hold.
666        // This is necessary when the loop exits early (e.g., timeout) while the
667        // root future is still Pending and holds captured variables with Context references.
668        drop(root);
669
670        // Assert the context doesn't escape the start() function (behavior
671        // is undefined in this case)
672        assert!(
673            Arc::weak_count(&executor) == 0,
674            "executor still has weak references"
675        );
676
677        // Handle the result — resume the original panic after cleanup if one was caught.
678        let output = match result {
679            Ok(output) => output,
680            Err(payload) => resume_unwind(payload),
681        };
682
683        // Extract the executor from the Arc
684        let executor = Arc::into_inner(executor).expect("executor still has strong references");
685
686        // Construct a checkpoint that can be used to restart the runtime
687        let checkpoint = Checkpoint {
688            cycle: executor.cycle,
689            deadline: executor.deadline,
690            auditor: executor.auditor,
691            rng: executor.rng,
692            time: executor.time,
693            storage,
694            dns: executor.dns,
695            catch_panics: executor.panicker.catch(),
696            network_buffer_pool_cfg,
697            storage_buffer_pool_cfg,
698        };
699
700        (output, checkpoint)
701    }
702}
703
704impl Default for Runner {
705    fn default() -> Self {
706        Self::new(Config::default())
707    }
708}
709
710impl crate::Runner for Runner {
711    type Context = Context;
712
713    fn start<F, Fut>(self, f: F) -> Fut::Output
714    where
715        F: FnOnce(Self::Context) -> Fut,
716        Fut: Future,
717    {
718        let (output, _) = self.start_and_recover(f);
719        output
720    }
721}
722
723/// The mode of a [Task].
724enum Mode {
725    Root,
726    Work(Mutex<Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>>),
727}
728
729/// A future being executed by the [Executor].
730struct Task {
731    id: u128,
732    label: Label,
733
734    mode: Mode,
735}
736
737/// A waker for a [Task].
738struct TaskWaker {
739    id: u128,
740
741    tasks: Weak<Tasks>,
742}
743
744impl ArcWake for TaskWaker {
745    fn wake_by_ref(arc_self: &Arc<Self>) {
746        // Upgrade the weak reference to re-enqueue this task.
747        // If upgrade fails, the task queue has been dropped and no action is required.
748        //
749        // This can happen if some data is passed into the runtime and it drops after the runtime exits.
750        if let Some(tasks) = arc_self.tasks.upgrade() {
751            tasks.queue(arc_self.id);
752        }
753    }
754}
755
756/// A collection of [Task]s that are being executed by the [Executor].
757struct Tasks {
758    /// The next task id.
759    counter: Mutex<u128>,
760    /// Tasks ready to be polled.
761    ready: Mutex<Vec<u128>>,
762    /// All running tasks.
763    running: Mutex<BTreeMap<u128, Arc<Task>>>,
764}
765
766impl Tasks {
767    /// Create a new task queue.
768    const fn new() -> Self {
769        Self {
770            counter: Mutex::new(0),
771            ready: Mutex::new(Vec::new()),
772            running: Mutex::new(BTreeMap::new()),
773        }
774    }
775
776    /// Increment the task counter and return the old value.
777    fn increment(&self) -> u128 {
778        let mut counter = self.counter.lock();
779        let old = *counter;
780        *counter = counter.checked_add(1).expect("task counter overflow");
781        old
782    }
783
784    /// Register the root task.
785    ///
786    /// If the root task has already been registered, this function will panic.
787    fn register_root(arc_self: &Arc<Self>) {
788        let id = arc_self.increment();
789        let task = Arc::new(Task {
790            id,
791            label: Label::root(),
792            mode: Mode::Root,
793        });
794        arc_self.register(id, task);
795    }
796
797    /// Register a non-root task to be executed.
798    fn register_work(
799        arc_self: &Arc<Self>,
800        label: Label,
801        future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
802    ) {
803        let id = arc_self.increment();
804        let task = Arc::new(Task {
805            id,
806            label,
807            mode: Mode::Work(Mutex::new(Some(future))),
808        });
809        arc_self.register(id, task);
810    }
811
812    /// Register a new task to be executed.
813    fn register(&self, id: u128, task: Arc<Task>) {
814        // Track as running until completion
815        self.running.lock().insert(id, task);
816
817        // Add to ready
818        self.queue(id);
819    }
820
821    /// Enqueue an already registered task to be executed.
822    fn queue(&self, id: u128) {
823        let mut ready = self.ready.lock();
824        ready.push(id);
825    }
826
827    /// Drain all ready tasks.
828    fn drain(&self) -> Vec<u128> {
829        let mut queue = self.ready.lock();
830        let len = queue.len();
831        replace(&mut *queue, Vec::with_capacity(len))
832    }
833
834    /// The number of ready tasks.
835    fn ready(&self) -> usize {
836        self.ready.lock().len()
837    }
838
839    /// Lookup a task.
840    ///
841    /// We must return cloned here because we cannot hold the running lock while polling a task (will
842    /// deadlock if [Self::register_work] is called).
843    fn get(&self, id: u128) -> Option<Arc<Task>> {
844        let running = self.running.lock();
845        running.get(&id).cloned()
846    }
847
848    /// Remove a task.
849    fn remove(&self, id: u128) {
850        self.running.lock().remove(&id);
851    }
852
853    /// Clear all tasks.
854    fn clear(&self) -> Vec<Arc<Task>> {
855        // Clear ready
856        self.ready.lock().clear();
857
858        // Clear running tasks
859        let running: BTreeMap<u128, Arc<Task>> = {
860            let mut running = self.running.lock();
861            take(&mut *running)
862        };
863        running.into_values().collect()
864    }
865}
866
867type Network = MeteredNetwork<AuditedNetwork<DeterministicNetwork>>;
868type Storage = MeteredStorage<AuditedStorage<FaultyStorage<MemStorage>>>;
869
870/// Implementation of [crate::Spawner], [crate::Clock],
871/// [crate::Network], and [crate::Storage] for the `deterministic`
872/// runtime.
873pub struct Context {
874    name: String,
875    attributes: Vec<(String, String)>,
876    executor: Weak<Executor>,
877    network: Arc<Network>,
878    storage: Arc<Storage>,
879    network_buffer_pool: BufferPool,
880    storage_buffer_pool: BufferPool,
881    tree: Arc<Tree>,
882    execution: Execution,
883    traced: bool,
884}
885
886impl Context {
887    fn new(cfg: Config) -> (Self, Arc<Executor>, Panicked) {
888        // Create a new registry
889        let mut registry = Registry::new();
890        let mut runtime_registry = registry.sub_registry(METRICS_PREFIX);
891
892        // Initialize runtime
893        let metrics = Arc::new(Metrics::init(&mut runtime_registry));
894        let start_time = cfg.start_time;
895        let deadline = cfg
896            .timeout
897            .map(|timeout| start_time.checked_add(timeout).expect("timeout overflowed"));
898        let auditor = Arc::new(Auditor::default());
899
900        // Create shared RNG (used by both executor and storage)
901        let rng = Arc::new(Mutex::new(cfg.rng));
902
903        // Initialize buffer pools
904        let network_buffer_pool = BufferPool::new(
905            cfg.network_buffer_pool_cfg.clone(),
906            &mut runtime_registry.sub_registry("network_buffer_pool"),
907        );
908        let storage_buffer_pool = BufferPool::new(
909            cfg.storage_buffer_pool_cfg.clone(),
910            &mut runtime_registry.sub_registry("storage_buffer_pool"),
911        );
912
913        // Create storage fault config (default to disabled if None)
914        let storage_fault_config = Arc::new(RwLock::new(cfg.storage_fault_cfg));
915        let storage = MeteredStorage::new(
916            AuditedStorage::new(
917                FaultyStorage::new(
918                    MemStorage::new(storage_buffer_pool.clone()),
919                    rng.clone(),
920                    storage_fault_config,
921                ),
922                auditor.clone(),
923            ),
924            &mut runtime_registry,
925        );
926
927        // Create network
928        let network = AuditedNetwork::new(DeterministicNetwork::default(), auditor.clone());
929        let network = MeteredNetwork::new(network, &mut runtime_registry);
930
931        // Initialize panicker
932        let (panicker, panicked) = Panicker::new(cfg.catch_panics);
933
934        let executor = Arc::new(Executor {
935            registry,
936            cycle: cfg.cycle,
937            deadline,
938            metrics,
939            auditor,
940            rng,
941            time: Mutex::new(start_time),
942            tasks: Arc::new(Tasks::new()),
943            sleeping: Mutex::new(BinaryHeap::new()),
944            shutdown: Mutex::new(Stopper::default()),
945            panicker,
946            dns: Mutex::new(HashMap::new()),
947        });
948
949        (
950            Self {
951                name: String::new(),
952                attributes: Vec::new(),
953                executor: Arc::downgrade(&executor),
954                network: Arc::new(network),
955                storage: Arc::new(storage),
956                network_buffer_pool,
957                storage_buffer_pool,
958                tree: Tree::root(),
959                execution: Execution::default(),
960                traced: false,
961            },
962            executor,
963            panicked,
964        )
965    }
966
967    /// Recover the inner state (deadline, metrics, auditor, rng, synced storage, etc.) from the
968    /// current runtime and use it to initialize a new instance of the runtime. A recovered runtime
969    /// does not inherit the current runtime's pending tasks, unsynced storage, network connections, nor
970    /// its shutdown signaler.
971    ///
972    /// This is useful for performing a deterministic simulation that spans multiple runtime instantiations,
973    /// like simulating unclean shutdown (which involves repeatedly halting the runtime at unexpected intervals).
974    ///
975    /// It is only permitted to call this method after the runtime has finished (i.e. once `start` returns)
976    /// and only permitted to do once (otherwise multiple recovered runtimes will share the same inner state).
977    /// If either one of these conditions is violated, this method will panic.
978    fn recover(checkpoint: Checkpoint) -> (Self, Arc<Executor>, Panicked) {
979        // Rebuild metrics
980        let mut registry = Registry::new();
981        let mut runtime_registry = registry.sub_registry(METRICS_PREFIX);
982        let metrics = Arc::new(Metrics::init(&mut runtime_registry));
983
984        // Copy state
985        let network =
986            AuditedNetwork::new(DeterministicNetwork::default(), checkpoint.auditor.clone());
987        let network = MeteredNetwork::new(network, &mut runtime_registry);
988
989        // Initialize buffer pools
990        let network_buffer_pool = BufferPool::new(
991            checkpoint.network_buffer_pool_cfg.clone(),
992            &mut runtime_registry.sub_registry("network_buffer_pool"),
993        );
994        let storage_buffer_pool = BufferPool::new(
995            checkpoint.storage_buffer_pool_cfg.clone(),
996            &mut runtime_registry.sub_registry("storage_buffer_pool"),
997        );
998
999        // Initialize panicker
1000        let (panicker, panicked) = Panicker::new(checkpoint.catch_panics);
1001
1002        let executor = Arc::new(Executor {
1003            // Copied from the checkpoint
1004            cycle: checkpoint.cycle,
1005            deadline: checkpoint.deadline,
1006            auditor: checkpoint.auditor,
1007            rng: checkpoint.rng,
1008            time: checkpoint.time,
1009            dns: checkpoint.dns,
1010
1011            // New state for the new runtime
1012            registry,
1013            metrics,
1014            tasks: Arc::new(Tasks::new()),
1015            sleeping: Mutex::new(BinaryHeap::new()),
1016            shutdown: Mutex::new(Stopper::default()),
1017            panicker,
1018        });
1019        (
1020            Self {
1021                name: String::new(),
1022                attributes: Vec::new(),
1023                executor: Arc::downgrade(&executor),
1024                network: Arc::new(network),
1025                storage: checkpoint.storage,
1026                network_buffer_pool,
1027                storage_buffer_pool,
1028                tree: Tree::root(),
1029                execution: Execution::default(),
1030                traced: false,
1031            },
1032            executor,
1033            panicked,
1034        )
1035    }
1036
1037    /// Upgrade Weak reference to [Executor].
1038    fn executor(&self) -> Arc<Executor> {
1039        self.executor.upgrade().expect("executor already dropped")
1040    }
1041
1042    /// Get a reference to [Metrics].
1043    fn metrics(&self) -> Arc<Metrics> {
1044        self.executor().metrics.clone()
1045    }
1046
1047    /// Get a reference to the [Auditor].
1048    pub fn auditor(&self) -> Arc<Auditor> {
1049        self.executor().auditor.clone()
1050    }
1051
1052    /// Compute a [Sha256] digest of all storage contents.
1053    pub fn storage_audit(&self) -> Digest {
1054        self.storage.inner().inner().inner().audit()
1055    }
1056
1057    /// Access the storage fault configuration.
1058    ///
1059    /// Changes to the returned [`FaultConfig`] take effect immediately for
1060    /// subsequent storage operations. This allows dynamically enabling or
1061    /// disabling fault injection during a test.
1062    pub fn storage_fault_config(&self) -> Arc<RwLock<FaultConfig>> {
1063        self.storage.inner().inner().config()
1064    }
1065
1066    /// Register a DNS mapping for a hostname.
1067    ///
1068    /// If `addrs` is `None`, the mapping is removed.
1069    /// If `addrs` is `Some`, the mapping is added or updated.
1070    pub fn resolver_register(&self, host: impl Into<String>, addrs: Option<Vec<IpAddr>>) {
1071        // Update the auditor
1072        let executor = self.executor();
1073        let host = host.into();
1074        executor.auditor.event(b"resolver_register", |hasher| {
1075            hasher.update(host.as_bytes());
1076            hasher.update(addrs.encode());
1077        });
1078
1079        // Update the DNS mapping
1080        let mut dns = executor.dns.lock();
1081        match addrs {
1082            Some(addrs) => {
1083                dns.insert(host, addrs);
1084            }
1085            None => {
1086                dns.remove(&host);
1087            }
1088        }
1089    }
1090}
1091
1092impl crate::Spawner for Context {
1093    fn dedicated(mut self) -> Self {
1094        self.execution = Execution::Dedicated;
1095        self
1096    }
1097
1098    fn shared(mut self, blocking: bool) -> Self {
1099        self.execution = Execution::Shared(blocking);
1100        self
1101    }
1102
1103    fn spawn<F, Fut, T>(mut self, f: F) -> Handle<T>
1104    where
1105        F: FnOnce(Self) -> Fut + Send + 'static,
1106        Fut: Future<Output = T> + Send + 'static,
1107        T: Send + 'static,
1108    {
1109        // Get metrics
1110        let (label, metric) = spawn_metrics!(self);
1111
1112        // Track supervision before resetting configuration
1113        let parent = Arc::clone(&self.tree);
1114        let traced = self.traced;
1115        self.execution = Execution::default();
1116        self.traced = false;
1117        let (child, aborted) = Tree::child(&parent);
1118        if aborted {
1119            return Handle::closed(metric);
1120        }
1121        self.tree = child;
1122
1123        // Spawn the task (we don't care about Model)
1124        let executor = self.executor();
1125        let future = if traced {
1126            let span = info_span!(parent: None, "task", name = %label.name());
1127            for (key, value) in &self.attributes {
1128                span.set_attribute(key.clone(), value.clone());
1129            }
1130            Either::Left(f(self).instrument(span))
1131        } else {
1132            Either::Right(f(self))
1133        };
1134        let (f, handle) = Handle::init(
1135            future,
1136            metric,
1137            executor.panicker.clone(),
1138            Arc::clone(&parent),
1139        );
1140        Tasks::register_work(&executor.tasks, label, Box::pin(f));
1141
1142        // Register the task on the parent
1143        if let Some(aborter) = handle.aborter() {
1144            parent.register(aborter);
1145        }
1146
1147        handle
1148    }
1149
1150    async fn stop(self, value: i32, timeout: Option<Duration>) -> Result<(), Error> {
1151        let executor = self.executor();
1152        executor.auditor.event(b"stop", |hasher| {
1153            hasher.update(value.to_be_bytes());
1154        });
1155        let stop_resolved = {
1156            let mut shutdown = executor.shutdown.lock();
1157            shutdown.stop(value)
1158        };
1159
1160        // Wait for all tasks to complete or the timeout to fire
1161        let timeout_future = timeout.map_or_else(
1162            || futures::future::Either::Right(futures::future::pending()),
1163            |duration| futures::future::Either::Left(self.sleep(duration)),
1164        );
1165        select! {
1166            result = stop_resolved => {
1167                result.map_err(|_| Error::Closed)?;
1168                Ok(())
1169            },
1170            _ = timeout_future => Err(Error::Timeout),
1171        }
1172    }
1173
1174    fn stopped(&self) -> Signal {
1175        let executor = self.executor();
1176        executor.auditor.event(b"stopped", |_| {});
1177        let stopped = executor.shutdown.lock().stopped();
1178        stopped
1179    }
1180}
1181
1182impl crate::ThreadPooler for Context {
1183    fn create_thread_pool(
1184        &self,
1185        concurrency: NonZeroUsize,
1186    ) -> Result<ThreadPool, ThreadPoolBuildError> {
1187        let mut builder = ThreadPoolBuilder::new().num_threads(concurrency.get());
1188
1189        if rayon::current_thread_index().is_none() {
1190            builder = builder.use_current_thread()
1191        }
1192
1193        builder
1194            .spawn_handler(move |thread| {
1195                self.child("rayon_thread")
1196                    .dedicated()
1197                    .spawn(move |_| async move { thread.run() });
1198                Ok(())
1199            })
1200            .build()
1201            .map(Arc::new)
1202    }
1203}
1204
1205impl crate::Supervisor for Context {
1206    fn child(&self, label: &'static str) -> Self {
1207        let (tree, _) = Tree::child(&self.tree);
1208        Self {
1209            name: child_label(&self.name, label),
1210            attributes: self.attributes.clone(),
1211            executor: self.executor.clone(),
1212            network: self.network.clone(),
1213            storage: self.storage.clone(),
1214            network_buffer_pool: self.network_buffer_pool.clone(),
1215            storage_buffer_pool: self.storage_buffer_pool.clone(),
1216            tree,
1217            execution: Execution::default(),
1218            traced: false,
1219        }
1220    }
1221
1222    fn with_attribute(mut self, key: &'static str, value: impl std::fmt::Display) -> Self {
1223        // Validate label format (must match [a-zA-Z][a-zA-Z0-9_]*)
1224        validate_label(key);
1225
1226        // Add the attribute to the list of attributes
1227        add_attribute(&mut self.attributes, key, value);
1228        self
1229    }
1230
1231    fn name(&self) -> Name {
1232        Name {
1233            label: self.name.clone(),
1234            attributes: self.attributes.clone(),
1235        }
1236    }
1237}
1238
1239impl crate::Tracing for Context {
1240    fn with_span(mut self) -> Self {
1241        self.traced = true;
1242        self
1243    }
1244}
1245
1246impl crate::Metrics for Context {
1247    fn register<N: Into<String>, H: Into<String>, M: Metric>(
1248        &self,
1249        name: N,
1250        help: H,
1251        metric: M,
1252    ) -> Registered<M> {
1253        let name = name.into();
1254        let help = help.into();
1255        let executor = self.executor();
1256        executor.auditor.event(b"register", |hasher| {
1257            hasher.update(name.as_bytes());
1258            hasher.update(help.as_bytes());
1259            for (k, v) in &self.attributes {
1260                hasher.update(k.as_bytes());
1261                hasher.update(v.as_bytes());
1262            }
1263        });
1264        let metric = Arc::new(metric);
1265        executor.registry.register(
1266            prefixed_name(&self.name, &name),
1267            help,
1268            self.attributes.clone(),
1269            metric,
1270        )
1271    }
1272
1273    fn encode(&self) -> String {
1274        let executor = self.executor();
1275        executor.auditor.event(b"encode", |_| {});
1276        executor.registry.encode()
1277    }
1278}
1279
1280struct Sleeper {
1281    executor: Weak<Executor>,
1282    time: SystemTime,
1283    registered: bool,
1284}
1285
1286impl Sleeper {
1287    /// Upgrade Weak reference to [Executor].
1288    fn executor(&self) -> Arc<Executor> {
1289        self.executor.upgrade().expect("executor already dropped")
1290    }
1291}
1292
1293struct Alarm {
1294    time: SystemTime,
1295    waker: Waker,
1296}
1297
1298impl PartialEq for Alarm {
1299    fn eq(&self, other: &Self) -> bool {
1300        self.time.eq(&other.time)
1301    }
1302}
1303
1304impl Eq for Alarm {}
1305
1306impl PartialOrd for Alarm {
1307    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1308        Some(self.cmp(other))
1309    }
1310}
1311
1312impl Ord for Alarm {
1313    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1314        // Reverse the ordering for min-heap
1315        other.time.cmp(&self.time)
1316    }
1317}
1318
1319impl Future for Sleeper {
1320    type Output = ();
1321
1322    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1323        let executor = self.executor();
1324        {
1325            let current_time = *executor.time.lock();
1326            if current_time >= self.time {
1327                return Poll::Ready(());
1328            }
1329        }
1330        if !self.registered {
1331            self.registered = true;
1332            executor.sleeping.lock().push(Alarm {
1333                time: self.time,
1334                waker: cx.waker().clone(),
1335            });
1336        }
1337        Poll::Pending
1338    }
1339}
1340
1341impl Clock for Context {
1342    fn current(&self) -> SystemTime {
1343        *self.executor().time.lock()
1344    }
1345
1346    fn sleep(&self, duration: Duration) -> impl Future<Output = ()> + Send + 'static {
1347        let deadline = self
1348            .current()
1349            .checked_add(duration)
1350            .expect("overflow when setting wake time");
1351        self.sleep_until(deadline)
1352    }
1353
1354    fn sleep_until(&self, deadline: SystemTime) -> impl Future<Output = ()> + Send + 'static {
1355        Sleeper {
1356            executor: self.executor.clone(),
1357
1358            time: deadline,
1359            registered: false,
1360        }
1361    }
1362}
1363
1364/// A future that resolves when a given target time is reached.
1365///
1366/// If the future is not ready at the target time, the future is blocked until the target time is reached.
1367#[cfg(feature = "external")]
1368#[pin_project]
1369struct Waiter<F: Future> {
1370    executor: Weak<Executor>,
1371    target: SystemTime,
1372    #[pin]
1373    future: F,
1374    ready: Option<F::Output>,
1375    started: bool,
1376    registered: bool,
1377}
1378
1379#[cfg(feature = "external")]
1380impl<F> Future for Waiter<F>
1381where
1382    F: Future + Send,
1383{
1384    type Output = F::Output;
1385
1386    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
1387        let mut this = self.project();
1388
1389        // Poll once with a noop waker so the future can register interest or start work
1390        // without being able to wake this task before the sampled delay expires. Any ready
1391        // value is cached and only released after the clock reaches `self.target`.
1392        if !*this.started {
1393            *this.started = true;
1394            let waker = noop_waker();
1395            let mut cx_noop = task::Context::from_waker(&waker);
1396            if let Poll::Ready(value) = this.future.as_mut().poll(&mut cx_noop) {
1397                *this.ready = Some(value);
1398            }
1399        }
1400
1401        // Only allow the task to progress once the sampled delay has elapsed.
1402        let executor = this.executor.upgrade().expect("executor already dropped");
1403        let current_time = *executor.time.lock();
1404        if current_time < *this.target {
1405            // Register exactly once with the deterministic sleeper queue so the executor
1406            // wakes us once the clock reaches the scheduled target time.
1407            if !*this.registered {
1408                *this.registered = true;
1409                executor.sleeping.lock().push(Alarm {
1410                    time: *this.target,
1411                    waker: cx.waker().clone(),
1412                });
1413            }
1414            return Poll::Pending;
1415        }
1416
1417        // If the underlying future completed during the noop pre-poll, surface the cached value.
1418        if let Some(value) = this.ready.take() {
1419            return Poll::Ready(value);
1420        }
1421
1422        // Block the current thread until the future reschedules itself, keeping polling
1423        // deterministic with respect to executor time.
1424        let blocker = Blocker::new();
1425        loop {
1426            let waker = waker(blocker.clone());
1427            let mut cx_block = task::Context::from_waker(&waker);
1428            match this.future.as_mut().poll(&mut cx_block) {
1429                Poll::Ready(value) => {
1430                    break Poll::Ready(value);
1431                }
1432                Poll::Pending => blocker.wait(),
1433            }
1434        }
1435    }
1436}
1437
1438#[cfg(feature = "external")]
1439impl Pacer for Context {
1440    fn pace<'a, F, T>(&'a self, latency: Duration, future: F) -> impl Future<Output = T> + Send + 'a
1441    where
1442        F: Future<Output = T> + Send + 'a,
1443        T: Send + 'a,
1444    {
1445        // Compute target time
1446        let target = self
1447            .executor()
1448            .time
1449            .lock()
1450            .checked_add(latency)
1451            .expect("overflow when setting wake time");
1452
1453        Waiter {
1454            executor: self.executor.clone(),
1455            target,
1456            future,
1457            ready: None,
1458            started: false,
1459            registered: false,
1460        }
1461    }
1462}
1463
1464impl GClock for Context {
1465    type Instant = SystemTime;
1466
1467    fn now(&self) -> Self::Instant {
1468        self.current()
1469    }
1470}
1471
1472impl ReasonablyRealtime for Context {}
1473
1474impl crate::Network for Context {
1475    type Listener = ListenerOf<Network>;
1476
1477    async fn bind(&self, socket: SocketAddr) -> Result<Self::Listener, Error> {
1478        self.network.bind(socket).await
1479    }
1480
1481    async fn dial(
1482        &self,
1483        socket: SocketAddr,
1484    ) -> Result<(crate::SinkOf<Self>, crate::StreamOf<Self>), Error> {
1485        self.network.dial(socket).await
1486    }
1487}
1488
1489impl crate::Resolver for Context {
1490    async fn resolve(&self, host: &str) -> Result<Vec<IpAddr>, Error> {
1491        // Get the record
1492        let executor = self.executor();
1493        let dns = executor.dns.lock();
1494        let result = dns.get(host).cloned();
1495        drop(dns);
1496
1497        // Update the auditor
1498        executor.auditor.event(b"resolve", |hasher| {
1499            hasher.update(host.as_bytes());
1500            hasher.update(result.encode());
1501        });
1502        result.ok_or_else(|| Error::ResolveFailed(host.to_string()))
1503    }
1504}
1505
1506impl RngCore for Context {
1507    fn next_u32(&mut self) -> u32 {
1508        let executor = self.executor();
1509        executor.auditor.event(b"rand", |hasher| {
1510            hasher.update(b"next_u32");
1511        });
1512        let result = executor.rng.lock().next_u32();
1513        result
1514    }
1515
1516    fn next_u64(&mut self) -> u64 {
1517        let executor = self.executor();
1518        executor.auditor.event(b"rand", |hasher| {
1519            hasher.update(b"next_u64");
1520        });
1521        let result = executor.rng.lock().next_u64();
1522        result
1523    }
1524
1525    fn fill_bytes(&mut self, dest: &mut [u8]) {
1526        let executor = self.executor();
1527        executor.auditor.event(b"rand", |hasher| {
1528            hasher.update(b"fill_bytes");
1529        });
1530        executor.rng.lock().fill_bytes(dest);
1531    }
1532
1533    fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), rand::Error> {
1534        let executor = self.executor();
1535        executor.auditor.event(b"rand", |hasher| {
1536            hasher.update(b"try_fill_bytes");
1537        });
1538        let result = executor.rng.lock().try_fill_bytes(dest);
1539        result
1540    }
1541}
1542
1543impl CryptoRng for Context {}
1544
1545impl crate::Storage for Context {
1546    type Blob = <Storage as crate::Storage>::Blob;
1547
1548    async fn open_versioned(
1549        &self,
1550        partition: &str,
1551        name: &[u8],
1552        versions: std::ops::RangeInclusive<u16>,
1553    ) -> Result<(Self::Blob, u64, u16), Error> {
1554        self.storage.open_versioned(partition, name, versions).await
1555    }
1556
1557    async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
1558        self.storage.remove(partition, name).await
1559    }
1560
1561    async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
1562        self.storage.scan(partition).await
1563    }
1564}
1565
1566impl crate::BufferPooler for Context {
1567    fn network_buffer_pool(&self) -> &crate::BufferPool {
1568        &self.network_buffer_pool
1569    }
1570
1571    fn storage_buffer_pool(&self) -> &crate::BufferPool {
1572        &self.storage_buffer_pool
1573    }
1574}
1575
1576#[cfg(test)]
1577mod tests {
1578    use super::*;
1579    #[cfg(feature = "external")]
1580    use crate::FutureExt;
1581    use crate::{deterministic, reschedule, Blob, Metrics as _, Resolver, Runner as _, Storage};
1582    use commonware_macros::test_traced;
1583    #[cfg(feature = "external")]
1584    use commonware_utils::channel::mpsc;
1585    use commonware_utils::channel::oneshot;
1586    #[cfg(not(feature = "external"))]
1587    use futures::future::pending;
1588    #[cfg(not(feature = "external"))]
1589    use futures::stream::StreamExt as _;
1590    #[cfg(feature = "external")]
1591    use futures::StreamExt;
1592    use futures::{stream::FuturesUnordered, task::noop_waker};
1593
1594    async fn task(i: usize) -> usize {
1595        for _ in 0..5 {
1596            reschedule().await;
1597        }
1598        i
1599    }
1600
1601    fn run_tasks(tasks: usize, runner: deterministic::Runner) -> (String, Vec<usize>) {
1602        runner.start(|context| async move {
1603            let mut handles = FuturesUnordered::new();
1604            for i in 0..=tasks - 1 {
1605                handles.push(context.child("task").spawn(move |_| task(i)));
1606            }
1607
1608            let mut outputs = Vec::new();
1609            while let Some(result) = handles.next().await {
1610                outputs.push(result.unwrap());
1611            }
1612            assert_eq!(outputs.len(), tasks);
1613            (context.auditor().state(), outputs)
1614        })
1615    }
1616
1617    fn run_with_seed(seed: u64) -> (String, Vec<usize>) {
1618        let executor = deterministic::Runner::seeded(seed);
1619        run_tasks(5, executor)
1620    }
1621
1622    #[test]
1623    fn test_same_seed_same_order() {
1624        // Generate initial outputs
1625        let mut outputs = Vec::new();
1626        for seed in 0..1000 {
1627            let output = run_with_seed(seed);
1628            outputs.push(output);
1629        }
1630
1631        // Ensure they match
1632        for seed in 0..1000 {
1633            let output = run_with_seed(seed);
1634            assert_eq!(output, outputs[seed as usize]);
1635        }
1636    }
1637
1638    #[test_traced("TRACE")]
1639    fn test_different_seeds_different_order() {
1640        let output1 = run_with_seed(12345);
1641        let output2 = run_with_seed(54321);
1642        assert_ne!(output1, output2);
1643    }
1644
1645    #[test]
1646    fn test_alarm_min_heap() {
1647        // Populate heap
1648        let now = SystemTime::now();
1649        let alarms = vec![
1650            Alarm {
1651                time: now + Duration::new(10, 0),
1652                waker: noop_waker(),
1653            },
1654            Alarm {
1655                time: now + Duration::new(5, 0),
1656                waker: noop_waker(),
1657            },
1658            Alarm {
1659                time: now + Duration::new(15, 0),
1660                waker: noop_waker(),
1661            },
1662            Alarm {
1663                time: now + Duration::new(5, 0),
1664                waker: noop_waker(),
1665            },
1666        ];
1667        let mut heap = BinaryHeap::new();
1668        for alarm in alarms {
1669            heap.push(alarm);
1670        }
1671
1672        // Verify min-heap
1673        let mut sorted_times = Vec::new();
1674        while let Some(alarm) = heap.pop() {
1675            sorted_times.push(alarm.time);
1676        }
1677        assert_eq!(
1678            sorted_times,
1679            vec![
1680                now + Duration::new(5, 0),
1681                now + Duration::new(5, 0),
1682                now + Duration::new(10, 0),
1683                now + Duration::new(15, 0),
1684            ]
1685        );
1686    }
1687
1688    #[test]
1689    #[should_panic(expected = "runtime timeout")]
1690    fn test_timeout() {
1691        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1692        executor.start(|context| async move {
1693            loop {
1694                context.sleep(Duration::from_secs(1)).await;
1695            }
1696        });
1697    }
1698
1699    #[test]
1700    #[should_panic(expected = "cycle duration must be non-zero when timeout is set")]
1701    fn test_bad_timeout() {
1702        let cfg = Config {
1703            timeout: Some(Duration::default()),
1704            cycle: Duration::default(),
1705            ..Config::default()
1706        };
1707        deterministic::Runner::new(cfg);
1708    }
1709
1710    #[test]
1711    #[should_panic(
1712        expected = "cycle duration must be greater than or equal to system time precision"
1713    )]
1714    fn test_bad_cycle() {
1715        let cfg = Config {
1716            cycle: SYSTEM_TIME_PRECISION - Duration::from_nanos(1),
1717            ..Config::default()
1718        };
1719        deterministic::Runner::new(cfg);
1720    }
1721
1722    #[test]
1723    fn test_recover_synced_storage_persists() {
1724        // Initialize the first runtime
1725        let executor1 = deterministic::Runner::default();
1726        let partition = "test_partition";
1727        let name = b"test_blob";
1728        let data = b"Hello, world!";
1729
1730        // Run some tasks, sync storage, and recover the runtime
1731        let (state, checkpoint) = executor1.start_and_recover(|context| async move {
1732            let (blob, _) = context.open(partition, name).await.unwrap();
1733            blob.write_at(0, data).await.unwrap();
1734            blob.sync().await.unwrap();
1735            context.auditor().state()
1736        });
1737
1738        // Verify auditor state is the same
1739        assert_eq!(state, checkpoint.auditor.state());
1740
1741        // Check that synced storage persists after recovery
1742        let executor = Runner::from(checkpoint);
1743        executor.start(|context| async move {
1744            let (blob, len) = context.open(partition, name).await.unwrap();
1745            assert_eq!(len, data.len() as u64);
1746            let read = blob.read_at(0, data.len()).await.unwrap();
1747            assert_eq!(read.coalesce(), data);
1748        });
1749    }
1750
1751    #[test]
1752    #[should_panic(expected = "goodbye")]
1753    fn test_recover_panic_handling() {
1754        // Initialize the first runtime
1755        let executor1 = deterministic::Runner::default();
1756        let (_, checkpoint) = executor1.start_and_recover(|_| async move {
1757            reschedule().await;
1758        });
1759
1760        // Ensure that panic setting is preserved
1761        let executor = Runner::from(checkpoint);
1762        executor.start(|_| async move {
1763            panic!("goodbye");
1764        });
1765    }
1766
1767    #[test]
1768    fn test_recover_unsynced_storage_does_not_persist() {
1769        // Initialize the first runtime
1770        let executor = deterministic::Runner::default();
1771        let partition = "test_partition";
1772        let name = b"test_blob";
1773        let data = b"Hello, world!";
1774
1775        // Run some tasks without syncing storage
1776        let (_, checkpoint) = executor.start_and_recover(|context| async move {
1777            let (blob, _) = context.open(partition, name).await.unwrap();
1778            blob.write_at(0, data).await.unwrap();
1779        });
1780
1781        // Recover the runtime
1782        let executor = Runner::from(checkpoint);
1783
1784        // Check that unsynced storage does not persist after recovery
1785        executor.start(|context| async move {
1786            let (_, len) = context.open(partition, name).await.unwrap();
1787            assert_eq!(len, 0);
1788        });
1789    }
1790
1791    #[test]
1792    fn test_recover_dns_mappings_persist() {
1793        // Initialize the first runtime
1794        let executor = deterministic::Runner::default();
1795        let host = "example.com";
1796        let addrs = vec![
1797            IpAddr::V4(std::net::Ipv4Addr::new(192, 168, 1, 1)),
1798            IpAddr::V4(std::net::Ipv4Addr::new(192, 168, 1, 2)),
1799        ];
1800
1801        // Register DNS mapping and recover the runtime
1802        let (state, checkpoint) = executor.start_and_recover({
1803            let addrs = addrs.clone();
1804            |context| async move {
1805                context.resolver_register(host, Some(addrs));
1806                context.auditor().state()
1807            }
1808        });
1809
1810        // Verify auditor state is the same
1811        assert_eq!(state, checkpoint.auditor.state());
1812
1813        // Check that DNS mappings persist after recovery
1814        let executor = Runner::from(checkpoint);
1815        executor.start(move |context| async move {
1816            let resolved = context.resolve(host).await.unwrap();
1817            assert_eq!(resolved, addrs);
1818        });
1819    }
1820
1821    #[test]
1822    fn test_recover_time_persists() {
1823        // Initialize the first runtime
1824        let executor = deterministic::Runner::default();
1825        let duration_to_sleep = Duration::from_secs(10);
1826
1827        // Sleep for some time and recover the runtime
1828        let (time_before_recovery, checkpoint) = executor.start_and_recover(|context| async move {
1829            context.sleep(duration_to_sleep).await;
1830            context.current()
1831        });
1832
1833        // Check that the time advanced correctly before recovery
1834        assert_eq!(
1835            time_before_recovery.duration_since(UNIX_EPOCH).unwrap(),
1836            duration_to_sleep
1837        );
1838
1839        // Check that the time persists after recovery
1840        let executor2 = Runner::from(checkpoint);
1841        executor2.start(move |context| async move {
1842            assert_eq!(context.current(), time_before_recovery);
1843
1844            // Advance time further
1845            context.sleep(duration_to_sleep).await;
1846            assert_eq!(
1847                context.current().duration_since(UNIX_EPOCH).unwrap(),
1848                duration_to_sleep * 2
1849            );
1850        });
1851    }
1852
1853    #[test]
1854    #[should_panic(expected = "executor still has weak references")]
1855    fn test_context_return() {
1856        // Initialize runtime
1857        let executor = deterministic::Runner::default();
1858
1859        // Start runtime
1860        let context = executor.start(|context| async move {
1861            // Attempt to recover before the runtime has finished
1862            context
1863        });
1864
1865        // Should never get this far
1866        drop(context);
1867    }
1868
1869    #[test]
1870    fn test_default_time_zero() {
1871        // Initialize runtime
1872        let executor = deterministic::Runner::default();
1873
1874        executor.start(|context| async move {
1875            // Check that the time is zero
1876            assert_eq!(
1877                context.current().duration_since(UNIX_EPOCH).unwrap(),
1878                Duration::ZERO
1879            );
1880        });
1881    }
1882
1883    #[test]
1884    fn test_start_time() {
1885        // Initialize runtime with default config
1886        let executor_default = deterministic::Runner::default();
1887        executor_default.start(|context| async move {
1888            assert_eq!(context.current(), UNIX_EPOCH);
1889        });
1890
1891        // Initialize runtime with custom start time
1892        let start_time = UNIX_EPOCH + Duration::from_secs(100);
1893        let cfg = Config::default().with_start_time(start_time);
1894        let executor = deterministic::Runner::new(cfg);
1895
1896        executor.start(move |context| async move {
1897            // Check that the time matches the custom start time
1898            assert_eq!(context.current(), start_time);
1899        });
1900    }
1901
1902    #[test]
1903    #[should_panic(expected = "start time must be greater than or equal to unix epoch")]
1904    fn test_bad_start_time() {
1905        let cfg = Config::default().with_start_time(UNIX_EPOCH - Duration::from_secs(1));
1906        deterministic::Runner::new(cfg);
1907    }
1908
1909    #[cfg(not(feature = "external"))]
1910    #[test]
1911    #[should_panic(expected = "runtime stalled")]
1912    fn test_stall() {
1913        // Initialize runtime
1914        let executor = deterministic::Runner::default();
1915
1916        // Start runtime
1917        executor.start(|_| async move {
1918            pending::<()>().await;
1919        });
1920    }
1921
1922    #[cfg(not(feature = "external"))]
1923    #[test]
1924    #[should_panic(expected = "runtime stalled")]
1925    fn test_external_simulated() {
1926        // Initialize runtime
1927        let executor = deterministic::Runner::default();
1928
1929        // Create a thread that waits for 1 second
1930        let (tx, rx) = oneshot::channel();
1931        std::thread::spawn(move || {
1932            std::thread::sleep(Duration::from_secs(1));
1933            tx.send(()).unwrap();
1934        });
1935
1936        // Start runtime
1937        executor.start(|_| async move {
1938            rx.await.unwrap();
1939        });
1940    }
1941
1942    #[cfg(feature = "external")]
1943    #[test]
1944    fn test_external_realtime() {
1945        // Initialize runtime
1946        let executor = deterministic::Runner::default();
1947
1948        // Create a thread that waits for 1 second
1949        let (tx, rx) = oneshot::channel();
1950        std::thread::spawn(move || {
1951            std::thread::sleep(Duration::from_secs(1));
1952            tx.send(()).unwrap();
1953        });
1954
1955        // Start runtime
1956        executor.start(|_| async move {
1957            rx.await.unwrap();
1958        });
1959    }
1960
1961    #[cfg(feature = "external")]
1962    #[test]
1963    fn test_external_realtime_variable() {
1964        // Initialize runtime
1965        let executor = deterministic::Runner::default();
1966
1967        // Start runtime
1968        executor.start(|context| async move {
1969            // Initialize test
1970            let start_real = SystemTime::now();
1971            let start_sim = context.current();
1972            let (first_tx, first_rx) = oneshot::channel();
1973            let (second_tx, second_rx) = oneshot::channel();
1974            let (results_tx, mut results_rx) = mpsc::channel(2);
1975
1976            // Create a thread that waits for 1 second
1977            let first_wait = Duration::from_secs(1);
1978            std::thread::spawn(move || {
1979                std::thread::sleep(first_wait);
1980                first_tx.send(()).unwrap();
1981            });
1982
1983            // Create a thread
1984            std::thread::spawn(move || {
1985                std::thread::sleep(Duration::ZERO);
1986                second_tx.send(()).unwrap();
1987            });
1988
1989            // Wait for a delay sampled before the external send occurs
1990            let first = context.child("sample_before_send").spawn({
1991                let results_tx = results_tx.clone();
1992                move |context| async move {
1993                    first_rx.pace(&context, Duration::ZERO).await.unwrap();
1994                    let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
1995                    assert!(elapsed_real > first_wait);
1996                    let elapsed_sim = context.current().duration_since(start_sim).unwrap();
1997                    assert!(elapsed_sim < first_wait);
1998                    results_tx.send(1).await.unwrap();
1999                }
2000            });
2001
2002            // Wait for a delay sampled after the external send occurs
2003            let second = context
2004                .child("sample_after_send")
2005                .spawn(move |context| async move {
2006                    second_rx.pace(&context, first_wait).await.unwrap();
2007                    let elapsed_real = SystemTime::now().duration_since(start_real).unwrap();
2008                    assert!(elapsed_real >= first_wait);
2009                    let elapsed_sim = context.current().duration_since(start_sim).unwrap();
2010                    assert!(elapsed_sim >= first_wait);
2011                    results_tx.send(2).await.unwrap();
2012                });
2013
2014            // Wait for both tasks to complete
2015            second.await.unwrap();
2016            first.await.unwrap();
2017
2018            // Ensure order is correct
2019            let mut results = Vec::new();
2020            for _ in 0..2 {
2021                results.push(results_rx.recv().await.unwrap());
2022            }
2023            assert_eq!(results, vec![1, 2]);
2024        });
2025    }
2026
2027    #[cfg(not(feature = "external"))]
2028    #[test]
2029    fn test_simulated_skip() {
2030        // Initialize runtime
2031        let executor = deterministic::Runner::default();
2032
2033        // Start runtime
2034        executor.start(|context| async move {
2035            context.sleep(Duration::from_secs(1)).await;
2036
2037            // Check if we skipped
2038            let metrics = context.encode();
2039            let iterations = metrics
2040                .lines()
2041                .find_map(|line| {
2042                    line.strip_prefix("runtime_iterations_total ")
2043                        .and_then(|value| value.trim().parse::<u64>().ok())
2044                })
2045                .expect("missing runtime_iterations_total metric");
2046            assert!(iterations < 10);
2047        });
2048    }
2049
2050    #[cfg(feature = "external")]
2051    #[test]
2052    fn test_realtime_no_skip() {
2053        // Initialize runtime
2054        let executor = deterministic::Runner::default();
2055
2056        // Start runtime
2057        executor.start(|context| async move {
2058            context.sleep(Duration::from_secs(1)).await;
2059
2060            // Check if we skipped
2061            let metrics = context.encode();
2062            let iterations = metrics
2063                .lines()
2064                .find_map(|line| {
2065                    line.strip_prefix("runtime_iterations_total ")
2066                        .and_then(|value| value.trim().parse::<u64>().ok())
2067                })
2068                .expect("missing runtime_iterations_total metric");
2069            assert!(iterations > 500);
2070        });
2071    }
2072
2073    #[test]
2074    #[should_panic(expected = "label must start with [a-zA-Z]")]
2075    fn test_metrics_label_empty() {
2076        let executor = deterministic::Runner::default();
2077        executor.start(|context| async move {
2078            let _ = context.child("");
2079        });
2080    }
2081
2082    #[test]
2083    #[should_panic(expected = "label must start with [a-zA-Z]")]
2084    fn test_metrics_label_invalid_first_char() {
2085        let executor = deterministic::Runner::default();
2086        executor.start(|context| async move {
2087            let _ = context.child("1invalid");
2088        });
2089    }
2090
2091    #[test]
2092    #[should_panic(expected = "label must only contain [a-zA-Z0-9_]")]
2093    fn test_metrics_label_invalid_char() {
2094        let executor = deterministic::Runner::default();
2095        executor.start(|context| async move {
2096            let _ = context.child("invalid-label");
2097        });
2098    }
2099
2100    #[test]
2101    #[should_panic(expected = "using runtime label is not allowed")]
2102    fn test_metrics_label_reserved_prefix() {
2103        let executor = deterministic::Runner::default();
2104        executor.start(|context| async move {
2105            let _ = context.child(METRICS_PREFIX);
2106        });
2107    }
2108
2109    #[test]
2110    fn test_metrics_duplicate_attribute_overwrites() {
2111        let executor = deterministic::Runner::default();
2112        executor.start(|context| async move {
2113            let context = context
2114                .child("test")
2115                .with_attribute("epoch", "old")
2116                .with_attribute("epoch", "new");
2117            assert_eq!(
2118                context.name().attributes,
2119                vec![("epoch".to_string(), "new".to_string())]
2120            );
2121        });
2122    }
2123
2124    #[test]
2125    fn test_storage_fault_injection_and_recovery() {
2126        // Phase 1: Run with 100% sync failure rate
2127        let cfg = deterministic::Config::default().with_storage_fault_config(FaultConfig {
2128            sync_rate: Some(1.0),
2129            ..Default::default()
2130        });
2131
2132        let (result, checkpoint) =
2133            deterministic::Runner::new(cfg).start_and_recover(|ctx| async move {
2134                let (blob, _) = ctx.open("test_fault", b"blob").await.unwrap();
2135                blob.write_at(0, b"data".to_vec()).await.unwrap();
2136                blob.sync().await // This should fail due to fault injection
2137            });
2138
2139        // Verify sync failed
2140        assert!(result.is_err());
2141
2142        // Phase 2: Recover and disable faults explicitly
2143        deterministic::Runner::from(checkpoint).start(|ctx| async move {
2144            // Explicitly disable faults for recovery verification
2145            *ctx.storage_fault_config().write() = FaultConfig::default();
2146
2147            // Data was not synced, so blob should be empty (unsynced writes are lost)
2148            let (blob, len) = ctx.open("test_fault", b"blob").await.unwrap();
2149            assert_eq!(len, 0, "unsynced data should be lost after recovery");
2150
2151            // Now we can write and sync successfully
2152            blob.write_at(0, b"recovered".to_vec()).await.unwrap();
2153            blob.sync()
2154                .await
2155                .expect("sync should succeed with faults disabled");
2156
2157            // Verify data persisted
2158            let read_buf = blob.read_at(0, 9).await.unwrap();
2159            assert_eq!(read_buf.coalesce(), b"recovered");
2160        });
2161    }
2162
2163    #[test]
2164    fn test_storage_fault_dynamic_config() {
2165        let executor = deterministic::Runner::default();
2166        executor.start(|ctx| async move {
2167            let (blob, _) = ctx.open("test_dynamic", b"blob").await.unwrap();
2168
2169            // Initially no faults - sync should succeed
2170            blob.write_at(0, b"initial".to_vec()).await.unwrap();
2171            blob.sync().await.expect("initial sync should succeed");
2172
2173            // Enable sync faults dynamically
2174            let storage_fault_cfg = ctx.storage_fault_config();
2175            storage_fault_cfg.write().sync_rate = Some(1.0);
2176
2177            // Now sync should fail
2178            blob.write_at(0, b"updated".to_vec()).await.unwrap();
2179            let result = blob.sync().await;
2180            assert!(result.is_err(), "sync should fail with faults enabled");
2181
2182            // Disable faults
2183            storage_fault_cfg.write().sync_rate = Some(0.0);
2184
2185            // Sync should succeed again
2186            blob.sync()
2187                .await
2188                .expect("sync should succeed with faults disabled");
2189        });
2190    }
2191
2192    #[test]
2193    fn test_storage_fault_determinism() {
2194        // Run the same sequence twice with the same seed
2195        fn run_with_seed(seed: u64) -> Vec<bool> {
2196            let cfg = deterministic::Config::default()
2197                .with_seed(seed)
2198                .with_storage_fault_config(FaultConfig {
2199                    open_rate: Some(0.5),
2200                    ..Default::default()
2201                });
2202
2203            let runner = deterministic::Runner::new(cfg);
2204            runner.start(|ctx| async move {
2205                let mut results = Vec::new();
2206                for i in 0..20 {
2207                    let name = format!("blob{i}");
2208                    let result = ctx.open("test_determinism", name.as_bytes()).await;
2209                    results.push(result.is_ok());
2210                }
2211                results
2212            })
2213        }
2214
2215        let results1 = run_with_seed(12345);
2216        let results2 = run_with_seed(12345);
2217        assert_eq!(
2218            results1, results2,
2219            "same seed should produce same failure pattern"
2220        );
2221
2222        let results3 = run_with_seed(99999);
2223        assert_ne!(
2224            results1, results3,
2225            "different seeds should produce different patterns"
2226        );
2227    }
2228
2229    #[test]
2230    fn test_storage_fault_determinism_multi_task() {
2231        // Run the same multi-task sequence twice with the same seed.
2232        // This tests that task shuffling + fault decisions interleave deterministically.
2233        fn run_with_seed(seed: u64) -> Vec<u32> {
2234            let cfg = deterministic::Config::default()
2235                .with_seed(seed)
2236                .with_storage_fault_config(FaultConfig {
2237                    open_rate: Some(0.5),
2238                    write_rate: Some(0.3),
2239                    sync_rate: Some(0.2),
2240                    ..Default::default()
2241                });
2242
2243            let runner = deterministic::Runner::new(cfg);
2244            runner.start(|ctx| async move {
2245                // Spawn multiple tasks that do storage operations
2246                let mut handles = Vec::new();
2247                for i in 0..5 {
2248                    let ctx = ctx.child("task");
2249                    handles.push(ctx.spawn(move |ctx| async move {
2250                        let mut successes = 0u32;
2251                        for j in 0..4 {
2252                            let name = format!("task{i}_blob{j}");
2253                            if let Ok((blob, _)) = ctx.open("partition", name.as_bytes()).await {
2254                                successes += 1;
2255                                if blob.write_at(0, b"data".to_vec()).await.is_ok() {
2256                                    successes += 1;
2257                                }
2258                                if blob.sync().await.is_ok() {
2259                                    successes += 1;
2260                                }
2261                            }
2262                        }
2263                        successes
2264                    }));
2265                }
2266
2267                // Collect results from all tasks
2268                let mut results = Vec::new();
2269                for handle in handles {
2270                    results.push(handle.await.unwrap());
2271                }
2272                results
2273            })
2274        }
2275
2276        let results1 = run_with_seed(42);
2277        let results2 = run_with_seed(42);
2278        assert_eq!(
2279            results1, results2,
2280            "same seed should produce same multi-task pattern"
2281        );
2282
2283        let results3 = run_with_seed(99999);
2284        assert_ne!(
2285            results1, results3,
2286            "different seeds should produce different patterns"
2287        );
2288    }
2289}